JStorm 源码分析 - Worker 的启动与运行

栏目: 编程工具 · 发布时间: 5年前

内容简介:对一个 topology,JStorm 最终会调度成一个或多个 worker,每个 worker 即为一个真正的操作系统执行进程,分布到一个集群的一台或者多台机器上并行执行。 而每个 worker 中,又可以有多个 task,分别代表一个执行线程。每个 task 就是上面提到的组件(component)的实现,要么是 spout 要么是 bolt 。如上图所示, worker 是一个独立 JVM 的进程, 它其实是由 Supervisor 通过命令行执行 Worker#main 方法来启动. worker

对一个 topology,JStorm 最终会调度成一个或多个 worker,每个 worker 即为一个真正的操作系统执行进程,分布到一个集群的一台或者多台机器上并行执行。 而每个 worker 中,又可以有多个 task,分别代表一个执行线程。每个 task 就是上面提到的组件(component)的实现,要么是 spout 要么是 bolt 。

JStorm 源码分析 - Worker 的启动与运行

如上图所示, worker 是一个独立 JVM 的进程, 它其实是由 Supervisor 通过命令行执行 Worker#main 方法来启动. worker 进程内部, 运行着许多线程, 包括: Task 线程、序列化/反序列化线程等. 其对应的代码为: com.alibaba.jstorm.daemon.worker.Worker

注意: Storm 与 JStorm 的 worker 模型有所不同, JStorm 移除了 executor 的概念, 详见“ JStorm 中 task 与 executor 的关系 ”.

启动

worker 有两种被启动的方式, 这个在 supervisor 一节已经提到.

  • (本地调试) 调用 mk_worker 方法来启动
  • (线上部署) 通过命令行调用 main, 在新的 JVM 中启动 Worker

JStorm 源码分析 - Worker 的启动与运行

在 mk_worker 中, 执行了如下操作:

  1. 打日志( sb 这个变量名我很喜欢)
  2. new Worker ,并设置必要的属性 ( 这里设置的属性真的太多了, 得再开一个章节细讲 ). 总的来说是以下事情:
    • 算出当前worker会启动的task
    • todo
  3. 执行 execute 在 execute 方法内, worker 完成了初始化和启动 task 的工作, 主要做了以下事情:
    1. worker 之间互连并启动一个线程监控变化,如果worker任务变更会与启停 worker 重连
    2. 监控 topology 是否 active 并将这个状态赋给 storm-active-atom 变量,task 根据这个变量决定是否调用 spout 的 nextTuple
    3. worker 启动线程来执行具体的 tasks

让我们详细看一下 execute 方法内部

在第一步中: 创建并启动分发控制信息的线程

// create recv connection, reduce the count of netty client reconnect
AsyncLoopThread controlRvthread = startDispatchThread();
threads.add(controlRvthread);
  1. 使用Disruptor的队列建立了一个属于worker的 recvControlQueue . 不过到目前为止,没有足够的信息告诉我们这个队列是在worker的生命周期中起到什么作用的.
  2. 对recvControlQueue 的性能指标进行跟踪( 使用dropwizard/metrics,详见jstorm metric )
  3. 使用netty创建了一个server, 对指定端口进行监听. NettyServer 会将收到的消息放入 recvControlQueue 和 deserializeQueues ( 忘了 deserializeQueues 是啥? 看下这篇“tuple 在 整个拓扑中的流转过程” )中
  4. 新建并启动了一个 VirtulPortCtrlDispatch , 这又是个啥东西呢. 似乎只是用来在启动的过程中, 向各个 task 发送控制信息. 上面所述的 recvControlQueue 中的元素会按照taskId 被分发到 VirtulPortCtrlDispatch 持有的 controlQueues 存放的task的 ctrlQueue 中. 也就是 recvControlQueue 中装的东东会被放到 VirtulPortCtrlDispatch.controlQueues 的 ctrlQueue中.那么问题来了, recvControlQueue 中的元素又是哪来的呢? (这个控制信息猜测是后续的)

JStorm 源码分析 - Worker 的启动与运行

在第二步中: 创建并启动”维护task之间连接”的线程

创建了一个 RefreshConnections .

JStorm 源码分析 - Worker 的启动与运行

为了配置这个类,需要从上下文中取出本次拓扑的拓扑结构,以及当前 worker 将会启动的 task 的taskid 列表 然后通过这两个信息算出当前 worker 会和那些 task 进行连接,不区分是本 worker 和其他 worker 主要工作: 该类会在 worker 的运行过程中,定时去做下面这些事: 维护 task 之间的通信(目前使用的是 netty ):包括新建连接,移除旧的连接。(因为 worker 是可能挂掉掉并重启的,此时 task 之间的链接就需要重新维护) 其中会去计算出 task 的下游 task 在哪个 worker 中. 然后启动 NettyClient 去 connect 下游 worke r 的 port.

在第三步中:创建并启动维护zk状态的线程

在第四步中:创建并启动发送控制信息的线程(DrainerCtrlRunable)这名字取得挺有意思的, 排水控制?

JStorm 源码分析 - Worker 的启动与运行

主要工作:

//从transferCtrlQueue里消费
super(workerData.getTransferCtrlQueue (), idStr)

这家伙会发送控制信息

TaskMessage message = new TaskMessage(TaskMessage.CONTROL_MESSAGE, targetTask, tupleMessage);
conn.sendDirect(message);

第五步中:创建并启动心跳维护线程

??维护心跳,谁的心跳?怎么维护?为什么要维护?

第六步中:为worker创建一个metric统计的类

(如果后续datacore想要维护好性能的话,可以考虑加入这个东西来统计各个因子的数据。但是这个对代码改动还挺多的,因此还是看情况吧)

第七步中: 创建并启动task线程. (终于创建了task)

JStorm 源码分析 - Worker 的启动与运行

这里已经创建并启动了 task 线程.

第八步中:创建并启动n个序列化,反序列化线程。

JStorm 源码分析 - Worker 的启动与运行 这个n还挺讲究的,有专门的算法,代码先贴在下面,可以考虑下为啥要这么计算

最后一步: 初始化步骤结束

经过哐哐哐一通创建,现在有了好多个不同的线程,这些线程被塞到workData里,然后一起返回给了 mk_worker 方法的调用者。 sd.join是说,当worker所创建的所有线程都运行结束后,worker线程才结束.

JStorm 源码分析 - Worker 的启动与运行

那么到这里,worker已经完成了所有启动需要的操作.

运行时

todo


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据化管理

数据化管理

黄成明 (@数据化管理) / 电子工业出版社 / 2014-7 / 59.90元

《数据化管理:洞悉零售及电子商务运营》讲述了两个年轻人在大公司销售、商品、电商、数据等部门工作的故事,通过大量案例深入浅出地讲解了数据意识和零售思维。作者将各种数据分析方法融入到具体的业务场景中,最终形成数据化管理模型,从而帮助企业提高运营管理能力。 《数据化管理:洞悉零售及电子商务运营》全部案例均基于Excel,每个人都能快速上手应用并落地。一起来看看 《数据化管理》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具