JStorm 源码解析:worker 的启动和运行机制

栏目: 编程工具 · 发布时间: 5年前

内容简介:上一篇我们分析了 supervisor 节点的启动和运行过程,提及到 supervisor 的核心工作就是基于 ZK 从 nimbus 节点领取分配给它的任务,并启动 worker 执行。一个 worker 就是一个 JVM 进程,运行在 supervisor 节点上,多个 task 可以同时运行在一个 worker 进程之中,每个 task 都对应一个线程。Worker 进程的启动位于 Worker 类中,前面我们在分析 supervisor 节点的启动过程时提及到了对于 Worker 类 main 函

上一篇我们分析了 supervisor 节点的启动和运行过程,提及到 supervisor 的核心工作就是基于 ZK 从 nimbus 节点领取分配给它的任务,并启动 worker 执行。一个 worker 就是一个 JVM 进程,运行在 supervisor 节点上,多个 task 可以同时运行在一个 worker 进程之中,每个 task 都对应一个线程。

Worker 进程的启动位于 Worker 类中,前面我们在分析 supervisor 节点的启动过程时提及到了对于 Worker 类 main 函数的触发,supervisor 在启动相应 worker 进程时会指定 topologyId、supervisorId、workerPort、workerId,以及 classpath 等参数,worker 在拿到这些参数之后会先获取当前机器上端口对应的老进程,并逐一 kill 掉,然后调用 Worker#mk_worker 方法创建并启动对应的 worker 实例,该方法的核心实现如下:

Worker w = new Worker(conf, context, topologyId, supervisorId, port, workerId, jarPath);
return w.execute();

Worker 类仅包含一个实例属性 WorkerData,它封装了所有与 worker 运行相关的属性,实例化 Worker 对象的过程也是初始化 WorkerData 属性的过程,该过程主要包含以下工作:

workers/${worker_id}/pids

初始化完成之后会调用 Worker#execute 方法创建并启动 worker 进程,该方法主要的执行流程可以概括如下:

  1. 为当前 worker 创建并启动一个 socket 连接,用于接收消息并分发给名下的 task 线程
  2. 启动一个线程用于维护当前 worker 状态变更时,更新与其它 worker 之间的连接关系
  3. 启动一个线程用于定期获取当前 topology 在 ZK 上的基本信息,当 topology 状态发生变更时触发本地相应操作
  4. 启动一个线程循环消费当前 worker 的 tuple 队列发送给对应的下游 task 线程
  5. 启动一个线程用于定期更新本地的 worker 心跳信息
  6. 创建并启动当前 worker 下所有的 task 任务

方法实现如下:

public WorkerShutdown execute() throws Exception {
    List<AsyncLoopThread> threads = new ArrayList<>();

    // 1. 为 worker 创建一个 socket 连接,接收和分发消息给对应的 task
    AsyncLoopThread controlRvThread = this.startDispatchThread();
    threads.add(controlRvThread);

    // 2. 创建线程用于在 worker 关闭或者新启动时更新与其他 worker 之间的连接信息
    RefreshConnections refreshConn = this.makeRefreshConnections();
    AsyncLoopThread refreshConnLoopThread = new AsyncLoopThread(refreshConn, false, Thread.MIN_PRIORITY, true);
    threads.add(refreshConnLoopThread);

    // 3. 获取 topology 在 ZK 上的状态,当状态发生变更时更新本地 task 状态
    RefreshActive refreshZkActive = new RefreshActive(workerData);
    AsyncLoopThread refreshZk = new AsyncLoopThread(refreshZkActive, false, Thread.MIN_PRIORITY, true);
    threads.add(refreshZk);

    // 4. 创建一个线程循环消费 tuple 队列发送给对应的下游 task
    DrainerCtrlRunnable drainerCtrlRunnable = new DrainerCtrlRunnable(workerData, MetricDef.SEND_THREAD);
    AsyncLoopThread controlSendThread = new AsyncLoopThread(drainerCtrlRunnable, false, Thread.MAX_PRIORITY, true);
    threads.add(controlSendThread);

    // Sync heartbeat to Apsara Container
    AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkWorkerInstance(workerData.getStormConf());
    if (syncContainerHbThread != null) {
        threads.add(syncContainerHbThread);
    }

    JStormMetricsReporter metricReporter = new JStormMetricsReporter(workerData);
    metricReporter.init();
    workerData.setMetricsReporter(metricReporter);

    // 5. 更新本地心跳信息
    RunnableCallback heartbeatFn = new WorkerHeartbeatRunnable(workerData);
    AsyncLoopThread hb = new AsyncLoopThread(heartbeatFn, false, null, Thread.NORM_PRIORITY, true);
    threads.add(hb);

    // 6. 创建并启动当前 worker 下所有的 task
    List<TaskShutdownDaemon> shutdownTasks = this.createTasks();
    workerData.setShutdownTasks(shutdownTasks);

    List<AsyncLoopThread> serializeThreads = workerData.setSerializeThreads();
    threads.addAll(serializeThreads);
    List<AsyncLoopThread> deserializeThreads = workerData.setDeserializeThreads();
    threads.addAll(deserializeThreads);

    return new WorkerShutdown(workerData, threads);
}

