Flink编程练习
阅读原文时间:2023年07月11日阅读:3

目录

利用socket作为数据源,对输入的每行数据进行单词计数。计算频率为process time的每10秒一次,结果输出到terminal。

object SocketWindowWordCount {
    def main(args: Array[String]) : Unit = {

        val port: Int = try {
            ParameterTool.fromArgs(args).getInt("port")
        } catch {
            case e: Exception => {
                System.err.println("No port specified. Please run 'xx.jar --port <port>'")
                return
            }
        }

        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        val text = env.socketTextStream("localhost", port, '\n')

        val windowCounts = text
            .flatMap(_.split("\\s"))
            .map(WordWithCount(_,1))
            .keyBy(_.word)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
            .sum("count")

        windowCounts.print()

        env.execute("Socket Window WordCount")
    }

    case class WordWithCount(word: String, count: Long)
}

数据格式

case class SensorReading(id: String, timestamp: Long, temperature: Double)

object SmokeLevel extends Enumeration {
  type SmokeLevel = SmokeLevel.Value
  val High, Low = Value
}

case class Alert(message: String, timestamp: Long)

时间特征为event time,每1s更新一次watermark,watermark由SensorReading内部的timestamp推进,允许5s的延迟(过滤掉迟到数据)。数据源SensorReading并发处理,数据源SmokeLevel并发度为1,但能够被每个并发的SensorReading流访问。假设两个流的数据是源源不断的。当SensorReading的temperature大于100且SmokeLevel为High时触发警报。警报包含当时SensorReading的timestamp。

下面例子,迟到数据,即数据晚于WM依然会被处理

注意:如果某个流在connect前assignTimestampsAndWatermarks,connect后的流是不会更新WM的。

def main(args: Array[String]): Unit = {

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.getConfig.setAutoWatermarkInterval(1000L)

  val sensorInput = env
    .addSource(new SensorSource)
    .assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(5)) {
        override def extractTimestamp(element: SensorReading): Long = {
          element.timestamp
        }
      })
  val smokeLevelInput = env
    .addSource(new SmokeLevelSource)
    .setParallelism(1)

  val res = sensorInput
    .process(new MyKeyedProcessFunction) // 这里的实现省略,其实就是if和collect
    .connect(smokeLevelInput)
    .flatMap(new MyFlatMapFunc)

  res.print()

  env.execute("multiple streamss")

}

class MyFlatMapFunc extends CoFlatMapFunction[SensorReading, SmokeLevel, Alert] {
  private private var curSmokeLevel = SmokeLevel.Low

  override def flatMap1(value: SensorReading, out: Collector[Alert]): Unit = {
    if (curSmokeLevel.equals(SmokeLevel.High) && value.temperature > 100) {
      out.collect(Alert("Alert! ", value.timestamp))
    }
  }

  override def flatMap2(value: SmokeLevel, out: Collector[Alert]): Unit = {
    curSmokeLevel = value
  }
}

对每个key的数据量进行累加计数,如果1分钟没有新数据,就输出key-count对。对每一个数据进行处理时,sideoutput当前所处理的key的state数据(更新后的)

val realTimeInfo: OutputTag[String] =
  new OutputTag[String]("real-time_info")

def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.getConfig.setAutoWatermarkInterval(1000L)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val countStream = env.addSource(new SensorSource)
    .keyBy(_.id)
    .process(new MyProcessFunc)

  countStream.getSideOutput(realTimeInfo)
    .print()

  env.execute()
}

case class CountWithTimeStamp(key: String, count: Int, ts: Long)

class MyProcessFunc extends KeyedProcessFunction[String, SensorReading, (String, Int)] {

  lazy val state = getRuntimeContext
    .getState(new ValueStateDescriptor[CountWithTimeStamp]("muState", classOf[CountWithTimeStamp]))

  override def processElement(value: SensorReading,
                              ctx: KeyedProcessFunction[String, SensorReading, (String, Int)]#Context,
                              out: Collector[(String, Int)]): Unit = {
    val current = state.value() match {
      case null =>
        CountWithTimeStamp(value.id, 1, ctx.timestamp())
      case CountWithTimeStamp(key, count, lastModified) =>
        // 删除上一次的timer
        ctx.timerService().deleteEventTimeTimer(lastModified + 6000)
        CountWithTimeStamp(key, count + 1, ctx.timestamp())
    }

    state.update(current)

    ctx.timerService().registerEventTimeTimer(current.ts + 6000)

    ctx.output(realTimeInfo, current)
  }

  override def onTimer(timestamp: Long,
                       ctx: KeyedProcessFunction[String, SensorReading, (String, Int)]#OnTimerContext,
                       out: Collector[(String, Int)]): Unit = {
    state.value() match {
      case CountWithTimeStamp(key, count, lastModified) =>
        if (timestamp == lastModified) out.collect((key, count)) else None
      case _ =>
    }
  }
}

利用tumbling window计算各个sensor在15s内的最大最小值,返回结果包含窗口的结束时间。另外,window只存储极值,不保留原数据。

checkpoint间隔为10s,watermark刷新间隔1s

def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.getConfig.setAutoWatermarkInterval(1000)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  env.enableCheckpointing(10 * 1000L)

  env.addSource(new SensorSource)
    .assignTimestampsAndWatermarks(new MyPeriodicAssigner)
    .map(r => (r.id, r.temperature, r.temperature))
    .keyBy(_._1)
    .window(TumblingEventTimeWindows.of(Time.seconds(15)))
    .reduce(
      (item1: (String, Double, Double), item2: (String, Double, Double)) => {
        (item1._1, item1._2.min(item2._2), item1._3.max(item2._3))
      },
      new MyWindowEndProcessFunction()
    )
}

case class MaxMinTemperature(key: String, min: Double, max: Double, ts: Long)

class MyWindowEndProcessFunction
  extends ProcessWindowFunction[(String, Double, Double), MaxMinTemperature, String, TimeWindow] {
  override def process(key: String, context: Context, elements: Iterable[(String, Double, Double)],
                       out: Collector[MaxMinTemperature]): Unit = {
    out.collect(MaxMinTemperature(key, elements.head._2, elements.head._3, context.window.getEnd))
  }
}