Canal实时解析mysql binlog数据实战
阅读原文时间:2023年07月10日阅读:1

一、说明

通过canal实时监听mysql binlog日志文件的变化,并将数据解析出来

二、环境准备

1、创建maven项目并修改pom.xml配置文件

com.alibaba.otter canal.client 1.1.4

2、嗦代码

特别说明:在解析数据时,相当于程序时客户端,客户在连接canal服务端是时不需要用户名和密码

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {
public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {

// 获取连接  
CanalConnector canalConnector=CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.140.131",11111),  
        "example","","");

while(true)  
{  
   // 连接  
    canalConnector.connect();  
    // 订阅数据库  
    canalConnector.subscribe("CanalDb.\*");  
    // 获取数据  
    Message message = canalConnector.get(100);  
    // 获取Entry集合  
    List<CanalEntry.Entry> entries=message.getEntries();  
    // 判断集合是否为空,如果为空,则线程等待一分钟再拉取数据  
    if (entries.size()<=0)  
    {  
        System.out.println("档次抓取没有数据,休息一会儿。。。");  
        Thread.sleep(2000);  
    }  
    else  
    {  
        // 遍历entries,单条解析  
        for (CanalEntry.Entry entry:entries)  
        {  
           // 1,获取表名  
            String tableName=entry.getHeader().getTableName();  
            // 2,获取类型  
            CanalEntry.EntryType entryType=entry.getEntryType();  
            // 3,获取序列化后的数据  
            ByteString storeValue=entry.getStoreValue();  
            // 4.判断当前entryType类型是否为ROWDATA  
            if (CanalEntry.EntryType.ROWDATA.equals(entryType))  
            {  
                //5.反序列化数据  
                CanalEntry.RowChange rowChange=CanalEntry.RowChange.parseFrom(storeValue);  
                //6.获取当前事件的操作类型  
                CanalEntry.EventType eventType=rowChange.getEventType();  
                //7.获取数据集  
                List<CanalEntry.RowData> rowDataList=rowChange.getRowDatasList();  
                //8.遍历rowDataList并打印数据集  
                for(CanalEntry.RowData rowData:rowDataList)  
                {  
                    JSONObject beforData=new JSONObject();  
                    List<CanalEntry.Column> beforClountList=rowData.getBeforeColumnsList();  
                    for (CanalEntry.Column column:beforClountList)  
                    {  
                        beforData.put(column.getName(),column.getValue());  
                    }  
                    JSONObject afterData=new JSONObject();  
                    List<CanalEntry.Column> afterClountList=rowData.getAfterColumnsList();  
                    for (CanalEntry.Column column:afterClountList)  
                    {  
                        afterData.put(column.getName(),column.getValue());  
                    }  
                    // 打印数据  
                    System.out.println(""+tableName+  
                            ",EventType:"+eventType+  
                            ",Before:"+beforData+  
                            ",After:"+afterData);  
                }

            }  
            else  
            {  
                System.out.println("当前操作类型为"+entryType);  
            }  
        }  
    }  
}  

}
}

三、项目效果

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章