diff --git a/docs/assets/jobs/starrockswriter.json b/docs/assets/jobs/starrockswriter.json
new file mode 100644
index 000000000..fe4de4daa
--- /dev/null
+++ b/docs/assets/jobs/starrockswriter.json
@@ -0,0 +1,54 @@
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 2
+ }
+ },
+ "content": {
+ "writer": {
+ "name": "starrockswriter",
+ "parameter": {
+ "username": "test",
+ "password": "123456",
+ "batchSize": 1024,
+ "connection": [
+ {
+ "table": "table1",
+ "database": "example_db",
+ "jdbcUrl": "jdbc:mysql://172.28.17.100:9030/",
+ "loadUrl": ["172.28.17.100:8030", "172.28.17.100:8030"]
+ }
+ ],
+ "loadProps": {},
+ "lineDelimiter": "\n",
+ "format": "csv"
+ }
+ },
+ "reader": {
+ "name": "streamreader",
+ "parameter": {
+ "column": [
+ {
+ "random": "1,500",
+ "type": "long"
+ },
+ {
+ "random": "1,127",
+ "type": "long"
+ },
+ {
+ "value": "this is a text",
+ "type": "string"
+ },
+ {
+ "random": "5,200",
+ "type": "long"
+ }
+ ],
+ "sliceRecordCount": 100
+ }
+ }
+ }
+ }
+}
diff --git a/docs/images/supported_databases.png b/docs/images/supported_databases.png
index 8e4864f53..15f582fb0 100644
Binary files a/docs/images/supported_databases.png and b/docs/images/supported_databases.png differ
diff --git a/docs/writer/starrockswriter.md b/docs/writer/starrockswriter.md
new file mode 100644
index 000000000..3d89c9156
--- /dev/null
+++ b/docs/writer/starrockswriter.md
@@ -0,0 +1,77 @@
+# StarRocksWriter
+
+StarRocksWriter 插件用于向 [Starrocks](https://www.starrocks.com/zh-CN/index) 数据库以流式方式写入数据。 其实现上是通过访问 Doris http 连接(8030)
+,然后通过 [stream load](https://docs.starrocks.com/zh-cn/main/loading/StreamLoad)
+加载数据到数据中,相比 `insert into` 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。
+
+Doris 是一个兼容 MySQL 协议的数据库后端,因此 Doris 读取可以使用 [MySQLReader](../../reader/mysqlreader) 进行访问。
+
+## 示例
+
+假定要写入的表的建表语句如下:
+
+```sql
+CREATE
+DATABASE example_db;
+CREATE TABLE example_db.table1
+(
+ siteid INT DEFAULT '10',
+ citycode SMALLINT,
+ username VARCHAR(32) DEFAULT '',
+ pv BIGINT SUM DEFAULT '0'
+) AGGREGATE KEY(siteid, citycode, username)
+DISTRIBUTED BY HASH(siteid) BUCKETS 10
+PROPERTIES("replication_num" = "1");
+```
+
+下面配置一个从内存读取数据,然后写入到 doris 表的配置文件
+
+```json
+--8<-- "jobs/starrockswriter.json"
+```
+
+将上述配置文件保存为 `job/stream2starrocks.json`
+
+执行下面的命令
+
+```shell
+bin/addax.sh job/stream2starrocks.json
+```
+
+## 参数说明
+
+| 配置项 | 是否必须 | 类型 | 默认值 | 描述 |
+| :-------------- | :------: | ------ |------------- |-------|
+| jdbcUrl | 否 | string | 无 | 目的数据库的 JDBC 连接信息,用于执行`preSql`及`postSql` |
+| loadUrl | 是 | string | 无 | StarRocks FE的地址用于StreamLoad[1],可以为多个fe地址,`fe_ip:fe_http_port` |
+| username | 是 | string | 无 | HTTP 签名验证帐号 |
+| password | 否 | string | 无 | HTTP 签名验证密码 |
+| database | 是 | string | 无 | StarRocks表的数据库名称|
+| table | 是 | string | 无 | StarRocks表的表名称|
+| column | 否 | list | 无 | 所配置的表中需要同步的列名集合,详细描述见 [rdbmswriter](../rdbmswriter) |
+| maxBatchRows | 否 | int | 500000 | 单次StreamLoad导入的最大行数 |
+| maxBatchSize | 否 | int | 104857600 | 单次StreamLoad导入的最大字节数 |
+| flushInterval | 否 | int | 300000 | 上一次StreamLoad结束至下一次开始的时间间隔(单位:ms) |
+| loadProps | 否 | map | `csv` | streamLoad 的请求参数,详情参照[StreamLoad介绍页面][1] |
+
+[1]: https://docs.starrocks.com/zh-cn/main/loading/StreamLoad
+
+
+## 类型转换
+
+默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。
+如需更改列分隔符, 则正确配置 `loadProps` 即可:
+```json
+"loadProps": {
+ "column_separator": "\\x01",
+ "row_delimiter": "\\x02"
+}
+```
+
+如需更改导入格式为`json`, 则正确配置 `loadProps` 即可:
+```json
+"loadProps": {
+ "format": "json",
+ "strip_outer_array": true
+}
+```
diff --git a/package.xml b/package.xml
index 2212545a4..e9dbf3cbe 100644
--- a/package.xml
+++ b/package.xml
@@ -489,6 +489,14 @@
0644
addax-${project.version}
+
+ plugin/writer/starrockswriter/target/starrockswriter-${project.version}/
+
+ **/*.*
+
+ 0644
+ addax-${project.version}
+
plugin/writer/streamwriter/target/streamwriter-${project.version}/
diff --git a/plugin/writer/starrockswriter/package.xml b/plugin/writer/starrockswriter/package.xml
new file mode 100755
index 000000000..2d610250e
--- /dev/null
+++ b/plugin/writer/starrockswriter/package.xml
@@ -0,0 +1,37 @@
+
+ release
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ *.json
+
+ plugin/writer/${project.artifactId}
+
+
+ target/
+
+ ${project.artifactId}-${project.version}.jar
+
+ plugin/writer/${project.artifactId}
+
+
+
+
+
+ false
+ plugin/writer/${project.artifactId}/libs
+ runtime
+
+ com.wgzhao.addax:*
+
+
+
+
diff --git a/plugin/writer/starrockswriter/pom.xml b/plugin/writer/starrockswriter/pom.xml
new file mode 100755
index 000000000..4dbcfcf27
--- /dev/null
+++ b/plugin/writer/starrockswriter/pom.xml
@@ -0,0 +1,96 @@
+
+ 4.0.0
+
+ com.wgzhao.addax
+ addax-all
+ 4.0.9-SNAPSHOT
+ ../../../pom.xml
+
+
+ starrockswriter
+ starrocks-writer
+ jar
+
+
+
+ com.wgzhao.addax
+ addax-common
+ ${project.version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ com.wgzhao.addax
+ addax-rdbms
+ ${project.version}
+
+
+ commons-codec
+ commons-codec
+ ${commons.codec.version}
+
+
+ org.apache.commons
+ commons-lang3
+ ${commons.lang3.version}
+
+
+ commons-logging
+ commons-logging
+ ${commons.logging.version}
+
+
+ org.apache.httpcomponents
+ httpcore
+ ${httpcore.version}
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpclient.version}
+
+
+ com.alibaba
+ fastjson
+ ${fastjson.version}
+
+
+ mysql
+ mysql-connector-java
+ ${mysql.jdbc.version}
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ package.xml
+
+ ${project.artifactId}-${project.version}
+
+
+
+ release
+ package
+
+ single
+
+
+
+
+
+
+
+
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/StarRocksWriter.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/StarRocksWriter.java
new file mode 100755
index 000000000..9d6425b94
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/StarRocksWriter.java
@@ -0,0 +1,163 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter;
+
+import com.wgzhao.addax.common.element.Record;
+import com.wgzhao.addax.common.exception.AddaxException;
+import com.wgzhao.addax.common.plugin.RecordReceiver;
+import com.wgzhao.addax.common.spi.Writer;
+import com.wgzhao.addax.common.util.Configuration;
+import com.wgzhao.addax.plugin.writer.starrockswriter.manager.StarRocksWriterManager;
+import com.wgzhao.addax.plugin.writer.starrockswriter.row.StarRocksISerializer;
+import com.wgzhao.addax.plugin.writer.starrockswriter.row.StarRocksSerializerFactory;
+import com.wgzhao.addax.plugin.writer.starrockswriter.util.StarRocksWriterUtil;
+import com.wgzhao.addax.rdbms.util.DBUtil;
+import com.wgzhao.addax.rdbms.util.DBUtilErrorCode;
+import com.wgzhao.addax.rdbms.util.DataBaseType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.List;
+
+public class StarRocksWriter
+ extends Writer
+{
+
+ public static class Job
+ extends Writer.Job
+ {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Job.class);
+ private Configuration originalConfig = null;
+ private StarRocksWriterOptions options;
+
+ @Override
+ public void init()
+ {
+ this.originalConfig = super.getPluginJobConf();
+ options = new StarRocksWriterOptions(super.getPluginJobConf());
+ options.doPretreatment();
+ }
+
+ @Override
+ public void preCheck()
+ {
+ this.init();
+ StarRocksWriterUtil.preCheckPrePareSQL(options);
+ StarRocksWriterUtil.preCheckPostSQL(options);
+ }
+
+ @Override
+ public void prepare()
+ {
+ String username = options.getUsername();
+ String password = options.getPassword();
+ String jdbcUrl = options.getJdbcUrl();
+ List renderedPreSqls = StarRocksWriterUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable());
+ if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
+ Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
+ LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl);
+ StarRocksWriterUtil.executeSqls(conn, renderedPreSqls);
+ DBUtil.closeDBResources(null, null, conn);
+ }
+ }
+
+ @Override
+ public List split(int mandatoryNumber)
+ {
+ List configurations = new ArrayList<>(mandatoryNumber);
+ for (int i = 0; i < mandatoryNumber; i++) {
+ configurations.add(originalConfig);
+ }
+ return configurations;
+ }
+
+ @Override
+ public void post()
+ {
+ String username = options.getUsername();
+ String password = options.getPassword();
+ String jdbcUrl = options.getJdbcUrl();
+ List renderedPostSqls = StarRocksWriterUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable());
+ if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
+ Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
+ LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl);
+ StarRocksWriterUtil.executeSqls(conn, renderedPostSqls);
+ DBUtil.closeDBResources(null, null, conn);
+ }
+ }
+
+ @Override
+ public void destroy()
+ {
+ }
+ }
+
+ public static class Task
+ extends Writer.Task
+ {
+ private StarRocksWriterManager writerManager;
+ private StarRocksWriterOptions options;
+ private StarRocksISerializer rowSerializer;
+
+ @Override
+ public void init()
+ {
+ options = new StarRocksWriterOptions(super.getPluginJobConf());
+ if (options.isWildcardColumn()) {
+ Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword());
+ List columns = StarRocksWriterUtil.getStarRocksColumns(conn, options.getDatabase(), options.getTable());
+ options.setInfoCchemaColumns(columns);
+ }
+ writerManager = new StarRocksWriterManager(options);
+ rowSerializer = StarRocksSerializerFactory.createSerializer(options);
+ }
+
+ @Override
+ public void prepare()
+ {
+ }
+
+ public void startWrite(RecordReceiver recordReceiver)
+ {
+ try {
+ Record record;
+ while ((record = recordReceiver.getFromReader()) != null) {
+ if (record.getColumnNumber() != options.getColumns().size()) {
+ throw AddaxException
+ .asAddaxException(
+ DBUtilErrorCode.CONF_ERROR,
+ String.format(
+ "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
+ record.getColumnNumber(),
+ options.getColumns().size()));
+ }
+ writerManager.writeRecord(rowSerializer.serialize(record));
+ }
+ }
+ catch (Exception e) {
+ throw AddaxException.asAddaxException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
+ }
+ }
+
+ @Override
+ public void post()
+ {
+ try {
+ writerManager.close();
+ }
+ catch (Exception e) {
+ throw AddaxException.asAddaxException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
+ }
+ }
+
+ @Override
+ public void destroy() {}
+
+ @Override
+ public boolean supportFailOver()
+ {
+ return false;
+ }
+ }
+}
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/StarRocksWriterOptions.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/StarRocksWriterOptions.java
new file mode 100644
index 000000000..952428bd4
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/StarRocksWriterOptions.java
@@ -0,0 +1,192 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter;
+
+import com.wgzhao.addax.common.exception.AddaxException;
+import com.wgzhao.addax.common.util.Configuration;
+import com.wgzhao.addax.rdbms.util.DBUtilErrorCode;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class StarRocksWriterOptions
+ implements Serializable
+{
+
+ private static final long serialVersionUID = 1l;
+ private static final long KILO_BYTES_SCALE = 1024l;
+ private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE;
+ private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE;
+ private static final int MAX_RETRIES = 3;
+ private static final int BATCH_ROWS = 500000;
+ private static final long FLUSH_INTERVAL = 300000;
+
+ private static final String KEY_LOAD_PROPS_FORMAT = "format";
+ private static final String KEY_USERNAME = "username";
+ private static final String KEY_PASSWORD = "password";
+ private static final String KEY_DATABASE = "database";
+ private static final String KEY_TABLE = "table";
+ private static final String KEY_COLUMN = "column";
+ private static final String KEY_PRE_SQL = "preSql";
+ private static final String KEY_POST_SQL = "postSql";
+ private static final String KEY_JDBC_URL = "jdbcUrl";
+ private static final String KEY_MAX_BATCH_ROWS = "maxBatchRows";
+ private static final String KEY_MAX_BATCH_SIZE = "maxBatchSize";
+ private static final String KEY_FLUSH_INTERVAL = "flushInterval";
+ private static final String KEY_LOAD_URL = "loadUrl";
+ private static final String KEY_FLUSH_QUEUE_LENGTH = "flushQueueLength";
+ private static final String KEY_LOAD_PROPS = "loadProps";
+ private final Configuration options;
+ private List infoCchemaColumns;
+ private final List userSetColumns;
+ private boolean isWildcardColumn;
+ public StarRocksWriterOptions(Configuration options)
+ {
+ this.options = options;
+ this.userSetColumns = options.getList(KEY_COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());
+ if (1 == options.getList(KEY_COLUMN, String.class).size() && "*".trim().equals(options.getList(KEY_COLUMN, String.class).get(0))) {
+ this.isWildcardColumn = true;
+ }
+ }
+
+ public void doPretreatment()
+ {
+ validateRequired();
+ validateStreamLoadUrl();
+ }
+
+ public String getJdbcUrl()
+ {
+ return options.getString(KEY_JDBC_URL);
+ }
+
+ public String getDatabase()
+ {
+ return options.getString(KEY_DATABASE);
+ }
+
+ public String getTable()
+ {
+ return options.getString(KEY_TABLE);
+ }
+
+ public String getUsername()
+ {
+ return options.getString(KEY_USERNAME);
+ }
+
+ public String getPassword()
+ {
+ return options.getString(KEY_PASSWORD);
+ }
+
+ public List getLoadUrlList()
+ {
+ return options.getList(KEY_LOAD_URL, String.class);
+ }
+
+ public List getColumns()
+ {
+ if (isWildcardColumn) {
+ return this.infoCchemaColumns;
+ }
+ return this.userSetColumns;
+ }
+
+ public boolean isWildcardColumn()
+ {
+ return this.isWildcardColumn;
+ }
+
+ public void setInfoCchemaColumns(List cols)
+ {
+ this.infoCchemaColumns = cols;
+ }
+
+ public List getPreSqlList()
+ {
+ return options.getList(KEY_PRE_SQL, String.class);
+ }
+
+ public List getPostSqlList()
+ {
+ return options.getList(KEY_POST_SQL, String.class);
+ }
+
+ public Map getLoadProps()
+ {
+ return options.getMap(KEY_LOAD_PROPS);
+ }
+
+ public int getMaxRetries()
+ {
+ return MAX_RETRIES;
+ }
+
+ public int getBatchRows()
+ {
+ Integer rows = options.getInt(KEY_MAX_BATCH_ROWS);
+ return null == rows ? BATCH_ROWS : rows;
+ }
+
+ public long getBatchSize()
+ {
+ Long size = options.getLong(KEY_MAX_BATCH_SIZE);
+ return null == size ? BATCH_BYTES : size;
+ }
+
+ public long getFlushInterval()
+ {
+ Long interval = options.getLong(KEY_FLUSH_INTERVAL);
+ return null == interval ? FLUSH_INTERVAL : interval;
+ }
+
+ public int getFlushQueueLength()
+ {
+ Integer len = options.getInt(KEY_FLUSH_QUEUE_LENGTH);
+ return null == len ? 1 : len;
+ }
+
+ public StreamLoadFormat getStreamLoadFormat()
+ {
+ Map loadProps = getLoadProps();
+ if (null == loadProps) {
+ return StreamLoadFormat.CSV;
+ }
+ if (loadProps.containsKey(KEY_LOAD_PROPS_FORMAT)
+ && StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(KEY_LOAD_PROPS_FORMAT)))) {
+ return StreamLoadFormat.JSON;
+ }
+ return StreamLoadFormat.CSV;
+ }
+
+ private void validateStreamLoadUrl()
+ {
+ List urlList = getLoadUrlList();
+ for (String host : urlList) {
+ if (host.split(":").length < 2) {
+ throw AddaxException.asAddaxException(DBUtilErrorCode.CONF_ERROR,
+ "loadUrl的格式不正确,请输入 `fe_ip:fe_http_ip;fe_ip:fe_http_ip`。");
+ }
+ }
+ }
+
+ private void validateRequired()
+ {
+ final String[] requiredOptionKeys = new String[] {
+ KEY_USERNAME,
+ KEY_DATABASE,
+ KEY_TABLE,
+ KEY_COLUMN,
+ KEY_LOAD_URL
+ };
+ for (String optionKey : requiredOptionKeys) {
+ options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
+ }
+ }
+
+ public enum StreamLoadFormat
+ {
+ CSV, JSON
+ }
+}
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksFlushTuple.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksFlushTuple.java
new file mode 100644
index 000000000..7781f8f69
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksFlushTuple.java
@@ -0,0 +1,26 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter.manager;
+
+import java.util.List;
+
+public class StarRocksFlushTuple
+{
+
+ private String label;
+ private final Long bytes;
+ private final List rows;
+
+ public StarRocksFlushTuple(String label, Long bytes, List rows)
+ {
+ this.label = label;
+ this.bytes = bytes;
+ this.rows = rows;
+ }
+
+ public String getLabel() {return label;}
+
+ public void setLabel(String label) {this.label = label;}
+
+ public Long getBytes() {return bytes;}
+
+ public List getRows() {return rows;}
+}
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadFailedException.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadFailedException.java
new file mode 100644
index 000000000..3c638394c
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadFailedException.java
@@ -0,0 +1,37 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter.manager;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class StarRocksStreamLoadFailedException
+ extends IOException
+{
+
+ static final long serialVersionUID = 1L;
+
+ private final Map response;
+ private boolean reCreateLabel;
+
+ public StarRocksStreamLoadFailedException(String message, Map response)
+ {
+ super(message);
+ this.response = response;
+ }
+
+ public StarRocksStreamLoadFailedException(String message, Map response, boolean reCreateLabel)
+ {
+ super(message);
+ this.response = response;
+ this.reCreateLabel = reCreateLabel;
+ }
+
+ public Map getFailedResponse()
+ {
+ return response;
+ }
+
+ public boolean needReCreateLabel()
+ {
+ return reCreateLabel;
+ }
+}
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java
new file mode 100644
index 000000000..94118a32d
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java
@@ -0,0 +1,255 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter.manager;
+
+import com.alibaba.fastjson.JSON;
+import com.wgzhao.addax.plugin.writer.starrockswriter.StarRocksWriterOptions;
+import com.wgzhao.addax.plugin.writer.starrockswriter.row.StarRocksDelimiterParser;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class StarRocksStreamLoadVisitor
+{
+
+ private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
+ private static final String RESULT_FAILED = "Fail";
+ private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
+ private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
+ private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
+ private static final String RESULT_LABEL_PREPARE = "PREPARE";
+ private static final String RESULT_LABEL_ABORTED = "ABORTED";
+ private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
+ private final StarRocksWriterOptions writerOptions;
+ private long pos;
+
+ public StarRocksStreamLoadVisitor(StarRocksWriterOptions writerOptions)
+ {
+ this.writerOptions = writerOptions;
+ }
+
+ public void doStreamLoad(StarRocksFlushTuple flushData)
+ throws IOException
+ {
+ String host = getAvailableHost();
+ if (null == host) {
+ throw new IOException("None of the host in `load_url` could be connected.");
+ }
+ String loadUrl = new StringBuilder(host)
+ .append("/api/")
+ .append(writerOptions.getDatabase())
+ .append("/")
+ .append(writerOptions.getTable())
+ .append("/_stream_load")
+ .toString();
+ LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
+ Map loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows(), flushData.getBytes().intValue()));
+ final String keyStatus = "Status";
+ if (null == loadResult || !loadResult.containsKey(keyStatus)) {
+ throw new IOException("Unable to flush data to StarRocks: unknown result status.");
+ }
+ LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
+ if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
+ throw new IOException(
+ new StringBuilder("Failed to flush data to StarRocks.\n").append(JSON.toJSONString(loadResult)).toString()
+ );
+ }
+ else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
+ LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
+ // has to block-checking the state to get the final result
+ checkLabelState(host, flushData.getLabel());
+ }
+ }
+
+ private String getAvailableHost()
+ {
+ List hostList = writerOptions.getLoadUrlList();
+ long tmp = pos + hostList.size();
+ for (; pos < tmp; pos++) {
+ String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString();
+ if (tryHttpConnection(host)) {
+ return host;
+ }
+ }
+ return null;
+ }
+
+ private boolean tryHttpConnection(String host)
+ {
+ try {
+ URL url = new URL(host);
+ HttpURLConnection co = (HttpURLConnection) url.openConnection();
+ co.setConnectTimeout(1000);
+ co.connect();
+ co.disconnect();
+ return true;
+ }
+ catch (Exception e1) {
+ LOG.warn("Failed to connect to address:{}", host, e1);
+ return false;
+ }
+ }
+
+ private byte[] joinRows(List rows, int totalBytes)
+ {
+ if (StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
+ Map props = writerOptions.getLoadProps();
+ byte[] lineDelimiter = StarRocksDelimiterParser.parse((String) props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
+ ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
+ for (byte[] row : rows) {
+ bos.put(row);
+ bos.put(lineDelimiter);
+ }
+ return bos.array();
+ }
+
+ if (StarRocksWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
+ ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
+ bos.put("[".getBytes(StandardCharsets.UTF_8));
+ byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
+ boolean isFirstElement = true;
+ for (byte[] row : rows) {
+ if (!isFirstElement) {
+ bos.put(jsonDelimiter);
+ }
+ bos.put(row);
+ isFirstElement = false;
+ }
+ bos.put("]".getBytes(StandardCharsets.UTF_8));
+ return bos.array();
+ }
+ throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
+ }
+
+ @SuppressWarnings("unchecked")
+ private void checkLabelState(String host, String label)
+ throws IOException
+ {
+ int idx = 0;
+ while (true) {
+ try {
+ TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
+ }
+ catch (InterruptedException ex) {
+ break;
+ }
+ try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+ HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(writerOptions.getDatabase()).append("/get_load_state?label=").append(label).toString());
+ httpGet.setHeader("Authorization", getBasicAuthHeader(writerOptions.getUsername(), writerOptions.getPassword()));
+ httpGet.setHeader("Connection", "close");
+
+ try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
+ HttpEntity respEntity = getHttpEntity(resp);
+ if (respEntity == null) {
+ throw new IOException(String.format("Failed to flush data to StarRocks, Error " +
+ "could not get the final state of label[%s].\n", label), null);
+ }
+ Map result = (Map) JSON.parse(EntityUtils.toString(respEntity));
+ String labelState = (String) result.get("state");
+ if (null == labelState) {
+ throw new IOException(String.format("Failed to flush data to StarRocks, Error " +
+ "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
+ }
+ LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
+ switch (labelState) {
+ case LAEBL_STATE_VISIBLE:
+ case LAEBL_STATE_COMMITTED:
+ return;
+ case RESULT_LABEL_PREPARE:
+ continue;
+ case RESULT_LABEL_ABORTED:
+ throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error " +
+ "label[%s] state[%s]\n", label, labelState), null, true);
+ case RESULT_LABEL_UNKNOWN:
+ default:
+ throw new IOException(String.format("Failed to flush data to StarRocks, Error " +
+ "label[%s] state[%s]\n", label, labelState), null);
+ }
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map doHttpPut(String loadUrl, String label, byte[] data)
+ throws IOException
+ {
+ LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
+ final HttpClientBuilder httpClientBuilder = HttpClients.custom()
+ .setRedirectStrategy(new DefaultRedirectStrategy()
+ {
+ @Override
+ protected boolean isRedirectable(String method)
+ {
+ return true;
+ }
+ });
+ try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
+ HttpPut httpPut = new HttpPut(loadUrl);
+ List cols = writerOptions.getColumns();
+ if (null != cols && !cols.isEmpty() && StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
+ httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList())));
+ }
+ if (null != writerOptions.getLoadProps()) {
+ for (Map.Entry entry : writerOptions.getLoadProps().entrySet()) {
+ httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
+ }
+ }
+ httpPut.setHeader("Expect", "100-continue");
+ httpPut.setHeader("label", label);
+ httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded");
+ httpPut.setHeader("Authorization", getBasicAuthHeader(writerOptions.getUsername(), writerOptions.getPassword()));
+ httpPut.setEntity(new ByteArrayEntity(data));
+ httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
+ try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
+ HttpEntity respEntity = getHttpEntity(resp);
+ if (respEntity == null) {
+ return null;
+ }
+ return (Map) JSON.parse(EntityUtils.toString(respEntity));
+ }
+ }
+ }
+
+ private String getBasicAuthHeader(String username, String password)
+ {
+ String auth = username + ":" + password;
+ byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
+ return "Basic " + new String(encodedAuth, Charset.defaultCharset());
+ }
+
+ private HttpEntity getHttpEntity(CloseableHttpResponse resp)
+ {
+ int code = resp.getStatusLine().getStatusCode();
+ if (200 != code) {
+ LOG.warn("Request failed with code:{}", code);
+ return null;
+ }
+ HttpEntity respEntity = resp.getEntity();
+ if (null == respEntity) {
+ LOG.warn("Request failed with empty response.");
+ return null;
+ }
+ return respEntity;
+ }
+}
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java
new file mode 100644
index 000000000..36b18af54
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java
@@ -0,0 +1,215 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter.manager;
+
+import com.google.common.base.Strings;
+import com.wgzhao.addax.plugin.writer.starrockswriter.StarRocksWriterOptions;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class StarRocksWriterManager
+{
+
+ private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriterManager.class);
+
+ private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
+ private final StarRocksWriterOptions writerOptions;
+
+ private final List buffer = new ArrayList<>();
+ private final LinkedBlockingDeque flushQueue;
+ private int batchCount = 0;
+ private long batchSize = 0;
+ private volatile boolean closed = false;
+ private volatile Exception flushException;
+ private ScheduledExecutorService scheduler;
+ private ScheduledFuture> scheduledFuture;
+
+ public StarRocksWriterManager(StarRocksWriterOptions writerOptions)
+ {
+ this.writerOptions = writerOptions;
+ this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(writerOptions);
+ flushQueue = new LinkedBlockingDeque<>(writerOptions.getFlushQueueLength());
+ this.startScheduler();
+ this.startAsyncFlushing();
+ }
+
+ public void startScheduler()
+ {
+ stopScheduler();
+ this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("starrocks-interval-flush").daemon(true).build());
+ this.scheduledFuture = this.scheduler.schedule(() -> {
+ synchronized (StarRocksWriterManager.this) {
+ if (!closed) {
+ try {
+ String label = createBatchLabel();
+ LOG.info(String.format("StarRocks interval Sinking triggered: label[%s].", label));
+ if (batchCount == 0) {
+ startScheduler();
+ }
+ flush(label, false);
+ }
+ catch (Exception e) {
+ flushException = e;
+ }
+ }
+ }
+ }, writerOptions.getFlushInterval(), TimeUnit.MILLISECONDS);
+ }
+
+ public void stopScheduler()
+ {
+ if (this.scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+ }
+ }
+
+ public final synchronized void writeRecord(String record)
+ throws IOException
+ {
+ checkFlushException();
+ try {
+ byte[] bts = record.getBytes(StandardCharsets.UTF_8);
+ buffer.add(bts);
+ batchCount++;
+ batchSize += bts.length;
+ if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) {
+ String label = createBatchLabel();
+ LOG.debug(String.format("StarRocks buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
+ flush(label, false);
+ }
+ }
+ catch (Exception e) {
+ throw new IOException("Writing records to StarRocks failed.", e);
+ }
+ }
+
+ public synchronized void flush(String label, boolean waitUtilDone)
+ throws Exception
+ {
+ checkFlushException();
+ if (batchCount == 0) {
+ if (waitUtilDone) {
+ waitAsyncFlushingDone();
+ }
+ return;
+ }
+ flushQueue.put(new StarRocksFlushTuple(label, batchSize, new ArrayList<>(buffer)));
+ if (waitUtilDone) {
+ // wait the last flush
+ waitAsyncFlushingDone();
+ }
+ buffer.clear();
+ batchCount = 0;
+ batchSize = 0;
+ }
+
+ public synchronized void close()
+ {
+ if (!closed) {
+ closed = true;
+ try {
+ String label = createBatchLabel();
+ if (batchCount > 0) {
+ LOG.debug(String.format("StarRocks Sink is about to close: label[%s].", label));
+ }
+ flush(label, true);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Writing records to StarRocks failed.", e);
+ }
+ }
+ checkFlushException();
+ }
+
+ public String createBatchLabel()
+ {
+ return UUID.randomUUID().toString();
+ }
+
+ private void startAsyncFlushing()
+ {
+ // start flush thread
+ Thread flushThread = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ while (true) {
+ try {
+ asyncFlush();
+ }
+ catch (Exception e) {
+ flushException = e;
+ }
+ }
+ }
+ });
+ flushThread.setDaemon(true);
+ flushThread.start();
+ }
+
+ private void waitAsyncFlushingDone()
+ throws InterruptedException
+ {
+ // wait previous flushings
+ for (int i = 0; i <= writerOptions.getFlushQueueLength(); i++) {
+ flushQueue.put(new StarRocksFlushTuple("", 0l, null));
+ }
+ checkFlushException();
+ }
+
+ private void asyncFlush()
+ throws Exception
+ {
+ StarRocksFlushTuple flushData = flushQueue.take();
+ if (Strings.isNullOrEmpty(flushData.getLabel())) {
+ return;
+ }
+ stopScheduler();
+ LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
+ for (int i = 0; i <= writerOptions.getMaxRetries(); i++) {
+ try {
+ // flush to StarRocks with stream load
+ starrocksStreamLoadVisitor.doStreamLoad(flushData);
+ LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
+ startScheduler();
+ break;
+ }
+ catch (Exception e) {
+ LOG.warn("Failed to flush batch data to StarRocks, retry times = {}", i, e);
+ if (i >= writerOptions.getMaxRetries()) {
+ throw new IOException(e);
+ }
+ if (e instanceof StarRocksStreamLoadFailedException && ((StarRocksStreamLoadFailedException) e).needReCreateLabel()) {
+ String newLabel = createBatchLabel();
+ LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
+ flushData.setLabel(newLabel);
+ }
+ try {
+ Thread.sleep(1000l * Math.min(i + 1, 10));
+ }
+ catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Unable to flush, interrupted while doing another attempt", e);
+ }
+ }
+ }
+ }
+
+ private void checkFlushException()
+ {
+ if (flushException != null) {
+ throw new RuntimeException("Writing records to StarRocks failed.", flushException);
+ }
+ }
+}
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksBaseSerializer.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksBaseSerializer.java
new file mode 100644
index 000000000..c89d0ca86
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksBaseSerializer.java
@@ -0,0 +1,26 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter.row;
+
+import com.wgzhao.addax.common.element.Column;
+
+public class StarRocksBaseSerializer
+{
+
+ protected String fieldConvertion(Column col)
+ {
+ if (null == col.getRawData() || Column.Type.NULL == col.getType()) {
+ return null;
+ }
+ if (Column.Type.BOOL == col.getType()) {
+ return String.valueOf(col.asLong());
+ }
+ if (Column.Type.BYTES == col.getType()) {
+ byte[] bts = (byte[]) col.getRawData();
+ long value = 0;
+ for (int i = 0; i < bts.length; i++) {
+ value += (bts[bts.length - i - 1] & 0xffL) << (8 * i);
+ }
+ return String.valueOf(value);
+ }
+ return col.asString();
+ }
+}
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksCsvSerializer.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksCsvSerializer.java
new file mode 100644
index 000000000..58d83fb59
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksCsvSerializer.java
@@ -0,0 +1,32 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter.row;
+
+import com.wgzhao.addax.common.element.Record;
+
+public class StarRocksCsvSerializer
+ extends StarRocksBaseSerializer
+ implements StarRocksISerializer
+{
+
+ private static final long serialVersionUID = 1L;
+
+ private final String columnSeparator;
+
+ public StarRocksCsvSerializer(String sp)
+ {
+ this.columnSeparator = StarRocksDelimiterParser.parse(sp, "\t");
+ }
+
+ @Override
+ public String serialize(Record row)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < row.getColumnNumber(); i++) {
+ String value = fieldConvertion(row.getColumn(i));
+ sb.append(null == value ? "\\N" : value);
+ if (i < row.getColumnNumber() - 1) {
+ sb.append(columnSeparator);
+ }
+ }
+ return sb.toString();
+ }
+}
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksDelimiterParser.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksDelimiterParser.java
new file mode 100644
index 000000000..21f3e794f
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksDelimiterParser.java
@@ -0,0 +1,59 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter.row;
+
+import com.google.common.base.Strings;
+
+import java.io.StringWriter;
+
+public class StarRocksDelimiterParser
+{
+
+ private static final String HEX_STRING = "0123456789ABCDEF";
+
+ public static String parse(String sp, String dSp)
+ throws RuntimeException
+ {
+ if (Strings.isNullOrEmpty(sp)) {
+ return dSp;
+ }
+ if (!sp.toUpperCase().startsWith("\\X")) {
+ return sp;
+ }
+ String hexStr = sp.substring(2);
+ // check hex str
+ if (hexStr.isEmpty()) {
+ throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`");
+ }
+ if (hexStr.length() % 2 != 0) {
+ throw new RuntimeException("Failed to parse delimiter: `Hex str length error`");
+ }
+ for (char hexChar : hexStr.toUpperCase().toCharArray()) {
+ if (HEX_STRING.indexOf(hexChar) == -1) {
+ throw new RuntimeException("Failed to parse delimiter: `Hex str format error`");
+ }
+ }
+ // transform to separator
+ StringWriter writer = new StringWriter();
+ for (byte b : hexStrToBytes(hexStr)) {
+ writer.append((char) b);
+ }
+ return writer.toString();
+ }
+
+ private static byte[] hexStrToBytes(String hexStr)
+ {
+ String upperHexStr = hexStr.toUpperCase();
+ int length = upperHexStr.length() / 2;
+ char[] hexChars = upperHexStr.toCharArray();
+ byte[] bytes = new byte[length];
+ for (int i = 0; i < length; i++) {
+ int pos = i * 2;
+ bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
+ }
+ return bytes;
+ }
+
+ private static byte charToByte(char c)
+ {
+ return (byte) HEX_STRING.indexOf(c);
+ }
+}
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksISerializer.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksISerializer.java
new file mode 100644
index 000000000..f17807870
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksISerializer.java
@@ -0,0 +1,12 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter.row;
+
+import com.wgzhao.addax.common.element.Record;
+
+import java.io.Serializable;
+
+public interface StarRocksISerializer
+ extends Serializable
+{
+
+ String serialize(Record row);
+}
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksJsonSerializer.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksJsonSerializer.java
new file mode 100644
index 000000000..abdaadd08
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksJsonSerializer.java
@@ -0,0 +1,38 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter.row;
+
+import com.alibaba.fastjson.JSON;
+import com.wgzhao.addax.common.element.Record;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StarRocksJsonSerializer
+ extends StarRocksBaseSerializer
+ implements StarRocksISerializer
+{
+
+ private static final long serialVersionUID = 1L;
+
+ private final List fieldNames;
+
+ public StarRocksJsonSerializer(List fieldNames)
+ {
+ this.fieldNames = fieldNames;
+ }
+
+ @Override
+ public String serialize(Record row)
+ {
+ if (null == fieldNames) {
+ return "";
+ }
+ Map rowMap = new HashMap<>(fieldNames.size());
+ int idx = 0;
+ for (String fieldName : fieldNames) {
+ rowMap.put(fieldName, fieldConvertion(row.getColumn(idx)));
+ idx++;
+ }
+ return JSON.toJSONString(rowMap);
+ }
+}
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksSerializerFactory.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksSerializerFactory.java
new file mode 100644
index 000000000..ae5c6b0ef
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/row/StarRocksSerializerFactory.java
@@ -0,0 +1,24 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter.row;
+
+import com.wgzhao.addax.plugin.writer.starrockswriter.StarRocksWriterOptions;
+
+import java.util.Map;
+
+public class StarRocksSerializerFactory
+{
+
+ private StarRocksSerializerFactory() {}
+
+ public static StarRocksISerializer createSerializer(StarRocksWriterOptions writerOptions)
+ {
+ if (StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
+ Map props = writerOptions.getLoadProps();
+ return new StarRocksCsvSerializer(null == props || !props.containsKey("column_separator") ?
+ null : String.valueOf(props.get("column_separator")));
+ }
+ if (StarRocksWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
+ return new StarRocksJsonSerializer(writerOptions.getColumns());
+ }
+ throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties.");
+ }
+}
diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/util/StarRocksWriterUtil.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/util/StarRocksWriterUtil.java
new file mode 100755
index 000000000..8f6a4e3d8
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/util/StarRocksWriterUtil.java
@@ -0,0 +1,117 @@
+package com.wgzhao.addax.plugin.writer.starrockswriter.util;
+
+import com.alibaba.druid.sql.parser.ParserException;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.wgzhao.addax.common.base.Constant;
+import com.wgzhao.addax.plugin.writer.starrockswriter.StarRocksWriterOptions;
+import com.wgzhao.addax.rdbms.util.DBUtil;
+import com.wgzhao.addax.rdbms.util.DataBaseType;
+import com.wgzhao.addax.rdbms.util.RdbmsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public final class StarRocksWriterUtil
+{
+ private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriterUtil.class);
+
+ private StarRocksWriterUtil() {}
+
+ public static List getStarRocksColumns(Connection conn, String databaseName, String tableName)
+ {
+ String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' " +
+ "AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName);
+ List columns = new ArrayList<>();
+ ResultSet rs = null;
+ try {
+ rs = DBUtil.query(conn, currentSql);
+ while (DBUtil.asyncResultSetNext(rs)) {
+ String colName = rs.getString("COLUMN_NAME");
+ columns.add(colName);
+ }
+ return columns;
+ }
+ catch (Exception e) {
+ throw RdbmsException.asQueryException(e, currentSql);
+ }
+ finally {
+ DBUtil.closeDBResources(rs, null, null);
+ }
+ }
+
+ public static List renderPreOrPostSqls(List preOrPostSqls, String tableName)
+ {
+ List renderedSqls = new ArrayList<>();
+ if ( null == preOrPostSqls ) {
+ return renderedSqls;
+ }
+ for (String sql : preOrPostSqls) {
+ if (!Strings.isNullOrEmpty(sql)) {
+ renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
+ }
+ }
+ return renderedSqls;
+ }
+
+ public static void executeSqls(Connection conn, List sqls)
+ {
+ Statement stmt = null;
+ String currentSql = null;
+ try {
+ stmt = conn.createStatement();
+ for (String sql : sqls) {
+ currentSql = sql;
+ stmt.execute(sql);
+ }
+ }
+ catch (Exception e) {
+ throw RdbmsException.asQueryException(e, currentSql);
+ }
+ finally {
+ DBUtil.closeDBResources(null, stmt, null);
+ }
+ }
+
+ public static void preCheckPrePareSQL(StarRocksWriterOptions options)
+ {
+ String table = options.getTable();
+ List preSqls = options.getPreSqlList();
+ List renderedPreSqls = StarRocksWriterUtil.renderPreOrPostSqls(preSqls, table);
+ if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
+ LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls));
+ for (String sql : renderedPreSqls) {
+ try {
+ DBUtil.sqlValid(sql, DataBaseType.MySql);
+ }
+ catch (ParserException e) {
+ throw RdbmsException.asPreSQLParserException(e, sql);
+ }
+ }
+ }
+ }
+
+ public static void preCheckPostSQL(StarRocksWriterOptions options)
+ {
+ String table = options.getTable();
+ List postSqls = options.getPostSqlList();
+ List renderedPostSqls = StarRocksWriterUtil.renderPreOrPostSqls(postSqls, table);
+ if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
+ LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls));
+ for (String sql : renderedPostSqls) {
+ try {
+ DBUtil.sqlValid(sql, DataBaseType.MySql);
+ }
+ catch (ParserException e) {
+ throw RdbmsException.asPostSQLParserException(e, sql);
+ }
+ }
+ }
+ }
+}
diff --git a/plugin/writer/starrockswriter/src/main/resources/plugin.json b/plugin/writer/starrockswriter/src/main/resources/plugin.json
new file mode 100755
index 000000000..a566926d6
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "starrockswriter",
+ "class": "com.wgzhao.addax.plugin.writer.starrockswriter.StarRocksWriter",
+ "description": "useScene: prod. mechanism: StarRocksStreamLoad. warn: The more you know about the database, the less problems you encounter.",
+ "developer": "starrocks"
+}
diff --git a/plugin/writer/starrockswriter/src/main/resources/plugin_job_template.json b/plugin/writer/starrockswriter/src/main/resources/plugin_job_template.json
new file mode 100644
index 000000000..ca5c99d08
--- /dev/null
+++ b/plugin/writer/starrockswriter/src/main/resources/plugin_job_template.json
@@ -0,0 +1,14 @@
+{
+ "name": "starrockswriter",
+ "parameter": {
+ "username": "",
+ "password": "",
+ "database": "",
+ "table": "",
+ "column": [],
+ "preSql": [],
+ "postSql": [],
+ "jdbcUrl": "",
+ "loadUrl": []
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index fc95acf68..9779aef47 100644
--- a/pom.xml
+++ b/pom.xml
@@ -209,6 +209,7 @@
plugin/writer/rediswriter
plugin/writer/sqlitewriter
plugin/writer/sqlserverwriter
+ plugin/writer/starrockswriter
plugin/writer/streamwriter
plugin/writer/tdenginewriter
plugin/writer/txtfilewriter