[DB] Flink 读 MySQL
阅读原文时间:2023年07月08日阅读:1

思路

在 Flink 中创建一张表有两种方法:

  • 从一个文件中导入表结构(Structure)(常用于批计算)(静态)
  • 从 DataStream 或者 DataSet 转换成 Table (动态)

package com.kaikeba.mysql.demo

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row

object Flink2Mysql {
def main(args: Array[String]): Unit = {
//设定执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)

//通过创建JDBCInputFormat读取JDBC数据源  
val jdbcDataSet: DataSet\[Row\] =  
  env.createInput(JDBCInputFormat.buildJDBCInputFormat()  
    .setDrivername("com.mysql.cj.jdbc.Driver")  
    .setDBUrl("jdbc:mysql://127.0.0.1:3306/flink-mysql?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useSSL=false")  
    .setUsername("root")  
    .setPassword("Chen1227+")  
    .setQuery("select \* from filter")  
    .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.INT\_TYPE\_INFO, BasicTypeInfo.INT\_TYPE\_INFO))  
    .finish()  
  )

//将DataSet注册为表  
tEnv.registerDataSet("tb", jdbcDataSet)  
//执行查询操作  
val table = tEnv.sqlQuery("select \* from tb")  
//把table转为DataSet  
tEnv.toDataSet\[Row\](table).print()  

}
}  

 

参考

Flink 读写 Mysql

https://blog.csdn.net/Android_xue/article/details/102705711

https://blog.csdn.net/ranyizhang/article/details/103759251

https://www.cnblogs.com/Gxiaobai/p/12645497.html

Flink流处理访问MySQL

https://blog.csdn.net/u012447842/article/details/89175772

Flink实例

https://blog.csdn.net/xianpanjia4616/article/details/98318750

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器