一. 消息接收与分发

Storm 会为 worker 基于 Netty 创建并返回一个 socket 连接用于接收消息,同时 worker 与名下所有 task 之间会维持一个传输队列,并启动一个线程循环消费接收到的消息投递给对应 task 的传输队列中。该过程位于 Worker#startDispatchThread 方法中,该方法实现如下(去掉了一些非关键代码):

private AsyncLoopThread startDispatchThread() {
    IContext context = workerData.getContext(); // 获取消息上下文:NettyContext
    String topologyId = workerData.getTopologyId();

    // 1. 创建一个接收消息的消息队列(disruptor)
    Map stormConf = workerData.getStormConf();
    long timeout = JStormUtils.parseLong(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10); // 默认 10ms
    WaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS); // 10ms
    int queueSize = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_CTRL_BUFFER_SIZE), 256);
    DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI, queueSize, waitStrategy, false, 0, 0);

    // 2. 为当前 worker 基于 Netty 创建并返回一个 Socket 连接用于接收消息
    IConnection recvConnection = context.bind(
            topologyId, workerData.getPort(), workerData.getDeserializeQueues(), recvControlQueue, false, workerData.getTaskIds());
    workerData.setRecvConnection(recvConnection);

    // 3. 启动一个线程循环消费 worker 接收到的消息,并应用 DisruptorRunnable.onEvent 方法,
    //    最终调用的是 VirtualPortCtrlDispatch.handleEvent 方法,将消息投递给指定 task 的消息队列
    RunnableCallback recvControlDispatcher = new VirtualPortCtrlDispatch(
            workerData, recvConnection, recvControlQueue, MetricDef.RECV_THREAD);
    return new AsyncLoopThread(recvControlDispatcher, false, Thread.MAX_PRIORITY, true);
}

这里的消息队列底层都依赖于 disruptor 实现,最终对于接收到的消息都会调用 VirtualPortCtrlDispatch#handleEvent 方法进行处理:

public void handleEvent(Object event, boolean endOfBatch) throws Exception {
    TaskMessage message = (TaskMessage) event;
    int task = message.task(); // 获取当前消息对应的 taskId

    // 消息反序列化
    Object tuple = null;
    try {
        // there might be errors when calling update_topology
        tuple = this.deserialize(message.message(), task);
    } catch (Throwable e) {
        if (Utils.exceptionCauseIsInstanceOf(KryoException.class, e)) {
            throw new RuntimeException(e);
        }
        LOG.warn("serialize msg error", e);
    }

    // 获取 taskId 对应的消息通道
    DisruptorQueue queue = controlQueues.get(task);
    if (queue == null) {
        LOG.warn("Received invalid control message for task-{}, Dropping...{} ", task, tuple);
        return;
    }
    if (tuple != null) {
        // 将消息投递给对应的 task 传输队列
        queue.publish(tuple);
    }
}

