Skip to content

Commit

Permalink
[Issue 6111][Doc] Add documentation for Debezium Mongodb connector (a…
Browse files Browse the repository at this point in the history
…pache#6112)


Fixes apache#6111 


### Motivation

Add Debezium Mongodb for io-connectors .

### Modifications
  • Loading branch information
huangdx0726 authored Feb 13, 2020
1 parent d3ec620 commit 33232ca
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 3 deletions.
8 changes: 8 additions & 0 deletions site2/docs/io-connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ Pulsar has various source connectors, which are sorted alphabetically as below.

* [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java)

### Debezium MongoDB

* [Configuration](io-debezium-source.md#configuration)

* [Example](io-debezium-source.md#example-of-mongodb)

* [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java)


### File

Expand Down
134 changes: 131 additions & 3 deletions site2/docs/io-debezium-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ title: Debezium source onnector
sidebar_label: Debezium source connector
---

The Debezium source connector pulls messages from MySQL or PostgreSQL to Pulsar topics.
The Debezium source connector pulls messages from MySQL or PostgreSQL
and persists the messages to Pulsar topics.

## Configuration

The configuration of the Debezium source connector has the following properties.
The configuration of Debezium source connector has the following properties.

| Name | Required | Default | Description |
|------|----------|---------|-------------|
Expand All @@ -27,6 +28,16 @@ The configuration of the Debezium source connector has the following properties.
| `database.history.pulsar.service.url` | true | null | Pulsar cluster service URL for history topic. |
| `pulsar.service.url` | true | null | Pulsar cluster service URL. |
| `offset.storage.topic` | true | null | Record the last committed offsets that the connector successfully completes. |
### MongoDB Configuration
| Name | Required | Default | Description |
|------|----------|---------|-------------|
| `mongodb.hosts` | true | null | The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list contains a single hostname and a port pair. If mongodb.members.auto.discover is set to false, the host and port pair are prefixed with the replica set name (e.g., rs0/localhost:27017). |
| `mongodb.name` | true | null | A unique name that identifies the connector and/or MongoDB replica set or shared cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. |
| `mongodb.user` | true | null | Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
| `mongodb.password` | true | null | Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
| `mongodb.task.id` | true | null | The taskId of the MongoDB connector that attempts to use a separate task for each replica set. |



## Example of MySQL

Expand Down Expand Up @@ -270,7 +281,7 @@ This example shows how to change the data of a PostgreSQL table using the Pulsar
$ docker exec -it pulsar-postgresql /bin/bash
```

6. A MySQL client pops out.
6. A PostgreSQL client pops out.

Use the following commands to change the data of the table _products_.

Expand Down Expand Up @@ -304,7 +315,124 @@ This example shows how to change the data of a PostgreSQL table using the Pulsar
----- got message -----
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.products.Key"},"payload":{"id":107}}�{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"db"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"string","optional":true,"field":"schema"},{"type":"string","optional":true,"field":"table"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"after":{"id":107,"name":"1111111111","description":"box of assorted rocks","weight":5.3},"source":{"version":"0.9.2.Final","connector":"postgresql","name":"dbserver1","db":"postgres","ts_usec":1559208957661080,"txId":577,"lsn":23862872,"schema":"inventory","table":"products","snapshot":false,"last_snapshot_record":null},"op":"u","ts_ms":1559208957692}}
```
## Example of MongoDB

You need to create a configuration file before using the Pulsar Debezium connector.

* JSON

```json
{
"mongodb.hosts": "rs0/mongodb:27017",
"mongodb.name": "dbserver1",
"mongodb.user": "debezium",
"mongodb.password": "dbz",
"mongodb.task.id": "1",
"database.whitelist": "inventory",
"pulsar.service.url": "pulsar://127.0.0.1:6650"
}
```

* YAML

You can create a `debezium-mongodb-source-config.yaml` file and copy the [contents](https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml) below to the `debezium-mongodb-source-config.yaml` file.

```yaml
tenant: "public"
namespace: "default"
name: "debezium-mongodb-source"
topicName: "debezium-mongodb-topic"
archive: "connectors/pulsar-io-debezium-mongodb-{{pulsar:version}}.nar"
parallelism: 1

configs:

## config for pg, docker image: debezium/example-mongodb:0.10
mongodb.hosts: "rs0/mongodb:27017",
mongodb.name: "dbserver1",
mongodb.user: "debezium",
mongodb.password: "dbz",
mongodb.task.id: "1",
database.whitelist: "inventory",

## PULSAR_SERVICE_URL_CONFIG
pulsar.service.url: "pulsar://127.0.0.1:6650"
```

### Usage

This example shows how to change the data of a MongoDB table using the Pulsar Debezium connector.


1. Start a MongoDB server with a database from which Debezium can capture changes.

```bash
$ docker pull debezium/example-mongodb:0.10
$ docker run -d -it --rm --name pulsar-mongodb -e MONGODB_USER=mongodb -e MONGODB_PASSWORD=mongodb -p 27017:27017 debezium/example-mongodb:0.10
```
Use the following commands to initialize the data.

``` bash
./usr/local/bin/init-inventory.sh
```
If the local host cannot access the container network, you can update the file ```/etc/hosts``` and add a rule ```127.0.0.1 6 f114527a95f```. f114527a95f is container id, you can try to get by ```docker ps -a```


2. Start a Pulsar service locally in standalone mode.

```bash
$ bin/pulsar standalone
```

3. Start the Pulsar Debezium connector in local run mode using one of the following methods.

* Use the **JSON** configuration file as shown previously.

Make sure the nar file is available at `connectors/pulsar-io-mongodb-{{pulsar:version}}.nar`.

```bash
$ bin/pulsar-admin source localrun \
--archive connectors/pulsar-io-debezium-mongodb-{{pulsar:version}}.nar \
--name debezium-mongodb-source \
--destination-topic-name debezium-mongodb-topic \
--tenant public \
--namespace default \
--source-config '{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": "debezium","mongodb.password": "dbz","mongodb.task.id": "1","database.whitelist": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'
```

* Use the **YAML** configuration file as shown previously.

```bash
$ bin/pulsar-admin source localrun \
--source-config-file debezium-mongodb-source-config.yaml
```

4. Subscribe the topic _sub-products_ for the _inventory.products_ table.

```
$ bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0
```

5. Start a MongoDB client in docker.

```bash
$ docker exec -it pulsar-mongodb /bin/bash
```

6. A MongoDB client pops out.

```bash
mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory
db.products.update({"_id":NumberLong(104)},{$set:{weight:1.25}})
```

In the terminal window of subscribing topic, you can receive the following messages.

```bash
----- got message -----
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.products.Key"},"payload":{"id":"104"}}, value = {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"int64","optional":true,"field":"h"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"after":"{\"_id\": {\"$numberLong\": \"104\"},\"name\": \"hammer\",\"description\": \"12oz carpenter's hammer\",\"weight\": 1.25,\"quantity\": 4}","patch":null,"source":{"version":"0.10.0.Final","connector":"mongodb","name":"dbserver1","ts_ms":1573541905000,"snapshot":"true","db":"inventory","rs":"rs0","collection":"products","ord":1,"h":4983083486544392763},"op":"r","ts_ms":1573541909761}}.
```

## FAQ

### Debezium postgres connector will hang when create snap
Expand Down

0 comments on commit 33232ca

Please sign in to comment.