思路
在 Flink 中创建一张表有两种方法:
package com.kaikeba.mysql.demo
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row
object Flink2Mysql {
def main(args: Array[String]): Unit = {
//设定执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
//通过创建JDBCInputFormat读取JDBC数据源
val jdbcDataSet: DataSet\[Row\] =
env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.cj.jdbc.Driver")
.setDBUrl("jdbc:mysql://127.0.0.1:3306/flink-mysql?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useSSL=false")
.setUsername("root")
.setPassword("Chen1227+")
.setQuery("select \* from filter")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.INT\_TYPE\_INFO, BasicTypeInfo.INT\_TYPE\_INFO))
.finish()
)
//将DataSet注册为表
tEnv.registerDataSet("tb", jdbcDataSet)
//执行查询操作
val table = tEnv.sqlQuery("select \* from tb")
//把table转为DataSet
tEnv.toDataSet\[Row\](table).print()
}
}
参考
Flink 读写 Mysql
https://blog.csdn.net/Android_xue/article/details/102705711
https://blog.csdn.net/ranyizhang/article/details/103759251
https://www.cnblogs.com/Gxiaobai/p/12645497.html
Flink流处理访问MySQL
https://blog.csdn.net/u012447842/article/details/89175772
Flink实例
https://blog.csdn.net/xianpanjia4616/article/details/98318750
手机扫一扫
移动阅读更方便
你可能感兴趣的文章