Fllin(七)【Flink CDC实践】
阅读原文时间:2023年07月11日阅读:3

目录

1.简介

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC种类

CDC主要分为基于查询和基于Binlog两种方式,我们主要了解一下这两种之间的区别:

基于查询的CDC

基于Binlog的CDC

开源产品

Sqoop、Kafka JDBC Source

Canal、Maxwell、Debezium

执行模式

Batch

Streaming

是否可以捕获所有数据变化

延迟性

高延迟

低延迟

是否增加数据库压力

支持的数据库

Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL

等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors

2.依赖

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>1.1.1</version>
    </dependency>

<dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
</dependencies>
package com.flink.day07_flink_cdc;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;

/**
 * @description: TODO Flink CDC stream api
 * @author: HaoWu
 * @create: 2021年05月21日
 */
public class Flink01_Flink_CDC_Streaming {
    public static void main(String[] args) throws Exception {
        //1.TODO 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.TODO Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔5秒钟做一次CK
        env.enableCheckpointing(5000L);
        //2.2 指定CK的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //2.3 设置任务关闭的时候保留最后一次CK数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.4 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
        //2.5 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/flinkCDC"));
        //2.6 设置访问HDFS的用户名
        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //3.创建Flink-MySQL-CDC的Source
        Properties properties = new Properties();
        //initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
        //latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
        //timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
        //specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
        properties.setProperty("scan.startup.mode", "initial");
        DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("root")
                .databaseList("gmall-flink-200821")
                .tableList("gmall-flink-200821.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
                .debeziumProperties(properties)
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();

        //4.使用CDC Source从MySQL读取数据
        DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);

        //5.打印数据
        mysqlDS.print();

       //6.执行任务
        env.execute();
    }
}
package com.flink.day07_flink_cdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @description: TODO FLink CDC SQL
 * @author: HaoWu
 * @create: 2021年05月24日
 */
public class Flink02_Flink_CDC_Sql {
    public static void main(String[] args) throws Exception {
        //1.创建flink sql执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.创建FLink-Mysql-CDC 的Source
        String sourceSql= "create table user_info(\n" +
                "    id int,\n" +
                "    name string,\n" +
                "    age int\n" +
                ")\n" +
                "with(\n" +
                "  'connector'='mysql-cdc',\n" +
                "  'hostname'='hadoop102',\n" +
                "  'port'='3306',\n" +
                "  'username'='root',\n" +
                "  'password'='root',\n" +
                "  'password'='root',\n" +
                "  'database-name'='gmall-flink-200821',\n" +
                "  'table-name'='z_user_info'\n" +
                ")";
        tableEnv.executeSql(sourceSql);
        tableEnv.executeSql("select * from user_info").print();

        //3.执行程序
        env.execute();
    }
}

5.自定义反序列化器

参考:https://blog.csdn.net/qq_31866793/article/details/109207663

package com.flink.day07_flink_cdc;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
import java.util.Properties;

/**
 * @description: TODO Flink-CDC 自定义反序列化
 * @author: HaoWu
 * @create: 2021年05月24日
 */
public class Flink03_Flink_CDC_CustomerSchema {
    public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.创建 Flink-Mysql-CDC 的Source
        Properties properties = new Properties();
        //initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
        //latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
        //timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
        //specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset
        properties.setProperty("debezium.snapshot.mode", "initial");
        DebeziumSourceFunction<JSONObject> mysqlSource = MySQLSource
                .<JSONObject>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("root")
                .databaseList("gmall-flink-200821")
                .tableList("gmall-flink-200821.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
                .debeziumProperties(properties)
                .deserializer(new CdcDwdDeserializationSchema()) //自定义类解析cdc数据格式
                .build();

        //3.使用CDC Source从MySQL读取数据
        DataStreamSource<JSONObject> mysqlDS = env.addSource(mysqlSource);

        //4.打印数据
        mysqlDS.print();

        //5.执行任务
        env.execute();

    }
}

/**
 * 自定义类,解析一下cdc的格式:test不支持delete
 */
class MyCustomSchema implements DebeziumDeserializationSchema<String>{//自定义数据解析器
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        //获取主题信息,包含着数据库和表名  mysql_binlog_source.gmall-flink-200821.z_user_info
        String topic = sourceRecord.topic();
        String[] arr = topic.split("\\.");
        String db = arr[1];
        String tableName = arr[2];

