1. Original demandIt is necessary to synchronize the original full data as well as the incremental data of specific tables in specific MySQL libraries in real time, and the corresponding modifications and deletions must also be synchronized. Data synchronization must not be intrusive: business procedures must not be changed, and there must not be too much performance pressure on the business side. Application scenarios: data ETL synchronization and reducing the pressure on business servers. 2. Solution 3. Canal introduction and installationCanal is an open source project under Alibaba, developed in pure Java. Based on database incremental log analysis, it provides incremental data subscription and consumption, and currently mainly supports MySQL (also supports mariaDB). Working principle: MySQL master-slave replication implementation From a high level perspective, replication is divided into three steps:
How canal worksThe principle is relatively simple:
Architectureillustrate:
instance module:
Install1. MySQL and Kafka environment preparation 2. Download canal: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz 3. Unzip: tar -zxvf canal.deployer-1.1.3.tar.gz 4. Configure file parameters in directory conf Configure canal.properties: Enter conf/example and configure instance.properties: 5. Start: bin/startup.sh 6. Log viewing: 4. Verification1. Develop the corresponding kafka consumer package org.kafka; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; /** * * Title: KafkaConsumerTest * Description: * kafka consumer demo * Version:1.0.0 * @author pancm * @date January 26, 2018 */ public class KafkaConsumerTest implements Runnable { private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private static final String GROUPID = "groupA"; public KafkaConsumerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.7.193:9092"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "latest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { int messageNo = 1; System.out.println("---------Start consumption---------"); try { for (; ; ) { msgList = consumer.poll(1000); if (null != msgList && msgList.count() > 0) { for (ConsumerRecord<String, String> record : msgList) { //Print after consuming 100 records, but the printed data may not follow this pattern System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset()); // String v = decodeUnicode(record.value()); // System.out.println(v); // Exit when 1000 messages are consumed if (messageNo % 1000 == 0) { break; } messageNo++; } } else { Thread.sleep(11); } } } catch (InterruptedException e) { e.printStackTrace(); finally consumer.close(); } } public static void main(String args[]) { KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data"); Thread thread1 = new Thread(test1); thread1.start(); } /* * Convert Chinese to unicode*/ public static String gbEncoding(final String gbString) { char[] utfBytes = gbString.toCharArray(); String unicodeBytes = ""; for (int i = 0; i < utfBytes.length; i++) { String hexB = Integer.toHexString(utfBytes[i]); if (hexB.length() <= 2) { hexB = "00" + hexB; } unicodeBytes = unicodeBytes + "\\u" + hexB; } return unicodeBytes; } /* * unicode encoding to Chinese*/ public static String decodeUnicode(final String dataStr) { int start = 0; int end = 0; final StringBuffer buffer = new StringBuffer(); while (start > -1) { end = dataStr.indexOf("\\u", start + 2); String charStr = ""; if (end == -1) { charStr = dataStr.substring(start + 2, dataStr.length()); } else { charStr = dataStr.substring(start + 2, end); } char letter = (char) Integer.parseInt(charStr, 16); // Parse hexadecimal integer string. buffer.append(new Character(letter).toString()); start = end; } return buffer.toString(); } } 2. Add data to table bak1 CREATE TABLE `bak1` ( `vin` varchar(20) NOT NULL, `p1` double DEFAULT NULL, `p2` double DEFAULT NULL, `p3` double DEFAULT NULL, `p4` double DEFAULT NULL, `p5` double DEFAULT NULL, `p6` double DEFAULT NULL, `p7` double DEFAULT NULL, `p8` double DEFAULT NULL, `p9` double DEFAULT NULL, `p0` double DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 show create table bak1; insert into bak1 select '李雷abcv', `p1`, `p2`, `p3`, `p4`, `p5`, `p6`, `p7`, `p8`, `p9`, `p0` from moci limit 10 3. View the output results: This concludes this article about synchronizing the full and incremental data of a specific MySQL table to a message queue - solution. For more information about synchronizing data in a specific MySQL table, please search for previous articles on 123WORDPRESS.COM or continue browsing the following related articles. I hope you will support 123WORDPRESS.COM in the future! You may also be interested in:
|
<<: Bootstrap3.0 study notes table related
>>: A detailed introduction to setting up Jenkins on Tencent Cloud Server
Mybatis fuzzy query implementation method The rev...
noscript definition and usage The noscript elemen...
Currently, most CPUs support floating-point units...
<br />This site’s original content, please i...
border-radius:10px; /* All corners are rounded wi...
Zabbix automatically discovers rules to monitor s...
This article example shares the specific code of ...
1. Write a split script (splitNginxLog.sh) * Beca...
Table of contents 1. Problem Description 2. Probl...
Table of contents What is Routing Basic use of pu...
KILL [CONNECTION | QUERY] processlist_id In MySQL...
Table of contents background example Misconceptio...
Table of contents 1:mysql execution process 1.1: ...
A few days ago, when I was adjusting a module of a...
The effect is as follows: Example 1 Example 2: Ta...