目录
安装nc:https://eternallybored.org/misc/netcat/ ,下载解压,配置到环境变量path,cmd使用nc -lp port
即可。linux使用nc -lk port
批处理WordCount:获取环境、获取批数据、操作数据
流处理WordCount:获取环境、获取流数据、操作数据、提交执行
设置并行度
ParameterTool.fromArgs(args)
从args获取参数
设置共享组,默认default,同一个共享组的使用同一个slot,后一步没有设置则使用与前一个相同的共享组。
任务需要的slot是所有不同共享组的最大并行度的和
解压缩flink-1.10.1-bin-scala_2.12 .tgz
修改flink-conf.yaml文件(jobmanager地址)
修改slaves为其他节点(master节点默认为localhost:8081),注意所有节点配置要一致
./start-cluster.sh
启动集群,./stop-cluster.sh
停止集群
提交任务:配置优先级代码 > 提交 > flink-conf.yaml
前端提交,默认master的配置localhost:8081端口
命令行提交flink run -c MainClass -p 2 xxx.jar --host localhost --port 7777
flink list
查看运行的job
flink list -a
列出所有job,包括取消的
flink cancel JobId
取消job
缺点:当有任务的并行度大于现有可用的slot时,job提交会一直卡在create,直到有足够的slot
要求flink是有hadoop支持的版本,Hadoop环境在2.2以上,并且集群中有HDFS服务。flink1.7之前自带Hadoop的jar包,之后需要自行下载,放入lib目录下
-- session-Cluster模式
需要先启动flink集群,然后再提交作业,接着会向yarn申请一块空间,资源保持不变。如果资源满了下一个作业就无法提交,只能等到yarn中的某些作业执行完,释放了资源,下个作业才会正常提交。所有作业共享Dispatcher和ResourceManager,共享资源,适合规模小执行时间短的作业。
在yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都向这里提交,这个flink集群会常驻在yarn集群,除非手动停止。
启动Hadoop集群
启动yarn-sessionyarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
-n(--container) taskmanager的数量(在现在的版本上不起作用)
-s(--slots)每个taskmanager的slot的数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,优势可以多一些taskmanager,做冗余
-jm JobManager的内存,MB
-tm 每个taskmanager的内存,MB
-nm yarn的appName,yarn的ui上的名字
-d 后台执行
提交任务与Standalone模式相同,启动yarn-session后默认向yarn-session提交,之后在yarn8088上即可查看
取消yarn-sessionyarn application --kill application_xxxx
-- Per-Job-Cluster模式
一个Job会对应一个集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
每次提交都会创建一个新的flink 集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
fink run -m yarn-cluster ...
flink在最近的版本中支持了k8s部署模式。在k8s上构建flink session cluster,需要将flink集群的组件对应的docker镜像分别在k8s上启动,包括jobmanager、taskmanager、jobmanagerservice三个镜像服务,每个镜像服务都可以从中央镜像仓库获取。
搭建k8s集群
配置各组件的yaml文件
启动Flink Session Cluster
kubectl create -f jobmanager-service.yaml
启动jobmanager-service服务
kubectl create -f jobmanager-deployment.yaml
启动jobmanager-deployment服务
kubectl create -f taskmanager-deployment.yaml
启动taskmanager-deployment服务
访问Flink UI界面
集群启动后,就可以通过JobManagerServicers 中配置的 WebUI 端口,用浏览器输入以下 url 来访问 Flink UI 页面了http://{JobManager Host:Port}:Port}/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy
-- JobManager
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager 所控制执行。 JobManager 会先接收到要执行的应用程序, 这个应用程序会包括:作业图( JobGraph )、逻辑数据流图 logical dataflow graph )和打包了所有的类、库和其它资源的 JAR 包。 JobManager 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫做“执行图 ExecutionGraph ),包含了所有可以并发执行的任务。 JobManager 会向资源管理器( ResourceManager )请求执行任务必要的资源,也就是任务管理器 TaskManager )上的插槽( slot )。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager 上。而在运行过程中, JobManager 会负责所有需要中央协调的操作,比如说检查点( checkpoints )的协调。
-- TaskManager
Flink中的工作进程。通常在 Flink 中会有多个 TaskManager 运行,每一个 TaskManager都包含了一定数量的插槽(slots )。插槽的数量限制了 TaskManager 能够执行的任务数量。启动之后, TaskManager 会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager 就会将一个或者多个插槽提供给 JobManager 调用。 JobManager 就可以向插槽分配任务( tasks )来执行了。在执行过程中,一个 TaskManager 可以跟其它运行同一应 用程序的 TaskManager 交换数据。
-- ResourceManager
主要负责管理任务管理器(TaskManager )的插槽 slot TaskManger 插槽是 Flink 中定义的处理资源单元。 Fli nk 为不同的环境和资源管理工具提供了不同资源管理器,比如YARN 、 Mesos 、 K8s ,以及 standalone 部署。当 JobManager 申请插槽资源时, ResourceManager会将有空闲插槽的 TaskManager 分配给 JobManager 。如果 ResourceManager 没有足够的插槽来满足 JobManager 的请求,它还可以向资源提供平台发起会话,以提供启动 TaskManager进程的容器。另外, ResourceManager 还负责终止空闲的 TaskManager ,释放计算资源。
-- Dispacher
可以跨作业运行,它为应用提交提供了REST 接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个 JobManager 。由于是 REST 接口,所以 Dispatcher 可以作为集群的一个 HTTP 接入点,这样就能够不受防火墙阻挡。 Dispatcher 也会启动一个 Web UI ,用来方便地展示和监控作业执行的信息。 Dispatcher 在架构中可能并不是必需的,这取决于应用提交运行的方式。
-- taskmanager与slot
一个taskManager是一个jvm进程,slot表示这个taskManager能接收多少task,是静态的概念,表示taskManager具有的并发执行能力
-- 程序与数据流(DataFlow)
-- 执行图(ExecutionGraph)
-- 并行度(Parallelism)
-- 任务链(Operator Chain)
批处理与流处理类似,执行环境获取不同
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
批处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
流处理执行环境
也可以自己创建本地环境或使用远程环境:
StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment.createRemoteEnvironment("jobManager-host", 6123, "path/to/your/jar");
集合
文件
kafka
自定义
基本转换:map/flatMap/filter
分组聚合:keyBy/sum/min/max/minBy/maxBy
规约:reduce
多流:split-select/connect-CoMap-CoFlatMap/union,侧输出流
Java和Scala的基本数据类型,Java和Scala元组,Scala样例类,Java简单对象(必须有空参构造),其他(Arrays, Lists, Maps, Enum等等)
函数类(Function Classes):Flink暴露了所有的udf的接口(实现方式为接口或抽象类,富函数的为抽象类)
匿名函数(Lambda Function)
富函数(Rich Functions):富函数是DataStreamAPI提供的一个函数类的接口,所有的Flink函数类都有Rich版本,与常规函数的不同在于,可以获取运行时的上下文,并拥有一些声明周期方法,可以实现更复杂的功能。(比如map前的open、执行后的close,简单的map只能一条数据调用一次,而声明周期方法是只执行一次,还可以获取上下文信息,比如并行度,任务名字,state状态等待)
富函数的使用与基本函数类似,只是是继承相应的接口
flink流数据写入kafka/redis/es/jdbc,以及其他官方的连接器,也可以自定义sink
window是无限数据流处理的核心,window将一个无限的stream拆分成有限大小的bucket,我们可以在这些bucket上做计算操作。
时间窗口(Time Window):滚动时间窗口(Tumbling)、滑动时间窗口(Sliding)、会话窗口
计数窗口(Count Window):滚动时间窗口、滑动时间窗口
滚动窗口(按时间或数据个数滚动)、滑动窗口(按时间或数据个数,根据步长滑动)、会话窗口(只有时间类型的,多久未收到数据后结束当前窗口)
Window使用前必须先keyBy,再进行Window操作(窗口分配器),后面必须再跟上类似聚合的窗口操作(窗口函数)
window()方法接收的输入参数是一个windowAssigner,它负责把每条数据分发到正确的window中,flink提供了通用的windowAssigner:
滚动窗口
滑动窗口
会话窗口
全局窗口
增量聚合函数:来一条数据计算一次,保持一个简单的状态,ReduceFunction,AggregateFunction
全窗口函数:先把窗口所有数据收集起来,计算时遍历所有数据(比如求中位数),ProcessWindowFunction,WindowFunction
Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都有生成时间,Flink通过时间戳分配器访问事件时间戳。如果使用kafka,自动使用kafka的时间
Ingestion Time:是数据进入 Flink 的时间。
Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time
水印有两种类型:
AssignerWithPeriodicWatermarks,周期性生成watermark,默认200ms,可以在env设置时间语义的源码上看到(适合数据量大)
AssignerWithPunctuatedWatermarks,每条数据后都插入一个水印(适合数据量小)
以上两个接口都继承自TimestampAssigner
水印的传递
当前分区的水印只取上游所有分区的最小值,只有当前分区的水印更新后,会向下游所有分区发送当前水印。
算子状态(Operator State)
键控状态(Keyed State)
状态后端(State Backends):状态的存储、访问、维护,由一个可插入的组件决定,这个组件叫状态后端,负责两件事,本地状态的管理,以及将检查点状态写入远程存储
可以访问时间戳、watermark、注册定时事件、输出特定事件。ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑(之前的算子和window无法实现)
端到端的一致性需要source可以使用偏移量查询历史数据、sink可以回滚。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章