Canal的安装与使用
阅读原文时间:2023年07月11日阅读:1

一、Canal介绍

  Canal的原理就是它自己伪装成slave, 向mysql发送dump协议,MySQL master接收到dump请求之后推送binlog文件给slave, 也就是canal。  

二、Canal安装

  1. 下载Canal

   wget https://github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gz

  2. 解压到/opt/softwares/canal目录, 解压完之后如下图所示:

  3. 配置instance

  4. 修改canal.properties

三、Mysql 安装

  1、mysql 安装

    yum install mysql

    yum install mysql-server

  2、启动mysql

    /etc/init.d/mysqld start 或者sevice mysqld start

  3、设置root用户密码

    mysqladmin -u root password '123456'

  4、登录msyql

    mysql -uroot -p123456

  5、检查并开启binlog复制功能及binlog模式是否为ROW模式

    参考: binlog详解

四、Canal抽取binlog

  Canal只是伪装成slave抽取binlog,Canal拿到binlog之后还需要交给业务方去做响应的处理,那么怎么去交给业务方呢?一般都是Canal获取到binlog之后写到kafka里,业务方订阅kafka topic消费binlog,完成业务逻辑处理。

  但是Canal不能直接写Kafka, 所以还需要有个client连接Canal,Canal获取binlog之后交给Client, Client在往Kafka里写binlog消息,Client代码如下:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClientExample {

public static void main(String\[\] args) {  
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.41.254", 11111), "example", "canal", "canal");  
    try {  
        int batchSize = 1000;

        connector.connect();  
        connector.subscribe("zhengxinv6\\\\..\*");  
        connector.rollback();

        while (true) {  
            // 获取指定数量的数据  
            Message message = connector.getWithoutAck(batchSize);

            long batchId = message.getId();  
            int size = message.getEntries().size();  
            if (batchId == -1 || size == 0) {  
                try {  
                    Thread.sleep(1000);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }

                continue;  
            }

            System.out.println("batchId = \[" + batchId + "\]");  
            printEntry(message.getEntries());

            connector.ack(batchId); //提交确认  
            //connector.rollback(batchId);  
        }

    } finally {  
        connector.disconnect();  
    }  
}

private static void printEntry(List<CanalEntry.Entry> entrys) {  
    for (CanalEntry.Entry entry : entrys) {  
        if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN  
                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {  
            continue;  
        }

        CanalEntry.RowChange rowChange = null;  
        try {  
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());  
        } catch (Exception e) {  
            throw new RuntimeException(  
                    "ERROR ## parser of eromanga-event has an error,data:"  
                            + entry.toString(), e);  
        }

        CanalEntry.EventType eventType = rowChange.getEventType();  
        System.out.println(String.format("================> binlog\[%s:%s\] ,name\[%s,%s\] , eventType : %s",  
                entry.getHeader().getLogfileName(),  
                entry.getHeader().getLogfileOffset()+"",  
                entry.getHeader().getSchemaName(),  
                entry.getHeader().getTableName(),  
                eventType));

        for (CanalEntry.RowData rowData: rowChange.getRowDatasList()) {  
            if (eventType == CanalEntry.EventType.DELETE) {  
                printColumn(rowData.getBeforeColumnsList());  
            } else if (eventType == CanalEntry.EventType.INSERT) {  
                printColumn(rowData.getAfterColumnsList());  
            } else {  
                System.out.println("-------> before");  
                printColumn(rowData.getBeforeColumnsList());  
                System.out.println("-------> after");  
                printColumn(rowData.getAfterColumnsList());  
            }  
        }  
    }  
}

private static void printColumn(List<CanalEntry.Column> columns) {  
    for (CanalEntry.Column column : columns) {  
        System.out.println(column.getName() + " : " + column.getValue()  
                + "    update=" + column.getUpdated());  
    }  
}  

}

五、Canal使用过程出现的问题及解决方法

  参考:canal报错解决方法

参考:https://www.jianshu.com/p/6299048fad66