N21_第x周_Storm_01_单机实践篇

   这2周没有按马哥安排的课程走,因公司需要,大家一直在试尝大数据这块。作业不能不做,也不知道马哥哪周的作业会有storm,只好先将这段时间的实验慢慢记录下来(其它flume、kafka、spark等本周会慢慢补充),等知道具体的作业题目后,再完善。

实验目的

  了解storm的原理,并用storm单机版实验加深理解,为后面的大数据做准备。

  了解Topology、spout、bolt、Nimbus、Suppervisor,怎么用。

  本篇不涉及原理及相关解释,可以度娘。   

实验题目

  1473770496158165.jpg

  1473770824893767.jpg

    RandomSpout类:读取外部数据并封装为tuple发送出去,模拟从goods数据中随机取一个商品名称封装到tuple中发送出去;

    UpperBolt类:将收到的原始商品名称,转换成大写再发送出去;

    SuffixBolt类:给商品名称添加后缀,然后将数据写入文件中;

    TopoMain类:描述topology的结构,以及创建topology并提交给集群;

RandomSpout.java

// 读取外部数据并封装为tuple发送出去
public class RandomSpout extends BaseRichSpout{
SpoutOutputCollector collector = null;
String [] goods = {"iphone","xiaomi","meizu","zhongxing","huawei","moto","sumsung","simens"};
/**
 *  获取消息并发送给下一个组件的方法,会被storm不断地调用(最重要的一个方法)
 * 
 *  从goods数据中随机取一个商品名称封装到tuple中发送出去
 * */
@Override
public void nextTuple() {
// 随机取到一个商品名称
Random random = new Random();
String good = goods[random.nextInt(goods.length)];
//封装到tuple中发送出去
collector.emit(new Values(good));
//休眠500毫秒
Utils.sleep(500);
}
//进行初始化,只在开始的时候调用一次
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
 *  定义tuple的scheme
 * */
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("src_word"));   //第一个商品名称
}
}

UpperBolt.java

/**
 *  将收到的原始商品名称,转换成大写再发送出去
 * */
public class UpperBolt extends BaseBasicBolt{
/**
 *  execute:每来一次消息,就会被执行一次
 * */
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
// 从tuple中拿到我们的数据----原始商品名
String src_word = tuple.getString(0);
//转换成大写
String upper_word = src_word.toUpperCase();
//发送出去
collector.emit(new Values(upper_word));
}
//声明bolt组件要发送tuple的字段定义
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("upper_word"));
}
}

SuffixBolt.java

/**
 *  给商品名称添加后缀,然后将数据写入文件中
 *  @author zhouyong
 * */
public class SuffixBolt extends BaseBasicBolt {
FileWriter fileWriter = null;
//初始化方法,会被调用一次
@Override
public void prepare(Map stormConf, TopologyContext context) {
try{
fileWriter = new FileWriter("/home/hadoop/" + UUID.randomUUID());
}catch(Exception ex){
ex.printStackTrace();
}
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
// 从消息元组tuple中拿到上一个组件发送过来的数据
String upper_word = tuple.getString(0);
//给商品名称添加后缀
String result = upper_word + "_suffix";
try{
fileWriter.append(result);
fileWriter.append("\n");
fileWriter.flush();
}catch(Exception ex){
ex.printStackTrace();
}
}
//声明该组件要发送出去的tuple的字段定义
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
}

主类TopoMain.java

/**
 *  描述topology的结构,以及创建topology并提交给集群
 *  @author zhouyong
 * */
public class TopoMain {
public static void main(String [] args) throws Exception{
 TopologyBuilder topologyBuilder = new TopologyBuilder();
 
 //设置消息源组件为RandomSpout
 //唯为标识,spout实例,并发数
 topologyBuilder.setSpout("randomspout", new RandomSpout(), 4);
 
 //设置逻辑处理组件UpperBolt,并指定接收randomspout的消息
 topologyBuilder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
 
 //设置逻辑处理组件SuffixBolt,并指定接收upperbolt的消息
 topologyBuilder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
 
 //创建一个topology
 StormTopology topo = topologyBuilder.createTopology();
 
 //创建一个storm的配置参数对象
 Config conf = new Config();
 //设置storm集群为这个topo启动的进程数
 conf.setNumWorkers(4);
 conf.setDebug(true);
 conf.setNumAckers(0);
 
 //提交topo到storm集群中
 StormSubmitter.submitTopology("demotopo", conf, topo);
 
}
}

