Flink SQL任务自动生成与提交
阅读原文时间:2023年07月11日阅读:2

目录

起因

事情的起因,是看到一篇公众号文章Apache Flink 在汽车之家的应用与实践,里面提到了“基于 SQL 的开发流程”。在平台提供以上功能的基础上,用户可以快速的实现 SQL 作业的开发:

创建一个 SQL 任务;

1.编写 DDL 声明 Source 和 Sink;

2.编写 DML,完成主要业务逻辑的实现;

3.在线查看结果,若数据符合预期,添加 INSERT INTO 语句,写入到指定 Sink 中即可。

即这种功能:

之前也写过一个spark自动提交任务的小项目,但始终无法做到简单交互自动生成任务,因而最终没有用到生产环境当中。Flink的SQL是Flink生态里的一等公民,可以做到直接一条SQL语句把数据源source加载到一张表,也可以一条SQL语句把数据写入到sink终端。这让交互式自动生成任务能够简单实现。

本文前提是,简单实现。如果不考虑性价比,当然也可以做到一个WEB系统,全交互式,点点点,需要的什么参数,什么功能提交到后台,解析生成任务也是可以做的,但这样性价比不高。

思路

1.首先是一个简单的WEB系统,后台springboot可以快速实现,前端thymeleaf可以快速做一个简单的页面,不考虑CSS,JS。

2.然后提供一个页面,传入flink集群参数,mainclass,parallelism等,以及业务参数,主要是sourceSql,sinkSql,transformationSql。然后后台提供一个Flink任务的模板,将sourceSql,sinkSql,transformationSql这三个参数替换掉。再编译成class并生成jar包。

3.然后将jar包提交到flink集群,再将任务提交到集群。

这一点可以通过Flink提供的restapi实现。

https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/ops/rest_api/

关于第2点,也可以在模板当中以提供参数的形式传入sourceSql,sinkSql,transformationSql,flink 在提交任务的接口里也提供了这样的参数program-args,但只适合简单的参数,像sql语句这种带大量的引号,空格,换行符不建议这样传参。

实现

功能本身实现起来倒没太大问题。代码就不提供了,光占篇幅。有兴趣的同学可以看github仓库,这里

1.配置

需要修改application的以下4个配置

分别是

flink集群的访问地址,

布置WEB服务的根目录地址,

编译flink任务所依赖的jar包目录,

flink任务main class,也就是flink任务模板的路径(不要写成包名的格式)

flinkurl=http://localhost:8081

rootpath=D://ideaproject//test//flink_remotesubmit

dependmentpath=D://ideaproject//test//flink_remotesubmit//lib

mainpath=//src//main//java//com//yp//flink//model//TableApiModel.java

2.界面如下

提交成功后,请确保浏览器没有阻止弹窗,跳转到flink任务界面

3.环境

基于

flink 1.13.2

spring boot 2.5

问题

遇到的问题,来自于汽车之家的那张图。就文章里表述的需求来说,需要每天按小时统计PV,UV。

1.flink 的kafka connector 目前只支持批模式(有界数据),不支持流模式(无界)。

如图中红色标记处。

如果按inStreamingMode,那就是实时统计,没法设定batch频次;如果按inBatchMode批次,上图可见,目前不支持。而且kafka connector里参数也只支持kafka的起始位置,不支持结束位置。

所以这里怎么实现的?目前不得而知。

2.还是如上图所见,在sink的时候是只支持append模式,而在append模式下,不支持group by,因为使用了group by 会改行结果行。

如果强行使用group by 将会抛出异常:

目前sink只支持append模式,如果使用了group by 等会改变结果行,会报错:AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate

所以对kafka数据源进行sink的怎样进行分组聚合?

update:

以上两个问题的解决方案,通过改写flink源码已经解决,代码如下:

https://github.com/nyingping/flink/releases/tag/v14.0-kafkaconnector-batchmode

Flink SQL还是非常强大的。前几天发布的flink 14版本,批执行模式现在支持在同一应用中混合使用 DataStream API 和 SQL/Table API(此前仅支持单独使用 DataStream API 或 SQL/Table API)。也引入了更多的connetor。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章