1. 新建一个Maven项目,pom.xml代码如下:
<groupId>com.yg</groupId>
<artifactId>storm</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>storm</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.path.to.main.Class</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
2.新建HelloWorldSpout.java,代码如下:
package com.yg.storm.spouts;
import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
public class HelloWorldSpout extends BaseRichSpout{
/\*\*
\* 功能:随机生成字符串
\* 实现:先产生一个1-10随机整数,再不断产生一个1-10随机整数,若两者
\* 相等,则发射hello world,否则发送其他字符串
\*/
private static final long serialVersionUID = -5698117627723074157L;
private static final int MAX\_RANDOM = 10;
private int referenceRandom;
private SpoutOutputCollector collector;
//构造函数
public HelloWorldSpout(){
//产生第一个随机数
final Random rand = new Random();
referenceRandom = rand.nextInt(MAX\_RANDOM);
}
//在spout加载时,打开一些资源(只在spout加载的时候执行一次)
@Override
public void open(Map conf,
TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
//核心方法,storm会不断调用该方法,也就是方法执行完后会马上重置并再次执行
@Override
public void nextTuple() {
Utils.sleep(1000);//停滞一秒
final Random rand = new Random();
int instanceRandom = rand.nextInt(MAX\_RANDOM);
if (referenceRandom == instanceRandom){
collector.emit(new Values("Hello World"));//有顺序的
} else {
collector.emit(new Values("Other Random Word"));
}
}
//声明Tuple的字段名,有顺序的
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
3.新建HelloWorldBolt.java,代码如下:
package com.yg.storm.bolts;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
public class HelloWorldBolt extends BaseRichBolt{
/\*\*
\* 功能:就收到spout发送的数据,打印并统计hello world的数量
\* 实现:打印,创建计数变量用于统计hello world
\*/
private static final long serialVersionUID = -5061906223048521415L;
private int myCount = 0;//计数变量,不能在execute函数中初始化
private TopologyContext context;//上下文变量
private OutputCollector collector;
//相当于spout中的open
@Override
public void prepare(Map stormConf,
TopologyContext context,
OutputCollector collector) {
this.context = context;
this.collector = collector;
}
//相当于spout中的nextTuple
@Override
public void execute(Tuple input) {
//拿到数据,用字段名取出
String text = input.getStringByField("sentence");
System.out.println("One tuple gets in: " + context.getThisTaskId() + text);
if ("Hello World".equals(text)){
myCount++;
System.out.println("Found a Hello World! My count is now:" + myCount);
}
collector.ack(input);//处理完成要通知Storm
// collector.fail(input);//处理失败要通知Storm
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
4.新建HelloWorldTopolog.java,代码如下:
package com.yg.storm.topologies;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import com.yg.storm.bolts.HelloWorldBolt;
import com.yg.storm.spouts.HelloWorldSpout;
public class HelloWorldTopology {
//可以向main传递一个参数作为集群模式下的Topology的名字,若没有传入参数则使用本地模式
public static void main(String\[\] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("hlSpout", new HelloWorldSpout());
builder.setBolt("hlBolt", new HelloWorldBolt())
.shuffleGrouping("hlSpout");
Config conf = new Config();
if (args != null && args.length > 0){
//集群模式提交
conf.setNumWorkers(3);
try {
StormSubmitter.submitTopology(args\[0\], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvalidTopologyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (AuthorizationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
//本地模式提交
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(1000\*60);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
直接本地运行HelloWorldTopology类即可.
手机扫一扫
移动阅读更方便
你可能感兴趣的文章