Spark详解(07) - SparkStreaming
阅读原文时间:2023年07月14日阅读:1

**Spark详解(07) - SparkStreaming
**

SparkStreaming概述

Spark Streaming用于流式数据的处理。

Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、HDFS等。

数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。

而结果也能保存在很多地方,如HDFS、数据库等。

什么是Dstream

SparkCore => RDD

SparkSQL => DataFrame、DataSet

Spark Streaming使用离散化流(Discretized Stream)作为抽象表示,叫作Dstream

DStream是随时间推移而收到的数据的序列。

在DStream内部,每个时间区间收到的数据都作为RDD存在,而DStream是由这些RDD所组成的序列(因此得名"离散化")。

所以简单来讲,DStream就是对RDD在实时数据处理场景的一种封装。

架构图

整体`架构图

SparkStreaming架构图

背压机制

Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数"spark.streaming.receiver.maxRate"的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。

为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure):根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。

通过属性"spark.streaming.backpressure.enabled"来控制是否启用背压机制,默认值false,即不启用。

**易用
**

**容错
**

**易整合到Spark体系
**

DStream入门

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

1)添加依赖

org.apache.spark

spark-streaming_2.12

3.0.0

org.apache.spark

spark-core_2.12

3.0.0

2)编写代码

  1. import org.apache.spark.SparkConf

  2. import org.apache.spark.streaming.{Seconds, StreamingContext}

  3. object SparkStreaming01_WordCount {

  4.     def main(args: Array[String]): Unit = {

  5.         //1._初始化_Spark配置信息

  6.         val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")

  7.         //2._初始化_SparkStreamingContext

  8.         val ssc = new StreamingContext(sparkConf, Seconds(3))

  9.         //3._通过监控端口创建_DStream,读进来的数据为一行行

  10.         val lineDStream = ssc.socketTextStream("hadoop102", 9999)

  11.         //3.1 将每一行数据做切分,形成一个个单词

  12.         val wordDStream = lineDStream.flatMap(_.split(" "))

  13.         //3.2 将单词映射成元组(__word,1)

  14.         val wordToOneDStream = wordDStream.map((_, 1))

  15.         //3.3 将相同的单词次数做统计

  16.         val wordToSumDStream = wordToOneDStream.reduceByKey(_+_)

  17.         //3.4 打印

  18.         wordToSumDStream.print()

  19.         //4 启动__SparkStreamingContext

  20.         ssc.start()

  21.         // 将主线程阻塞,主线程不退出

  22.         ssc.awaitTermination()

  23.     }

  24. }

3)更改日志打印级别

将log4j.properties文件添加到resources里面,就能更改打印日志的级别为error

log4j.rootLogger=error, stdout,R

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n

log4j.appender.R=org.apache.log4j.RollingFileAppender

log4j.appender.R.File=../log/agent.log

log4j.appender.R.MaxFileSize=1024KB

log4j.appender.R.MaxBackupIndex=1

log4j.appender.R.layout=org.apache.log4j.PatternLayout

log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n

4)启动程序并通过netcat发送数据:

[hadoop@hadoop102 ~]$ nc -lk 9999

hello spark

5)在Idea控制台输出如下内容:

-------------------------------------------

Time: 1602731772000 ms

-------------------------------------------

(hello,1)

(spark,1)

注意:目前用的算子,只能处理本批次数据的累加,不能统计所有批次总的单词个数。

DStream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。

在内部实现上,每一批次的数据封装成一个RDD,一系列连续的RDD组成了DStream。对这些RDD的转换是由Spark引擎来计算。

说明:DStream中批次与批次之间计算相互独立。如果批次设置时间小于计算时间会出现计算任务叠加情况,需要多分配资源。通常情况,批次设置时间要大于计算时间。

DStream创建

  • 用法及说明

测试方法:

使用ssc.queueStream(queueOfRDDs)来创建DStream

将每一个推送到这个队列中的RDD,都会作为一个DStream处理。

  • 案例实操

需求:循环创建几个RDD,将RDD放入队列。通过SparkStreaming创建DStream,计算WordCount

编写代码

  1. import org.apache.spark.SparkConf

  2. import org.apache.spark.rdd.RDD

  3. import org.apache.spark.streaming.{Seconds, StreamingContext}

  4. import scala.collection.mutable

  5. object SparkStreaming02_RDDStream {

  6. def main(args: Array[String]): Unit = {

  7. //1._初始化_Spark配置信息

  8. val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")

  9. //2._初始化_SparkStreamingContext

  10. val ssc = new StreamingContext(conf, Seconds(4))

  11. //3._创建_RDD队列

  12. val rddQueue = new mutable.Queue[RDD[Int]]()

  13. //4._创建_QueueInputDStream

  14. // oneAtATime = true 默认,一次读取队列里面的一个数据

  15. // oneAtATime = false_,_ 按照设定的时间,读取队列里面数据

  16. val inputDStream = ssc.queueStream(rddQueue, oneAtATime = false)

  17. //5._处理队列中的_RDD数据

  18. val sumDStream = inputDStream.reduce(_+_)

  19. //6._打印结果_

  20. sumDStream.print()

  21. //7._启动任务_

  22. ssc.start()

  23. //8._循环创建并向_RDD队列中放入__RDD

  24. for (i <- 1 to 5) {

  25. rddQueue += ssc.sparkContext.makeRDD(1 to 5)

  26. Thread.sleep(2000)

  27. }

  28. ssc.awaitTermination()

  29. }

  30. }

结果展示(oneAtATime = false)

-------------------------------------------

Time: 1603347444000 ms

-------------------------------------------

15

-------------------------------------------

Time: 1603347448000 ms

-------------------------------------------

30

-------------------------------------------

Time: 1603347452000 ms

-------------------------------------------

30

说明:如果一个批次中有多个RDD进入队列,最终计算前都会合并到一个RDD计算

  • 用法及说明

需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。

3.2.2 案例实操

需求:自定义数据源,实现监控某个端口号,获取该端口号内容。

  1. 使用自定义的数据源采集数据

  2. import org.apache.spark.SparkConf

  3. import org.apache.spark.streaming.{Seconds, StreamingContext}

  4. object SparkStreaming03_CustomerReceiver {

  5. def main(args: Array[String]): Unit = {

  6. //1._初始化_Spark配置信息

  7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")

  8. //2._初始化_SparkStreamingContext

  9. val ssc = new StreamingContext(sparkConf, Seconds(5))

  10. //3._创建自定义_receiver的__Streaming

  11. val lineDStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))

  12. //4._将每一行数据做切分,形成一个个单词_

  13. val wordDStream = lineDStream.flatMap(_.split(" "))

  14. //5._将单词映射成元组(_word,1)

  15. val wordToOneDStream = wordDStream.map((_, 1))

  16. //6._将相同的单词次数做统计_

  17. val wordToSumDStream = wordToOneDStream.reduceByKey(_ + _)

  18. //7._打印_

  19. wordToSumDStream.print()

  20. //8._启动_SparkStreamingContext

  21. ssc.start()

  22. ssc.awaitTermination()

  23. }

  24. }

2)自定义数据源

  1. import org.apache.spark.storage.StorageLevel

  2. import org.apache.spark.streaming.receiver.Receiver

  3. import java.io.{BufferedReader, InputStreamReader}

  4. import java.net.Socket

  5. import java.nio.charset.StandardCharsets

  6. class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {

  7. // 最初启动的时候,调用该方法,作用为:读数据并将数据发送给__Spark

  8. override def onStart(): Unit = {

  9. new Thread("Socket Receiver") {

  10. override def run() {

  11. receive()

  12. }

  13. }.start()

  14. }

  15. // 读数据并将数据发送给__Spark

  16. def receive(): Unit = {

  17. // 创建一个__Socket

  18. val socket: Socket = new Socket(host, port)

  19. // 创建一个__BufferedReader用于读取端口传来的数据

  20. val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))

  21. // 读取数据

  22. var input: String = reader.readLine()

  23. //__receiver没有关闭并且输入数据不为空,则循环发送数据给__Spark

  24. while (!isStopped() && input != null) {

  25. store(input)

  26. input = reader.readLine()

  27. }

  28. // 如果循环结束,则关闭资源

  29. reader.close()

  30. socket.close()

  31. //_重启接收任务_

  32. restart("restart")

  33. }

  34. override def onStop(): Unit = {}

  35. }

3)测试

先启动nc服务,再启动SparkStreaming程序

[hadoop@hadoop102 ~]$ nc -lk 9999

hello spark

  • 版本选型

ReceiverAPI需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题:接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用。

DirectAPI是由计算的Executor来主动消费Kafka的数据,速度由自身控制。

注意:目前spark3.0.0以上版本只有Direct模式。

http://spark.apache.org/docs/2.4.7/streaming-kafka-integration.html

http://spark.apache.org/docs/3.0.0/streaming-kafka-0-10-integration.html

总结:不同版本的offset存储位置

0-8 ReceiverAPI offset默认存储在:Zookeeper中

0-8 DirectAPI offset默认存储在:CheckPoint

手动维护:MySQL等有事务的存储系统

0-10 DirectAPI offset默认存储在:_consumer_offsets系统主题

手动维护:MySQL等有事务的存储系统

  • Kafka 0-10 Direct模式

1)需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

2)导入依赖

org.apache.spark

spark-streaming-kafka-0-10_2.12

3.0.0

3)编写代码

  1. import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}

  2. import org.apache.kafka.common.serialization.StringDeserializer

  3. import org.apache.spark.SparkConf

  4. import org.apache.spark.streaming.dstream.{DStream, InputDStream}

  5. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

  6. import org.apache.spark.streaming.{Seconds, StreamingContext}

  7. object SparkStreaming04_DirectAuto {

  8.     def main(args: Array[String]): Unit = {

  9.         //1._创建_SparkConf

  10.         val sparkConf: SparkConf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")

  11.         //2._创建_StreamingContext

  12.         val ssc = new StreamingContext(sparkConf, Seconds(3))

  13.         //3._定义_Kafka参数:kafka集群地址、消费者组名称、key序列化、value序列化

  14.         val kafkaPara: Map[String, Object] = Map[String, Object](

  15.             ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",

  16.             ConsumerConfig.GROUP_ID_CONFIG -> "group_1",

  17.             ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",

  18.             ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]

  19.         )

  20.         //4._读取_Kafka数据创建__DStream

  21.         val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](

  22.             ssc,

  23.             LocationStrategies.PreferConsistent, //_优先位置_

  24.             ConsumerStrategies.Subscribe[String, String](Set("testTopic"), kafkaPara)// 消费策略:(订阅多个主题,配置参数)

  25.         )

  26.         //5._将每条消息的_KV取出

  27.         val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())

  28.         //6._计算_WordCount

  29.         valueDStream.flatMap(_.split(" "))

  30.             .map((_, 1))

  31.             .reduceByKey(_ + _)

  32.             .print()

  33.         //7._开启任务_

  34.         ssc.start()

  35.         ssc.awaitTermination()

  36.     }

  37. }

4)测试

(1)分别启动Zookeeper和Kafka集群

zk.sh start

kf.sh start

(2)创建一个Kafka的Topic主题testTopic,两个分区

bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --create --replication-factor 1 --partitions 2 --topic testTopic

(3)查看Topic列表

bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka -list

(4)查看Topic详情

bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --describe --topic testTopic

(5)创建Kafka生产者

bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic testTopic

Hello spark

Hello spark

(6)创建Kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic testTopic

5)查看_consumer_offsets主题中存储的offset

bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --describe --group group_1

GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET

Group_1 testTopic 0 13 13

在生产者中生产数据,再次观察offset变化

DStream转换

DStream上的操作与RDD的类似,分为转换和输出两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

无状态转化操作:就是把RDD转化操作应用到DStream每个批次上,每个批次相互独立,自己算自己的。

常规无状态转化操作

DStream的部分无状态转化操作列在了下表中,都是DStream自己的API。

注意,针对键值对的DStream转化操作,要添加import StreamingContext._才能在Scala中使用,比如reduceByKey()。

函数名称

目的

Scala示例

函数签名

map()

对DStream中的每个元素应用给定函数,返回由各元素输出的元素组成的DStream。

ds.map(x=>x + 1)

f: (T) -> U

flatMap()

对DStream中的每个元素应用给定函数,返回由各元素输出的迭代器组成的DStream。

ds.flatMap(x => x.split(" "))

f: T -> Iterable[U]

filter()

返回由给定DStream中通过筛选的元素组成的DStream

ds.filter(x => x != 1)

f: T -> Boolean

repartition()

改变DStream的分区数

ds.repartition(10)

N / A

reduceByKey()

将每个批次中键相同的记录规约。

ds.reduceByKey( (x, y) => x + y)

f: T, T -> T

groupByKey()

将每个批次中的记录根据键分组。

ds.groupByKey()

N / A

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD批次组成,且无状态转化操作是分别应用到每个RDD批次上的。

Transform

需求:通过Transform可以将DStream每一批次的数据直接转换为RDD的算子操作。

  1. 代码编写

有状态转化操作:计算当前批次RDD时,需要用到历史RDD的数据。

UpdateStateByKey

updateStateByKey()用于键值对形式的DStream,可以记录历史批次状态。例如可以实现累加WordCount。

updateStateByKey()参数中需要传递一个函数,在函数内部可以根据需求对新数据和历史状态进行整合处理,返回一个新的DStream。

注意:使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。

checkpoint小文件过多

checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次

0)需求:更新版的WordCount

1)编写代码

  1. import org.apache.spark.SparkConf

  2. import org.apache.spark.streaming.dstream.ReceiverInputDStream

  3. import org.apache.spark.streaming.{Seconds, StreamingContext}

  4. object sparkStreaming06_updateStateByKey {

  5.     // 定义更新状态方法,参数__seq为当前批次单词次数,state为以往批次单词次数

  6.     val updateFunc = (seq: Seq[Int], state: Option[Int]) => {

  7.         // 当前批次数据累加

  8.         val currentCount = seq.sum

  9.         // 历史批次数据累加结果

  10.         val previousCount = state.getOrElse(0)

  11.         // 总的数据累加

  12.         Some(currentCount + previousCount)

  13.     }

  14.     def createSCC(): StreamingContext = {

  15.         //1 创建__SparkConf

  16.         val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")

  17.         //2 创建__StreamingContext

  18.         val ssc = new StreamingContext(conf, Seconds(3))

  19.         ssc.checkpoint("./ck")

  20.         //3 获取一行数据

  21.         val lines = ssc.socketTextStream("hadoop102", 9999)

  22.         //4 切割

  23.         val words = lines.flatMap(_.split(" "))

  24.         //5 统计单词

  25.         val wordToOne = words.map(word => (word, 1))

  26.         //6 使用__updateStateByKey来更新状态,统计从运行开始以来单词总的次数

  27.         val stateDstream = wordToOne.updateStateByKey[Int](updateFunc)

  28.         stateDstream.print()

  29.         ssc

  30.     }

  31.     def main(args: Array[String]): Unit = {

  32.         val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck",()=>createSCC())

  33.         //7 开启任务

  34.         ssc.start()

  35.         ssc.awaitTermination()

  36.     }

  37. }

2)启动程序并向9999端口发送数据

nc -lk 9999

hello hadoop

hello hadoop

3)结果展示

-------------------------------------------

Time: 1603441344000 ms

-------------------------------------------

(hello,1)

(hadoop,1)

-------------------------------------------

Time: 1603441347000 ms

-------------------------------------------

(hello,2)

(hadoop,2)

4)原理说明

WindowOperations(窗口函数)

Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

窗口时长:计算内容的时间范围;

滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集批次大小的整数倍。

如下图所示WordCount案例:窗口大小为批次的2倍,滑动步等于批次大小。

Window

1)基本语法:window(windowLength, slideInterval): 基于对源DStream窗口的批次进行计算返回一个新的DStream。

2)需求:统计WordCount:3秒一个批次,窗口12秒,滑步6秒。

3)代码编写:

  1. import org.apache.spark.SparkConf

  2. import org.apache.spark.streaming.dstream.DStream

  3. import org.apache.spark.streaming.{Seconds, StreamingContext}

  4. object SparkStreaming07_window {

  5.     def main(args: Array[String]): Unit = {

  6.         // 1 初始化__SparkStreamingContext

  7.         val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")

  8.         val ssc = new StreamingContext(conf, Seconds(3))

  9.         // 2 通过监控端口创建__DStream,读进来的数据为一行行

  10.         val lines = ssc.socketTextStream("hadoop102", 9999)

  11.         // 3 切割_=》变换_

  12.         val wordToOneDStream = lines.flatMap(_.split(" "))

  13.             .map((_, 1))

  14.         // 4 获取窗口返回数据

  15.         val wordToOneByWindow: DStream[(String, Int)] = wordToOneDStream.window(Seconds(12), Seconds(6))

  16.         // 5 聚合窗口数据并打印

  17.         val wordToCountDStream: DStream[(String, Int)] = wordToOneByWindow.reduceByKey(_+_)

  18.         wordToCountDStream.print()

  19.         // 6 启动_=》阻塞_

  20.         ssc.start()

  21.         ssc.awaitTermination()

  22.     }

  23. }

5)如果有多批数据进入窗口,最终也会通过window操作变成统一的RDD处理。

reduceByKeyAndWindow

1)基本语法:

  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。

2)需求:统计WordCount:3秒一个批次,窗口12秒,滑步6秒。

3)代码编写:

  1. import org.apache.spark.SparkConf

  2. import org.apache.spark.streaming.{Seconds, StreamingContext}

  3. object SparkStreaming08_reduceByKeyAndWindow {

  4.     def main(args: Array[String]): Unit = {

  5.         // 1 初始化__SparkStreamingContext

  6.         val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")

  7.         val ssc = new StreamingContext(conf, Seconds(3))

  8.         // 保存数据到检查点

  9.         ssc.checkpoint("./ck")

  10.         // 2 通过监控端口创建__DStream,读进来的数据为一行行

  11.         val lines = ssc.socketTextStream("hadoop102", 9999)

  12.         // 3 切割_=》变换_

  13.         val wordToOne = lines.flatMap(_.split(" "))

  14.                          .map((_, 1))

  15.         // 4 窗口参数说明: 算法逻辑,窗口12秒,滑步6秒

  16.         val wordCounts = wordToOne.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(12), Seconds(6))

  17.         // 5 打印

  18.         wordCounts.print()

  19.         // 6 启动_=》阻塞_

  20.         ssc.start()

  21.         ssc.awaitTermination()

  22.     }

  23. }

reduceByKeyAndWindow(反向Reduce)

1)基本语法:

  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并"反向reduce"离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的"加""减"计数。通过前边介绍可以想到,这个函数只适用于"可逆的reduce函数",也就是这些reduce函数有相应的"反reduce"函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。

2)需求:统计WordCount:3秒一个批次,窗口12秒,滑步6秒。

3)代码编写:

  1. import org.apache.spark.{HashPartitioner, SparkConf}

  2. import org.apache.spark.streaming.{Seconds, StreamingContext}

  3. object SparkStreaming09_reduceByKeyAndWindow_reduce {

  4.     def main(args: Array[String]): Unit = {

  5.         // 1 初始化__SparkStreamingContext

  6.         val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")

  7.         val ssc = new StreamingContext(conf, Seconds(3))

  8.         // 保存数据到检查点

  9.         ssc.checkpoint("./ck")

  10.         // 2 通过监控端口创建__DStream,读进来的数据为一行行

  11.         val lines = ssc.socketTextStream("hadoop102", 9999)

  12.         // 3 切割_=》变换_

  13.         val wordToOne = lines.flatMap(_.split(" "))

  14.             .map((_, 1))

  15.         // 4 窗口参数说明: 算法逻辑,窗口12秒,滑步6秒

  16.         /*

  17.         val wordToSumDStream: DStream[(String, Int)]= wordToOne.reduceByKeyAndWindow(

  18.             (a: Int, b: Int) => (a + b),

  19.             (x: Int, y: Int) => (x - y),

  20.             Seconds(12),

  21.             Seconds(6)

  22.         )*/

  23.         // 处理单词统计次数为__0的问题

  24.         val wordToSumDStream: DStream[(String, Int)]= wordToOne.reduceByKeyAndWindow(

  25.             (a: Int, b: Int) => (a + b),

  26.             (x: Int, y: Int) => (x - y),

  27.             Seconds(12),

  28.             Seconds(6),

  29.             new HashPartitioner(2),

  30.             (x:(String, Int)) => x._2 > 0

  31.         )

  32.         // 5 打印

  33.         wordToSumDStream.print()

  34.         // 6 启动_=》阻塞_

  35.         ssc.start()

  36.         ssc.awaitTermination()

  37.     }

  38. }

Window的其他操作

(1)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;

(2)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;

DStream输出

DStream通常将数据输出到,外部数据库或屏幕上。

DStream与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个Context就都不会启动。

1)输出操作API如下:

  1. saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。"prefix-Time_IN_MS[.suffix]"。
  2. saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将DStream中的数据保存为 SequenceFiles 。每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。
  3. saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files。每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。

注意:以上操作都是每一批次写出一次,会产生大量小文件,在生产环境,很少使用。

  1. print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。
  2. foreachRDD(func):这是最通用的输出操作,即将函数func用于产生DStream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者写入数据库。

在企业开发中通常采用foreachRDD(),它用来对DStream中的RDD进行任意计算。这和transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到如MySQL的外部数据库中。

  1. foreachRDD代码实操

3)注意

(1)连接不能写在Driver层面(序列化)

(2)如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;

(3)增加foreachPartition,在分区创建(获取)。

优雅关闭

流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。

关闭方式:使用外部文件系统来控制内部程序关闭。

  1. 主程序

2)测试

(1)发送数据

nc -lk 9999

hello

(2)启动Hadoop集群

sbin/start-dfs.sh

hadoop fs -mkdir /stopSpark

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器