Skip to content

Commit

Permalink
Add starrocks writer plugin (wgzhao#578)
Browse files Browse the repository at this point in the history
* Add support for Starrocks writer wgzhao#472 wgzhao#473 wgzhao#557 wgzhao#575
  • Loading branch information
wgzhao authored Jun 1, 2022
1 parent 78850e5 commit a03fa66
Show file tree
Hide file tree
Showing 22 changed files with 1,489 additions and 0 deletions.
54 changes: 54 additions & 0 deletions docs/assets/jobs/starrockswriter.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": {
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "test",
"password": "123456",
"batchSize": 1024,
"connection": [
{
"table": "table1",
"database": "example_db",
"jdbcUrl": "jdbc:mysql://172.28.17.100:9030/",
"loadUrl": ["172.28.17.100:8030", "172.28.17.100:8030"]
}
],
"loadProps": {},
"lineDelimiter": "\n",
"format": "csv"
}
},
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"random": "1,500",
"type": "long"
},
{
"random": "1,127",
"type": "long"
},
{
"value": "this is a text",
"type": "string"
},
{
"random": "5,200",
"type": "long"
}
],
"sliceRecordCount": 100
}
}
}
}
}
Binary file modified docs/images/supported_databases.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
77 changes: 77 additions & 0 deletions docs/writer/starrockswriter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# StarRocksWriter

StarRocksWriter 插件用于向 [Starrocks](https://www.starrocks.com/zh-CN/index) 数据库以流式方式写入数据。 其实现上是通过访问 Doris http 连接(8030)
,然后通过 [stream load](https://docs.starrocks.com/zh-cn/main/loading/StreamLoad)
加载数据到数据中,相比 `insert into` 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。

Doris 是一个兼容 MySQL 协议的数据库后端,因此 Doris 读取可以使用 [MySQLReader](../../reader/mysqlreader) 进行访问。

## 示例

假定要写入的表的建表语句如下:

```sql
CREATE
DATABASE example_db;
CREATE TABLE example_db.table1
(
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
) AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");
```

下面配置一个从内存读取数据,然后写入到 doris 表的配置文件

```json
--8<-- "jobs/starrockswriter.json"
```

将上述配置文件保存为 `job/stream2starrocks.json`

执行下面的命令

```shell
bin/addax.sh job/stream2starrocks.json
```

## 参数说明

| 配置项 | 是否必须 | 类型 | 默认值 | 描述 |
| :-------------- | :------: | ------ |------------- |-------|
| jdbcUrl || string || 目的数据库的 JDBC 连接信息,用于执行`preSql``postSql` |
| loadUrl | 是 | string | 无 | StarRocks FE的地址用于StreamLoad[1],可以为多个fe地址,`fe_ip:fe_http_port`
| username || string || HTTP 签名验证帐号 |
| password || string || HTTP 签名验证密码 |
| database || string || StarRocks表的数据库名称|
| table || string || StarRocks表的表名称|
| column || list || 所配置的表中需要同步的列名集合,详细描述见 [rdbmswriter](../rdbmswriter) |
| maxBatchRows || int | 500000 | 单次StreamLoad导入的最大行数 |
| maxBatchSize || int | 104857600 | 单次StreamLoad导入的最大字节数 |
| flushInterval || int | 300000 | 上一次StreamLoad结束至下一次开始的时间间隔(单位:ms) |
| loadProps || map | `csv` | streamLoad 的请求参数,详情参照[StreamLoad介绍页面][1] |

[1]: https://docs.starrocks.com/zh-cn/main/loading/StreamLoad


## 类型转换

默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。
如需更改列分隔符, 则正确配置 `loadProps` 即可:
```json
"loadProps": {
"column_separator": "\\x01",
"row_delimiter": "\\x02"
}
```

如需更改导入格式为`json`, 则正确配置 `loadProps` 即可:
```json
"loadProps": {
"format": "json",
"strip_outer_array": true
}
```
8 changes: 8 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,14 @@
<fileMode>0644</fileMode>
<outputDirectory>addax-${project.version}</outputDirectory>
</fileSet>
<fileSet>
<directory>plugin/writer/starrockswriter/target/starrockswriter-${project.version}/</directory>
<includes>
<include>**/*.*</include>
</includes>
<fileMode>0644</fileMode>
<outputDirectory>addax-${project.version}</outputDirectory>
</fileSet>
<fileSet>
<directory>plugin/writer/streamwriter/target/streamwriter-${project.version}/</directory>
<includes>
Expand Down
37 changes: 37 additions & 0 deletions plugin/writer/starrockswriter/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<assembly
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-component-1.1.2.xsd">
<id>release</id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>*.json</include>
</includes>
<outputDirectory>plugin/writer/${project.artifactId}</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>${project.artifactId}-${project.version}.jar</include>
</includes>
<outputDirectory>plugin/writer/${project.artifactId}</outputDirectory>
</fileSet>
</fileSets>

<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/${project.artifactId}/libs</outputDirectory>
<scope>runtime</scope>
<excludes>
<exclude>com.wgzhao.addax:*</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>
96 changes: 96 additions & 0 deletions plugin/writer/starrockswriter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.wgzhao.addax</groupId>
<artifactId>addax-all</artifactId>
<version>4.0.9-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>starrockswriter</artifactId>
<name>starrocks-writer</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.wgzhao.addax</groupId>
<artifactId>addax-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>com.wgzhao.addax</groupId>
<artifactId>addax-rdbms</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>${commons.codec.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons.lang3.version}</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>${commons.logging.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.jdbc.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>package.xml</descriptor>
</descriptors>
<finalName>${project.artifactId}-${project.version}</finalName>
</configuration>
<executions>
<execution>
<id>release</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Loading

0 comments on commit a03fa66

Please sign in to comment.