Skip to content

Commit

Permalink
[doriswriter] merge office doriswriter codes (wgzhao#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao authored Nov 18, 2021
1 parent 77e33a9 commit 1e4a1f9
Show file tree
Hide file tree
Showing 10 changed files with 657 additions and 108 deletions.
5 changes: 4 additions & 1 deletion docs/assets/jobs/doriswriter.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
"database": "example_db",
"endpoint": "http://127.0.0.1:8030/"
}
]
],
"loadProps": {},
"lineDelimiter": "\n",
"format": "csv"
}
},
"reader": {
Expand Down
35 changes: 24 additions & 11 deletions docs/writer/doriswriter.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
# Doris Writer

DorisWriter 插件用于向 [Doris](http://doris.incubator.apache.org/master/zh-CN/) 数据库以流式方式写入数据。
其实现上是通过访问 Doris http 连接(8030),然后通过 [stream load](http://doris.incubator.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)
DorisWriter 插件用于向 [Doris](http://doris.incubator.apache.org/master/zh-CN/) 数据库以流式方式写入数据。 其实现上是通过访问 Doris http 连接(8030)
,然后通过 [stream load](http://doris.incubator.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)
加载数据到数据中,相比 `insert into` 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。

Doris 是一个兼容 MySQL 协议的数据库后端,因此 Doris 读取可以使用 [MySQLReader](../../reader/mysqlreader) 进行访问。
Doris 是一个兼容 MySQL 协议的数据库后端,因此 Doris 读取可以使用 [MySQLReader](../../reader/mysqlreader) 进行访问。

## 示例

假定要写入的表的建表语句如下:

```sql
CREATE DATABASE example_db;
CREATE
DATABASE example_db;
CREATE TABLE example_db.table1
(
siteid INT DEFAULT '10',
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
pv BIGINT SUM DEFAULT '0'
) AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");
```
Expand Down Expand Up @@ -54,14 +54,27 @@ bin/addax.sh job/stream2doris.json
| table || string || 所选取的需要同步的表名|
| column || list || 所配置的表中需要同步的列名集合,详细描述见 [rdbmswriter](../rdbmswriter) |
| batchSize || int | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起 |
| lineDelimiter || string | `\n` | 每行的的分隔符,支持多个字节, 例如 `\x02\x03` |
| format || string | `csv` | 导入数据的格式, 可以使是 json 或者 csv |
| loadProps || map | `csv` | streamLoad 的请求参数,详情参照[StreamLoad介绍页面][1] |
| connectTimeout || int | -1 | StreamLoad单次请求的超时时间, 单位毫秒(ms) |

[1]: https://doris.apache.org/master/zh-CN/administrator-guide/load-data/load-json-format.html#stream-load

## endpoint

`endpoint` 只是的任意一个 BE 的主机名及 `webserver_port` 端口,官方文档描述也可以填写 FE 主机名和 `http_port` 端口,但实际测试一直处于连接拒绝状态。

### column

该插件中的 `column` 不是必须项,如果没有配置该项,或者配置为 `["*"]` , 则按照 reader 插件获取的字段值进行顺序拼装。
否则可以按照如下方式指定需要插入的字段
该插件中的 `column` 不是必须项,如果没有配置该项,或者配置为 `["*"]` , 则按照 reader 插件获取的字段值进行顺序拼装。 否则可以按照如下方式指定需要插入的字段

```json
{
"column": ["siteid","citycode","username"]
"column": [
"siteid",
"citycode",
"username"
]
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.wgzhao.addax.plugin.writer.doriswriter;

import com.wgzhao.addax.common.element.Column;
import com.wgzhao.addax.common.element.DateColumn;
import com.wgzhao.addax.common.element.Record;
import org.apache.commons.lang3.time.DateFormatUtils;

import java.util.List;
import java.util.TimeZone;

public abstract class DorisCodec
{
protected static String timeZone = "GMT+8";
protected static TimeZone timeZoner = TimeZone.getTimeZone(timeZone);
protected final List<String> fieldNames;

public DorisCodec(final List<String> fieldNames) {
this.fieldNames = fieldNames;
}

public abstract String serialize(Record row);

/**
* convert datax internal data to string
*
* @param col
* @return
*/
protected Object convertColumn(final Column col) {
if (null == col.getRawData()) {
return null;
}
Column.Type type = col.getType();
switch (type) {
case BOOL:
case INT:
case LONG:
return col.asLong();
case DOUBLE:
return col.asDouble();
case STRING:
return col.asString();
case DATE: {
final DateColumn.DateType dateType = ((DateColumn) col).getSubType();
switch (dateType) {
case DATE:
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZoner);
case DATETIME:
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZoner);
default:
return col.asString();
}
}
default:
// BAD, NULL, BYTES
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.wgzhao.addax.plugin.writer.doriswriter;

import com.wgzhao.addax.common.element.Record;

import java.util.ArrayList;
import java.util.List;

public class DorisCsvCodec
extends DorisCodec
{
private final String columnSeparator;

public DorisCsvCodec(final List<String> fieldNames, String columnSeparator)
{
super(fieldNames);
this.columnSeparator = columnSeparator;
}

@Override
public String serialize(final Record row)
{
if (null == this.fieldNames) {
return "";
}
List<String> list = new ArrayList<>();

for (int i = 0; i < this.fieldNames.size(); i++) {
Object value = this.convertColumn(row.getColumn(i));
list.add(value != null ? value.toString() : "\\N");
}

return String.join(columnSeparator, list);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.wgzhao.addax.plugin.writer.doriswriter;

public class DorisFlushBatch
{
private final String lineDelimiter;
private String label;
private long rows = 0;
private final StringBuilder data = new StringBuilder();

public DorisFlushBatch(String lineDelimiter) {
this.lineDelimiter = lineDelimiter;
}

public void setLabel(String label) {
this.label = label;
}

public String getLabel() {
return label;
}

public long getRows() {
return rows;
}

public void putData(String row) {
if (data.length() > 0) {
data.append(lineDelimiter);
}
data.append(row);
rows++;
}

public StringBuilder getData() {
return data;
}

public long getSize() {
return data.length();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.wgzhao.addax.plugin.writer.doriswriter;

import com.alibaba.fastjson.JSON;
import com.wgzhao.addax.common.element.Record;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DorisJsonCodec
extends DorisCodec
{
public DorisJsonCodec(final List<String> fieldNames)
{
super(fieldNames);
}

@Override
public String serialize(final Record row)
{
if (null == this.fieldNames) {
return "";
}
final Map<String, Object> rowMap = new HashMap<>(this.fieldNames.size());
int idx = 0;
for (final String fieldName : this.fieldNames) {
rowMap.put(fieldName, this.convertColumn(row.getColumn(idx)));
++idx;
}
return JSON.toJSONString(rowMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.wgzhao.addax.plugin.writer.doriswriter;

import com.wgzhao.addax.common.base.Key;

public class DorisKey extends Key
{
public final static String LINE_DELIMITER = "lineDelimiter";
public static final String CONNECT_TIMEOUT = "connectTimeout";
public static final String LOAD_PROPS = "loadProps";
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public static class Job

private Configuration originalConfig = null;

private DorisWriterEmitter dorisWriterEmitter;

@Override
public void init()
{
Expand All @@ -45,6 +47,7 @@ public void init()
conn.getNecessaryValue(Key.TABLE, DorisWriterErrorCode.REQUIRED_VALUE);
conn.getNecessaryValue(Key.ENDPOINT, DorisWriterErrorCode.REQUIRED_VALUE);
conn.getNecessaryValue(Key.DATABASE, DorisWriterErrorCode.REQUIRED_VALUE);
this.dorisWriterEmitter = new DorisWriterEmitter(originalConfig);
}

@Override
Expand Down
Loading

0 comments on commit 1e4a1f9

Please sign in to comment.