**Spark详解(07) - SparkStreaming
**
Spark Streaming用于流式数据的处理。
Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、HDFS等。
数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。
而结果也能保存在很多地方,如HDFS、数据库等。
SparkCore => RDD
SparkSQL => DataFrame、DataSet
Spark Streaming使用离散化流(Discretized Stream)作为抽象表示,叫作Dstream
DStream是随时间推移而收到的数据的序列。
在DStream内部,每个时间区间收到的数据都作为RDD存在,而DStream是由这些RDD所组成的序列(因此得名"离散化")。
所以简单来讲,DStream就是对RDD在实时数据处理场景的一种封装。
整体`架构图
SparkStreaming架构图
Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数"spark.streaming.receiver.maxRate"的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。
为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure):根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
通过属性"spark.streaming.backpressure.enabled"来控制是否启用背压机制,默认值false,即不启用。
**易用
**
**容错
**
**易整合到Spark体系
**
需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数
1)添加依赖
2)编写代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming01_WordCount {
def main(args: Array[String]): Unit = {
//1._初始化_Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
//2._初始化_SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//3._通过监控端口创建_DStream,读进来的数据为一行行
val lineDStream = ssc.socketTextStream("hadoop102", 9999)
//3.1 将每一行数据做切分,形成一个个单词
val wordDStream = lineDStream.flatMap(_.split(" "))
//3.2 将单词映射成元组(__word,1)
val wordToOneDStream = wordDStream.map((_, 1))
//3.3 将相同的单词次数做统计
val wordToSumDStream = wordToOneDStream.reduceByKey(_+_)
//3.4 打印
wordToSumDStream.print()
//4 启动__SparkStreamingContext
ssc.start()
// 将主线程阻塞,主线程不退出
ssc.awaitTermination()
}
}
3)更改日志打印级别
将log4j.properties文件添加到resources里面,就能更改打印日志的级别为error
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
4)启动程序并通过netcat发送数据:
[hadoop@hadoop102 ~]$ nc -lk 9999
hello spark
5)在Idea控制台输出如下内容:
-------------------------------------------
Time: 1602731772000 ms
-------------------------------------------
(hello,1)
(spark,1)
注意:目前用的算子,只能处理本批次数据的累加,不能统计所有批次总的单词个数。
DStream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。
在内部实现上,每一批次的数据封装成一个RDD,一系列连续的RDD组成了DStream。对这些RDD的转换是由Spark引擎来计算。
说明:DStream中批次与批次之间计算相互独立。如果批次设置时间小于计算时间会出现计算任务叠加情况,需要多分配资源。通常情况,批次设置时间要大于计算时间。
测试方法:
使用ssc.queueStream(queueOfRDDs)来创建DStream
将每一个推送到这个队列中的RDD,都会作为一个DStream处理。
需求:循环创建几个RDD,将RDD放入队列。通过SparkStreaming创建DStream,计算WordCount
编写代码
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object SparkStreaming02_RDDStream {
def main(args: Array[String]): Unit = {
//1._初始化_Spark配置信息
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
//2._初始化_SparkStreamingContext
val ssc = new StreamingContext(conf, Seconds(4))
//3._创建_RDD队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//4._创建_QueueInputDStream
// oneAtATime = true 默认,一次读取队列里面的一个数据
// oneAtATime = false_,_ 按照设定的时间,读取队列里面数据
val inputDStream = ssc.queueStream(rddQueue, oneAtATime = false)
//5._处理队列中的_RDD数据
val sumDStream = inputDStream.reduce(_+_)
//6._打印结果_
sumDStream.print()
//7._启动任务_
ssc.start()
//8._循环创建并向_RDD队列中放入__RDD
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 5)
Thread.sleep(2000)
}
ssc.awaitTermination()
}
}
结果展示(oneAtATime = false)
-------------------------------------------
Time: 1603347444000 ms
-------------------------------------------
15
-------------------------------------------
Time: 1603347448000 ms
-------------------------------------------
30
-------------------------------------------
Time: 1603347452000 ms
-------------------------------------------
30
说明:如果一个批次中有多个RDD进入队列,最终计算前都会合并到一个RDD计算
需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。
需求:自定义数据源,实现监控某个端口号,获取该端口号内容。
使用自定义的数据源采集数据
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming03_CustomerReceiver {
def main(args: Array[String]): Unit = {
//1._初始化_Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
//2._初始化_SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(5))
//3._创建自定义_receiver的__Streaming
val lineDStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))
//4._将每一行数据做切分,形成一个个单词_
val wordDStream = lineDStream.flatMap(_.split(" "))
//5._将单词映射成元组(_word,1)
val wordToOneDStream = wordDStream.map((_, 1))
//6._将相同的单词次数做统计_
val wordToSumDStream = wordToOneDStream.reduceByKey(_ + _)
//7._打印_
wordToSumDStream.print()
//8._启动_SparkStreamingContext
ssc.start()
ssc.awaitTermination()
}
}
2)自定义数据源
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
// 最初启动的时候,调用该方法,作用为:读数据并将数据发送给__Spark
override def onStart(): Unit = {
new Thread("Socket Receiver") {
override def run() {
receive()
}
}.start()
}
// 读数据并将数据发送给__Spark
def receive(): Unit = {
// 创建一个__Socket
val socket: Socket = new Socket(host, port)
// 创建一个__BufferedReader用于读取端口传来的数据
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
// 读取数据
var input: String = reader.readLine()
//_当_receiver没有关闭并且输入数据不为空,则循环发送数据给__Spark
while (!isStopped() && input != null) {
store(input)
input = reader.readLine()
}
// 如果循环结束,则关闭资源
reader.close()
socket.close()
//_重启接收任务_
restart("restart")
}
override def onStop(): Unit = {}
}
3)测试
先启动nc服务,再启动SparkStreaming程序
[hadoop@hadoop102 ~]$ nc -lk 9999
hello spark
ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题:接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用。
DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
注意:目前spark3.0.0以上版本只有Direct模式。
http://spark.apache.org/docs/2.4.7/streaming-kafka-integration.html
http://spark.apache.org/docs/3.0.0/streaming-kafka-0-10-integration.html
总结:不同版本的offset存储位置
0-8 ReceiverAPI offset默认存储在:Zookeeper中
0-8 DirectAPI offset默认存储在:CheckPoint
手动维护:MySQL等有事务的存储系统
0-10 DirectAPI offset默认存储在:_consumer_offsets系统主题
手动维护:MySQL等有事务的存储系统
1)需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
2)导入依赖
3)编写代码
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming04_DirectAuto {
def main(args: Array[String]): Unit = {
//1._创建_SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")
//2._创建_StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//3._定义_Kafka参数:kafka集群地址、消费者组名称、key序列化、value序列化
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "group_1",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
//4._读取_Kafka数据创建__DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, //_优先位置_
ConsumerStrategies.Subscribe[String, String](Set("testTopic"), kafkaPara)// 消费策略:(订阅多个主题,配置参数)
)
//5._将每条消息的_KV取出
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
//6._计算_WordCount
valueDStream.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
//7._开启任务_
ssc.start()
ssc.awaitTermination()
}
}
4)测试
(1)分别启动Zookeeper和Kafka集群
zk.sh start
kf.sh start
(2)创建一个Kafka的Topic主题testTopic,两个分区
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --create --replication-factor 1 --partitions 2 --topic testTopic
(3)查看Topic列表
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka -list
(4)查看Topic详情
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --describe --topic testTopic
(5)创建Kafka生产者
bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic testTopic
Hello spark
Hello spark
(6)创建Kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic testTopic
5)查看_consumer_offsets主题中存储的offset
bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --describe --group group_1
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET
Group_1 testTopic 0 13 13
在生产者中生产数据,再次观察offset变化
DStream上的操作与RDD的类似,分为转换和输出两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
无状态转化操作:就是把RDD转化操作应用到DStream每个批次上,每个批次相互独立,自己算自己的。
DStream的部分无状态转化操作列在了下表中,都是DStream自己的API。
注意,针对键值对的DStream转化操作,要添加import StreamingContext._才能在Scala中使用,比如reduceByKey()。
函数名称
目的
Scala示例
函数签名
map()
对DStream中的每个元素应用给定函数,返回由各元素输出的元素组成的DStream。
ds.map(x=>x + 1)
f: (T) -> U
flatMap()
对DStream中的每个元素应用给定函数,返回由各元素输出的迭代器组成的DStream。
ds.flatMap(x => x.split(" "))
f: T -> Iterable[U]
filter()
返回由给定DStream中通过筛选的元素组成的DStream
ds.filter(x => x != 1)
f: T -> Boolean
repartition()
改变DStream的分区数
ds.repartition(10)
N / A
reduceByKey()
将每个批次中键相同的记录规约。
ds.reduceByKey( (x, y) => x + y)
f: T, T -> T
groupByKey()
将每个批次中的记录根据键分组。
ds.groupByKey()
N / A
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD批次组成,且无状态转化操作是分别应用到每个RDD批次上的。
需求:通过Transform可以将DStream每一批次的数据直接转换为RDD的算子操作。
有状态转化操作:计算当前批次RDD时,需要用到历史RDD的数据。
updateStateByKey()用于键值对形式的DStream,可以记录历史批次状态。例如可以实现累加WordCount。
updateStateByKey()参数中需要传递一个函数,在函数内部可以根据需求对新数据和历史状态进行整合处理,返回一个新的DStream。
注意:使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。
checkpoint小文件过多
checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次
0)需求:更新版的WordCount
1)编写代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object sparkStreaming06_updateStateByKey {
// 定义更新状态方法,参数__seq为当前批次单词次数,state为以往批次单词次数
val updateFunc = (seq: Seq[Int], state: Option[Int]) => {
// 当前批次数据累加
val currentCount = seq.sum
// 历史批次数据累加结果
val previousCount = state.getOrElse(0)
// 总的数据累加
Some(currentCount + previousCount)
}
def createSCC(): StreamingContext = {
//1 创建__SparkConf
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
//2 创建__StreamingContext
val ssc = new StreamingContext(conf, Seconds(3))
ssc.checkpoint("./ck")
//3 获取一行数据
val lines = ssc.socketTextStream("hadoop102", 9999)
//4 切割
val words = lines.flatMap(_.split(" "))
//5 统计单词
val wordToOne = words.map(word => (word, 1))
//6 使用__updateStateByKey来更新状态,统计从运行开始以来单词总的次数
val stateDstream = wordToOne.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc
}
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck",()=>createSCC())
//7 开启任务
ssc.start()
ssc.awaitTermination()
}
}
2)启动程序并向9999端口发送数据
nc -lk 9999
hello hadoop
hello hadoop
3)结果展示
-------------------------------------------
Time: 1603441344000 ms
-------------------------------------------
(hello,1)
(hadoop,1)
-------------------------------------------
Time: 1603441347000 ms
-------------------------------------------
(hello,2)
(hadoop,2)
4)原理说明
Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
窗口时长:计算内容的时间范围;
滑动步长:隔多久触发一次计算。
注意:这两者都必须为采集批次大小的整数倍。
如下图所示WordCount案例:窗口大小为批次的2倍,滑动步等于批次大小。
1)基本语法:window(windowLength, slideInterval): 基于对源DStream窗口的批次进行计算返回一个新的DStream。
2)需求:统计WordCount:3秒一个批次,窗口12秒,滑步6秒。
3)代码编写:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming07_window {
def main(args: Array[String]): Unit = {
// 1 初始化__SparkStreamingContext
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
val ssc = new StreamingContext(conf, Seconds(3))
// 2 通过监控端口创建__DStream,读进来的数据为一行行
val lines = ssc.socketTextStream("hadoop102", 9999)
// 3 切割_=》变换_
val wordToOneDStream = lines.flatMap(_.split(" "))
.map((_, 1))
// 4 获取窗口返回数据
val wordToOneByWindow: DStream[(String, Int)] = wordToOneDStream.window(Seconds(12), Seconds(6))
// 5 聚合窗口数据并打印
val wordToCountDStream: DStream[(String, Int)] = wordToOneByWindow.reduceByKey(_+_)
wordToCountDStream.print()
// 6 启动_=》阻塞_
ssc.start()
ssc.awaitTermination()
}
}
5)如果有多批数据进入窗口,最终也会通过window操作变成统一的RDD处理。
1)基本语法:
2)需求:统计WordCount:3秒一个批次,窗口12秒,滑步6秒。
3)代码编写:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming08_reduceByKeyAndWindow {
def main(args: Array[String]): Unit = {
// 1 初始化__SparkStreamingContext
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
val ssc = new StreamingContext(conf, Seconds(3))
// 保存数据到检查点
ssc.checkpoint("./ck")
// 2 通过监控端口创建__DStream,读进来的数据为一行行
val lines = ssc.socketTextStream("hadoop102", 9999)
// 3 切割_=》变换_
val wordToOne = lines.flatMap(_.split(" "))
.map((_, 1))
// 4 窗口参数说明: 算法逻辑,窗口12秒,滑步6秒
val wordCounts = wordToOne.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(12), Seconds(6))
// 5 打印
wordCounts.print()
// 6 启动_=》阻塞_
ssc.start()
ssc.awaitTermination()
}
}
1)基本语法:
2)需求:统计WordCount:3秒一个批次,窗口12秒,滑步6秒。
3)代码编写:
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming09_reduceByKeyAndWindow_reduce {
def main(args: Array[String]): Unit = {
// 1 初始化__SparkStreamingContext
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
val ssc = new StreamingContext(conf, Seconds(3))
// 保存数据到检查点
ssc.checkpoint("./ck")
// 2 通过监控端口创建__DStream,读进来的数据为一行行
val lines = ssc.socketTextStream("hadoop102", 9999)
// 3 切割_=》变换_
val wordToOne = lines.flatMap(_.split(" "))
.map((_, 1))
// 4 窗口参数说明: 算法逻辑,窗口12秒,滑步6秒
/*
val wordToSumDStream: DStream[(String, Int)]= wordToOne.reduceByKeyAndWindow(
(a: Int, b: Int) => (a + b),
(x: Int, y: Int) => (x - y),
Seconds(12),
Seconds(6)
)*/
// 处理单词统计次数为__0的问题
val wordToSumDStream: DStream[(String, Int)]= wordToOne.reduceByKeyAndWindow(
(a: Int, b: Int) => (a + b),
(x: Int, y: Int) => (x - y),
Seconds(12),
Seconds(6),
new HashPartitioner(2),
(x:(String, Int)) => x._2 > 0
)
// 5 打印
wordToSumDStream.print()
// 6 启动_=》阻塞_
ssc.start()
ssc.awaitTermination()
}
}
(1)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
(2)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
DStream通常将数据输出到,外部数据库或屏幕上。
DStream与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个Context就都不会启动。
1)输出操作API如下:
注意:以上操作都是每一批次写出一次,会产生大量小文件,在生产环境,很少使用。
在企业开发中通常采用foreachRDD(),它用来对DStream中的RDD进行任意计算。这和transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到如MySQL的外部数据库中。
3)注意
(1)连接不能写在Driver层面(序列化)
(2)如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;
(3)增加foreachPartition,在分区创建(获取)。
流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。
关闭方式:使用外部文件系统来控制内部程序关闭。
2)测试
(1)发送数据
nc -lk 9999
hello
(2)启动Hadoop集群
sbin/start-dfs.sh
hadoop fs -mkdir /stopSpark
手机扫一扫
移动阅读更方便
你可能感兴趣的文章