将这4个java类打包成jar包,jar包名称为demotopo.jar。

1473771658505019.jpg

环境部署

1,安装zookeeper;

2,安装storm;

CentOS6.5,我们统一将zookeeper和storm安装到/opt/hadoop/下。

安装zookeeper

zookeeper版本:zookeeper-3.4.8.tar.gz

单机版部署zookeeper,只要解压就可以了,可以不做配置上的修改。

启动zookeeper

1>, 启动zookeeper

# ./zkServer.sh start

2>, 检查zookeeper是否成功

# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config:
/opt/hadoop/zookeeper/zookeeper/bin/../conf/zoo.cfg
Mode: standalone

 1473772366993930.jpg

安装storm

1>,版本:apache-storm-0.9.6.tar.gz

2>,安装配置

   解压到/opt/hadoop/storm/后,配置storm.yaml,

文件在/usr/local/storm/conf/storm.yaml,内容:
 storm.zookeeper.servers:
     - 127.0.0.1
 storm.zookeeper.port: 2181
 nimbus.host: "127.0.0.1"
 storm.local.dir: "/tmp/storm"
 supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703

注意: 这里要特别注意前后空格问题,否则在启动时会不通过。


3>, 启动主节点:nimbus

# ./storm nimbus\\前台启动,窗口不能关闭
## bin/storm nimbus 1>/dev/null 2>&1 &\\后台启动
检查是否启动
# jps
10790 nimbus
11030 worker
10870 supervisor
8457 QuorumPeerMain
11366 Jps
11023 worker
11255 core
11018 worker
11019 worker

前台窗口启动主节点nimbus截图:

1473772984288521.jpg


4>,启动一个前端UI

# bin/storm ui\\前端启动,窗口不能关
## bin/storm ui 1>/dev/null 2>&1 &\\后台启动
# jps
11255 core

http://172.31.3.148:8080/index.html  

       1473773190267950.jpg


5>,启动从节点: supervisor

# ./storm supervisor\\前端启动
## ./storm supervisor\\后台启动
# jps
10790 nimbus
11030 worker
10870 supervisor

1473773344524546.jpg

用jps检查下所有服务是否都正常:

[root@localhost apache-storm-0.9.6]# jps
10790 nimbus
11030 worker
10870 supervisor
8457 QuorumPeerMain
11366 Jps
11023 worker
11255 core
11018 worker
11019 worker

——-

提交Topologies

1>,上传jar包

将demotopo.jar包上传到storm的安装目录/opt/hadoop/storm/下:

1473773781461054.jpg

2>,将jar包发送给storm去执行

命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】【stormIP地址】【storm端口】【拓扑名称】【参数】

# ./storm jar ../../demotopo.jar cn.itcast.storm.TopoMain

# ./storm jar ../../demotopo.jar cn.itcast.storm.TopoMain
Running: /usr/local/java/bin/java -client -Dstorm.options= -Dstorm.home=/opt/hadoop/storm/apache-storm-0.9.6 -Dstorm.log.dir=/opt/hadoop/storm/apache-storm-0.9.6/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/hadoop/storm/apache-storm-0.9.6/lib/carbonite-1.4.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jetty-util-6.1.26.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/compojure-1.1.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jgrapht-core-0.9.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.logging-0.2.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jetty-6.1.26.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clj-time-0.4.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-codec-1.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/servlet-api-2.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/storm-core-0.9.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/hiccup-0.3.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clj-stacktrace-0.2.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-lang-2.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/chill-java-0.3.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/kryo-2.21.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/logback-classic-1.0.13.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/minlog-1.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/objenesis-1.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clout-1.0.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.cli-0.2.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-devel-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-jetty-adapter-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-servlet-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/snakeyaml-1.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/log4j-over-slf4j-1.6.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/core.incubator-0.1.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-core-1.1.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/asm-4.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/math.numeric-tower-0.0.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/disruptor-2.10.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/joda-time-2.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-exec-1.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jline-2.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/slf4j-api-1.7.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.macro-0.1.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clojure-1.5.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-fileupload-1.2.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/reflectasm-1.07-shaded.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/logback-core-1.0.13.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-io-2.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-logging-1.1.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/json-simple-1.1.jar:../../demotopo.jar:/opt/hadoop/storm/apache-storm-0.9.6/conf:/opt/hadoop/storm/apache-storm-0.9.6/bin -Dstorm.jar=../../demotopo.jar cn.itcast.storm.TopoMain
654  [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
708  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar ../../demotopo.jar to assigned location: /opt/hadoop/storm/apache-storm-0.9.6/nimbus/inbox/stormjar-c392c9d8-ebaf-47e8-be98-6d98526e82a8.jar
752  [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/hadoop/storm/apache-storm-0.9.6/nimbus/inbox/stormjar-c392c9d8-ebaf-47e8-be98-6d98526e82a8.jar
752  [main] INFO  backtype.storm.StormSubmitter - Submitting topology demotopo in distributed mode with conf {"topology.workers":4,"topology.acker.executors":0,"topology.debug":true}
973  [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: demotopo

3>,检查

检查Topology(名为demotopo)是否运行

      1473774107376858.jpg

检查/home/hadoop/下是否生成了4个文件

  1473774207108853.jpg

  这里的4个worker是在配置文件中配置的。

检查文件大小是否在不停变化

   1473774307357070.jpg

检查文件中单词是否随机,每个单词后是否以suffix结尾

  1473774437982428.jpg

注:附件是实验jar包,请将.rar改为.jar。

N21_第x周_Storm_01_单机实践篇demotopo.rar

原创文章,作者:365,如若转载,请注明出处:http://www.178linux.com/46231

(0)
365365
上一篇 2016-09-15 22:21
下一篇 2016-09-15 22:21

相关推荐

  • 第六周总结

    请详细总结vim编辑器的使用并完成以下练习题 vim: 模块化的编辑器   基本模式: 编辑模式,命令模式 输入模式 末行模式: 打开文件: # vim [options] [file..] +#:打开文件后,直接让光标处于第#行的行首; +/PATTERN:打开文件后,直接让光标处于第一个被PATTERN匹配到的行的行首; 模式转换: 编辑模式:…

    Linux干货 2017-08-07
  • Linux权限管理练习

    1、当用户xiaoming对/testdir 目录无执行权限时,意味着无法做哪些操作? 无法cd切换进入此目录,无法创建文件,无法删除文件,无法查看里面文件的内容,只能ls列出目录下的内容 2、当用户xiaoqiang对/testdir 目录无读权限时,意味着无法做哪些操作? 无法ls查看目录下的内容 3、当用户wangcai 对/testdir 目录无写权…

    Linux干货 2016-08-05
  • find文件:就是这么简单

    概述 由于Linux一切皆文件,我们的日常运维工作其实就是与文件打交道的事,如何能够快速而有效地找到我们需要的文件呢?这是个令人头疼的问题。幸运是,Linux为用户提供了强大的查找工具——find。find通过遍历指定路径完成文件查找,它的的工作特点: 精确查找——多查询条件组合,精确匹配; 实时查找——遍历指定路径; 查找速度稍慢——由于需要遍历路径,速度…

    Linux干货 2016-08-18
  • N25_第一周博客作业

    一、计算机组成及其功能      1、CPU:          运算器:计算功能,对数据进行加工处理的部件。          控制器:负责从存储器读取指令,控制计算器之间的运行状态和结果;     &nbs…

    Linux干货 2016-12-04
  • etc的常见问答

    1、复制/etc/skel目录为/home/tuser1,要求/home/tuser1及其内部文件的属组和其它用户均没有任何访问权限。 [root@localhost tuser1]# cp -r /etc/skel/ /home/tuser1 [root@localhost tuser1]# chmod -R go= /home/tuser1/ [root…

    2017-12-26
  • N24期第四周作业

    1、复制/etc/skel目录为/home/tuser1,要求/home/tuser1及其内部文件的属组和其它用户均没有任何访问权限。 2、编辑/etc/group文件,添加组hadoop。 vim /etc/group,增加此行 3、手动编辑/etc/passwd文件新增一行,添加用户hadoop,其基本组ID为hadoop组的id号;其家目录为/home…

    Linux干货 2016-11-22