Preliminary Exploration of Data Heterogeneity in Canal (Tips)
AD |
Canal canal 1 Canal MySQL binlog MySQL Elasticsearch Canal JOIN (buy_id)(shop_id)joinCanal canal binlogIDID2 Canal canal Canal MySQL master(binary log ) binary log eventsshow binlog eventsslavemasterbinary log events(relay log)slave MySQL Canal canal mysql slave mysql slave mysql master dump mysql master dump binary log slave (canal)canal binary log (byte) Canale servercanaljvminstance 1server1.
Canal canal
1
Canal MySQL binlog MySQL Elasticsearch
Canal
JOIN
(buy_id)(shop_id)join
Canal canal binlogIDID
2
Canal canal Canal
MySQL
- master(binary log ) binary log eventsshow binlog events
- slavemasterbinary log events(relay log)
- slave
MySQL Canal
- canal mysql slave mysql slave mysql master dump
- mysql master dump binary log slave (canal)
- canal binary log (byte)
Canale
- servercanaljvm
- instance 1server1..ninstance)
instance
- eventParser (slavemaster)
- eventSink (ParserStore)
- eventStore ()
- metaManager (&)
3 IntelliJ IDEA Canal Demo
Linux canal Canal Demo Debug
canal Canal
canal Demo example
canal client (server)
IDEA Canal Demo
1Canal Server Demo
package com.alibaba.otter.canal.server;import com.alibaba.otter.canal.instance.core.CanalInstance;import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;import com.alibaba.otter.canal.instance.manager.model.Canal;import com.alibaba.otter.canal.instance.manager.model.CanalParameter;import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.util.Arrays;public class CanalServerTestMain { protected static final String ZK_CLUSTER_ADDRESS = "127.0.0.1:2181"; protected static final String DESTINATION = "example"; protected static final String DETECTING_SQL = "select 1"; protected static final String MYSQL_ADDRESS = "127.0.0.1"; protected static final String USERNAME = "canal"; protected static final String PASSWORD = "canal"; protected static final String FILTER = ".*\\..*"; /** 500s */ protected static final long RUN_TIME = 120 * 1000; private final ByteBuffer header = ByteBuffer.allocate(4); private CanalServerWithNetty nettyServer; public static void main(String[] args) { CanalServerTestMain test = new CanalServerTestMain(); try { test.setUp(); System.out.println("start"); } catch (Throwable e) { e.printStackTrace(); } finally { System.out.println("sleep"); try { Thread.sleep(RUN_TIME); } catch (Throwable ee) { } test.tearDown(); System.out.println("end"); } } public void setUp() { CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded(); embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() { public CanalInstance generate(String destination) { Canal canal = buildCanal(); return new CanalInstanceWithManager(canal, FILTER); } }); nettyServer = CanalServerWithNetty.instance(); nettyServer.setEmbeddedServer(embeddedServer); nettyServer.setPort(11111); nettyServer.start(); // instance embeddedServer.start("example"); } public void tearDown() { nettyServer.stop(); } private Canal buildCanal() { Canal canal = new Canal(); canal.setId(1L); canal.setName(DESTINATION); canal.setDesc("test"); CanalParameter parameter = new CanalParameter(); //parameter.setZkClusters(Arrays.asList(ZK_CLUSTER_ADDRESS)); parameter.setMetaMode(CanalParameter.MetaMode.MEMORY); parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT); parameter.setIndexMode(CanalParameter.IndexMode.MEMORY); parameter.setStorageMode(CanalParameter.StorageMode.MEMORY); parameter.setMemoryStorageBufferSize(32 * 1024); parameter.setSourcingType(CanalParameter.SourcingType.MYSQL); parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306), new InetSocketAddress(MYSQL_ADDRESS, 3306))); parameter.setDbUsername(USERNAME); parameter.setDbPassword(PASSWORD); parameter.setSlaveId(1234L); parameter.setDefaultConnectionTimeoutInSeconds(30); parameter.setConnectionCharset("UTF-8"); parameter.setConnectionCharsetNumber((byte) 33); parameter.setReceiveBufferSize(8 * 1024); parameter.setSendBufferSize(8 * 1024); parameter.setDetectingEnable(false); parameter.setDetectingIntervalInSeconds(10); parameter.setDetectingRetryTimes(3); parameter.setDetectingSQL(DETECTING_SQL); canal.setCanalParameter(parameter); return canal; }}
2Canal Client Demo
package com.alibaba.otter.canal.example;import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;public class SimpleCanalClientExample { public static void main(String[] args) { // CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*..*"); connector.rollback(); int totalEmptyCount = 3000; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // // connector.rollback(batchId); // , } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } }}
client
binlog
IDEA Demo canal canal
Disclaimer: The content of this article is sourced from the internet. The copyright of the text, images, and other materials belongs to the original author. The platform reprints the materials for the purpose of conveying more information. The content of the article is for reference and learning only, and should not be used for commercial purposes. If it infringes on your legitimate rights and interests, please contact us promptly and we will handle it as soon as possible! We respect copyright and are committed to protecting it. Thank you for sharing.(Email:[email protected])
Mobile advertising space rental |
Tag: Preliminary Exploration of Data Heterogeneity in Canal Tips
Founder of ofo Xiaohuangche, Dai Wei, went to the United States to start a business: registered users drink coffee for free, with an estimated value of $200 million
Next21 rules for dividing databases and tables, hold!
Guess you like
-
China Leads in Developing IEC 63206 International Standard, Driving Global Innovation in Industrial Process Control System RecordersDetail
2025-01-18 11:06:14 1
-
The 2024 Micro-Short Series Industry Ecological Insight Report: 647,000 Job Opportunities, Rise of Diversified Business Models, and High-Quality Content as the Future TrendDetail
2025-01-17 17:33:01 1
-
Global PC Market Shows Moderate Recovery in 2024: High AIPC Prices a Bottleneck, Huge Growth Potential in 2025Detail
2025-01-17 11:02:09 1
-
Bosch's Smart Cockpit Platform Surpasses 2 Million Units Shipped, Showcasing Strength in Intelligent Driving TechnologyDetail
2025-01-17 10:55:29 1
-
YY Guangzhou Awarded "2024 Network Information Security Support Unit" for Outstanding ContributionsDetail
2025-01-17 10:43:28 1
-
TikTok CEO Invited to Trump's Inauguration, Biden Administration May Delay BanDetail
2025-01-16 20:06:11 1
-
Douyin Denies Opening International Registration: Overseas IPs Don't Equate to Overseas Registration; Platform Actively Combats Account ImpersonationDetail
2025-01-16 14:26:12 1
-
Lei Jun, Xiaomi's founder, chairman, and CEO, has set a new goal: learning to drive a forklift!Detail
2025-01-15 10:22:30 11
-
ByteDance Scholarship 2024: Fifteen Outstanding Doctoral Students Awarded RMB 100,000 Each to Advance Frontier Technology ExplorationDetail
2025-01-14 15:56:39 1
-
Fliggy Launches "Peace of Mind for the New Year" Service Initiative to Ensure Smooth Travel During the Year of the Snake Spring Festival RushDetail
2025-01-14 15:24:53 1
-
Arm's Massive Fee Hike and Potential In-House Chip Development: A Precursor to a Seismic Shift in the Chip Industry?Detail
2025-01-14 11:02:36 1
-
Adobe Firefly Launches: Generative AI Suite Revolutionizes Image and Video Processing EfficiencyDetail
2025-01-14 10:46:39 1
-
Chinese New Year Elements Sell Like Hotcakes Overseas: Cross-border E-commerce "Spring Festival Economy" Booms, Cainiao Overseas Warehouses Help Merchants Capture Market ShareDetail
2025-01-13 14:17:50 1
-
China Railway's 12306 System Successfully Navigates Spring Festival Travel RushDetail
2025-01-13 12:56:54 1
-
Handan, Hebei Province Successfully Tests First Low-Altitude Drone Delivery Route, Ushering in a New Era of Smart LogisticsDetail
2025-01-13 12:50:13 1
-
Kuaishou Leads in Developing Anti-Fraud Industry Standards, Contributing to a Secure and Reliable Short-Video CommunityDetail
2025-01-13 09:47:32 11
-
Microsoft Offers Top Salaries to Retain AI Talent: AI Software Engineers Earn Over $400,000 AnnuallyDetail
2025-01-12 17:28:34 11
- Detail
-
Chang'e-5 Mission Unveils Secrets: New Discoveries Regarding Lunar Magnetic Field Strength and Deep Dynamics 2 Billion Years AgoDetail
2025-01-10 11:42:44 11
-
SenseTime's "Day Day New" Multimodal Large Model: Native Fusion Enables Diverse ApplicationsDetail
2025-01-10 11:40:40 21