        //获取操作类型 READ DELETE UPDATE CREATE  ,注:导debezium.data包
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        //获取值信息并转换为Struct类型,注:导kafka的包
        Struct value = (Struct) sourceRecord.value();
        //获取变化后的数据 ,注:导kafka的包
        Struct after = value.getStruct("after");
        //创建JSON对象用于存储数据信息
        JSONObject data = new JSONObject();
        for (Field field : after.schema().fields()) {
            Object o = after.get(field);
            data.put(field.name(), o);
        }
        //创建JSON对象用于封装最终返回值数据信息
        JSONObject result = new JSONObject();
        result.put("operation", operation.toString().toLowerCase());
        result.put("data", data);
        result.put("database", db);
        result.put("table", tableName);

        //发送数据至下游
        collector.collect(result.toJSONString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);

    }
}
/**
 * 自定义类,解析一下cdc的格式,支持所有操作
 */
class CdcDwdDeserializationSchema implements DebeziumDeserializationSchema<JSONObject> {
    private static final long serialVersionUID = -3168848963265670603L;

    public CdcDwdDeserializationSchema() {
    }

    @Override
    public void deserialize(SourceRecord record, Collector<JSONObject> out) throws Exception {
        Struct dataRecord  =  (Struct)record.value();

        Struct afterStruct = dataRecord.getStruct("after");
        Struct beforeStruct = dataRecord.getStruct("before");
        /*
          todo 1,同时存在 beforeStruct 跟 afterStruct数据的话,就代表是update的数据
               2,只存在 beforeStruct 就是delete数据
               3,只存在 afterStruct数据 就是insert数据
         */

        JSONObject logJson = new JSONObject();

        String canal_type = "";
        List<Field> fieldsList = null;
        if(afterStruct !=null && beforeStruct !=null){
            System.out.println("这是修改数据");
            canal_type = "update";
            fieldsList = afterStruct.schema().fields();
            //todo 字段与值
            for (Field field : fieldsList) {
                String fieldName = field.name();
                Object fieldValue = afterStruct.get(fieldName);
//            System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);
                logJson.put(fieldName,fieldValue);
            }
        }else if (afterStruct !=null){
            System.out.println( "这是新增数据");

            canal_type = "insert";
            fieldsList = afterStruct.schema().fields();
            //todo 字段与值
            for (Field field : fieldsList) {
                String fieldName = field.name();
                Object fieldValue = afterStruct.get(fieldName);
//            System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);
                logJson.put(fieldName,fieldValue);
            }
        }else if (beforeStruct !=null){
            System.out.println( "这是删除数据");
            canal_type = "detele";
            fieldsList = beforeStruct.schema().fields();
            //todo 字段与值
            for (Field field : fieldsList) {
                String fieldName = field.name();
                Object fieldValue = beforeStruct.get(fieldName);
//            System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);
                logJson.put(fieldName,fieldValue);
            }
        }else {
            System.out.println("一脸蒙蔽了");
        }
        //todo 拿到databases table信息
        Struct source = dataRecord.getStruct("source");
        Object db = source.get("db");
        Object table = source.get("table");
        Object ts_ms = source.get("ts_ms");

        logJson.put("canal_database",db);
        logJson.put("canal_database",table);
        logJson.put("canal_ts",ts_ms);
        logJson.put("canal_type",canal_type);

        //todo 拿到topic
        String topic = record.topic();
        System.out.println("topic = " + topic);

        //todo 主键字段
        Struct pk = (Struct)record.key();
        List<Field> pkFieldList = pk.schema().fields();
        int partitionerNum = 0 ;
        for (Field field : pkFieldList) {
            Object pkValue= pk.get(field.name());
            partitionerNum += pkValue.hashCode();

        }
        int hash = Math.abs(partitionerNum) % 3;
        logJson.put("pk_hashcode",hash);
        out.collect(logJson);
    }
    @Override
    public TypeInformation<JSONObject> getProducedType() {
        return BasicTypeInfo.of(JSONObject.class);
    }
}

6.打包测试

1)打包并上传至Linux

2)开启MySQL Binlog并重启MySQL

3)启动Flink集群

bin/start-cluster.sh

4)启动HDFS集群

start-dfs.sh

5)启动程序

bin/flink run -c com.hadoop.FlinkCDC flink-200821-1.0-SNAPSHOT-jar-with-dependencies.jar

6)在MySQL的gmall-flink-200821.z_user_info表中添加、修改或者删除数据

7)给当前的Flink程序创建Savepoint

bin/flink savepoint JobId hdfs://hadoop102:8020/flink/save

8)关闭程序后,从savepoint重启程序

bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c com.hadoop.FlinkCDC flink-200821-1.0-SNAPSHOT-jar-with-dependencies.jar