Storm技术基础(一)

KinglyJn      2017-05-13

数据分析概述

# 数据分析一定是基于时段的

批处理:时段跨度相对很大(一般>=1day, hadoop)
实时处理:时间跨度很小(一般<=1s,GB or TB, storm[毫秒级别]、spark-streaming[秒级别])


storm简介

底层:clojure,基于jvm的高级面向函数式的编程语言,
     阿里巴巴在storm的基础上使用java代替的了clojure的核心,并在性能上做了优化,产生了jstorm
     目前storm和jstorm都由apache管理

学习版本:0.9.6


下载、安装、配置

# 下载
http://www.apache.org/dyn/closer.lua/storm/apache-storm-0.9.6/apache-storm-0.9.6.tar.gz

# 前提
jdk1.7+
zookeeper集群(重启zk:bin/zkServer.sh restart)
python-2.6.6版以上胶水

# 解压修改配置文件
storm_env.ini 

storm.yaml
    sotrm.zookeeper.servers:
        zookeeper服务器列表
    storm.zookeeper.port:
        zookeeper服务连接端口
    storm.zookeeper.session.timeout:
        客户端连接zookeeper超时时间

    storm.local.dir:
        指定storm本地文件存放目录,存放任务jar以及少量的状态信息
    storm.cluster.mode:
        集群的运行模式(distributed|local)
    
    nimbus.host:
        nimbus服务器地址
        在storm1.0之后,该参数改为nimbus.seeds,用于配置主控节点的地址,可以配置多个
    nimbus.task.timeout.secs:
        判断task存活的心跳超时时间(storm支持多级别的task容错,而这个参数就是针对task界别容错的,
        storm判断task是否存活的心跳时间,超过了这个心跳时间,storm会进行故障转移,重新启动这个task)
    nimbus.task.launch.secs:
        task启动时的一个特殊超时设置,在拓扑任务任务启动后,第一次发送心跳前,
        将会使用这个值临时替代心跳,这是因为第一次启动时task需要较长的时间响应,
        所以这个值通常会大于task的正常心跳值
    nimbus.supervisor.timeout.secs:
        判断supervisor是否存活的心跳超时时间(storm针对supervisor容错的参数配置,
        storm判断supervisor是否存活的心跳时间,超过了这个时间,storm会认为这个
        supervisor失效了,进而进行相应的故障转移,将该节点上的任务全部转移到其他工作节点上)
    
    ui.port:
        指定storm-ui的web服务端口,默认是8080

    drpc.servers:
        分布式rpc的服务列表,以便DRPCSpout知道和谁进行通讯
    drpc.port:
        分布式rpc的服务端口

    supervisor.slots.ports
        指定supervisor节点启动一些工作进程时worker所使用的默认端口,若worker
        数大于这里配置的端口数,则其余的worker会使用其他端口作为自己的进程端口。
    supervisor.worker.timeout.secs:
        判断worker是否存活的心跳超时时间(storm针对worker容错的参数配置,
        storm检测心跳间隔超过了这个时间没得到响应,storm会进行worker重启)
    supervisor.worker.start.timeout.secs:
        类似于task的初始心跳(nimbus.task.launch.secs)


# 配置示例:
==========================================
storm.zookeeper.servers:
     - "nimbusz"
     - "supervisor01z" 
     - "supervisor02z" 

storm.local.dir: "/home/ubuntu/storm-local"
nimbus.seeds: ["nimbusz"]
ui.port: 8081

supervisor.slots.ports:
     - 6700
     - 6701
     - 6702

drpc.servers:
     - "nimbusz"
==========================================


storm安装部署架构


    nimbus----->zk集群---->【supervisor01进程---worker123进程---executor123线程】
                     ---->【supervisor02】
                     ---->【supervisor03】

# numbus主节点:
    1. 接收客户端提交的任务请求,任务由numbus进行分配(只是将分配信息提交到zk集群上,
       即在zk相应的znode节点上写入任务分配信息;supervisor查看这些znode上的任务
       分配信息,获取分配给它的任务)
    2. 监控整个集群的状态(从zk集群中相应的znode上读取supervisor和worker的状态信息)
    3. 容错(当任务在某些supervisor节点上运行的时候,由于supervisor进程失效,nimbus
       可以将这些任务重新分配给其他supervisor运行)

