Flume+kakfa+sparkStream实时处理数据测试
阅读原文时间:2023年07月10日阅读:1

flume:从数据源拉取数据

kafka:主要起到缓冲从flume拉取多了的数据

sparkStream:对数据进行处理

一.flume拉取数据

1.源数据文件读取配置

在flume目录的conf目录下配置读取数据源的配置,配置一个test.properties文件,内容如下:

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/hjh/spark/test.txt
a1.sources.r1.restartThrottle =
a1.sources.r1.logStdErr = true
#a1.sources.r1.restart = true
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity =
a1.channels.c1.transactionCapacity =
a1.channels.c1.keepalive =
a1.sinks.k1.type =org.apache.flume.plugins.KafkaSink
a1.sinks.k1.metadata.broker.list=192.168.22.7:,192.168.22.8:,192.168.22.9:
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.request.required.acks=
a1.sinks.k1.max.message.size=
a1.sinks.k1.producer.type=sync
a1.sinks.k1.custom.encoding=UTF-
a1.sinks.k1.custom.topic.name=test
a1.sinks.k1.channel=c1
a1.sinks.k1.product.source.name=

配置读取源文件的读取路径如下:

a1.sources.r1.command = tail -F /home/hadoop/hjh/spark/test.txt

读取的数据传到kafka的哪个topic下:

a1.sinks.k1.custom.topic.name=test

2.启动flume读取数据

nohup bin/flume-ng agent -c conf -f conf/test.properties -n a1 -Dflume.root.logger=INFO,console &

二.kafka缓冲数据

1.启动zookeeper服务(启动kafka自带的单机zookeeper)

bin/zookeeper-server-start.sh config/zookeeper.properties

2.启动kafka服务

nohup bin/kafka-server-start.sh config/server.properties &

3.创建一个topic

bin/kafka-topics.sh --create --zookeeper localhost: --replication-factor --partitions --topic test

集群情况下,localhost换成集群的master地址

4.查看kafka的topic

bin/kafka-topics.sh --list --zookeeper localhost:

三.SparkStream处理数据

1.用spark中自带例子进行测试

进入spark目录

bin/run-example org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 my-consumer-group test

zoo01,zoo02,zoo03替换为集群的zookeeper地址

2.往源文件中加入数据

echo "test test" >> test.txt

sparkStream会统计源数据中单词的数量并输出