N21_第x周_Storm_01_单机实践篇

   这2周没有按马哥安排的课程走,因公司需要,大家一直在试尝大数据这块。作业不能不做,也不知道马哥哪周的作业会有storm,只好先将这段时间的实验慢慢记录下来(其它flume、kafka、spark等本周会慢慢补充),等知道具体的作业题目后,再完善。

实验目的

  了解storm的原理,并用storm单机版实验加深理解,为后面的大数据做准备。

  了解Topology、spout、bolt、Nimbus、Suppervisor,怎么用。

  本篇不涉及原理及相关解释,可以度娘。   

实验题目

  1473770496158165.jpg

  1473770824893767.jpg

    RandomSpout类:读取外部数据并封装为tuple发送出去,模拟从goods数据中随机取一个商品名称封装到tuple中发送出去;

    UpperBolt类:将收到的原始商品名称,转换成大写再发送出去;

    SuffixBolt类:给商品名称添加后缀,然后将数据写入文件中;

    TopoMain类:描述topology的结构,以及创建topology并提交给集群;

RandomSpout.java

// 读取外部数据并封装为tuple发送出去
public class RandomSpout extends BaseRichSpout{
SpoutOutputCollector collector = null;
String [] goods = {"iphone","xiaomi","meizu","zhongxing","huawei","moto","sumsung","simens"};
/**
 *  获取消息并发送给下一个组件的方法,会被storm不断地调用(最重要的一个方法)
 * 
 *  从goods数据中随机取一个商品名称封装到tuple中发送出去
 * */
@Override
public void nextTuple() {
// 随机取到一个商品名称
Random random = new Random();
String good = goods[random.nextInt(goods.length)];
//封装到tuple中发送出去
collector.emit(new Values(good));
//休眠500毫秒
Utils.sleep(500);
}
//进行初始化,只在开始的时候调用一次
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
 *  定义tuple的scheme
 * */
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("src_word"));   //第一个商品名称
}
}

UpperBolt.java

/**
 *  将收到的原始商品名称,转换成大写再发送出去
 * */
public class UpperBolt extends BaseBasicBolt{
/**
 *  execute:每来一次消息,就会被执行一次
 * */
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
// 从tuple中拿到我们的数据----原始商品名
String src_word = tuple.getString(0);
//转换成大写
String upper_word = src_word.toUpperCase();
//发送出去
collector.emit(new Values(upper_word));
}
//声明bolt组件要发送tuple的字段定义
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("upper_word"));
}
}

SuffixBolt.java

/**
 *  给商品名称添加后缀,然后将数据写入文件中
 *  @author zhouyong
 * */
public class SuffixBolt extends BaseBasicBolt {
FileWriter fileWriter = null;
//初始化方法,会被调用一次
@Override
public void prepare(Map stormConf, TopologyContext context) {
try{
fileWriter = new FileWriter("/home/hadoop/" + UUID.randomUUID());
}catch(Exception ex){
ex.printStackTrace();
}
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
// 从消息元组tuple中拿到上一个组件发送过来的数据
String upper_word = tuple.getString(0);
//给商品名称添加后缀
String result = upper_word + "_suffix";
try{
fileWriter.append(result);
fileWriter.append("\n");
fileWriter.flush();
}catch(Exception ex){
ex.printStackTrace();
}
}
//声明该组件要发送出去的tuple的字段定义
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
}

主类TopoMain.java

/**
 *  描述topology的结构,以及创建topology并提交给集群
 *  @author zhouyong
 * */
public class TopoMain {
public static void main(String [] args) throws Exception{
 TopologyBuilder topologyBuilder = new TopologyBuilder();
 
 //设置消息源组件为RandomSpout
 //唯为标识,spout实例,并发数
 topologyBuilder.setSpout("randomspout", new RandomSpout(), 4);
 
 //设置逻辑处理组件UpperBolt,并指定接收randomspout的消息
 topologyBuilder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
 
 //设置逻辑处理组件SuffixBolt,并指定接收upperbolt的消息
 topologyBuilder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
 
 //创建一个topology
 StormTopology topo = topologyBuilder.createTopology();
 
 //创建一个storm的配置参数对象
 Config conf = new Config();
 //设置storm集群为这个topo启动的进程数
 conf.setNumWorkers(4);
 conf.setDebug(true);
 conf.setNumAckers(0);
 
 //提交topo到storm集群中
 StormSubmitter.submitTopology("demotopo", conf, topo);
 
}
}

将这4个java类打包成jar包,jar包名称为demotopo.jar。

1473771658505019.jpg

环境部署

1,安装zookeeper;

2,安装storm;

CentOS6.5,我们统一将zookeeper和storm安装到/opt/hadoop/下。

安装zookeeper

zookeeper版本:zookeeper-3.4.8.tar.gz

单机版部署zookeeper,只要解压就可以了,可以不做配置上的修改。

启动zookeeper

1>, 启动zookeeper

# ./zkServer.sh start

2>, 检查zookeeper是否成功

# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config:
/opt/hadoop/zookeeper/zookeeper/bin/../conf/zoo.cfg
Mode: standalone

 1473772366993930.jpg

安装storm

1>,版本:apache-storm-0.9.6.tar.gz

2>,安装配置

   解压到/opt/hadoop/storm/后,配置storm.yaml,

文件在/usr/local/storm/conf/storm.yaml,内容:
 storm.zookeeper.servers:
     - 127.0.0.1
 storm.zookeeper.port: 2181
 nimbus.host: "127.0.0.1"
 storm.local.dir: "/tmp/storm"
 supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703

注意: 这里要特别注意前后空格问题,否则在启动时会不通过。


3>, 启动主节点:nimbus

# ./storm nimbus\\前台启动,窗口不能关闭
## bin/storm nimbus 1>/dev/null 2>&1 &\\后台启动
检查是否启动
# jps
10790 nimbus
11030 worker
10870 supervisor
8457 QuorumPeerMain
11366 Jps
11023 worker
11255 core
11018 worker
11019 worker

前台窗口启动主节点nimbus截图:

1473772984288521.jpg


4>,启动一个前端UI

# bin/storm ui\\前端启动,窗口不能关
## bin/storm ui 1>/dev/null 2>&1 &\\后台启动
# jps
11255 core

http://172.31.3.148:8080/index.html  

       1473773190267950.jpg


5>,启动从节点: supervisor

# ./storm supervisor\\前端启动
## ./storm supervisor\\后台启动
# jps
10790 nimbus
11030 worker
10870 supervisor

1473773344524546.jpg

用jps检查下所有服务是否都正常:

[root@localhost apache-storm-0.9.6]# jps
10790 nimbus
11030 worker
10870 supervisor
8457 QuorumPeerMain
11366 Jps
11023 worker
11255 core
11018 worker
11019 worker

——-

提交Topologies

1>,上传jar包

将demotopo.jar包上传到storm的安装目录/opt/hadoop/storm/下:

1473773781461054.jpg

2>,将jar包发送给storm去执行

命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】【stormIP地址】【storm端口】【拓扑名称】【参数】

# ./storm jar ../../demotopo.jar cn.itcast.storm.TopoMain

# ./storm jar ../../demotopo.jar cn.itcast.storm.TopoMain
Running: /usr/local/java/bin/java -client -Dstorm.options= -Dstorm.home=/opt/hadoop/storm/apache-storm-0.9.6 -Dstorm.log.dir=/opt/hadoop/storm/apache-storm-0.9.6/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/hadoop/storm/apache-storm-0.9.6/lib/carbonite-1.4.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jetty-util-6.1.26.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/compojure-1.1.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jgrapht-core-0.9.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.logging-0.2.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jetty-6.1.26.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clj-time-0.4.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-codec-1.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/servlet-api-2.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/storm-core-0.9.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/hiccup-0.3.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clj-stacktrace-0.2.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-lang-2.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/chill-java-0.3.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/kryo-2.21.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/logback-classic-1.0.13.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/minlog-1.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/objenesis-1.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clout-1.0.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.cli-0.2.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-devel-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-jetty-adapter-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-servlet-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/snakeyaml-1.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/log4j-over-slf4j-1.6.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/core.incubator-0.1.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-core-1.1.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/asm-4.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/math.numeric-tower-0.0.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/disruptor-2.10.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/joda-time-2.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-exec-1.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jline-2.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/slf4j-api-1.7.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.macro-0.1.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clojure-1.5.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-fileupload-1.2.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/reflectasm-1.07-shaded.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/logback-core-1.0.13.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-io-2.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-logging-1.1.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/json-simple-1.1.jar:../../demotopo.jar:/opt/hadoop/storm/apache-storm-0.9.6/conf:/opt/hadoop/storm/apache-storm-0.9.6/bin -Dstorm.jar=../../demotopo.jar cn.itcast.storm.TopoMain
654  [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
708  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar ../../demotopo.jar to assigned location: /opt/hadoop/storm/apache-storm-0.9.6/nimbus/inbox/stormjar-c392c9d8-ebaf-47e8-be98-6d98526e82a8.jar
752  [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/hadoop/storm/apache-storm-0.9.6/nimbus/inbox/stormjar-c392c9d8-ebaf-47e8-be98-6d98526e82a8.jar
752  [main] INFO  backtype.storm.StormSubmitter - Submitting topology demotopo in distributed mode with conf {"topology.workers":4,"topology.acker.executors":0,"topology.debug":true}
973  [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: demotopo

3>,检查

检查Topology(名为demotopo)是否运行

      1473774107376858.jpg

检查/home/hadoop/下是否生成了4个文件

  1473774207108853.jpg

  这里的4个worker是在配置文件中配置的。

检查文件大小是否在不停变化

   1473774307357070.jpg

检查文件中单词是否随机,每个单词后是否以suffix结尾

  1473774437982428.jpg

注:附件是实验jar包,请将.rar改为.jar。

N21_第x周_Storm_01_单机实践篇demotopo.rar

原创文章,作者:365,如若转载,请注明出处:http://www.178linux.com/46231