Storm整合Kafka Java API源码
阅读原文时间:2023年09月07日阅读:2

1.Maven项目的pom.xml源码如下:

http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0

<groupId>com.yg</groupId>  
<artifactId>storm</artifactId>  
<version>0.0.1-SNAPSHOT</version>  
<packaging>jar</packaging>

<name>storm</name>  
<url>http://maven.apache.org</url>

<properties>  
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
</properties>

<dependencies>

    <dependency>  
        <groupId>org.apache.storm</groupId>  
        <artifactId>storm-core</artifactId>  
        <version>1.1.3</version>  
        <scope>provided</scope>  
    </dependency>

    <dependency>  
        <groupId>org.apache.kafka</groupId>  
        <artifactId>kafka\_2.12</artifactId>  
        <version>0.10.2.1</version>  
    </dependency>

    <dependency>  
        <groupId>org.apache.storm</groupId>  
        <artifactId>storm-kafka</artifactId>  
        <version>1.1.3</version>  
    </dependency>

</dependencies>

<build>  
    <plugins>  
        <plugin>  
            <artifactId>maven-assembly-plugin</artifactId>  
            <configuration>  
                <descriptorRefs>  
                    <descriptorRef>jar-with-dependencies</descriptorRef>  
                </descriptorRefs>  
                <archive>  
                    <manifest>  
                        <mainClass>com.path.to.main.Class</mainClass>  
                    </manifest>  
                </archive>  
            </configuration>  
        </plugin>

        <plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-compiler-plugin</artifactId>  
            <version>3.5</version>  
            <configuration>  
                <source>1.8</source>  
                <target>1.8</target>  
            </configuration>  
        </plugin>

    </plugins>  
</build>

2.KafkaSpout.java源码如下:

package com.yg.storm.kafka.spouts;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class KafkaSpout extends BaseRichSpout {

private static final long serialVersionUID = 7582771881226024741L;  
private KafkaConsumer<String, String> consumer;  
final private String TOPIC = "myTopic";  
SpoutOutputCollector collector;

@Override  
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {  
    this.collector = collector;

    Properties props = new Properties();  
    props.put("bootstrap.servers", "hadoop211:9092,hadoop212:9092,hadoop213:9092");  
    props.put("group.id", "test");  
    props.put("enable.auto.commit", "true");  
    props.put("auto.commit.interval.ms", "1000");  
    props.put("session.timeout.ms", "30000");  
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    consumer = new KafkaConsumer<String, String>(props);  
    consumer.subscribe(Arrays.asList(TOPIC));

}

@Override  
public void nextTuple() {  
    try {  
        consumer.subscribe(Arrays.asList(TOPIC));//向topic订阅数据

        //无限循环拉取  
        while (true) {  
            ConsumerRecords<String, String> records = consumer.poll(10);//一次拉取10条数据  
            for (ConsumerRecord<String, String> record : records) {  
                String key = record.key();  
                String value = record.value();  
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), key ,value);  
                collector.emit(new Values(value));//发射数据  
            }  
            Thread.sleep(1000\*1);//每隔一秒拉取一次  
        }  
    } catch (InterruptedException e) {  
        e.printStackTrace();  
    } finally {  
        consumer.close();  
    }

}

@Override  
public void declareOutputFields(OutputFieldsDeclarer declarer) {  
    // TODO Auto-generated method stub  
    declarer.declare(new Fields("sentence"));

}

}

3.HelloWorldBolt.java源码如下:

package com.yg.storm.bolts;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class HelloWorldBolt extends BaseRichBolt{

/\*\*  
 \* 功能:就收到spout发送的数据,打印并统计hello world的数量  
 \* 实现:打印,创建计数变量用于统计hello world  
 \*/  
private static final long serialVersionUID = -5061906223048521415L;  
private int myCount = 0;//计数变量,不能在execute函数中初始化  
private TopologyContext context;//上下文变量  
private OutputCollector collector;

//相当于spout中的open  
@Override  
public void prepare(Map stormConf,  
        TopologyContext context,  
        OutputCollector collector) {  
    this.context = context;  
    this.collector = collector;  
}

//相当于spout中的nextTuple  
@Override  
public void execute(Tuple input) {  
    //拿到数据,用字段名取出  
    String text = input.getStringByField("sentence");  
    System.out.println("One tuple gets in: " + context.getThisTaskId() + text);  
    if ("Hello World".equals(text)){  
        myCount++;  
        System.out.println("Found a Hello World! My count is now:" + myCount);  
    }  
    collector.ack(input);//处理完成要通知Storm  

// collector.fail(input);//处理失败要通知Storm

}

@Override  
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}  

}