二. 创建并启动用于维护 worker 之间连接关系的线程

在这一步会创建一个 RefreshConnections 对象,它继承了 RunnableCallback 类,所以同样是被异步循环线程模型接管(按照指定间隔循环调用其 RefreshConnections#run 方法),storm 会定期检测 ZK 上的 topology 任务分配信息是否有更新,如果有比本地更新的任务分配(依赖于任务分配时间戳进行判定)则会判断新任务分配的类型来相应的更新本地的信息。

如果当前的任务分配类型仅仅是更新集群上已有的 topology,则 storm 会遍历通知各个 task 执行相应的更新操作,同时会回调已注册的所有更新监听器以更新配置信息,实现如下:

// 当前任务分配已经更新且是更新 topology 操作,则通知所有的 task
List<TaskShutdownDaemon> taskShutdowns = workerData.getShutdownTasks();
Map newConf = StormConfig.read_supervisor_topology_conf(conf, topologyId);
workerData.getStormConf().putAll(newConf);
for (TaskShutdownDaemon taskSD : taskShutdowns) {
    // 通知所有的 task
    taskSD.update(newConf);
}
// disable/enable metrics on the fly
workerData.getUpdateListener().update(newConf); // 回调更新监听器,更新配置
workerData.setAssignmentType(AssignmentType.UpdateTopology);

如果当前是更新以外的任务分配类型(Assign、ScaleTopology),则 storm 会从新的任务分配信息中分别获取新增的、待删除的,以及需要更新的 taskId 列表,并执行相应的创建、删除,以及更新 task 操作,同时会更新 worker 上所有 task 的下游 task 列表信息。部分代码实现如下:

// 获取新增的 taskId 列表
Set<Integer> addedTasks = this.getAddedTasks(assignment);
// 获取待删除的 taskId 列表
Set<Integer> removedTasks = this.getRemovedTasks(assignment);
// 获取待更新的 taskId 列表
Set<Integer> updatedTasks = this.getUpdatedTasks(assignment);

// 基于新任务分配信息更新 workerData
workerData.updateWorkerData(assignment);
workerData.updateKryoSerializer();

// 关闭需要移除的 task
this.shutdownTasks(removedTasks);
// 创建新增的 task
this.createTasks(addedTasks);
// 更新已有需要被更新的 task
this.updateTasks(updatedTasks);

// 更新当前 worker 上所有 task 的下游 task 列表信息
Set<Integer> tmpOutboundTasks = Worker.worker_output_tasks(workerData);
if (!outboundTasks.equals(tmpOutboundTasks)) {
    for (int taskId : tmpOutboundTasks) {
        if (!outboundTasks.contains(taskId)) {
            workerData.addOutboundTaskStatusIfAbsent(taskId);
        }
    }
    for (int taskId : workerData.getOutboundTaskStatus().keySet()) {
        if (!tmpOutboundTasks.contains(taskId)) {
            workerData.removeOutboundTaskStatus(taskId);
        }
    }
    workerData.setOutboundTasks(tmpOutboundTasks);
    outboundTasks = tmpOutboundTasks;
}
workerData.setAssignmentType(AssignmentType.Assign);

三. 创建并启动定期获取 topology 基本信息的线程

在这一步会创建一个 RefreshActive 对象,它同样继承了 RunnableCallback 类,所以同样也是被异步循环线程模型接管(按照指定间隔循环调用其 RefreshActive#run 方法),storm 会定期获取当前 topology 在 ZK 上的基本信息,当 topology 状态发生变更时触发本地执行相应的操作。

如果 topology 状态信息变为 active、upgrading,或者 rollback 时,storm 会依次将本地 task 的状态设置为 TaskStatus.RUN ,如果当前 task 对应的组件是 spout,则会触发 ISpout#activate 方法。如果当前 topology 状态不为 inactive 时,storm 会依次将本地的 task 状态设置为 TaskStatus.PAUSE ,如果当前 task 对应的组件是 spout,则会触发 ISpout#deactivate 方法。最后更新本地记录的 topology 状态。相关实现如下:

if (newTopologyStatus.equals(StatusType.active) // 激活
        || newTopologyStatus.equals(StatusType.upgrading) // 灰度
        || newTopologyStatus.equals(StatusType.rollback)) { // 回滚
    for (TaskShutdownDaemon task : tasks) {
        if (task.getTask().getTaskStatus().isInit()) {
            task.getTask().getTaskStatus().setStatus(TaskStatus.RUN);
        } else {
            task.active();
        }
    }
} else if (oldTopologyStatus == null || !oldTopologyStatus.equals(StatusType.inactive)) {
    for (TaskShutdownDaemon task : tasks) {
        if (task.getTask().getTaskStatus().isInit()) {
            task.getTask().getTaskStatus().setStatus(TaskStatus.PAUSE);
        } else {
            task.deactive();
        }
    }
}
workerData.setTopologyStatus(newTopologyStatus);

四. 创建并启动循环消费 worker tuple 队列的线程

在这一步会创建一个 DrainerCtrlRunnable 对象,它同样继承了 RunnableCallback 类,所以同样也是被异步循环线程模型接管(按照指定间隔循环调用其 DrainerCtrlRunnable#run 方法),storm 会循环消费当前 worker 的 tuple 队列 transferCtrlQueue,并最终调用 DrainerCtrlRunnable#handleEvent 方法对拿到的消息进行处理,该方法的实现如下:

public void handleEvent(Object event, boolean endOfBatch) throws Exception {
    if (event == null) {
        return;
    }
    ITupleExt tuple = (ITupleExt) event;
    int targetTask = tuple.getTargetTaskId();

    // 获取与下游 task 的连接
    IConnection conn = this.getConnection(targetTask);
    if (conn != null) {
        byte[] tupleMessage = null;
        try {
            // there might be errors when calling update_topology
            tupleMessage = this.serialize(tuple); // 序列化数据
        } catch (Throwable e) {
            // 省略异常处理
        }
        // 基于 netty 发送数据
        TaskMessage message = new TaskMessage(TaskMessage.CONTROL_MESSAGE, targetTask, tupleMessage);
        conn.sendDirect(message);
    }
}

方法的逻辑比较简单,拿到当前 tuple 对应的下游 taskId,然后与之建立连接(netty)并将 tuple 发送给它。

五. 创建并启动当前 worker 下所有的 task 线程

方法 Worker#createTasks 用于为当前 worker 下的所有 task 任务创建一个 Task 对象,并为每个 task 启动一个线程执行,同时为每个 task 任务创建一个 TaskShutdownDaemon 对象用于管理对应的 task 线程,方法的实现如下:

private List<TaskShutdownDaemon> createTasks() throws Exception {
    List<TaskShutdownDaemon> shutdownTasks = new ArrayList<>();

    // 获取当前 worker 下所有的 taskId
    Set<Integer> taskIds = workerData.getTaskIds();

    Set<Thread> threads = new HashSet<>();
    List<Task> taskArrayList = new ArrayList<>();
    for (int taskId : taskIds) {
        // 创建并启动 task
        Task task = new Task(workerData, taskId);
        Thread thread = new Thread(task);
        threads.add(thread);
        taskArrayList.add(task);
        thread.start(); // 启动 task
    }
    for (Thread thread : threads) {
        thread.join();
    }
    for (Task t : taskArrayList) {
        shutdownTasks.add(t.getTaskShutdownDameon());
    }
    return shutdownTasks;
}

Task 类实现了 Runnable 接口,其 run 方法中简单调用了 Task#execute 方法,该方法首先会向系统 bolt 发送一条“startup”消息,然后依据当前的组件类型创建对应的任务执行器,创建的过程位于 Task#mkExecutor 方法中:

public BaseExecutors mkExecutor() {
    BaseExecutors baseExecutor = null;

    if (taskObj instanceof IBolt) {
        if (taskId == topologyContext.getTopologyMasterId()) {
            baseExecutor = new TopologyMasterBoltExecutors(this);
        } else {
            baseExecutor = new BoltExecutors(this);
        }
    } else if (taskObj instanceof ISpout) {
        if (this.isSingleThread(stormConf)) {
            baseExecutor = new SingleThreadSpoutExecutors(this);
        } else {
            baseExecutor = new MultipleThreadSpoutExecutors(this);
        }
    }

    return baseExecutor;
}

BaseExecutors 类是一个 RunnableCallback 类,所以其 run 方法会被异步循环调用。继承自 BaseExecutors 类有 5 个(如下),而 Task#mkExecutor 方法基于组件类型分别选择了相应的实现类进行实例化。

  • BoltExecutors
  • TopologyMasterBoltExecutors
  • SpoutExecutors
  • SingleThreadSpoutExecutors
  • MultipleThreadSpoutExecutors

先来看一下 BoltExecutors 和 TopologyMasterBoltExecutors,这是 bolt 组件的任务执行器,其中 TopologyMasterBoltExecutors 继承自 BoltExecutors,所以接下来我们主要来看一下 BoltExecutors 的实现。BoltExecutors 类的 run 方法实现如下:

public void run() {
    if (!isFinishInit) {
        // 执行初始化操作,主要是调用了 IBolt.prepare 方法
        this.initWrapper();
    }
    while (!taskStatus.isShutdown()) {
        try {
            // 循环消费当前 task 的消息队列
            this.consumeExecuteQueue();
        } catch (Throwable e) {
            // 省略异常处理逻辑
        }
    }
}

方法首先会判定是否完成了初始化操作,如果未完成则会调用 BaseExecutors#initWrapper 执行初始化,这期间主要是调用了 IBolt#prepare 方法,这也是我们在实现一个 bolt 时执行初始化的方法。如果当前 task 线程没有被销毁,则会一直循环调用 BoltExecutors#consumeExecuteQueue 消费当前 task 的消息队列。前面的分析我们知道 worker 会对接收到的消息按照 taskId 投递给对应 task 的消息队列,而消息队列的消费过程就在这里发生。针对接收到消息会逐条进行处理,这里最终调用的是 BoltExecutors#onEvent 方法,处理的消息就是我们熟悉的 Tuple 对象,而该方法的核心就是调用 IBolt#execute 方法,也就是调用用户自定义的策略对收到的 tuple 进行处理。

再来看一下 SingleThreadSpoutExecutors 和 MultipleThreadSpoutExecutors,这两类都继承自 SpoutExecutors 类,区别仅在于对于消息的附加处理和正常的业务逻辑是否位于同一个线程中,而核心逻辑都是调用 ISpout#nextTuple 方法,也就是执行用户自定义的业务逻辑。

针对 worker 的运行机制就分析到这里,但是 storm 对于消息的处理并没有结束,下一篇我们将一起探寻 ack 机制,看看 storm 如何保证消息至少被执行一次(at least once)。

(本篇完)

转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处

本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据之美

数据之美

邱南森 (Nathan Yau) / 张伸 / 中国人民大学出版社 / 2014-2-1 / CNY 89.00

这是一本教我们如何制作完美可视化图表,挖掘大数据背后意义的书。作者认为,可视化是一种媒介,向我们揭示了数据背后的故事。他循序渐进、深入浅出地道出了数据可视化的步骤和思想。本书让我们知道了如何理解数据可视化,如何探索数据的模式和寻找数据间的关联,如何选择适合自己的数据和目的的可视化方式,有哪些我们可以利用的可视化工具以及这些工具各有怎样的利弊。 作者给我们提供了丰富的可视化信息以及查看、探索数......一起来看看 《数据之美》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

URL 编码/解码
URL 编码/解码

URL 编码/解码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试