Skip to content

Commit

Permalink
Add support for Elasticsearch reader (wgzhao#121)
Browse files Browse the repository at this point in the history
* Add elastichsearch reader plugin
  • Loading branch information
wgzhao authored Feb 19, 2021
1 parent 474af42 commit 505ed5a
Show file tree
Hide file tree
Showing 15 changed files with 1,395 additions and 2 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
| ClickHouse | 支持 | 支持 | clickhousereader/clickhousewriter | |
| DB2 | 支持 | 支持 | rbdmsreader/rdbmswriter | 理论上支持,但未实际测试 |
| DBF | 支持 | 支持 | dbffilereader/dbffilewriter | |
| ElasticSearch | 不支持 | 支持 | elasticsearchwriter | |
| ElasticSearch | 支持 | 支持 | elasticsearchreader/elasticsearchwriter | 原始代码来自[@Kestrong](https://github.com/Kestrong/datax-elasticsearch) |
| FTP | 支持 | 支持 | ftpreader/ftpwriter | |
| HBase 1.x | 支持 | 支持 | hbase11xreader/hbase11xwriter | 直接操作HBase |
| HBase 1.x | 支持 | 支持 | hbase11xsqlreader/hbase11xsqlwriter | 通过[Phoenix](https://phoenix.apache.org)操作HBase |
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/sphinx/reader.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Cassandra Reader <reader/cassandrareader>
ClickHouse Reader <reader/clickhousereader>
DBF reader <reader/dbffilereader>
ElasticSearch Reader <reader/elasticsearchreader>
FTP reader <reader/ftpreader>
HBase 1.x Reader <reader/hbase11xreader>
HBase 2.x Reader <reader/hbase20xreader>
Expand Down
266 changes: 266 additions & 0 deletions docs/src/main/sphinx/reader/elasticsearchreader.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
# ElasticSearchReader

ElasticSearchReader 插件实现了从 [Elasticsearch](https://www.elastic.co/cn/elasticsearch/) 读取索引的功能, 它通过 Elasticsearch 提供的 Rest API (默认端口9200),执行指定的查询语句批量获取数据

## 示例

假定要获取的索引内容如下

```
{
"took": 14,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "test-1",
"_type": "default",
"_id": "38",
"_score": 1,
"_source": {
"col_date": "2017-05-25T11:22:33.000+08:00",
"col_integer": 19890604,
"col_keyword": "hello world",
"col_ip": "1.1.1.1",
"col_text": "long text",
"col_double": 19890604,
"col_long": 19890604,
"col_geo_point": "41.12,-71.34"
}
},
{
"_index": "test-1",
"_type": "default",
"_id": "103",
"_score": 1,
"_source": {
"col_date": "2017-05-25T11:22:33.000+08:00",
"col_integer": 19890604,
"col_keyword": "hello world",
"col_ip": "1.1.1.1",
"col_text": "long text",
"col_double": 19890604,
"col_long": 19890604,
"col_geo_point": "41.12,-71.34"
}
}
]
}
}
```

配置一个从 Elasticsearch 读取数据并打印到终端的任务

```
{
"job": {
"setting": {
"speed": {
"byte": -1,
"channel": 1
}
},
"content": [
{
"reader": {
"name": "elasticsearchreader",
"parameter": {
"endpoint": "http://127.0.0.1:9200",
"accessId":"",
"accesskey":"",
"index": "test-1",
"type": "default",
"searchType": "dfs_query_then_fetch",
"headers": {},
"scroll": "3m",
"search": [
{
"query": {
"match": {
"col_ip": "1.1.1.1"
}
},
"aggregations": {
"top_10_states": {
"terms": {
"field": "col_date",
"size": 10
}
}
}
}
],
"column": ["col_ip", "col_double", "col_long","col_integer",
"col_keyword", "col_text","col_geo_point","col_date"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
]
}
}
```

将上述内容保存为 `job/es2stream.json`

执行下面的命令进行采集

```shell
bin/datax.py job/es2stream.json
```

其输出结果类似如下(输出记录数有删减)

```
2021-02-19 13:38:15.860 [main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-02-19 13:38:15.895 [main] INFO Engine -
{
"content":[
{
"reader":{
"parameter":{
"accessId":"",
"headers":{},
"endpoint":"http://127.0.0.1:9200",
"search":[
{
"query": {
"match": {
"col_ip": "1.1.1.1"
}
},
"aggregations": {
"top_10_states": {
"terms": {
"field": "col_date",
"size": 10
}
}
}
}
],
"accesskey":"*****",
"searchType":"dfs_query_then_fetch",
"scroll":"3m",
"column":[
"col_ip",
"col_double",
"col_long",
"col_integer",
"col_keyword",
"col_text",
"col_geo_point",
"col_date"
],
"index":"test-1",
"type":"default"
},
"name":"elasticsearchreader"
},
"writer":{
"parameter":{
"print":true,
"encoding":"UTF-8"
},
"name":"streamwriter"
}
}
],
"setting":{
"errorLimit":{
"record":0,
"percentage":0.02
},
"speed":{
"byte":-1,
"channel":1
}
}
}
2021-02-19 13:38:15.934 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-19 13:38:15.934 [main] INFO JobContainer - DataX jobContainer starts job.
2021-02-19 13:38:15.937 [main] INFO JobContainer - Set jobId = 0
2017-05-25T11:22:33.000+08:00 19890604 hello world 1.1.1.1 long text 19890604 19890604 41.12,-71.34
2017-05-25T11:22:33.000+08:00 19890604 hello world 1.1.1.1 long text 19890604 19890604 41.12,-71.34
2021-02-19 13:38:19.845 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-02-19 13:38:19.848 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do post work.
2021-02-19 13:38:19.849 [job-0] INFO JobContainer - DataX Reader.Job [elasticsearchreader] do post work.
2021-02-19 13:38:19.855 [job-0] INFO JobContainer - PerfTrace not enable!
2021-02-19 13:38:19.858 [job-0] INFO StandAloneJobContainerCommunicator - Total 95 records, 8740 bytes | Speed 2.84KB/s, 31 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.103s | Percentage 100.00%
2021-02-19 13:38:19.861 [job-0] INFO JobContainer -
任务启动时刻 : 2021-02-19 13:38:15
任务结束时刻 : 2021-02-19 13:38:19
任务总计耗时 : 3s
任务平均流量 : 2.84KB/s
记录写入速度 : 31rec/s
读出记录总数 : 2
读写失败总数 : 0
```

## 参数说明

| 配置项 | 是否必须 | 类型 | 默认值 | 描述 |
| :---------- | :------: | ------- | ---------------------- | -------------------------------------------------- |
| endpoint || string || ElasticSearch的连接地址 |
| accessId || string | `""` | http auth中的user |
| accessKey || string | `""` | http auth中的password |
| index || string || elasticsearch中的index名 |
| type || string | index名 | elasticsearch中index的type名 |
| search || list | `[]` | json格式api搜索数据体 |
| column || list || 需要读取的字段 |
| timeout || int | 60 | 客户端超时时间(单位:秒) |
| discovery || boolean | false | 启用节点发现将(轮询)并定期更新客户机中的服务器列表 |
| compression || boolean | true | http请求,开启压缩 |
| multiThread || boolean | true | http请求,是否有多线程 |
| searchType || string | `dfs_query_then_fetch` | 搜索类型 |
| headers || map | `{}` | http请求头 |
| scroll || string | `""` | 滚动分页配置 |

### search

search 配置项允许配置为满足 Elasticsearch API 查询要求的内容,比如这样:

```json
{
"query": {
"match": {
"message": "myProduct"
}
},
"aggregations": {
"top_10_states": {
"terms": {
"field": "state",
"size": 10
}
}
}
}
```

### searchType

searchType 目前支持以下几种:

- dfs_query_then_fetch
- query_then_fetch
- count
- scan
1 change: 0 additions & 1 deletion docs/src/main/sphinx/writer/elasticsearchwriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@
]
}
}

```

#### 3.2 参数说明
Expand Down
38 changes: 38 additions & 0 deletions plugin/reader/elasticsearchreader/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<assembly
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-component-1.1.2.xsd">
<id>release</id>
<formats>
<format>dir</format>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>*.json</include>
</includes>
<outputDirectory>plugin/reader/${project.artifactId}</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>${project.artifactId}-${project.version}.jar</include>
</includes>
<outputDirectory>plugin/reader/${project.artifactId}</outputDirectory>
</fileSet>
</fileSets>

<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/${project.artifactId}/libs</outputDirectory>
<scope>runtime</scope>
<excludes>
<exclude>com.wgzhao.datax:datax-common</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>
Loading

0 comments on commit 505ed5a

Please sign in to comment.