4.KafkaHelloWorldTopology.java源码如下:

package com.yg.storm.kafka.topologies;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

import com.yg.storm.bolts.HelloWorldBolt;

public class KafkaHelloWorldTopology {

//可以向main函数传入一个参数作为集群模式下Topology的名字,如果不传入任何参数则使用本地模式  
public static void main(String\[\] args) {

        final String brokerZkStr = "hadoop211:2181,hadoop212:2181,hadoop213:2181";  
        final String topic  = "myTopic";

        BrokerHosts brokerHosts = new ZkHosts(brokerZkStr);

// //使用KafkaConfig
// KafkaConfig kafkaConfig = new KafkaConfig(
// brokerHosts,
// topic
// );

        //使用SpoutConfig,它继承自KafkaConfig,延伸了几个功能配置  
        SpoutConfig spoutConfig = new SpoutConfig(  
                brokerHosts, //storm要连接的kafka的对应的zk列表  
                topic, //storm要消费的kafka的topic  
                "/HWTopo", //storm在kafka消费的过程中需要在zk中设置一个工作目录  
                "kafkaspout");  //storm在kafka中消费过程中生产一个标识ID

        //指定自定义的scheme  
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());

        TopologyBuilder builder = new TopologyBuilder();  
        builder.setSpout("spout", new KafkaSpout(spoutConfig));  
        builder.setBolt("bolt1", new HelloWorldBolt()).shuffleGrouping("spout");

        Config conf = new Config();  

// Map map = new HashMap();
//
// map.put("metadata.broker.list", "hadoop211:9092,hadoop212:9092,hadoop213:9092");
// map.put("serializer.class", "kafka.serializer.StringEncoder");
// conf.put("kafka.broker.properties", map);
// conf.put("topic", topic);

        if(args != null && args.length > 0) {  
            //提交到集群运行  
            try {  
                StormSubmitter.submitTopology(args\[0\], conf, builder.createTopology());  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        } else {  
            //本地模式运行  
            LocalCluster cluster = new LocalCluster();  
            cluster.submitTopology("SchemeTopo", conf, builder.createTopology());  
            Utils.sleep(1000000);  
            cluster.killTopology("SchemeTopo");  
            cluster.shutdown();  
        }

    }

}

5.自定义schema类MessageScheme.java源码如下:

package com.yg.storm.kafka.topologies;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;

import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

//编写storm消费kafka的逻辑
/*
* MultiScheme is an interface that dictates how the ByteBuffer consumed
* from Kafka gets transformed into a storm tuple.
* It also controls the naming of your output field.
*
* The default RawMultiScheme just takes the ByteBuffer and returns a tuple
* with the ByteBuffer converted to a byte[].
* The name of the outputField is "bytes".
* There are alternative implementations like SchemeAsMultiScheme and
* KeyValueSchemeAsMultiScheme which can convert the ByteBuffer to String.
*
*/

public class MessageScheme implements Scheme {

private static final long serialVersionUID = 1033379821285531859L;

@Override  
public List<Object> deserialize(ByteBuffer buffer) {

    try {  
        Charset charset = Charset.forName("UTF-8");  
        CharsetDecoder decoder = charset.newDecoder();  
        CharBuffer charBuffer = decoder.decode(buffer.asReadOnlyBuffer());

        String sentence = charBuffer.toString();  
        return new Values(sentence);

    } catch (Exception e) {  
        System.out.println(e);  
        return null;  
    }  
}

@Override  
public Fields getOutputFields() {  
    return new Fields("sentence");  
}

}

直接本地运行KafkaHelloWorldTopology类即可.

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章