Home Page >  News List >> Tech >> Tech

Preliminary Exploration of Data Heterogeneity in Canal (Tips)

Tech 2023-05-16 11:07:49 Source: Network
AD

Canal canal 1 Canal MySQL binlog MySQL Elasticsearch Canal JOIN (buy_id)(shop_id)joinCanal canal binlogIDID2 Canal canal Canal MySQL master(binary log ) binary log eventsshow binlog eventsslavemasterbinary log events(relay log)slave MySQL Canal canal mysql slave mysql slave mysql master dump mysql master dump binary log slave (canal)canal binary log (byte) Canale servercanaljvminstance 1server1.

Canal canal

1

Canal MySQL binlog MySQL Elasticsearch

Canal

JOIN

(buy_id)(shop_id)join

Canal canal binlogIDID

2

Canal canal Canal

MySQL

  • master(binary log ) binary log eventsshow binlog events
  • slavemasterbinary log events(relay log)
  • slave

MySQL Canal

  • canal mysql slave mysql slave mysql master dump
  • mysql master dump binary log slave (canal)
  • canal binary log (byte)

Canale

  • servercanaljvm
  • instance 1server1..ninstance)

instance

  • eventParser (slavemaster)
  • eventSink (ParserStore)
  • eventStore ()
  • metaManager (&)

3 IntelliJ IDEA Canal Demo

Linux canal Canal Demo Debug

canal Canal

canal Demo example

canal client (server)

IDEA Canal Demo

1Canal Server Demo

package com.alibaba.otter.canal.server;import com.alibaba.otter.canal.instance.core.CanalInstance;import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;import com.alibaba.otter.canal.instance.manager.model.Canal;import com.alibaba.otter.canal.instance.manager.model.CanalParameter;import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.util.Arrays;public class CanalServerTestMain {    protected static final String ZK_CLUSTER_ADDRESS      = "127.0.0.1:2181";    protected static final String DESTINATION   = "example";    protected static final String DETECTING_SQL = "select 1";    protected static final String MYSQL_ADDRESS = "127.0.0.1";    protected static final String USERNAME      = "canal";    protected static final String PASSWORD      = "canal";    protected static final String FILTER        = ".*\\..*";    /**  500s  */    protected static final long RUN_TIME = 120 * 1000;    private final ByteBuffer header        = ByteBuffer.allocate(4);    private CanalServerWithNetty nettyServer;    public static void main(String[] args) {        CanalServerTestMain test = new CanalServerTestMain();        try {            test.setUp();            System.out.println("start");        } catch (Throwable e) {            e.printStackTrace();        } finally {            System.out.println("sleep");            try {                Thread.sleep(RUN_TIME);            } catch (Throwable ee) {            }            test.tearDown();            System.out.println("end");        }    }    public void setUp() {        CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();        embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {            public CanalInstance generate(String destination) {                Canal canal = buildCanal();                return new CanalInstanceWithManager(canal, FILTER);            }        });        nettyServer = CanalServerWithNetty.instance();        nettyServer.setEmbeddedServer(embeddedServer);        nettyServer.setPort(11111);        nettyServer.start();        //  instance        embeddedServer.start("example");    }    public void tearDown() {        nettyServer.stop();    }    private Canal buildCanal() {        Canal canal = new Canal();        canal.setId(1L);        canal.setName(DESTINATION);        canal.setDesc("test");        CanalParameter parameter = new CanalParameter();        //parameter.setZkClusters(Arrays.asList(ZK_CLUSTER_ADDRESS));        parameter.setMetaMode(CanalParameter.MetaMode.MEMORY);        parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT);        parameter.setIndexMode(CanalParameter.IndexMode.MEMORY);        parameter.setStorageMode(CanalParameter.StorageMode.MEMORY);        parameter.setMemoryStorageBufferSize(32 * 1024);        parameter.setSourcingType(CanalParameter.SourcingType.MYSQL);        parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306),                new InetSocketAddress(MYSQL_ADDRESS, 3306)));        parameter.setDbUsername(USERNAME);        parameter.setDbPassword(PASSWORD);        parameter.setSlaveId(1234L);        parameter.setDefaultConnectionTimeoutInSeconds(30);        parameter.setConnectionCharset("UTF-8");        parameter.setConnectionCharsetNumber((byte) 33);        parameter.setReceiveBufferSize(8 * 1024);        parameter.setSendBufferSize(8 * 1024);        parameter.setDetectingEnable(false);        parameter.setDetectingIntervalInSeconds(10);        parameter.setDetectingRetryTimes(3);        parameter.setDetectingSQL(DETECTING_SQL);        canal.setCanalParameter(parameter);        return canal;    }}

2Canal Client Demo

package com.alibaba.otter.canal.example;import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;public class SimpleCanalClientExample {    public static void main(String[] args) {        //         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),                11111), "example", "", "");        int batchSize = 1000;        int emptyCount = 0;        try {            connector.connect();            connector.subscribe(".*..*");            connector.rollback();            int totalEmptyCount = 3000;            while (emptyCount < totalEmptyCount) {                Message message = connector.getWithoutAck(batchSize); //                 long batchId = message.getId();                int size = message.getEntries().size();                if (batchId == -1 || size == 0) {                    emptyCount++;                    System.out.println("empty count : " + emptyCount);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e) {                    }                } else {                    emptyCount = 0;                    // System.out.printf("message[batchId=%s,size=%s] n", batchId, size);                    printEntry(message.getEntries());                }                connector.ack(batchId); //                 // connector.rollback(batchId); // ,             }            System.out.println("empty too many times, exit");        } finally {            connector.disconnect();        }    }    private static void printEntry(List<CanalEntry.Entry> entrys) {        for (CanalEntry.Entry entry : entrys) {            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {                continue;            }            CanalEntry.RowChange rowChage = null;            try {                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());            } catch (Exception e) {                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),                        e);            }            CanalEntry.EventType eventType = rowChage.getEventType();            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),                    eventType));            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {                if (eventType == CanalEntry.EventType.DELETE) {                    printColumn(rowData.getBeforeColumnsList());                } else if (eventType == EventType.INSERT) {                    printColumn(rowData.getAfterColumnsList());                } else {                    System.out.println("-------> before");                    printColumn(rowData.getBeforeColumnsList());                    System.out.println("-------> after");                    printColumn(rowData.getAfterColumnsList());                }            }        }    }    private static void printColumn(List<Column> columns) {        for (Column column : columns) {            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());        }    }}

client

binlog

IDEA Demo canal canal



Disclaimer: The content of this article is sourced from the internet. The copyright of the text, images, and other materials belongs to the original author. The platform reprints the materials for the purpose of conveying more information. The content of the article is for reference and learning only, and should not be used for commercial purposes. If it infringes on your legitimate rights and interests, please contact us promptly and we will handle it as soon as possible! We respect copyright and are committed to protecting it. Thank you for sharing.(Email:[email protected])

Mobile advertising space rental

Tag: Preliminary Exploration of Data Heterogeneity in Canal Tips

Unite directoryCopyright @ 2011-2025 All Rights Reserved. Copyright Webmaster Search Directory System