Storm总结
阅读原文时间:2021年04月20日阅读:1

spout把AMQP队列中消息填充到storm中。队列中的每个消息被作为一个Tuple提交。消息将会被确定或拒绝,一旦这拓扑完全处理了(正确或者失败)相应的Tuple。【如果需要确保所有的信息被可靠性的处理,要令AMQPSpout从not设为‘exclusive’或‘auto-delete’的队列中消费,否则,如果spout任务崩溃或者重启,这个队列就会被删除并丢弃其中所有的消息,当任务持续进行的时候,任何消息都会发布。不过目前还没有处理不正常的消息】如果spout worker对消息序列化处理失败,该spout worker会崩溃。

scheme用于将AMQP中的消息反序列化为一个Tuple。

nextTuple 从队列中提交下一个消息作为Tuple。如果没有消息准备提交就绪,这个程序就会稍等一段时间,避免了在spout worker上密切循环。

open 连接到AMQP broker,声明队列 并订阅要传入的消息。

declareOutputFields根据提供的Scheme声明该spout要输出的域。

spout中的方法:

  1. open

当一个task被初始化的时候调用此open方法,一般都会在此方法中对发送tuple的对象SpoutOutputCollector和配置对象topologyContext。

  1. declareOutputFields

此方法用于声明当前Spout的tuple发送流。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的,其中的参数包括了发送的域Fields。

  1. getComponentConfiguration

此方法用于声明针对当前组件的特殊的Configuration配置。

  1. nextTuple

这是Spout类中最重要的一个方法。发射一个Tuple到Topology都是通过这个方法实现的。

  1. ack,    fail

bolt中的方法:

  1. prepare

此方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。执行在execute方法之前。

  1. declareOutputFields

用于声明当前Bolt发送的Tuple中包含的字段。

  1. execute

此方法用于存放对Tuple的处理方法。具体的发送也是通过emit方法来完成的。此时,有两种情况:emit方法中有两个参数,emit方法中有一个参数。

(1)一个参数,此唯一的参数是发送到下游bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple是另起的一个新的Tuple树

(2)两个参数,第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,两者仍属于同一颗Tuple树。即,如果下游的Bolt处理Tuple失败,则会向上传递到当前Bolt,当前Bolt根据旧的Tuple流继续往上游传递,申请重发失败的Tuple。保证Tuple处理的可靠性。

spoutscheme定义的是Tuple中有哪些fieldsconsumerDeclarator定义的是怎么连接Mqexchange之类。

Storm框架组成部分:

Topology(拓扑):一个实时应用的计算任务被打包作为Topology发布,这同Hadoop的MapReduce任务相似。 
Spout(数据流的生产者):Storm中的消息源,用于为Topology生产消息(数据),一般是从外部数据源(如Message Queue、RDBMS、NoSQL、Realtime Log)不间断地读取数据并发送给Topology消息(tuple元组)。 
Stream(数据流):产生的数据(tuple元组)。Tuple是有数据格式的,在同一个流中,Tuple的数据格式应该都是一样的;不同流中的数据格式可能相同,也可能不同 

Bolt(运算):Storm中的消息处理者,用于为Topology进行消息的处理,Bolt可以执行过滤,聚合, 查询数据库等操作,而且可以一级一级的进行处理。 
Stream grouping:在Bolt任务中定义的Stream进行区分。 
Task:每个Spout或者Bolt在集群执行许多任务。 
Worker:Topology跨一个或多个Worker节点的进程执行。

Storm 分布式计算结构称为 topology(拓扑),由 stream(数据流), spout(数据流的生成者), bolt(运算)组成。一个Topology就是由一个或者多个的Spout和Bolt组成。将Spout和Bolt称之为组件(Components)

Storm 的核心数据结构是 tuple tuple 表,Stream tuple 列。 spout 代表了一个 Storm topology 的主要数据入口,充当采集器的角色,连接到数据源,将数据转化为一个个 tuple,并将 tuple 作为数据流进行发射。

bolt 可以理解为计算程序中的运算或者函数,将一个或者多个数据流作为输入,对数据实施运算后,选择性地输出一个或者多个数据流。 bolt 可以订阅多个由 spout 或者其他bolt 发射的数据流,这样就可以建立复杂的数据流转换网络。

topology中节点之间的联系表明了元组数据是怎样去传送的。

在topology中的每个节点都是并行运行的。因此在你的topology中,你可以为每个节点指定并行运行的数量,然后storm集群将会产生相应数量的线程来执行。

一个topology无限运行,直到你杀掉它才会停止。Storm将自动地重新分配失败过的任务。此外,Storm保证不会有数据丢失,即便是机器挂掉,消息被丢弃。

代码中用 setSpout 和 setBolt 方法定义了节点。这些方法的输入是 一个用户指定的id,包含处理逻辑的对象,你希望该节点并行计算的数量(可选,它表明了会有多少线程会通过storm集群来执行这个组件(spout或bolt))。

setBolt 返回一个 InputDeclarer 对象,它用来定义bolt的输入

"shuffleGrouping(shuffle分组)" 指元组数据 将会 随机分布地 从输入任务 到bolt任务中。在多个组件(spout或bolt)之间,这里有很多数据分组的方式。输入的定义可以是链式的,bolt可以指定多个源。

prepare 方法提供了一个 OutputCollector 对象,它用来发出元组数据给下游节点。元组数据可以在任意时间从bolt发出 -- 可以在 prepare,execute, 或者 cleanup 方法,或者 在另一个线程,异步地发送。 这里的 prepare 实现很简单,初始化并保存了 OutputCollector 的引用,该引用将在接下来的execute 方法中使用。

execute 方法从该bolt的输入中接收一个元组数据, ExclamationBolt 对象提取元组中的第一个字段,并追加字符串 "!!!" 。如果你实现的bolt订阅了多个输入源,你可以用 Tuple 中的 Tuple#getSourceComponent 方法来获取你当前读取的这个元组数据来自于哪个源。

在 execute 方法中,还有一点东西需要说明。 即输入的元组作为第一个参数 发出 ,然后在最后一行中发出确认消息。这是Storm保证可靠性的API的一部分,它保证数据不会丢失,这在之后的教程会说明。

Topology的名称是为了标识topology,以便你之后可以停掉它。一个topology将无限期运行,直到你停掉它。

topology的配置可以从多个方面调整topology运行时的形态。这里给出了两个最为常见的配置:

  1. TOPOLOGY_WORKERS (用 setNumWorkers方法来设置) 表明你希望在storm集群中分配多少进程来执行你的topology。每个在topology中的组件(spout 或 bolt)将会被分配多个线程去执行。线程数的设置是通过组件的 setBolt 和 setSpout 方法。这些线程存在于worker进程中。 每个worker进程包含了处理一些组件的一些线程,例如,你横跨集群指定了300个线程处理所有的组件,且指定了50个worker进程。也就是说,每个工作进程将执行6个线程, 其中的每一个可能又属于不同的组件。调整topology的性能需要通过调整每个组件的并行线程数 和 工作进程中运行的线程数量。

  2. TOPOLOGY_DEBUG (用 setDebug方法来设置), 当设为true的时候,它将告诉Storm打印组件发出的每条信息。这在本地模式测试topology的时候很有用处。但是当你的topology在集群中运行的时候,或许你应该关掉它。

  3. 场景不同

hadoopstorm擅长处理的方面不同。经常在网上看到一些对于大数据处理框架到底是使用hadoop还是storm的讨论,个人认为这要根据实际情况而定,如果需要分析统计大量的数据,且对实时性要求不高就使用hadoop;如果实时要求高,且数据量大的话,还是要使用storm

2、主从结构对比

目前分布式处理框架最主流的就是master/slave架构,master负责任务的接受与分配,slave负责任务的执行。storm和hadoop都不例外。不同的是:

hadoop着重于计算和数据存储,storm只着重于计算。那么区别在什么地方呢?

hadoop:在hadoop中,由于同时着重于计算和存储,同时存在两个master/slave机制。数据计算使用的使用的是MapReduce框架,master节点称之为JobTracker,slave节点称之为TaskTracker。数据存储使用的是HDFS,master节点称之为NameNode,slave节点称之为DataNode。

Storm:由于Storm只着重于计算,因此只有一套计算的master/slave机制。master称之为Nimbus,slave称之为supervisor。对应的是hadoop的分布式计算框架mapreduce,以下是二者的对比。下图列出了二者在计算架构上的对比:

3、任务工作方式对比

在Hadoop中,一个MapReduce任务我们称之为一个job,在storm中,一个任务我们称之为Topology

Hadoop中,Mapreduce Job提交之后,任务执行完成之后就会自动结束。

Storm中,一个Topology会一直运行下去,这是因为Storm是一个实时计算平台,需要不断的处理最新的记录,计算出最新的结果,因此当然不能停止。

4、集群组织方式对比

在Hadoop中,集群中的每个节点是通过配置文件指定的。

storm采用了另外一种方式,通过zookeeper来指定集群的节点。具体来说,每个storm节点在启动的时候都会连接zookeeper,将自己的ip和端口等信息写入zookeeper中。这样每个节点只要读取指定目录下的数据,既可以感知集群中其他节点的存在。

在后面,我们将会详细的介绍Storm是如何使用Zookeeper的。

图中星号标记的就是使用到的worker

nimbus分配任务时,它是不关心有几个supervisor的,其关心的是有多少个worker

一个worker同时只能运行一个Topology。如果此时我们想提交另外一个Topology,不管指定使用几个worker来执行,都只能在剩余的12个worker中来选择了!

  1. 数据处理方式对比

hadoop批处理,storm增量处理。

有好几种不同的stream grouping:

  • 最简单的grouping是shuffle grouping, 它随机发给任何一个task。上面例子里面RandomSentenceSpout和SplitSentence之间用的就是shuffle grouping, shuffle grouping对各个task的tuple分配的比较均匀。
  • 一种更有趣的grouping是fields grouping, SplitSentence和WordCount之间使用的就是fields grouping, 这种grouping机制保证相同field值的tuple会去同一个task, 这对于WordCount来说非常关键,如果同一个单词不去同一个task, 那么统计出来的单词次数就不对了。

fields grouping是stream合并,stream聚合以及很多其它场景的基础。在背后呢, fields grouping使用的一致性哈希来分配tuple的。

还有一些其它类型的stream grouping. 你可以在Concepts一章里更详细的了解。

下面是一些常用的 “路由选择” 机制:

Storm的Grouping即消息的Partition机制。当一个Tuple被发送时,如何确定将它发送个某个(些)Task来处理??

l ShuffleGrouping:随机选择一个Task来发送。
l FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。
l AllGrouping:广播发送,将每一个Tuple发送到所有的Task。
l GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task。
l NoneGrouping:不关心Tuple发送给哪个Task来处理,等价于ShuffleGrouping。
l DirectGrouping:直接将Tuple发送到指定的Task来处理。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器