Flink WorkCount代码
阅读原文时间:2023年07月09日阅读:1

Flink-scala所需依赖

<properties>
    <flink.version>1.7.0</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

流式处理WorkCount代码

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
object WordCount {
  //创建WordWithCount样例类,用来存储数据最终统计结果
  case class WordWithCount(word: String, count: Int)
  def main(args: Array[String]): Unit = {
    //获取上下文对象(初始化环境)
    val streamExecutionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //获取netcat服务的数据
    val dataStream: DataStream[String] = streamExecutionEnvironment.socketTextStream("slave4", 9000)
    //必须要引入这个包,包含了计算用到的方法
    import org.apache.flink.api.scala._
    //对获取到的数据进行处理
    val dataStream1: DataStream[WordWithCount] = dataStream.flatMap(_.toLowerCase.split(" ") filter (_.nonEmpty))
      .map(WordWithCount(_, 1))
      .keyBy("word")
      .timeWindow(Time.seconds(2), Time.seconds(2))
      .reduce((a, b) => WordWithCount(a.word, a.count + b.count))
    //打印结果,设置并行度为1
    dataStream1.print.setParallelism(1)
    //启动执行
    streamExecutionEnvironment.execute("WordCount")
  }
}

批式处理WordCount代码

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object WordCount {
  def main(args: Array[String]): Unit = {
    //获取上下文对象(初始化环境)
    val executionEnvironment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //获取文件中的数据
    val dataSet: DataSet[String] = executionEnvironment.readTextFile("./src/main/data/wordCount.txt")
    //必须要引入这个包,包含了计算用到的方法
    import org.apache.flink.api.scala._
    //对获取到的数据进行处理
    val aggregateDataSet: AggregateDataSet[(String, Int)] = dataSet.flatMap(_.toLowerCase.split(" ") filter (_.nonEmpty))
      .map((_, 1))
      .groupBy(0)0000000
      .sum(1)
    //打印结果
    aggregateDataSet.print
  }
}


流式处理与批式处理的区别
流式处理:Streaming
    初始化对象:StreamExecutionEnvironment
    返回值类型:DataStream
批式处理:Batch
    初始化对象:ExecutionEnvironment
    返回值类型:DataSet