Skip to content

Commit

Permalink
[sqlitereader] Introduce new sqlitereader plugin (wgzhao#305)
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao authored Aug 20, 2021
1 parent f177cee commit 600dcf8
Show file tree
Hide file tree
Showing 14 changed files with 410 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .github/ISSUE_TEMPLATE/release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ assignees: wgzhao

### redis reader

### sqlite reader

### sqlserver reader

### stream reader
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The project, originally from Ali's [DataX]((https://github.com/alibaba/datax)),
| PostgreSQL | YES | YES | postgresqlreader/postgresqlwriter | |
| Trino | YES | YES | rdbmsreader/rdbmswriter | [trino( formerly PrestoSQL)](https://trino.io) |
| Redis | YES | YES | redisreader/rediswriter | |
| SQLite | YES | NO | sqlitereader | |
| SQL Server | YES | YES | sqlserverreader/sqlserverwriter | |
| TDengine | YES | YES | tdenginereader/tdenginewriter | [TDengine](https://www.taosdata.com/cn/) |
| TDH Inceptor2 | YES | YES | rdbmsreader/rdbmswriter | [Transwarp TDH](http://transwarp.cn/transwarp/product-TDH.html?categoryId=18) 5.1 or later |
Expand Down
1 change: 1 addition & 0 deletions README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
| PostgreSQL | 支持 | 支持 | postgresqlreader/postgresqlwriter | |
| PrestoSQL | 支持 | 支持 | rdbmsreader/rdbmswriter | [trino(原PrestoSQL)](https://trino.io) 310以上 |
| Redis | 支持 | 支持 | redisreader/rediswriter | |
| SQLite | 支持 | 不支持 | sqlitereader | |
| SQL Server | 支持 | 支持 | sqlserverreader/sqlserverwriter | |
| TDengine | 支持 | 支持 | tdenginereader/tdenginewriter | 支持 [TDengine](https://www.taosdata.com/cn/) 数据库读写 |
| TDH Inceptor2 | 支持 | 支持 | rdbmsreader/rdbmswriter | [星环 TDH](http://transwarp.cn/transwarp/product-TDH.html?categoryId=18) 5.1以上版本 |
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/sphinx/reader.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
PostgreSQL Reader <reader/postgresqlreader>
RDBMS Reader <reader/rdbmsreader>
Redis Reader <reader/redisreader>
SQLite Reader <reader/sqlitereader>
SQLServer Reader <reader/sqlserverreader>
Stream Reader <reader/streamreader>
TDengine Reader <reader/tdenginereader>
Expand Down
102 changes: 102 additions & 0 deletions docs/src/main/sphinx/reader/sqlitereader.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# SQLite Reader

SQLiteReader 插件用于读取指定目录下的 sqlite 文件, 他继承于 [rdbmsreader](rdbmsreader)

## 示例

我们创建示例文件:

```shell
$ sqlite3 /tmp/test.sqlite3
SQLite version 3.7.17 2013-05-20 00:56:22
Enter ".help" for instructions
Enter SQL statements terminated with a ";"
sqlite> create table test(id int, name varchar(10), salary double);
sqlite> insert into test values(1,'foo', 12.13),(2,'bar',202.22);
sqlite> .q
```

下面的配置是读取该表到终端的作业:

```json
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "sqlitereader",
"parameter": {
"username": "fakeuser",
"password": "",
"column": [
"*"
],
"connection": [
{
"jdbcUrl": [
"jdbc:sqlite:/tmp/test.sqlite3"
],
"table": [
"test"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
```

将上述配置文件保存为 `job/sqlite2stream.json`

### 执行采集命令

执行以下命令进行数据采集

```shell
bin/addax.sh job/sqlite2stream.json
```

## 参数说明

| 配置项 | 是否必须 | 类型 | 默认值 | 描述 |
| :-------------- | :------: | ------ |------------- |--------------|
| jdbcUrl || list || 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接[附件控制信息](http://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html) |
| driver || string || 自定义驱动类名,解决兼容性问题,详见下面描述 |
| username || string || 数据源的用户名, 可随意配置,但不能缺失 |
| password || string || 数据源指定用户名的密码,可不配置 |
| table || list || 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
| column || list || 所配置的表中需要同步的列名集合,详细描述 [rdbmreader](rdbmsreader) |
| splitPk || string || 使用splitPk代表的字段进行数据分片,详细描述见 [rdbmreader](rdbmsreader)|
| autoPk || bool | false | 是否自动猜测分片主键,`3.2.6` 版本引入 |
| where || string || 针对表的筛选条件 |
| querySql || list || 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 `table``column`这些配置项 |


## 类型转换

| Addax 内部类型| MySQL 数据类型 |
| -------- | ----- |
| Long |int, tinyint, smallint, mediumint, int, bigint|
| Double |float, double, decimal|
| String |varchar, char, tinytext, text, mediumtext, longtext, year |
| Date |date, datetime, timestamp, time |
| Boolean |bit, bool |
| Bytes |tinyblob, mediumblob, blob, longblob, varbinary |
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public enum DataBaseType
Oracle("oracle", "oracle.jdbc.OracleDriver"),
Presto("presto", "io.prestosql.jdbc.PrestoDriver"),
ClickHouse("clickhouse", "ru.yandex.clickhouse.ClickHouseDriver"),
SQLite("sqlite", "org.sqlite.JDBC"),
SQLServer("sqlserver", "com.microsoft.sqlserver.jdbc.SQLServerDriver"),
PostgreSQL("postgresql", "org.postgresql.Driver"),
RDBMS("rdbms", "com.wgzhao.addax.plugin.rdbms.util.DataBaseType"),
Expand Down
8 changes: 8 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@
<fileMode>0644</fileMode>
<outputDirectory>addax-${project.version}</outputDirectory>
</fileSet>
<fileSet>
<directory>plugin/reader/sqlitereader/target/sqlitereader-${project.version}/</directory>
<includes>
<include>**/*.*</include>
</includes>
<fileMode>0644</fileMode>
<outputDirectory>addax-${project.version}</outputDirectory>
</fileSet>
<fileSet>
<directory>plugin/reader/sqlserverreader/target/sqlserverreader-${project.version}/</directory>
<includes>
Expand Down
37 changes: 37 additions & 0 deletions plugin/reader/sqlitereader/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<assembly
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-component-1.1.2.xsd">
<id>release</id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>*.json</include>
</includes>
<outputDirectory>plugin/reader/${project.artifactId}</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>${project.artifactId}-${project.version}.jar</include>
</includes>
<outputDirectory>plugin/reader/${project.artifactId}</outputDirectory>
</fileSet>
</fileSets>

<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/${project.artifactId}/libs</outputDirectory>
<scope>runtime</scope>
<excludes>
<exclude>com.wgzhao.addax:*</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>
62 changes: 62 additions & 0 deletions plugin/reader/sqlitereader/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.wgzhao.addax</groupId>
<artifactId>addax-all</artifactId>
<version>4.0.3-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<artifactId>sqlitereader</artifactId>
<name>sqlite-reader</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.wgzhao.addax</groupId>
<artifactId>addax-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.wgzhao.addax</groupId>
<artifactId>addax-rdbms</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.36.0.1</version>
</dependency>

</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>package.xml</descriptor>
</descriptors>
<finalName>${project.artifactId}-${project.version}</finalName>
</configuration>
<executions>
<execution>
<id>release</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.wgzhao.addax.plugin.reader.sqlitereader;

import com.wgzhao.addax.common.base.Key;
import com.wgzhao.addax.common.plugin.RecordSender;
import com.wgzhao.addax.common.spi.Reader;
import com.wgzhao.addax.common.util.Configuration;
import com.wgzhao.addax.rdbms.reader.CommonRdbmsReader;
import com.wgzhao.addax.rdbms.util.DataBaseType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static com.wgzhao.addax.common.base.Constant.DEFAULT_FETCH_SIZE;

public class SqliteReader
extends Reader
{

private static final DataBaseType DATABASE_TYPE = DataBaseType.SQLite;

public static class Job
extends Reader.Job
{
private static final Logger LOG = LoggerFactory.getLogger(Job.class);

private Configuration originalConfig = null;
private CommonRdbmsReader.Job commonRdbmsReaderJob;

@Override
public void init()
{
this.originalConfig = getPluginJobConf();
this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
this.originalConfig = this.commonRdbmsReaderJob.init(this.originalConfig);
}


@Override
public void preCheck()
{
init();
this.commonRdbmsReaderJob.preCheck(this.originalConfig, DATABASE_TYPE);
}

@Override
public List<Configuration> split(int adviceNumber)
{
return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
}

@Override
public void post()
{
this.commonRdbmsReaderJob.post(this.originalConfig);
}

@Override
public void destroy()
{
this.commonRdbmsReaderJob.destroy(this.originalConfig);
}
}

public static class Task
extends Reader.Task
{

private Configuration readerSliceConfig;
private CommonRdbmsReader.Task commonRdbmsReaderTask;

@Override
public void init()
{
this.readerSliceConfig = getPluginJobConf();
this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId());
this.commonRdbmsReaderTask.init(this.readerSliceConfig);
}

@Override
public void startRead(RecordSender recordSender)
{
int fetchSize = this.readerSliceConfig.getInt(Key.FETCH_SIZE, DEFAULT_FETCH_SIZE);

this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender, getTaskPluginCollector(), fetchSize);
}

@Override
public void post()
{
this.commonRdbmsReaderTask.post(this.readerSliceConfig);
}

@Override
public void destroy()
{
this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
}
}
}
Loading

0 comments on commit 600dcf8

Please sign in to comment.