1.Maven项目的pom.xml源码如下:
<groupId>com.yg</groupId>
<artifactId>storm</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>storm</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka\_2.12</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.path.to.main.Class</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
2.KafkaSpout.java源码如下:
package com.yg.storm.kafka.spouts;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class KafkaSpout extends BaseRichSpout {
private static final long serialVersionUID = 7582771881226024741L;
private KafkaConsumer<String, String> consumer;
final private String TOPIC = "myTopic";
SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop211:9092,hadoop212:9092,hadoop213:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(TOPIC));
}
@Override
public void nextTuple() {
try {
consumer.subscribe(Arrays.asList(TOPIC));//向topic订阅数据
//无限循环拉取
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);//一次拉取10条数据
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), key ,value);
collector.emit(new Values(value));//发射数据
}
Thread.sleep(1000\*1);//每隔一秒拉取一次
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("sentence"));
}
}
3.HelloWorldBolt.java源码如下:
package com.yg.storm.bolts;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
public class HelloWorldBolt extends BaseRichBolt{
/\*\*
\* 功能:就收到spout发送的数据,打印并统计hello world的数量
\* 实现:打印,创建计数变量用于统计hello world
\*/
private static final long serialVersionUID = -5061906223048521415L;
private int myCount = 0;//计数变量,不能在execute函数中初始化
private TopologyContext context;//上下文变量
private OutputCollector collector;
//相当于spout中的open
@Override
public void prepare(Map stormConf,
TopologyContext context,
OutputCollector collector) {
this.context = context;
this.collector = collector;
}
//相当于spout中的nextTuple
@Override
public void execute(Tuple input) {
//拿到数据,用字段名取出
String text = input.getStringByField("sentence");
System.out.println("One tuple gets in: " + context.getThisTaskId() + text);
if ("Hello World".equals(text)){
myCount++;
System.out.println("Found a Hello World! My count is now:" + myCount);
}
collector.ack(input);//处理完成要通知Storm
// collector.fail(input);//处理失败要通知Storm
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
4.KafkaHelloWorldTopology.java源码如下:
package com.yg.storm.kafka.topologies;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import com.yg.storm.bolts.HelloWorldBolt;
public class KafkaHelloWorldTopology {
//可以向main函数传入一个参数作为集群模式下Topology的名字,如果不传入任何参数则使用本地模式
public static void main(String\[\] args) {
final String brokerZkStr = "hadoop211:2181,hadoop212:2181,hadoop213:2181";
final String topic = "myTopic";
BrokerHosts brokerHosts = new ZkHosts(brokerZkStr);
// //使用KafkaConfig
// KafkaConfig kafkaConfig = new KafkaConfig(
// brokerHosts,
// topic
// );
//使用SpoutConfig,它继承自KafkaConfig,延伸了几个功能配置
SpoutConfig spoutConfig = new SpoutConfig(
brokerHosts, //storm要连接的kafka的对应的zk列表
topic, //storm要消费的kafka的topic
"/HWTopo", //storm在kafka消费的过程中需要在zk中设置一个工作目录
"kafkaspout"); //storm在kafka中消费过程中生产一个标识ID
//指定自定义的scheme
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(spoutConfig));
builder.setBolt("bolt1", new HelloWorldBolt()).shuffleGrouping("spout");
Config conf = new Config();
// Map
//
// map.put("metadata.broker.list", "hadoop211:9092,hadoop212:9092,hadoop213:9092");
// map.put("serializer.class", "kafka.serializer.StringEncoder");
// conf.put("kafka.broker.properties", map);
// conf.put("topic", topic);
if(args != null && args.length > 0) {
//提交到集群运行
try {
StormSubmitter.submitTopology(args\[0\], conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
//本地模式运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("SchemeTopo", conf, builder.createTopology());
Utils.sleep(1000000);
cluster.killTopology("SchemeTopo");
cluster.shutdown();
}
}
}
5.自定义schema类MessageScheme.java源码如下:
package com.yg.storm.kafka.topologies;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;
import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
//编写storm消费kafka的逻辑
/*
* MultiScheme is an interface that dictates how the ByteBuffer consumed
* from Kafka gets transformed into a storm tuple.
* It also controls the naming of your output field.
*
* The default RawMultiScheme just takes the ByteBuffer and returns a tuple
* with the ByteBuffer converted to a byte[].
* The name of the outputField is "bytes".
* There are alternative implementations like SchemeAsMultiScheme and
* KeyValueSchemeAsMultiScheme which can convert the ByteBuffer to String.
*
*/
public class MessageScheme implements Scheme {
private static final long serialVersionUID = 1033379821285531859L;
@Override
public List<Object> deserialize(ByteBuffer buffer) {
try {
Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
String sentence = charBuffer.toString();
return new Values(sentence);
} catch (Exception e) {
System.out.println(e);
return null;
}
}
@Override
public Fields getOutputFields() {
return new Fields("sentence");
}
}
直接本地运行KafkaHelloWorldTopology类即可.
手机扫一扫
移动阅读更方便
你可能感兴趣的文章