Skip to content

Commit

Permalink
Optimization of binlog to pulsar use canal (apache#3268)
Browse files Browse the repository at this point in the history
### Motivation

Optimization of mysql binlog to pulsar use canal

### Modifications

Add MessageUtils for parse columns
Support query of presto sql

### Result

#### INSERT
mysql:
```
MariaDB [aaa]> insert into users321(name, extra) values('xxxddxxxmm', 'ddd');
```
python consumer:
```
{u'timestamp': u'2018-12-31 00:31:24', u'message': u'[{"data":null,"database":"","es":1546234283000,"id":58,"isDdl":false,"mysqlType":null,"old":null,"sql":"insert into users321(name, extra) values(\'xxxddxxxmm\', \'ddd\')","sqlType":null,"table":"","ts":1546234284669,"type":"QUERY"},{"data":[{"isKey":"1","isNull":"0","index":"0","mysqlType":"int(11)","columnName":"id","columnValue":"34","updated":"1"},{"isKey":"0","isNull":"0","index":"1","mysqlType":"varchar(50)","columnName":"name","columnValue":"xxxddxxxmm","updated":"1"},{"isKey":"0","isNull":"0","index":"2","mysqlType":"varchar(50)","columnName":"extra","columnValue":"ddd","updated":"1"}],"database":"aaa","es":1546234283000,"id":58,"isDdl":false,"mysqlType":null,"old":null,"sql":"","sqlType":null,"table":"users321","ts":1546234284670,"type":"INSERT"}]', u'id': 58}
```
presto sql:
```
presto> select id, timestamp, message from pulsar."public/default".my_topic_test where id=58;
 id |      timestamp      |
----+---------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 58 | 2018-12-31 00:31:24 | [{"data":null,"database":"","es":1546234283000,"id":58,"isDdl":false,"mysqlType":null,"old":null,"sql":"insert into users321(name, extra) values('xxxddxxxmm', 'ddd')","sqlType"
```
  • Loading branch information
tuteng authored and sijie committed Jan 2, 2019
1 parent adc6fc0 commit bbd736d
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,16 @@
*/
package org.apache.pulsar.io.canal;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.MDC;

import java.net.InetSocketAddress;
Expand All @@ -40,16 +36,12 @@
import java.util.Objects;
import java.util.Optional;


/**
* A Simple class for mysql binlog sync to pulsar.
* A Simple abstract class for mysql binlog sync to pulsar.
*/
@Connector(
name = "canal",
type = IOType.SOURCE,
help = "The CanalSource is used for syncing mysql binlog to Pulsar.",
configClass = CanalSourceConfig.class)
@Slf4j
public class CanalSource extends PushSource<byte[]> {
public abstract class CanalAbstractSource<V> extends PushSource<V> {

protected Thread thread = null;

Expand Down Expand Up @@ -131,10 +123,9 @@ protected void process() {
connector.subscribe();
while (running) {
Message message = connector.getWithoutAck(canalSourceConfig.getBatchSize());
// delete the setRaw in new version of canal-client
message.setRaw(false);
List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
long batchId = message.getId();
List<FlatMessage> flatMessages = MessageUtils.messageConverter(message);
long batchId = getMessageId(message);
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Expand All @@ -143,10 +134,9 @@ protected void process() {
}
} else {
if (flatMessages != null) {
CanalRecord canalRecord = new CanalRecord(connector);
String m = JSON.toJSONString(flatMessages, SerializerFeature.WriteMapNullValue);
CanalRecord<V> canalRecord = new CanalRecord<>(connector);
canalRecord.setId(batchId);
canalRecord.setRecord(m.getBytes());
canalRecord.setRecord(extractValue(flatMessages));
consume(canalRecord);
}
}
Expand All @@ -160,11 +150,15 @@ protected void process() {
}
}

public abstract Long getMessageId(Message message);

public abstract V extractValue(List<FlatMessage> flatMessages);

@Getter
@Setter
static private class CanalRecord implements Record<byte[]> {
static private class CanalRecord<V> implements Record<V> {

private byte[] record;
private V record;
private Long id;
private CanalConnector connector;

Expand All @@ -173,17 +167,19 @@ public CanalRecord(CanalConnector connector) {
}

@Override
public Optional<String> getKey() {
return Optional.of(Long.toString(id));
public Optional<String> getKey() {
return Optional.of(Long.toString(id));
}

@Override
public byte[] getValue() {
public V getValue() {
return record;
}

@Override
public Optional<Long> getRecordSequence() {return Optional.of(id);}
public Optional<Long> getRecordSequence() {
return Optional.of(id);
}

@Override
public void ack() {
Expand All @@ -192,5 +188,4 @@ public void ack() {
}

}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.canal;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.FlatMessage;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;

import java.util.List;

/**
* A Simple class for mysql binlog sync to pulsar.
*/
@Connector(
name = "canal",
type = IOType.SOURCE,
help = "The CanalByteSource is used for syncing mysql binlog to Pulsar.",
configClass = CanalSourceConfig.class)
public class CanalByteSource extends CanalAbstractSource<byte[]> {

@Override
public Long getMessageId(Message message) {
return message.getId();
}

@Override
public byte[] extractValue(List<FlatMessage> flatMessages) {
String messages = JSON.toJSONString(flatMessages, SerializerFeature.WriteMapNullValue);
return messages.getBytes();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.canal;

import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import com.alibaba.fastjson.JSON;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;

/**
* A Simple class for mysql binlog sync to pulsar.
*/
@Connector(
name = "canal",
type = IOType.SOURCE,
help = "The CanalStringSource is used for syncing mysql binlog to Pulsar, easy to use presto sql search.",
configClass = CanalSourceConfig.class)
public class CanalStringSource extends CanalAbstractSource<CanalMessage> {

private Long messageId;

@Override
public Long getMessageId(Message message) {
this.messageId = message.getId();
return this.messageId;
}

@Override
public CanalMessage extractValue(List<FlatMessage> flatMessages) {
String messages = JSON.toJSONString(flatMessages, SerializerFeature.WriteMapNullValue);
CanalMessage canalMessage = new CanalMessage();
Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
canalMessage.setTimestamp(dateFormat.format(date));
canalMessage.setId(this.messageId);
canalMessage.setMessage(messages);
return canalMessage;
}

}


@Data
@AllArgsConstructor
@NoArgsConstructor
class CanalMessage {
private Long id;
private String message;
private String timestamp;
}
Loading

0 comments on commit bbd736d

Please sign in to comment.