From 627e6f8c2d7a64911517247f15db4e48e3ad89ae Mon Sep 17 00:00:00 2001 From: Anonymitaet <50226895+Anonymitaet@users.noreply.github.com> Date: Tue, 17 Sep 2019 21:58:47 +0800 Subject: [PATCH] [Doc] Update Canal source connector guide (#5173) 1. I've updated this guide based on `CanalSourceConfig.java` and `canal-mysql-source-config.yaml`. 2. Fix https://github.com/apache/pulsar/issues/5015 --- site2/docs/io-cdc-canal.md | 360 ++++++++++++++++++++----------------- 1 file changed, 195 insertions(+), 165 deletions(-) diff --git a/site2/docs/io-cdc-canal.md b/site2/docs/io-cdc-canal.md index e5d53aa4da16e..9d371ae0c7390 100644 --- a/site2/docs/io-cdc-canal.md +++ b/site2/docs/io-cdc-canal.md @@ -1,174 +1,204 @@ --- id: io-cdc-canal -title: CDC Canal Connector -sidebar_label: CDC Canal Connector +title: Canal source connector +sidebar_label: Canal source connector --- -### Source Configuration Options +The Canal source connector pulls messages from MySQL to Pulsar topics. -The Configuration is mostly related to Canal task config. +This guide explains how to congifure and use Canal source connector. + +## Configuration + +The configuration of Canal source connector has the following parameters. + +### Parameter | Name | Required | Default | Description | |------|----------|---------|-------------| -| `zkServers` | `false` | `127.0.0.1:2181` | `The address and port of the zookeeper . if canal server configured to cluster mode` | -| `batchSize` | `true` | `5120` | `Take 5120 records from the canal server in batches` | -| `username` | `false` | `` | `Canal server account, not MySQL` | -| `password` | `false` | `` | `Canal server password, not MySQL` | -| `cluster` | `false` | `false` | `Decide whether to open cluster mode based on canal server configuration, true: cluster mode, false: standalone mode` | -| `singleHostname` | `false` | `127.0.0.1` | `The address of canal server` | -| `singlePort` | `false` | `11111` | `The port of canal server` | - - -### Configuration Example - -Here is a configuration Json example: - -```$json -{ - "zkServers": "127.0.0.1:2181", - "batchSize": "5120", - "destination": "example", - "username": "", - "password": "", - "cluster": false, - "singleHostname": "127.0.0.1", - "singlePort": "11111", -} -``` -You could also find the yaml example in this [file](https://github.com/apache/pulsar/blob/master/pulsar-io/canal/src/main/resources/canal-mysql-source-config.yaml), which has similar content below: - -```$yaml -configs: - zkServers: "127.0.0.1:2181" - batchSize: "5120" - destination: "example" - username: "" - password: "" - cluster: false - singleHostname: "127.0.0.1" - singlePort: "11111" -``` - -### Usage example - -Here is a simple example to store MySQL change data using above example config. - -- Start a MySQL server - -```$bash -docker pull mysql:5.7 -docker run -d -it --rm --name pulsar-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=canal -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw mysql:5.7 -``` -- Modify configuration files mysqld.cnf - -``` -[mysqld] -pid-file = /var/run/mysqld/mysqld.pid -socket = /var/run/mysqld/mysqld.sock -datadir = /var/lib/mysql -#log-error = /var/log/mysql/error.log -# By default we only accept connections from localhost -#bind-address = 127.0.0.1 -# Disabling symbolic-links is recommended to prevent assorted security risks -symbolic-links=0 -log-bin=mysql-bin -binlog-format=ROW -server_id=1 -``` - -- Copy file to mysql server from local and restart mysql server -```$bash -docker cp mysqld.cnf pulsar-mysql:/etc/mysql/mysql.conf.d/ -docker restart pulsar-mysql -``` - -- Create test database in mysql server -```$bash -docker exec -it pulsar-mysql /bin/bash -mysql -h 127.0.0.1 -uroot -pcanal -e 'create database test;' -``` - -- Start canal server and connect mysql server - -``` -docker pull canal/canal-server:v1.1.2 -docker run -d -it --link pulsar-mysql -e canal.auto.scan=false -e canal.destinations=test -e canal.instance.master.address=pulsar-mysql:3306 -e canal.instance.dbUsername=root -e canal.instance.dbPassword=canal -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false --name=pulsar-canal-server -p 8000:8000 -p 2222:2222 -p 11111:11111 -p 11112:11112 -m 4096m canal/canal-server:v1.1.2 -``` - -- Start pulsar standalone - -```$bash -docker pull apachepulsar/pulsar:2.3.0 -docker run -d -it --link pulsar-canal-server -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone apachepulsar/pulsar:2.3.0 bin/pulsar standalone -``` - -- Start pulsar-io in standalone - -- Config file canal-mysql-source-config.yaml - -```$yaml -configs: - zkServers: "" - batchSize: "5120" - destination: "test" - username: "" - password: "" - cluster: false - singleHostname: "pulsar-canal-server" - singlePort: "11111" -``` -- Consumer file pulsar-client.py for test -``` -import pulsar - -client = pulsar.Client('pulsar://localhost:6650') -consumer = client.subscribe('my-topic', - subscription_name='my-sub') - -while True: - msg = consumer.receive() - print("Received message: '%s'" % msg.data()) - consumer.acknowledge(msg) - -client.close() -``` - -- Copy config file and test file to pulsar server - -```$bash -docker cp canal-mysql-source-config.yaml pulsar-standalone:/pulsar/conf/ -docker cp pulsar-client.py pulsar-standalone:/pulsar/ -``` - -- Download canal connector and start canal connector -```$bash -docker exec -it pulsar-standalone /bin/bash -wget http://apache.01link.hk/pulsar/pulsar-2.3.0/connectors/pulsar-io-canal-2.3.0.nar -P connectors -./bin/pulsar-admin sources localrun --archive ./connectors/pulsar-io-canal-2.3.0.nar --classname org.apache.pulsar.io.canal.CanalStringSource --tenant public --namespace default --name canal --destination-topic-name my-topic --source-config-file /pulsar/conf/canal-mysql-source-config.yaml --parallelism 1 -``` - -- Consumption data - -```$bash -docker exec -it pulsar-standalone /bin/bash -python pulsar-client.py -``` - -- Open another window for login mysql server - -```$bash -docker exec -it pulsar-mysql /bin/bash -mysql -h 127.0.0.1 -uroot -pcanal -``` -- Create table and insert, delete, update data in mysql server -``` -mysql> use test; -mysql> show tables; -mysql> CREATE TABLE IF NOT EXISTS `test_table`(`test_id` INT UNSIGNED AUTO_INCREMENT,`test_title` VARCHAR(100) NOT NULL, -`test_author` VARCHAR(40) NOT NULL, -`test_date` DATE,PRIMARY KEY ( `test_id` ))ENGINE=InnoDB DEFAULT CHARSET=utf8; -mysql> INSERT INTO test_table (test_title, test_author, test_date) VALUES("a", "b", NOW()); -mysql> UPDATE test_table SET test_title='c' WHERE test_title='a'; -mysql> DELETE FROM test_table WHERE test_title='c'; -``` +| `username` | true | None | Canal server account (not MySQL).| +| `password` | true | None | Canal server password (not MySQL). | +|`destination`|true|None|Source destination that Canal source connector connects to. +| `singleHostname` | false | None | Canal server address.| +| `singlePort` | false | None | Canal server port.| +| `cluster` | true | false | Whether to enable cluster mode based on Canal server configuration or not.

  • true: **cluster** mode.
    If set to true, it talks to `zkServers` to figure out the actual database host.

  • false: **standalone** mode.
    If set to false, it connects to the database specified by `singleHostname` and `singlePort`. | +| `zkServers` | true | None | Address and port of the Zookeeper that Canal source connector talks to figure out the actual database host.| +| `batchSize` | false | 1000 | Batch size to fetch from Canal. | + +### Example + +Before using the Canal connector, you can create a configuration file through one of the following methods. + +* JSON + + ```json + { + "zkServers": "127.0.0.1:2181", + "batchSize": "5120", + "destination": "example", + "username": "", + "password": "", + "cluster": false, + "singleHostname": "127.0.0.1", + "singlePort": "11111", + } + ``` + +* YAML + + You can create a YAML file and copy the [contents](https://github.com/apache/pulsar/blob/master/pulsar-io/canal/src/main/resources/canal-mysql-source-config.yaml) below to your YAML file. + + ```yaml + configs: + zkServers: "127.0.0.1:2181" + batchSize: "5120" + destination: "example" + username: "" + password: "" + cluster: false + singleHostname: "127.0.0.1" + singlePort: "11111" + ``` + +## Usage + +Here is an example of storing MySQL data using the configuration file as above. + +1. Start a MySQL server. + + ```bash + $ docker pull mysql:5.7 + $ docker run -d -it --rm --name pulsar-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=canal -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw mysql:5.7 + ``` + +2. Create a configuration file `mysqld.cnf`. + + ```bash + [mysqld] + pid-file = /var/run/mysqld/mysqld.pid + socket = /var/run/mysqld/mysqld.sock + datadir = /var/lib/mysql + #log-error = /var/log/mysql/error.log + # By default we only accept connections from localhost + #bind-address = 127.0.0.1 + # Disabling symbolic-links is recommended to prevent assorted security risks + symbolic-links=0 + log-bin=mysql-bin + binlog-format=ROW + server_id=1 + ``` + +3. Copy the configuration file `mysqld.cnf` to MySQL server. + + ```bash + $ docker cp mysqld.cnf pulsar-mysql:/etc/mysql/mysql.conf.d/ + ``` + +4. Restart the MySQL server. + + ```bash + $ docker restart pulsar-mysql + ``` + +5. Create a test database in MySQL server. + + ```bash + $ docker exec -it pulsar-mysql /bin/bash + $ mysql -h 127.0.0.1 -uroot -pcanal -e 'create database test;' + ``` + +6. Start a Canal server and connect to MySQL server. + + ``` + $ docker pull canal/canal-server:v1.1.2 + $ docker run -d -it --link pulsar-mysql -e canal.auto.scan=false -e canal.destinations=test -e canal.instance.master.address=pulsar-mysql:3306 -e canal.instance.dbUsername=root -e canal.instance.dbPassword=canal -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false --name=pulsar-canal-server -p 8000:8000 -p 2222:2222 -p 11111:11111 -p 11112:11112 -m 4096m canal/canal-server:v1.1.2 + ``` + +7. Start Pulsar standalone. + + ```bash + $ docker pull apachepulsar/pulsar:2.3.0 + $ docker run -d -it --link pulsar-canal-server -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone apachepulsar/pulsar:2.3.0 bin/pulsar standalone + ``` + +8. Modify the configuration file `canal-mysql-source-config.yaml`. + + ```yaml + configs: + zkServers: "" + batchSize: "5120" + destination: "test" + username: "" + password: "" + cluster: false + singleHostname: "pulsar-canal-server" + singlePort: "11111" + ``` + +9. Create a consumer file `pulsar-client.py`. + + ```python + import pulsar + + client = pulsar.Client('pulsar://localhost:6650') + consumer = client.subscribe('my-topic', + subscription_name='my-sub') + + while True: + msg = consumer.receive() + print("Received message: '%s'" % msg.data()) + consumer.acknowledge(msg) + + client.close() + ``` + +10. Copy the configuration file `canal-mysql-source-config.yaml` and the consumer file `pulsar-client.py` to Pulsar server. + + ```bash + $ docker cp canal-mysql-source-config.yaml pulsar-standalone:/pulsar/conf/ + $ docker cp pulsar-client.py pulsar-standalone:/pulsar/ + ``` + +11. Download a Canal connector and start it. + + ```bash + $ docker exec -it pulsar-standalone /bin/bash + $ wget http://apache.01link.hk/pulsar/pulsar-2.3.0/connectors/pulsar-io-canal-2.3.0.nar -P connectors + $ ./bin/pulsar-admin sources localrun \ + --archive ./connectors/pulsar-io-canal-2.3.0.nar \ + --classname org.apache.pulsar.io.canal.CanalStringSource \ + --tenant public \ + --namespace default \ + --name canal \ + --destination-topic-name my-topic \ + --source-config-file /pulsar/conf/canal-mysql-source-config.yaml \ + --parallelism 1 + ``` + +12. Consume data from MySQL. + + ```bash + $ docker exec -it pulsar-standalone /bin/bash + $ python pulsar-client.py + ``` + +13. Open another window to log in MySQL server. + + ```bash + $ docker exec -it pulsar-mysql /bin/bash + $ mysql -h 127.0.0.1 -uroot -pcanal + ``` + +14. Create a table, and insert, delete, and update data in MySQL server. + + ```bash + mysql> use test; + mysql> show tables; + mysql> CREATE TABLE IF NOT EXISTS `test_table`(`test_id` INT UNSIGNED AUTO_INCREMENT,`test_title` VARCHAR(100) NOT NULL, + `test_author` VARCHAR(40) NOT NULL, + `test_date` DATE,PRIMARY KEY ( `test_id` ))ENGINE=InnoDB DEFAULT CHARSET=utf8; + mysql> INSERT INTO test_table (test_title, test_author, test_date) VALUES("a", "b", NOW()); + mysql> UPDATE test_table SET test_title='c' WHERE test_title='a'; + mysql> DELETE FROM test_table WHERE test_title='c'; + ```