Flink-scala所需依赖
<properties>
<flink.version>1.7.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
流式处理WorkCount代码
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
object WordCount {
//创建WordWithCount样例类,用来存储数据最终统计结果
case class WordWithCount(word: String, count: Int)
def main(args: Array[String]): Unit = {
//获取上下文对象(初始化环境)
val streamExecutionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//获取netcat服务的数据
val dataStream: DataStream[String] = streamExecutionEnvironment.socketTextStream("slave4", 9000)
//必须要引入这个包,包含了计算用到的方法
import org.apache.flink.api.scala._
//对获取到的数据进行处理
val dataStream1: DataStream[WordWithCount] = dataStream.flatMap(_.toLowerCase.split(" ") filter (_.nonEmpty))
.map(WordWithCount(_, 1))
.keyBy("word")
.timeWindow(Time.seconds(2), Time.seconds(2))
.reduce((a, b) => WordWithCount(a.word, a.count + b.count))
//打印结果,设置并行度为1
dataStream1.print.setParallelism(1)
//启动执行
streamExecutionEnvironment.execute("WordCount")
}
}
批式处理WordCount代码
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object WordCount {
def main(args: Array[String]): Unit = {
//获取上下文对象(初始化环境)
val executionEnvironment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//获取文件中的数据
val dataSet: DataSet[String] = executionEnvironment.readTextFile("./src/main/data/wordCount.txt")
//必须要引入这个包,包含了计算用到的方法
import org.apache.flink.api.scala._
//对获取到的数据进行处理
val aggregateDataSet: AggregateDataSet[(String, Int)] = dataSet.flatMap(_.toLowerCase.split(" ") filter (_.nonEmpty))
.map((_, 1))
.groupBy(0)0000000
.sum(1)
//打印结果
aggregateDataSet.print
}
}
流式处理与批式处理的区别
流式处理:Streaming
初始化对象:StreamExecutionEnvironment
返回值类型:DataStream
批式处理:Batch
初始化对象:ExecutionEnvironment
返回值类型:DataSet
手机扫一扫
移动阅读更方便
你可能感兴趣的文章