# supervisor从节点:
    1. 接收nimbus分配的任务,负责启动worker工作进程(其本身并不是执行任务工作的进程),
       worker的容错也由supervsior进程负责
    2. 需要将自己的运行状态信息(心跳信息)汇报给zk集群(写入到某个znode)


启动nimbus进程

# 需要在配置文件中nimbus.host参数指定的服务器上启动

阻塞启动:bin/storm nimbus
后台启动:nohup bin/storm nimbus >> /dev/null 2>&1 &

jps查看java进程:
    config_value 表示正在启动读取配置文件中
    numbus 表示numbus启动完成进程

ps查看进程
    ps -ef |grep daemon.nimbus


启动ui进程(需要在nimbus节点上启动)

nohup bin/storm ui >> /dev/null 2>&1 &

jps查看java进程:
    config_value 表示正在启动读取配置文件中
    core 表示ui启动完成进程

ps查看进程
    ps -ef |grep daemon.ui.core

netstat查看通信端口
    netstat -lntpu | grep 8081(或其他你指定端口)


启动supervisor进程

nohup bin/storm supervisor >> /dev/null 2>&1 &

jps查看java进程:    
    config_value 表示正在启动读取配置文件中
    supervisor 表示supervisor启动完成进程

ps查看进程:
    ps -ef |grep daemon.supervisor


启动logviewer进程

# logviewer进程负责收集每个supervisor节点的日志(需要在每个supervisor节点上启动)
nohup bin/storm logviewer >> /dev/null 2>&1 &

jps查看java进程:    
    config_value 表示正在启动读取配置文件中
    logviewer 表示logviewer启动完成进程

ps查看进程:
    ps -ef |grep daemon.logviewer


worker进程和executor线程

# worker进程负责启动executor线程去真正执行客户端提交到storm集群上的
# 任务(task:包含spout和bolt),但它并不是常驻进程,不能通过手动启动。
# worker进程启动以后,也会定期将自己的状态信息汇报给zk集群


nimbus提交和执行拓扑任务

# 拓扑任务架构[本质是一个DAG,即有向无环图]

数据来源(如kafka)---> 数据喷头[spout]----tuple----> bolt12345...

# 使用官方提供的案例进行拓扑任务的提交
bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.WordCountTopology wordcount

examples/storm-starter/storm-starter-topologies-0.9.4.jar:表示拓扑任务jar包
storm.starter.WordCountTopology:表示拓扑任务主类
wordcount:表示随意指定的拓扑任务名称


