通过canal实时监听mysql binlog日志文件的变化,并将数据解析出来
特别说明:在解析数据时,相当于程序时客户端,客户在连接canal服务端是时不需要用户名和密码
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalClient {
public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
// 获取连接
CanalConnector canalConnector=CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.140.131",11111),
"example","","");
while(true)
{
// 连接
canalConnector.connect();
// 订阅数据库
canalConnector.subscribe("CanalDb.\*");
// 获取数据
Message message = canalConnector.get(100);
// 获取Entry集合
List<CanalEntry.Entry> entries=message.getEntries();
// 判断集合是否为空,如果为空,则线程等待一分钟再拉取数据
if (entries.size()<=0)
{
System.out.println("档次抓取没有数据,休息一会儿。。。");
Thread.sleep(2000);
}
else
{
// 遍历entries,单条解析
for (CanalEntry.Entry entry:entries)
{
// 1,获取表名
String tableName=entry.getHeader().getTableName();
// 2,获取类型
CanalEntry.EntryType entryType=entry.getEntryType();
// 3,获取序列化后的数据
ByteString storeValue=entry.getStoreValue();
// 4.判断当前entryType类型是否为ROWDATA
if (CanalEntry.EntryType.ROWDATA.equals(entryType))
{
//5.反序列化数据
CanalEntry.RowChange rowChange=CanalEntry.RowChange.parseFrom(storeValue);
//6.获取当前事件的操作类型
CanalEntry.EventType eventType=rowChange.getEventType();
//7.获取数据集
List<CanalEntry.RowData> rowDataList=rowChange.getRowDatasList();
//8.遍历rowDataList并打印数据集
for(CanalEntry.RowData rowData:rowDataList)
{
JSONObject beforData=new JSONObject();
List<CanalEntry.Column> beforClountList=rowData.getBeforeColumnsList();
for (CanalEntry.Column column:beforClountList)
{
beforData.put(column.getName(),column.getValue());
}
JSONObject afterData=new JSONObject();
List<CanalEntry.Column> afterClountList=rowData.getAfterColumnsList();
for (CanalEntry.Column column:afterClountList)
{
afterData.put(column.getName(),column.getValue());
}
// 打印数据
System.out.println(""+tableName+
",EventType:"+eventType+
",Before:"+beforData+
",After:"+afterData);
}
}
else
{
System.out.println("当前操作类型为"+entryType);
}
}
}
}
}
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章