jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

storm笔记本

作者:jasper | 分类:storm | 标签:   | 阅读 1465 次 | 发布:2015-04-25 12:30 a.m.

之前玩过一段时间的storm,虽然现在换方案改用spark了。但是还是积累了一些相关的知识,准备将之梳理出来,以防忘记。当然在此只是记录一些比较细节的地方,并不会面面俱到。

用处

适用于做大量数据的实时流式计算,区别于hadoop是做事后计算的。据我所知,阿里的双十一看板就是基于storm做的实时计算。支持集群管理,依赖于zookeeper做工作节点的管理和任务分配。支持多种数据在各处理节点间自由流动,基于Netty的高效传输机制,支持随机、广播、分组的路由。ack机制保证了数据高可靠性,数据流动在某个节点处理失败,可以引发数据从源头开始重传的功能。

基本组件

worker

worker 进程用于执行topology,一个topology可以包含一个或多个worker(并行的跑在不同的machine上),但一个worker只能对应于一个topology。
woker的数目可以通过TOPOLOGY_WORKER参数设定,也可以通过setNumTasks()函数进行设定。

executor

woker派生出的一个线程,一个worker可用包含一个或多个executor, 每个component (spout或bolt)至少对应于一个executor, 一个executor只能对应于一个component。
executor的数目一般通过setBolt和setSpout的参数进行设定。对于特殊的acker executors其值等于"topology.workers"。

task

真正的程序执行者。在在storm0.8之前,worker中每一个spout/bolt的线程称为一个task,在storm0.8之后,task不再与物理线程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。一个executor线程可以执行一个或多个tasks ,但一般默认每个executor只执行一个task,一个component的task数是不会改变的, 但是一个componet的executer数目是会发生变化的。
task的数目通过可以通过TOPOLOGY_TASKS参数设定,也可以通过setNumTasks()函数进行设定,如果不设定该值,那么task的数目将默认与executor的数目保持一致。

对于这几个基本组件,我还想说:

  1. 对各数目的设置,一般说来优先级为:defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration。

  2. 调整正在运行的topology的并行度
    使用如下命令 :storm rebalance topology_name -n N -e spout_name=N -e bolt_name=N
    例如 storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10。 注意这里修改的仅仅是wokers数目(5)和executors数目(3 和10),另外executors的数目受限于tasks的数目,即no.executor<=no.task,例如 builder.setBolt("split", new SplitSentence(), 4).setNumTasks(3);那么此处的4不再起作用。

  3. 对于storm ui中展示数据的理解:

这里的Executors的数目等于总的线程数,slots则是由参数supervisor.slots.ports决定的,该参数用于指定在某台机器上运行的woker进程端口数,即有几个端口就会运行几个worker,默认情况下会启动四个端口[6700,6701,6702,6703]。
supervisor.slots.ports: - 6700
- 6701
- 6702
- 6703
一定注意在每一项的开始时要加空格,冒号后也必须要加空格。

各组件的作用

1、【nimbus进程】storm集群工作的全局指挥官。
(1)通过thrift接口,监听并接收client对topology的submit,将topology代码保存到本地目录/nimbus/stormdist/下
(2)为client提交的topology计算任务分配,根据集群worker资源情况,计算出topology的spout和bolt的task应该如何在worker间分配,任务分配结果写入zookeeper
(3)通过thrift接口,监听supervisor的下载topology代码的请求,并提供下载
(4)通过thrift接口,监听ui对统计信息的读取,从zookeeper上读取统计信息,返回给ui
(5)若进程退出后,立即在本机重启,则不影响集群运行。

2、【supervisor进程】storm集群的资源管理者,按需启动worker进程。
(1)定时从zookeeper检查是否有代码未下载到本地的新topology,定时删除旧topology代码
(2)根据nimbus的任务分配结果,在本机按需启动1个或多个worker进程,监控守护所有的worker进程。
(3)若进程退出,立即在本机重启,则不影响集群运行。

3、【worker进程】storm集群的任务构造者,构造spout或bolt的task实例,启动executor线程。
(1)根据zookeeper上分配的task,在本进程中启动1个或多个executor线程,将构造好的task实例交给executor去运行(死循环调用spout.nextTuple()或bolt.execute()方法。
(2)向zookeeper写入心跳
(3)维持传输队列,发送tuple到其他的worker
(4)若进程退出,立即在本机重启,则不影响集群运行。

4、【executor线程】storm集群的任务执行者,循环执行task代码。
(1)执行1个或多个task(每个task对应spout或bolt的1个并行度),将输出加入到worker里的tuple队列
(2)执行storm内部线程acker,负责发送消息处理状态给对应spoult所在的worker

与KAFKA的集成

在storm与外部做集成里面,最常用的当属kafka了。用spout作为一个kfka的消费者,然后把数据传给bolt去做处理。而且spout这一部分已经不需要用户自己写了,因为storm官方已经有封装storm-kafka,但是对于storm-kafka想提示两点:

  1. spout可以根据用户设置的startOffsetTime值来读取offset(-2 从kafka头开始 -1 是从最新的开始 0 从ZK记录的开始)
  2. offset的记录是记录在storm的zookeeper上,而非像kafka通常的消费者一样,将offset记录在kafka的zookeeper之上。

HA的实现

  1. storm ui如果挂掉,没有任何影响,因为它只是提供一个概况预览。发现时起来即可;
  2. nimbus如果挂掉,也是暂时没有影响,为什么说是“暂时”,因为如果同时supervisor挂掉一个,就会出现类似于“脑裂”的问题;
  3. supervisor挂掉,没有影响,也不影响已存在的Worker进程;
  4. worker挂掉,没有影响,因为nimbus会根据心跳探测到,然后重新rebalance。

常见问题

列举一下我遇到的几个问题,具体报错记不得了:
1. 配置文件重复的错误,这是因为在开发的项目中引入了storm的配置文件,导致和生产上冲突导致,删除项目中的即可;
2. supervisor中报host找不到的错误,这是因为storm中默认是用hostname而非ip通信,在/etc/hhosts中增加其他机器的mapping即可;
3. 有时候发现supervisor总是起不来,可能是之前异常退出导致,要先删除work dir里面的内容;
4. NotSerializableException序列化的错误,详见另一篇文章从一个storm的序列化错误谈谈component的生命周期

应该还有一些吧,哎呀记不起来了。。。


转载请注明出处:http://www.opscoder.info/storm_note.html

其他分类: