Storm

Storm

概述

Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。被称作“实时的hadoop”。
Storm有很多使用场景:如实时分析,在线机器学习,持续计算, 分布式RPC,ETL等等。
Storm支持水平扩展,具有高容错性,保证每个消息都会得到处理,而且处理速度很快。
Storm的部署和运维都很便捷,而且更为重要的是可以使用任意编程语言来开发应用。

Storm的特点

  • 简单的编程模型:
    在大数据处理方面相信大家对hadoop已经耳熟能详,基于Google Map/Reduce来实现的Hadoop为开发者提供了map、reduce原语,使并行批处理程序变得非常地简单。
    同样,Storm也为大数据 的实时计算提供了一些简单优美的原语,这大大降低了开发并行实时处理的任务的复杂性,帮助你快速、高效的开发应用。
  • 水平扩展:
    在Storm集群中真正运行topology的主要有三个实体:工作进程、线程和任务。Storm集群中的每台机器上都可以运行多个工作进程,每个工作进程又可创建多个线程,每个线程可以执行多个任务,任务是真正进行数据处理的实体,我们开发的spout、bolt就是作为一个或者多个任务的方式执行的。
    计算任务在多个线程、进程和服务器之间并行进行,支持灵活的水平扩展。
  • 支持多种编程语言:
    你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
  • 高可靠性:
    Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
    spout发出的消息后续可能会触发产生成千上万条消息,可以形象的理解为一棵消息树,其中spout发出的消息为树根,Storm会跟踪这棵消息树的处理情况,只有当这棵消息树中的所有消息都被处理了,Storm才会认为spout发出的这个消息已经被“完全处理”。如果这棵消息树中的任何一个消息处理失败了,或者整棵消息树在限定的时间内没有“完全处理”,那么spout发出的消息就会重发。
  • 高容错性:
    Storm会管理工作进程和节点的故障。
    如果在消息处理过程中出了一些异常,Storm会重新安排这个出问题的处理单元。Storm保证一个处理单元永远运行(除非你显式杀掉这个处理单元)。
    当然,如果处理单元中存储了中间状态,那么当处理单元重新被Storm启动的时候,需要应用自己处理中间状态的恢复。
  • 本地模式:
    Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。

Storm是如何工作的

对于一个Storm集群,有两类节点:主节点master node和工作节点worker nodes。


Master Node ZK Cluster Worker Node Topology
v v v v
+-----+ +---------------------+
| ZK1 | |Supervisor |Worker*N | --> Spout -> Bolt
| | +---------------------+
+------+ +-----+ --->
|Nimbus| ---> | ZK2 | +---------------------+
+------+ | | |Supervisor |Worker*N |
+-----+ +---------------------+
| ZK..| --->
| | +---------------------+
+-----+ |Supervisor |Worker*N |
+---------------------+
  • 主节点(master node)运行着一个叫做Nimbus的守护进程。这个守护进程负责在集群中分发代码,为工作节点分配任务,并监控故障。
  • 每个工作节(worker nodes)点都运行着一个Supervisor守护进程, Supervisor作为拓扑的一部分运行在工作节点上。一个Storm拓扑结构在不同的机器上运行着众多的工作节点。
  • 每个Supervisor中运行着多个Workers进程,每个Worker进程中运行着多个Executor线程。每个Executor线程会循环调用Task实例(Task是Spout/Bolt的实例)的nextTupleexecute方法。Storm默认是1个(Spout/Bolt)只生成1个Task。
  • 数据的分发和处理分别是Spout和Bolt, Spout和Bolt由Stream Grouping连接起来的节点网络被称为: Topology(拓扑)。

Storm拓扑结构

下图是Storm各组件之间的数据交互图,可以看出Nimbus和Supervisor之间没有直接交互。Storm所有元数据信息/状态都是保存在Zookeeper上,
Nimbus在Zookeeper上保存所有的集群状态,单个守护进程可以是无状态的而且失效或重启时不会影响整个系统的健康。 如果Supervisor因故障出现问题而无法运行Topology,Nimbus会第一时间感知到,并重新分配Topology到其它可用的Supervisor上运行
Worker之间则通过Netty传送数据。

Nimbus_Supervisor

Storm主要概念

这些术语的字面意义翻译如下,由于这个工具的名字叫Storm,这些术语一律按照气象名词解释

nimbus 雨云,主节点的守护进程,负责为工作节点分发任务。
spout 龙卷,读取原始数据为bolt提供数据
bolt 雷电,从spout或其它bolt接收数据,并处理数据,处理结果可作为其它bolt的数据源或最终结果

下面的术语跟气象就没有关系了

topology 拓扑结构,Storm的一个任务单元
define field(s) 定义域,由spout或bolt提供,被bolt接收

在服务架构上来看, Storm分为 Master Node(即Nimbus), Zookeeper, Worker Node(Supervisor+Worker)

在编程逻辑上来看, Storm分为 Spout(分发Tuple流), Bolt(处理Tupe数据), Stream Grouping(前两者的数据分发规则), 以及Topology

Nimbus & Supervisor

Storm集群由一个主节点和多个工作节点组成。主节点运行了一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。每个工作节点都运行了一个名为“Supervisor”的守护进程,用于监听工作,开始并终止工作进程。Nimbus和Supervisor都能快速失败,而且是无状态的,这样一来它们就变得十分健壮,两者的协调工作是由Apache ZooKeeper来完成的。

Stream

Stream是一个数据流的抽象。这是一个没有边界的 Tuple序列,而这些Tuple序列会以一种分布式的方式并行地创建和处理。

对消息流的定义主要就是对消息流里面的tuple 进行定义,为了更好地使用tuple,需要给tuple 里的每个字段取一个名字,并且不同的tuple 字段对应的类型要相同,即两个tuple 的第一个字段类型相同,第二个字段类型相同。
默认情况下,tuple 的字段类型可以为integer、long、short、byte、string、double、float、boolean 和byte array 等基本类型,也可以自定义类型,只需要实现相应的序列化接口。

每一个消息流在定义的时候需要被分配一个id,最常见的消息流是单向的消息流,在Storm 中OutputFieldsDeclarer 定义了一些方法,让你可以定义一个Stream 而不用指定这个id。在这种情况下,这个Stream 会有个默认的id: 1。

Topologies

Topology是由Stream Grouping连接起来的Spout和Bolt节点网络。
在 Storm 中,一个实时计算应用程序的逻辑被封装在一个称为Topology 的对象中,也称为计算拓扑。
Topology 有点类似于Hadoop 中的MapReduce Job,但是它们之间的关键区别在于,一个MapReduce Job 最终总是会结束的,然而一个Storm 的Topology 会一直运行。
在逻辑上,一个Topology 是由一些Spout(消息的发送者)和Bolt(消息的处理者)组成图状结构,而链接Spouts 和Bolts 的则是Stream Groupings。

Spouts

Spout 是一个 topology(拓扑)中 streams 的源头. 通常 Spout 会从外部数据源读取 Tuple,然后把他们发送到拓扑中(如 Kestel 队列, 或者 Twitter API). Spout 可以是 可靠的 或 不可靠的. 可靠的 Spout 在 Storm 处理失败的时候能够重新发送(emit)失败的 Tuple, 不可靠的 Spout 一旦把一个 Tuple 发送出去就撒手不管了.

Spout 可以发送多个流. 可以使用 OutputFieldsDeclarer 的 declareStream 方法定义多个流,
在 SpoutOutputCollector 对象的 emit 方法中指定要发送到的 stream .

Spout 中的最主要的方法是 nextTuple():
nextTuple 要么向 topology(拓扑)中发送一个新的 Tuple,
要么在没有 Tuple 需要发送的情况下直接返回.
对于任何 Spout 实现, nextTuple 方法都必须非阻塞的, 因为 Storm 在一个线程中调用所有的 Spout 方法.

Spout 的另外几个重要的方法是 ack()fail().
这些方法在 Storm 检测到 Spout 发送出去的 Tuple 被成功处理或者处理失败的时候调用.

Bolts

所有消息处理的逻辑都在Bolt 中完成,在Bolt 中可以完成如过滤、分类、聚集、计算、查询数据库等操作。
Bolt 可以做简单的消息处理操作,例如,Bolt 可以不做任何操作,只是将接收到的消息转发给其他的Bolt。
Bolt 也可以做复杂的消息流的处理,这需要很多个Bolt。
在实际使用中,一条消息往往需要经过多个处理步骤,例如,计算一个点击数在前十的广告,首先需要对所有同学的成绩进行排序,然后在排序过的成绩中选出前十名的
成绩的同学。所以在一个Topology 中,往往有很多个Bolt,从而形成了复杂的流处理网络。

  • 使用OutputFieldsDeclarer.declareStream定义Stream。
  • 使用OutputCollector.emit来选择要发射的Stream。

Bolts的主要方法是execute()。在该方法里,Bolts以Tuple作为输入, 使用OutputCollector来发送Tuple, 通过调用OutputCollector.ack()通知这个Tuple的发射者Spout。
Bolts可以发射多条消息流。

Stream Groupings

Stream Grouping 就是用来定义一个Stream 应该如何分配给 Bolts 上面的多个Tasks。Storm里有7种类型的Stream Grouping:

  • Shuffle Grouping 随机分组,随机派发Stream里面的 Tuple ,保证每个Bolt接收到的 Tuple 数量大致相同。
  • Fields Grouping 按字段分组,以id举例。具有相同id的 Tuple 会被分到相同的Bolt中的一个Task,而不同id的 Tuple 会被分到不同的Bolt中的Task。
  • Direct Grouping 直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个Task处理这个消息。
    只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息 Tuple 必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的Task的id(OutputCollector.emit方法也会返回Task的id)。
  • All Grouping 广播,对于每一个 Tuple ,所有的 Bolts 都会收到。
  • Global Grouping 全局分组,这个 Tuple 被分配到Storm中的一个Bolt的其中一个 Task。具体一点就是分配给id值最低的那个 Task。
  • Non Grouping 不分组,Stream不关心到底谁会收到它的 Tuple 。目前这种分组和Shuffle Grouping是一样的效果,有一点不同的是Storm会把这个Bolt放到这个Bolt的订阅者同一个线程中去执行。
  • Local or Shuffle Grouping 如果目标Bolt有一个或者多个Task在同一个工作进程中, Tuple 将会被随机发射给这些Tasks。否则,和普通的Shuffle Grouping行为一致。

上面几种Streaming Group的内置实现中,最常用的应该是 Shuffle Grouping、Fields Grouping、Direct Grouping这三种,
使用其它的也能满足特定的应用需求。

另外,Storm还提供了用户自定义 Streaming Grouping 接口,如果上述 Streaming Grouping 都无法满足实际业务需求,也可以自己实现,只需要实现backtype.storm.grouping.CustomStreamGrouping接口,该接口定义了如下方法:

List<Integer> chooseTasks(int taskId, List<Object> values)

API

Spout

WordReader类实现了IRichSpout接口。我们将在第四章看到更多细节。
WordReader负责从文件按行读取文本,并把文本行提供给第一个bolt。

public class WordReader implements IRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
private TopologyContext context;

public boolean isDistributed() {return false;}

// 成功会调用ack
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
public void close() {}

// 失败会调用fail
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}

/* open是第一个被调用的spout方法
* TopologyContext context: 拓扑的上下文
* SpoutOutputCollector collector: 向Bolt发布数据
* 在这里我们创建了一个FileReader对象,用来读取文件
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
}
this.collector = collector;
}

/* nextTuple()会在同一个循环内被ack()和fail()周期性的调用。没有任务时它必须释放对线程的控制,其它方法才有机会得以执行。
* 读取文件每一行, 调用collector.emit() 向bolts发布待处理的数据
*/
public void nextTuple() {
/**
* 这个方法会不断的被调用,直到整个文件都读完了,我们将等待并返回。
*/
if(completed){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//什么也不做
}
return;
}
String str;
//创建reader
BufferedReader reader = new BufferedReader(fileReader);
try{
//读所有文本行
while((str = reader.readLine()) != null){
// 按行发布一个新值
this.collector.emit(new Values(str),str);
}
}catch(Exception e){
throw new RuntimeException("Error reading tuple",e);
}finally{
completed = true;
}
}

/**
* 声明输入域"word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}

Bolt

下面的WordNormalizer实现了接口backtype.storm.topology.IRichBolt,该Bolt负责得到并标准化每行文本。
它把文本行切分成单词,大写转化成小写,去掉头尾空白符。

public class WordNormalizer implements IRichBolt{
private OutputCollector collector;
/**
* 这个spout结束时(集群关闭的时候),调用此方法
*/
public void cleanup(){}
/**
* 声明: 这个bolt只会发布“word”域
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
/**
* collector用来向Bolt发布数据
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
/** 具体的处理方法
* bolt从单词文件接收到文本行,全部转化成小写,并切分它,从中得到所有单词。
* 最后,每次都调用collector.ack()确认已经处理
*/
public void execute(Tuple input){
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word=word.toLowerCase();
//发布这个单词
List a = new ArrayList();
a.add(input);
collector.emit(a,new Values(word));
}
}
//对collector做出应答
collector.ack(input);
}
}

Topology

public class TopologyMain {

public static void main(String[] args) throws InterruptedException {

//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());

// 在spout和bolts之间通过shuffleGrouping方法连接:
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),1)
.fieldsGrouping("word-normalizer", new Fields("word"));

//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);

/* Topology run
* 在生产环境中,拓扑会持续运行,
* 这里用LocalCluster创建本地调试运行
*/
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
// create and submit:
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();
}
}

Worker & Executor & Task

  • 每个 Worker Node 有一个 Supervisor, Supervisor 管理着N个 Worker 进程, 每个 Worker 进程是一个JVM进程, 有自己的端口号。
  • 一个 Topology 可能会在一个或者多个 Worker 进程里面执行,每个工作进程执行整个 Topology 的一部分
  • 每个 Worker进程中运行着多个 Executor 线程。
  • 每个 Executor线程中运行着若干个相同的 Task (可以理解为Spout/Bolt), Executor线程会执行Task 。

用代码说明:

// 创建 Topology的配置
Config conf = new Config();
conf.setNumWorkers(2); // 改Topology使用2个worker进程

// 设置Spout/Bolt的并行度(parallelism), 也即每个Spout/Bolt需要几个Executor线程来跑
topologyBuilder.setSpout(“blue-spout”, new BlueSpout(), 2); // parallelism hint为2

// 这个 Bolt除了设置并行度=2, 还设置了Task数量=4 (这个Bolt生成4个Task对象)
topologyBuilder.setBolt(“green-bolt”, new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping(“blue-spout”);

topologyBuilder.setBolt(“yellow-bolt”, new YellowBolt(), 6) .shuffleGrouping(“green-bolt”);

// 提交配置
StormSubmitter.submitTopology( “mytopology”, conf, topologyBuilder.createTopology() );

说明:

  • 上面定义了一个 拥有两个Worker进程的 Topology
  • 上面定义了3个Component: 1个spout叫做BlueSpout,2个bolt分别叫 GreenBoltYellowBoltBlueSpout发送它的输出到GreenBoltGreenBolt又把它的输出发到 YellowBolt
  • 上面3个Component的并行度(线程数)分别是 2 + 2 + 6 = 10, 每个worker进程产生10 / 2 = 5条线程。
  • GreenBolt特别指定了生成4个Task

下图中可以看到Topology的两个Worker进程平均分配任务;
黄/绿/蓝色Task的数量分别是6, 4, 2, 也就是总共12个Task对象;
Task外面灰色轮廓是Executor线程, 注意2个绿色Task由一个Executor执行;

安装

  • 准备jdk, zookeeper集群
  • 修改storm.yaml配置文件
    • storm.zookeeper.servers: Storm集群使用的Zookeeper集群地址。
    • 如果Zookeeper没有使用默认端口,那么还需要修改storm.zookeeper.port
    • storm.local.dir: Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给予足够的访问权限。

命令行

  • 启动Nimbus: storm nimbus
  • 启动Supervisor: storm supervisor
  • 启动UI: storm ui
  • 提交Topologies: storm jar [jar路径] [拓扑包名.拓扑类名] [storm IP地址] [storm端口] [拓扑名称] [参数]
  • 停止Topologies: storm kill [拓扑名称]

参考: