初识Storm之HelloWorld程序源码
阅读原文时间:2023年09月07日阅读:4

1. 新建一个Maven项目,pom.xml代码如下:

http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0

<groupId>com.yg</groupId>  
<artifactId>storm</artifactId>  
<version>0.0.1-SNAPSHOT</version>  
<packaging>jar</packaging>

<name>storm</name>  
<url>http://maven.apache.org</url>

<properties>  
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
</properties>

<dependencies>

    <dependency>  
        <groupId>org.apache.storm</groupId>  
        <artifactId>storm-core</artifactId>  
        <version>1.1.3</version>  
        <scope>provided</scope>  
    </dependency>

    <dependency>  
        <groupId>junit</groupId>  
        <artifactId>junit</artifactId>  
        <version>3.8.1</version>  
        <scope>test</scope>  
    </dependency>

</dependencies>

<build>  
    <plugins>  
        <plugin>  
            <artifactId>maven-assembly-plugin</artifactId>  
            <configuration>  
                <descriptorRefs>  
                    <descriptorRef>jar-with-dependencies</descriptorRef>  
                </descriptorRefs>  
                <archive>  
                    <manifest>  
                        <mainClass>com.path.to.main.Class</mainClass>  
                    </manifest>  
                </archive>  
            </configuration>  
        </plugin>

        <plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-compiler-plugin</artifactId>  
            <version>3.5</version>  
            <configuration>  
                <source>1.8</source>  
                <target>1.8</target>  
            </configuration>  
        </plugin>

    </plugins>  
</build>

2.新建HelloWorldSpout.java,代码如下:

package com.yg.storm.spouts;

import java.util.Map;
import java.util.Random;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class HelloWorldSpout extends BaseRichSpout{

/\*\*  
 \* 功能:随机生成字符串  
 \* 实现:先产生一个1-10随机整数,再不断产生一个1-10随机整数,若两者  
 \* 相等,则发射hello world,否则发送其他字符串  
 \*/  
private static final long serialVersionUID = -5698117627723074157L;  
private static final int MAX\_RANDOM = 10;  
private int referenceRandom;  
private SpoutOutputCollector collector;

//构造函数  
public HelloWorldSpout(){  
    //产生第一个随机数  
    final Random rand  = new Random();  
    referenceRandom = rand.nextInt(MAX\_RANDOM);  
}

//在spout加载时,打开一些资源(只在spout加载的时候执行一次)  
@Override  
public void open(Map conf,  
        TopologyContext context,  
        SpoutOutputCollector collector) {  
    this.collector = collector;

}

//核心方法,storm会不断调用该方法,也就是方法执行完后会马上重置并再次执行  
@Override  
public void nextTuple() {

    Utils.sleep(1000);//停滞一秒  
    final Random rand  = new Random();  
    int instanceRandom = rand.nextInt(MAX\_RANDOM);  
    if (referenceRandom == instanceRandom){  
        collector.emit(new Values("Hello World"));//有顺序的  
    } else {  
        collector.emit(new Values("Other Random Word"));  
    }  
}

//声明Tuple的字段名,有顺序的  
@Override  
public void declareOutputFields(OutputFieldsDeclarer declarer) {  
    declarer.declare(new Fields("sentence"));

}

}

3.新建HelloWorldBolt.java,代码如下:

package com.yg.storm.bolts;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class HelloWorldBolt extends BaseRichBolt{

/\*\*  
 \* 功能:就收到spout发送的数据,打印并统计hello world的数量  
 \* 实现:打印,创建计数变量用于统计hello world  
 \*/  
private static final long serialVersionUID = -5061906223048521415L;  
private int myCount = 0;//计数变量,不能在execute函数中初始化  
private TopologyContext context;//上下文变量  
private OutputCollector collector;

//相当于spout中的open  
@Override  
public void prepare(Map stormConf,  
        TopologyContext context,  
        OutputCollector collector) {  
    this.context = context;  
    this.collector = collector;  
}

//相当于spout中的nextTuple  
@Override  
public void execute(Tuple input) {  
    //拿到数据,用字段名取出  
    String text = input.getStringByField("sentence");  
    System.out.println("One tuple gets in: " + context.getThisTaskId() + text);  
    if ("Hello World".equals(text)){  
        myCount++;  
        System.out.println("Found a Hello World! My count is now:" + myCount);  
    }  
    collector.ack(input);//处理完成要通知Storm  

// collector.fail(input);//处理失败要通知Storm

}

@Override  
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}  

}

4.新建HelloWorldTopolog.java,代码如下:

package com.yg.storm.topologies;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

import com.yg.storm.bolts.HelloWorldBolt;
import com.yg.storm.spouts.HelloWorldSpout;

public class HelloWorldTopology {

//可以向main传递一个参数作为集群模式下的Topology的名字,若没有传入参数则使用本地模式  
public static void main(String\[\] args) {

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("hlSpout", new HelloWorldSpout());  
    builder.setBolt("hlBolt", new HelloWorldBolt())  
    .shuffleGrouping("hlSpout");

    Config conf = new Config();

    if (args != null && args.length > 0){  
        //集群模式提交  
        conf.setNumWorkers(3);

        try {  
            StormSubmitter.submitTopology(args\[0\], conf, builder.createTopology());  
        } catch (AlreadyAliveException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        } catch (InvalidTopologyException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        } catch (AuthorizationException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }

    } else {  
        //本地模式提交  
        LocalCluster cluster = new LocalCluster();  
        cluster.submitTopology("test", conf, builder.createTopology());  
        Utils.sleep(1000\*60);  
        cluster.killTopology("test");  
        cluster.shutdown();

    }  
}

}

直接本地运行HelloWorldTopology类即可.

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章