# 启动日志
...
361  [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
370  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar apache-storm-0.9.4/examples/storm-starter/storm-starter-topologies-0.9.4.jar to assigned location: /home/ubuntu/storm-local/nimbus/inbox/stormjar-e8fb8969-5476-446e-9f3a-20ce4b73dc89.jar
Start uploading file 'apache-storm-0.9.4/examples/storm-starter/storm-starter-topologies-0.9.4.jar' to '/home/ubuntu/storm-local/nimbus/inbox/stormjar-e8fb8969-5476-446e-9f3a-20ce4b73dc89.jar' (3248682 bytes)
[==================================================] 3248682 / 3248682
File 'apache-storm-0.9.4/examples/storm-starter/storm-starter-topologies-0.9.4.jar' uploaded to '/home/ubuntu/storm-local/nimbus/inbox/stormjar-e8fb8969-5476-446e-9f3a-20ce4b73dc89.jar' (3248682 bytes)
452  [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/ubuntu/storm-local/nimbus/inbox/stormjar-e8fb8969-5476-446e-9f3a-20ce4b73dc89.jar
452  [main] INFO  backtype.storm.StormSubmitter - Submitting topology wordcount in distributed mode with conf {"topology.workers":3,"topology.debug":true}
697  [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: wordcount


停止拓扑任务的执行

# 拓扑任务提交到storm集群上运行后,除非手动执行kill命令,否则将一直永远运行下去
# 而相比之下,mapreduce任务把数据处理完就终止了

# 其中wordcount是你指定的拓扑任务名称
bin/storm kill wordcount


# 停止日志
...
671  [main] INFO  backtype.storm.thrift - Connecting to Nimbus at nimbusz:6627
709  [main] INFO  backtype.storm.command.kill-topology - Killed topology: wordcount


storm常用命令

active          激活指定的任务
classpath       打印storm的classpath
dev-zookeeper   启动一个新的zookeeper,这种情况只用于开发或测试
drpc            启动一个drpc进程
jar             运行自己的拓扑任务
kill            通过任务名kill掉一个任务(storm使用杀进程方法关闭任务,cleanup不能执行)
deactivate      暂停storm的任务
rebalance       节点扩展之后进行负载均衡
list            列出正在运行的topolofies和状态
localconfvalue  打印出具体配置参数在本地storm配置文件中的值
remoteconfvalue 打印出具体配置参数在storm集群中的值
nimbus          启动一个nimbus进程
supervisor      启动一个supervisor进程
ui              启动ui监控页面ui的后台进程
help            命令解释及操作提示


在zk中查看nimbus和supervisor提交的信息

[zk: localhost:2181(CONNECTED) 1] ls /
[storm, zookeeper]

[zk: localhost:2181(CONNECTED) 2] ls /storm
[workerbeats, errors, supervisors, storms, assignments]

[zk: localhost:2181(CONNECTED) 3] ls /storm/workerbeats
[wordcount-2-1496654788]
[zk: localhost:2181(CONNECTED) 5] ls /storm/workerbeats/wordcount-2-1496654788
[83b7f23f-ad0b-45bd-85b8-500fd475fac3-6700, c3b030e5-a37a-485a-b96c-dc566a43fe9e-6700]

[zk: localhost:2181(CONNECTED) 4] ls /storm/errors     
[wordcount-2-1496654788]
[zk: localhost:2181(CONNECTED) 2] ls /storm/errors/wordcount-2-1496654788
[split, count, __acker, spout]

[zk: localhost:2181(CONNECTED) 3] ls /storm/supervisors
[c3b030e5-a37a-485a-b96c-dc566a43fe9e, 83b7f23f-ad0b-45bd-85b8-500fd475fac3]

[zk: localhost:2181(CONNECTED) 6] ls /storm/storms                                          
[wordcount-2-1496654788]

[zk: localhost:2181(CONNECTED) 9] ls /storm/assignments
[wordcount-2-1496654788]


# workerbeats:表示worker工作进程的工作状态信息
# errors:表示topology正在运行过程中出现异常的task信息,
         #方便nimbus重新将运行出错的任务进行重新分配
# supervisors:表示supervisor节点的状态信息
# storms:表示topology的基本配置信息
# assignments:表示任务的分配信息,nimbus节点接收到客户端提交的拓扑任务后会
              #对任务进行分配(将分配信息存储到znode中);supervisor节点
              #读取znode中的assignments信息,拿到分配给自己的任务,然后
              #supervisor启动worker进程执行具体的任务,worker进程将使用
              #多个executor线程进行工作


拓扑任务的分组策略

  • shuffleGrouping(随机分组)
  • fieldsGrouping(按照字段分组,在这里即是同一个单词只能发送给一个Bolt)
  • allGrouping(广播发送,即每一个Tuple,每一个Bolt都会收到)
  • globalGrouping(全局分组,将Tuple分配到task id值最低的task里面)
  • noneGrouping(随机分派)
  • directGrouping(直接分组,指定Tuple与Bolt的对应发送关系)
  • Local or shuffle Grouping
  • customGrouping (自定义的Grouping)


storm的编程模型,javaAPI构建拓扑任务(以wordcount任务为例)

注意:所有类最好实现序列化接口,最好发射string类型数据

技术选型使用storm,依靠storm的实时性以及大规模数据的特点。在spout中是随机发送内置语句作为数据源;使用一个bolt进行语句切分,将句子切分成单词发射出去;使用一个bolt订阅切分的单词tuple,进行单词统计,并且选用安字段分组的策略,词频实时排序,把topN实时发射出去;使用一个bolt将结果打印到log中;拓扑结构为 RandomSentenceSpout—->WordNormalizerBold—->WordCountBolt—->PrintBolt


拓扑任务驱动类

package com.demo.wordcount;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;

/**
 * 驱动类(构造拓扑)
 * @author zhangqingli
 *
 */
public class WordcountTopology {

	public static void main(String[] args) {
		TopologyBuilder builder = new TopologyBuilder();
		
		//线程数为2
		builder.setSpout("randomSentenceSpout", new RandomSentenceSpout(), 2); 
		//线程数为2,使用随机分组,订阅的消息为randomSentenceSpout
		builder.setBolt("wordNomalizerBolt", new WordNomalizerBolt(), 2).shuffleGrouping("randomSentenceSpout"); 
		//线程数为2,使用按字段分组,订阅的消息为wordNomalizerBolt
		builder.setBolt("wordCountBolt", new WordCountBolt(), 2).fieldsGrouping("wordNomalizerBolt", new Fields("word")); 
		//线程数为1,使用全局分组(慎用,数据量可能很大),订阅的消息为wordCountBolt
		builder.setBolt("printBolt", new PrintBolt(), 1).globalGrouping("wordCountBolt");
		//设置一些参数
		Config stormConf = new Config(); //在spout或bolt中可以通过conf map入参获取
		stormConf.setDebug(false);
		stormConf.setNumAckers(1);
		
		//通过是否有参数来控制是否启动集群(或本地方式运行)
		if(args!=null && args.length>0) {
			//最好一台机器上的一个topology只使用一个worker,主要原因时减少了worker之间的数据传输
			stormConf.setNumWorkers(1);
			try {
				StormSubmitter.submitTopology(args[0], stormConf, builder.createTopology());
			} catch (Exception e) {
				e.printStackTrace();
			}
		} else {
			stormConf.setMaxTaskParallelism(1);
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("wordcount", stormConf, builder.createTopology());
			
			//本地集群需要手动关闭
			Utils.sleep(60000);
			localCluster.killTopology("wordcount");
			localCluster.shutdown();
		}
	}
}


spout组件

package com.demo.wordcount;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

/**
 * Spout组件(继承BaseRichSpout 或 实现IRichSpout)
 * @author zhangqingli
 *
 */
public class RandomSentenceSpout implements IRichSpout {
	private static final long serialVersionUID = 1L;
	
	private SpoutOutputCollector spoutOutputCollector;
	private Map<String, Values> tupleMap; //tuple缓存器

	/**
	 * 初始化方法
	 * 首先被调用,且只被调用一次,一般用于初始化变量,如SpoutOutputCollector发射器
	 */
	@Override
	@SuppressWarnings("rawtypes")
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 		  //conf表示驱动类中的stormConf
		spoutOutputCollector = collector;
		tupleMap = new HashMap<>();
	}
	
	/**
	 * 收尾方法
	 * 一般用于释放资源
	 */
	@Override
	public void close() {
		 
	}
	
	/**
	 * 激活方法
	 * 当拓扑任务被暂停重新激活时会调用该方法,一般该方法默认即可
	 */
	@Override
	public void activate() {
		
	}
	
	/**
	 * 暂停方法
	 * 当拓扑任务被暂停时会调用该方法,一般该方法默认即可
	 */
	@Override
	public void deactivate() {
		
	}
	
	/**
	 * 核心方法
	 * 会被循环调用,实现如何从数据源上获取数据和向后面的组件bolt发射数据的逻辑
	 */
	@Override
	public void nextTuple() {
		Utils.sleep(5000);
		String[] sentences = new String[]{
				"Implement a parser wrapper by extending",
				"the FeedNormalizer Parser class and",
				"overriding the public methods. Also note", 
				"the helper methods in the root Parser",
				"object to make mapping of output from", 
				"the particular parser to the Feed object easier."
		};
		String sentence = sentences[new Random().nextInt(sentences.length)];
		
		String msgId = UUID.randomUUID().toString(); //msgId(消息保障机制用)
		Values values = new Values(sentence.trim().toLowerCase()); //tuple
		tupleMap.put(msgId, values); //将tuple添加到自定义缓存中
		spoutOutputCollector.emit(values, msgId); //发射
		
		System.err.println(Thread.currentThread() + ",发射的语句:" + sentence);
		System.err.println(Thread.currentThread() + ",tupleMap缓存:" + tupleMap);
	}
	
	
	/**
	 * 消息可靠性保障ack方法
	 * 当拓扑任务启用了消息可靠性保障机制,当某个tuple在拓扑任务上处理成功后,调用ack方法执行一些处理逻辑
	 * 
	 * 启用消息的确认机制步骤:
	 * 在第一个spout或bolt中:
	 * 		1. 发射器发射tuple时需要指定一个msgId
	 * 		2. 使用map缓存所发射的tuple(key为msgId,value为emit_values)
	 * 		3. 在ack方法中(此时已经能够确认消息发射成功)将缓存从map中移除
	 * 		4. 在fail方法中(此时已经能够确认消息发射失败)进行重试或其他补救操作
	 * 在下一个spout或bolt中:(如果需要继续往后面组件发射消息)
	 * 		1. 需要锚定前面的tuple【collector.emit(inputTuple, new Values("xxx"))】
	 * 		2. 消息处理成功调用收集器的ack方法【collector.ack(inputTuple)】
	 * 		3. 消息处理过程有异常调用收集器的fail方法【collector.fail(inputTuple)】
	 * 
	 * 对性能的影响:
	 * 		如果启用了消息保障性机制,一定会对拓扑任务的执行的效率产生影响,如果性能影响超过可接受范围,
	 * 		解决方法是增加acker组件的并发度(增加执行acker任务的executor线程个数),即可以通过设置
	 * 		stormConf.setNumAckers(5)来做。
	 * 
	 * 关于锚定:
	 * 所谓的锚定(anchoring)是指为 tupleTree 中增加一个新的节点。在bolt中你可以通过 
	 * collector.emit(tuple,new Values(xx)) 将该tuple发送到下个bolt,此时new Value 
	 * 就被锚定在该 tuple 上了。你可以认为 tuple tree 之间link的建立称为锚定。
	 * 
	 */
	@Override
	public void ack(Object msgId) {
		System.err.println(Thread.currentThread() + ",消息发射成功msgId:" + msgId);
		//确认发射成功,将tuple从缓存中移除
		tupleMap.remove(msgId);
	}

	/**
	 * 消息可靠性保障fail方法
	 * 当拓扑任务启用了消息可靠性保障机制,当某个tuple在拓扑任务上处理失败或超时后,
	 * 调用fail方法执行一些处理逻辑
	 * 
	 * 关于成功与失败:
	 * 在Storm的一个Topology中,Spout通过 SpoutOutputCollector 的emit()方法
	 * 发射一个tuple(源)即消息。而后经过在该Topology中定义的多个Bolt处理时,可能
	 * 会产生一个或多个新的Tuple。源Tuple和新产生的Tuple便构成了一个Tuple树。当整
	 * 棵树被处理完成,才算一个Tuple被完全处理,其中任何一个节点的Tuple处理失败或超
	 * 时,则整棵树失败。关于消息在storm中的超时默认为30秒,具体参看 defaults.yaml 
	 * 中的 topology.message.timeout.secs: 30 的配置。同时,你也可以在定义
	 * topology时,通过conf.setMessageTimeoutSecs方法指定超时时间。storm所谓的
	 * 消息可靠性指的是storm保证每个tuple都能被toplology完全处理。而且处理的结果要
	 * 么成功要么失败。出现失败的原因可能有两种即节点处理失败或者处理超时。
	 * 
	 * 有三种方法可以去掉可靠性:
	 * 1. 第一是把Config.TOPOLOGY_ACKERS 设置成 0. 在这种情况下,storm会在spout
	 *    发射一个tuple之后马上调用spout的ack方法。也就是说这个tuple树不会被跟踪。
	 * 2. 第二个方法是在tuple层面去掉可靠性。 你可以在发射tuple的时候不指定messageid
	 *    来达到不跟粽某个特定的spout tuple的目的。
	 * 3. 最后一个方法是如果你对于一个tuple树里面的某一部分到底成不成功不是很关心,那么
	 *    可以在发射这些tuple的时候unanchor它们。这样这些tuple就不在tuple树里面,也
	 *    就不会被跟踪了。
	 */
	@Override
	public void fail(Object msgId) {
		System.err.println(Thread.currentThread() + ",消息发射失败msgId:" + msgId);
		//消息发射失败,重试
		Values values = tupleMap.get(msgId);
		spoutOutputCollector.emit(values, msgId);
	}

	/**
	 * 声明向后面发射的tuple的key
	 * 
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("sentence"));
	}

	/**
	 * 设置spout组件专用的参数
	 */
	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}


bolt1组件(负责获取spout组件发送的消息,并将消息切分成单词向后发射)

package com.demo.wordcount;

import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordNomalizerBolt implements IRichBolt {
	private static final long serialVersionUID = 1L;

	private OutputCollector outputCollector;
	
	/**
	 * worker启动时初始化方法
	 * 功能类似于spout的open
	 */
	@Override
	@SuppressWarnings("rawtypes")
	public void prepare(Map paramMap, TopologyContext paramTopologyContext,
			OutputCollector paramOutputCollector) {
		outputCollector = paramOutputCollector;
	}

	/**
	 * 接收一个tuple并执行逻辑处理,发射出去
	 */
	@Override
	public void execute(Tuple paramTuple) {
		
		try {
			String sentence = paramTuple.getStringByField("sentence");
			String[] words = sentence.split(" ");
			
			for (String word : words) {
				//outputCollector.emit(new Values(word));
                //启用消息保障机制,需要锚定接收到的tuple
				outputCollector.emit(paramTuple, new Values(word)); 
			}
			
			//确认消息处理成功
			outputCollector.ack(paramTuple);
		} catch (Exception e) {
			e.printStackTrace();
			//确认消息处理失败
			outputCollector.fail(paramTuple); 
		}
	}

	/**
	 * 收尾方法(不保证一定被调用,因为我们停止拓扑任务的时候一般是采取杀进程的方法)
	 * 一般用于释放资源
	 */
	@Override
	public void cleanup() {
		
	}

	/**
	 * 声明向后面发射的tuple的key
	 * 
	 */
	@Override
	public void declareOutputFields(
			OutputFieldsDeclarer paramOutputFieldsDeclarer) {
		paramOutputFieldsDeclarer.declare(new Fields("word"));
	}

	/**
	 * 设置spout组件专用的参数
	 */
	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}


bolt2组件(负责统计出前10个单词,并把统计结果向后发射)

package com.demo.wordcount;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordCountBolt implements IRichBolt {
	private static final long serialVersionUID = 1L;
	private OutputCollector outputCollector;
	private Map<String, Integer> counterMap;
	
	@Override
	public void cleanup() {
		
	}

	@Override
	public void execute(Tuple inputTuple) {
		String str = inputTuple.getStringByField("word");
		
		if (!counterMap.containsKey(str)) {
			counterMap.put(str, 1);
		} else {
			Integer c = counterMap.get(str) + 1;
			counterMap.put(str, c);
		}
		
		//只取词频最大的前十个
		int num = 10;
		int length = 0;
		
		//对map进行排序
		counterMap = sortByMap(counterMap);
		
		if (counterMap.size() > num) {
			length = num;
		} else {
			length = counterMap.size();
		}
		
		//增量统计
		String countResult = null;
		int count = 0;
		for (String key : counterMap.keySet()) {
			if (count >= length) {
				break;
			}
			if (count==0) {
				countResult = "["+ key + ":" + counterMap.get(key) +"]";
			} else {
				countResult = countResult + ", ["+ key + ":" + counterMap.get(key) +"]";
			}
			count++;
		}
		
		countResult = "The first " + num + ":" + countResult;
        //该部分不启用消息保障机制,不需要锚定接收到的tuple
		outputCollector.emit(new Values(countResult)); 
		outputCollector.ack(inputTuple);
	}

	@Override
	@SuppressWarnings("rawtypes")
	public void prepare(Map stormConf, TopologyContext context, 
                        OutputCollector collector) {
		outputCollector = collector;
		counterMap = new HashMap<String, Integer>();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("countResult"));
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}

	/*
	 * 根据map的value对map进行排序
	 */
	public Map<String, Integer> sortByMap(Map<String, Integer> map) {
		List<Entry<String, Integer>> list = 
          	new LinkedList<Entry<String, Integer>>(map.entrySet());
		Collections.sort(list, new Comparator<Entry<String, Integer>>() {
			@Override
			public int compare(Entry<String, Integer> o1,
					Entry<String, Integer> o2) {
				return o2.getValue().compareTo(o1.getValue());
			}
		});
		Map<String, Integer> result = new LinkedHashMap<String, Integer>();
		for (Entry<String, Integer> entry : list) {
			result.put(entry.getKey(), entry.getValue());
		}
		return result;
	}
}


bolt3组件(负责实时打印单词统计结果)

package com.demo.wordcount;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

public class PrintBolt extends BaseBasicBolt {
	private static final long serialVersionUID = 1L;
	
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String msg = tuple.getString(0);
		if (msg != null) {
			System.out.println(Thread.currentThread() + "," + msg);
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}


pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.keyllo.demo</groupId>
	<artifactId>zdemo-storm-wordcount</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<!-- 公共参数信息 -->
	<properties>
		<java.version>1.7</java.version>
		<storm.version>1.1.0</storm.version>
	</properties>

	<!-- 依赖包信息 -->
	<dependencies>
		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-core</artifactId>
			<version>${storm.version}</version>
			<!-- keep storm out of the jar-with-dependencies -->
			<scope>provided</scope>
		</dependency>
	</dependencies>


	<!-- 构建配置信息 -->
	<build>
		<finalName>zdemo-storm-wordcount</finalName>
		<plugins>
			<!-- 编译插件 -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.5.1</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
				</configuration>
			</plugin>

			<!-- 使用shade打包 -->
			<plugin>  
			    <groupId>org.apache.maven.plugins</groupId>  
			    <artifactId>maven-shade-plugin</artifactId>  
			    <version>2.3</version>  
			    <executions>  
			        <execution>  
			            <phase>package</phase>  
			            <goals>  
			                <goal>shade</goal>  
			            </goals>  
			            <configuration>  
			                <transformers>  
			                    <transformer  
			                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">  
			                        <mainClass>com.demo.wordcount.WordcountTopology</mainClass>  
			                    </transformer>  
			                    <transformer  
			                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">  
			                        <resource>META-INF/spring.handlers</resource>  
			                    </transformer>  
			                    <transformer  
			                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">  
			                        <resource>META-INF/spring.schemas</resource>  
			                    </transformer>  
			                </transformers>  
			            </configuration>  
			        </execution>  
			    </executions>  
			</plugin>  
		</plugins>

		<resources>
			<resource>
				<directory>src/main/resources</directory>
				<filtering>true</filtering>
				<includes>
					<include>**/*.xml</include>
					<include>**/*.properties</include>
				</includes>
			</resource>
		</resources>
	</build>
</project>


使用maven打包拓扑任务,并将其发布到集群环境

# maven打包,生成zdemo-storm-wordcount.jar
mvn clean package

# 将zdemo-storm-wordcount.jar上传到nimbus主机,执行以下命令启动拓扑任务
xxx/bin/storm jar zdemo-storm-wordcount.jar com.demo.wordcount.WordCountTopology wordcount

# ok,大功告成


storm的并发

worker进程的并发度:表示拓扑任务由多少个worker进程来执行

// 可以通过stormConfig来设置worker进程数(默认为1个)
// 一般情况下,最好一台机器上的一个topology只使用一个worker,主要原因时减少了worker之间的数据传输
stormConfig.setNumWorkers(2)

executor线程的并发度:表示spout或bolt由多少个线程来执行

TopologyBuilder builder = new TopologyBuilder();
//线程数为2
builder.setSpout("randomSentenceSpout", new RandomSentenceSpout(), 2); 

//线程数为2,使用随机分组,订阅的消息为randomSentenceSpout
builder.setBolt("wordNomalizerBolt", 
                new WordNomalizerBolt(), 2).shuffleGrouping("randomSentenceSpout");

task任务的并发度:表示n个task由m个executor线程来执行(并不是并发执行而是轮流执行!)

//表示该bolt有12个task来运行,12个task由3个executor线程来运行
//相当于每个executor线程要运行4个task,这4个task只能executor线程上轮流执行
builder.setBolt("wordNomalizerBolt", 
                new WordNomalizerBolt(), 3)
  				.setNumTasks(12)
  				.shuffleGrouping("randomSentenceSpout"); 


Trident

概念

  • 它是storm高层次的抽象,在trident中保留了是spout组件,但是不再有bolt组件,将之前storm在bolt中实现的数据逻辑抽象成一系列的操作(operation),比如过滤、函数、分组统计等。当然在实际运行的过程中,trident还会将这些操作转换为bolt组件。
  • trident封装好了消息可靠性保障机制。
  • trident中有批次的概念,它可以将固定条数的tuple划分为一个个批次进行处理,每个批次都有一个有序的唯一编号,更新统计结果状态要严格按照批次顺序进行更新。
  • 由trident的批次概念,我们衍生出了trident的事务处理机制(类似于关系数据库事务控制的概念)。在trident中事务控制被划分为3个级别:
    • NON-Transactional:非事务控制,允许同一个批次内的tuple部分处理成功,失败的tuple可以在后面的其他多个批次内进行重试(也有可能不进行重试),可能会导致被成功处理多次
    • Transactional:严格的事务控制,要求批次内处理失败的tuple只能在本批次内进行重试(即以相同的批次号进行重试,其他后面的批次需要等待你这次批次处理成功才能继续往下进行,如果tuple一直重试不成功,则就会将整个程序挂起),这种级别的事务是没有容错的事务控制。
    • Opaque-Transactional:透明事务控制,针对Transactional事务控制的缺点设计。批次内的tuple梳理完

Tags:


Share: