diff --git a/README.md b/README.md index 7ceca7ecc..942b624f2 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ | ClickHouse | 支持 | 支持 | clickhousereader/clickhousewriter | | | DB2 | 支持 | 支持 | rbdmsreader/rdbmswriter | 理论上支持,但未实际测试 | | DBF | 支持 | 支持 | dbffilereader/dbffilewriter | | -| ElasticSearch | 不支持 | 支持 | elasticsearchwriter | | +| ElasticSearch | 支持 | 支持 | elasticsearchreader/elasticsearchwriter | 原始代码来自[@Kestrong](https://github.com/Kestrong/datax-elasticsearch) | | FTP | 支持 | 支持 | ftpreader/ftpwriter | | | HBase 1.x | 支持 | 支持 | hbase11xreader/hbase11xwriter | 直接操作HBase | | HBase 1.x | 支持 | 支持 | hbase11xsqlreader/hbase11xsqlwriter | 通过[Phoenix](https://phoenix.apache.org)操作HBase | diff --git a/docs/src/main/sphinx/reader.rst b/docs/src/main/sphinx/reader.rst index a621d76d7..fa2834838 100644 --- a/docs/src/main/sphinx/reader.rst +++ b/docs/src/main/sphinx/reader.rst @@ -10,6 +10,7 @@ Cassandra Reader ClickHouse Reader DBF reader + ElasticSearch Reader FTP reader HBase 1.x Reader HBase 2.x Reader diff --git a/docs/src/main/sphinx/reader/elasticsearchreader.md b/docs/src/main/sphinx/reader/elasticsearchreader.md new file mode 100644 index 000000000..4b0e5dafe --- /dev/null +++ b/docs/src/main/sphinx/reader/elasticsearchreader.md @@ -0,0 +1,266 @@ +# ElasticSearchReader + +ElasticSearchReader 插件实现了从 [Elasticsearch](https://www.elastic.co/cn/elasticsearch/) 读取索引的功能, 它通过 Elasticsearch 提供的 Rest API (默认端口9200),执行指定的查询语句批量获取数据 + +## 示例 + +假定要获取的索引内容如下 + +``` +{ +"took": 14, +"timed_out": false, +"_shards": { +"total": 1, +"successful": 1, +"skipped": 0, +"failed": 0 +}, +"hits": { +"total": 2, +"max_score": 1, +"hits": [ +{ +"_index": "test-1", +"_type": "default", +"_id": "38", +"_score": 1, +"_source": { +"col_date": "2017-05-25T11:22:33.000+08:00", +"col_integer": 19890604, +"col_keyword": "hello world", +"col_ip": "1.1.1.1", +"col_text": "long text", +"col_double": 19890604, +"col_long": 19890604, +"col_geo_point": "41.12,-71.34" +} +}, +{ +"_index": "test-1", +"_type": "default", +"_id": "103", +"_score": 1, +"_source": { +"col_date": "2017-05-25T11:22:33.000+08:00", +"col_integer": 19890604, +"col_keyword": "hello world", +"col_ip": "1.1.1.1", +"col_text": "long text", +"col_double": 19890604, +"col_long": 19890604, +"col_geo_point": "41.12,-71.34" +} +} +] +} +} +``` + +配置一个从 Elasticsearch 读取数据并打印到终端的任务 + +``` +{ + "job": { + "setting": { + "speed": { + "byte": -1, + "channel": 1 + } + }, + "content": [ + { + "reader": { + "name": "elasticsearchreader", + "parameter": { + "endpoint": "http://127.0.0.1:9200", + "accessId":"", + "accesskey":"", + "index": "test-1", + "type": "default", + "searchType": "dfs_query_then_fetch", + "headers": {}, + "scroll": "3m", + "search": [ + { + "query": { + "match": { + "col_ip": "1.1.1.1" + } + }, + "aggregations": { + "top_10_states": { + "terms": { + "field": "col_date", + "size": 10 + } + } + } + } + ], + "column": ["col_ip", "col_double", "col_long","col_integer", + "col_keyword", "col_text","col_geo_point","col_date" + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "print": true, + "encoding": "UTF-8" + } + } + } + ] + } +} +``` + +将上述内容保存为 `job/es2stream.json` + +执行下面的命令进行采集 + +```shell +bin/datax.py job/es2stream.json +``` + +其输出结果类似如下(输出记录数有删减) + +``` +2021-02-19 13:38:15.860 [main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl +2021-02-19 13:38:15.895 [main] INFO Engine - +{ + "content":[ + { + "reader":{ + "parameter":{ + "accessId":"", + "headers":{}, + "endpoint":"http://127.0.0.1:9200", + "search":[ + { + "query": { + "match": { + "col_ip": "1.1.1.1" + } + }, + "aggregations": { + "top_10_states": { + "terms": { + "field": "col_date", + "size": 10 + } + } + } + } + ], + "accesskey":"*****", + "searchType":"dfs_query_then_fetch", + "scroll":"3m", + "column":[ + "col_ip", + "col_double", + "col_long", + "col_integer", + "col_keyword", + "col_text", + "col_geo_point", + "col_date" + ], + "index":"test-1", + "type":"default" + }, + "name":"elasticsearchreader" + }, + "writer":{ + "parameter":{ + "print":true, + "encoding":"UTF-8" + }, + "name":"streamwriter" + } + } + ], + "setting":{ + "errorLimit":{ + "record":0, + "percentage":0.02 + }, + "speed":{ + "byte":-1, + "channel":1 + } + } +} + +2021-02-19 13:38:15.934 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0 +2021-02-19 13:38:15.934 [main] INFO JobContainer - DataX jobContainer starts job. +2021-02-19 13:38:15.937 [main] INFO JobContainer - Set jobId = 0 + +2017-05-25T11:22:33.000+08:00 19890604 hello world 1.1.1.1 long text 19890604 19890604 41.12,-71.34 +2017-05-25T11:22:33.000+08:00 19890604 hello world 1.1.1.1 long text 19890604 19890604 41.12,-71.34 + +2021-02-19 13:38:19.845 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks. +2021-02-19 13:38:19.848 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do post work. +2021-02-19 13:38:19.849 [job-0] INFO JobContainer - DataX Reader.Job [elasticsearchreader] do post work. +2021-02-19 13:38:19.855 [job-0] INFO JobContainer - PerfTrace not enable! +2021-02-19 13:38:19.858 [job-0] INFO StandAloneJobContainerCommunicator - Total 95 records, 8740 bytes | Speed 2.84KB/s, 31 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.103s | Percentage 100.00% +2021-02-19 13:38:19.861 [job-0] INFO JobContainer - +任务启动时刻 : 2021-02-19 13:38:15 +任务结束时刻 : 2021-02-19 13:38:19 +任务总计耗时 : 3s +任务平均流量 : 2.84KB/s +记录写入速度 : 31rec/s +读出记录总数 : 2 +读写失败总数 : 0 +``` + +## 参数说明 + +| 配置项 | 是否必须 | 类型 | 默认值 | 描述 | +| :---------- | :------: | ------- | ---------------------- | -------------------------------------------------- | +| endpoint | 是 | string | 无 | ElasticSearch的连接地址 | +| accessId | 否 | string | `""` | http auth中的user | +| accessKey | 否 | string | `""` | http auth中的password | +| index | 是 | string | 无 | elasticsearch中的index名 | +| type | 否 | string | index名 | elasticsearch中index的type名 | +| search | 是 | list | `[]` | json格式api搜索数据体 | +| column | 是 | list | 无 | 需要读取的字段 | +| timeout | 否 | int | 60 | 客户端超时时间(单位:秒) | +| discovery | 否 | boolean | false | 启用节点发现将(轮询)并定期更新客户机中的服务器列表 | +| compression | 否 | boolean | true | http请求,开启压缩 | +| multiThread | 否 | boolean | true | http请求,是否有多线程 | +| searchType | 否 | string | `dfs_query_then_fetch` | 搜索类型 | +| headers | 否 | map | `{}` | http请求头 | +| scroll | 否 | string | `""` | 滚动分页配置 | + +### search + +search 配置项允许配置为满足 Elasticsearch API 查询要求的内容,比如这样: + +```json +{ + "query": { + "match": { + "message": "myProduct" + } + }, + "aggregations": { + "top_10_states": { + "terms": { + "field": "state", + "size": 10 + } + } + } +} +``` + +### searchType + +searchType 目前支持以下几种: + +- dfs_query_then_fetch +- query_then_fetch +- count +- scan diff --git a/docs/src/main/sphinx/writer/elasticsearchwriter.md b/docs/src/main/sphinx/writer/elasticsearchwriter.md index 2aae214fd..04f3a48ea 100644 --- a/docs/src/main/sphinx/writer/elasticsearchwriter.md +++ b/docs/src/main/sphinx/writer/elasticsearchwriter.md @@ -158,7 +158,6 @@ ] } } - ``` #### 3.2 参数说明 diff --git a/plugin/reader/elasticsearchreader/package.xml b/plugin/reader/elasticsearchreader/package.xml new file mode 100644 index 000000000..a74ebedd9 --- /dev/null +++ b/plugin/reader/elasticsearchreader/package.xml @@ -0,0 +1,38 @@ + + release + + dir + zip + + false + + + src/main/resources + + *.json + + plugin/reader/${project.artifactId} + + + target/ + + ${project.artifactId}-${project.version}.jar + + plugin/reader/${project.artifactId} + + + + + + false + plugin/reader/${project.artifactId}/libs + runtime + + com.wgzhao.datax:datax-common + + + + \ No newline at end of file diff --git a/plugin/reader/elasticsearchreader/pom.xml b/plugin/reader/elasticsearchreader/pom.xml new file mode 100644 index 000000000..8ac820b8c --- /dev/null +++ b/plugin/reader/elasticsearchreader/pom.xml @@ -0,0 +1,77 @@ + + + 4.0.0 + + com.wgzhao.datax + datax-all + 3.2.2-SNAPSHOT + ../../../pom.xml + + elasticsearchreader + elasticsearchreader + Retrieve data from ElasticSearch + jar + + + + com.wgzhao.datax + datax-common + ${project.version} + + + slf4j-log4j12 + org.slf4j + + + + + + io.searchbox + jest-common + 6.3.1 + + + + io.searchbox + jest + 6.3.1 + + + + joda-time + joda-time + 2.9.7 + + + + + ognl + ognl + 3.2.18 + + + + + + + + maven-assembly-plugin + + + package.xml + + ${project.artifactId}-${project.version} + + + + release + package + + single + + + + + + + \ No newline at end of file diff --git a/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/DefaultMemberAccess.java b/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/DefaultMemberAccess.java new file mode 100644 index 000000000..b05141461 --- /dev/null +++ b/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/DefaultMemberAccess.java @@ -0,0 +1,134 @@ +package com.wgzhao.datax.plugin.reader.elasticsearchreader; + +import ognl.MemberAccess; + +import java.lang.reflect.AccessibleObject; +import java.lang.reflect.Member; +import java.lang.reflect.Modifier; +import java.util.Map; + +/** + * This class provides methods for setting up and restoring + * access in a Field. Java 2 provides access utilities for setting + * and getting fields that are non-public. This object provides + * coarse-grained access controls to allow access to private, protected + * and package protected members. This will apply to all classes + * and members. + * + * @author Luke Blanshard (blanshlu@netscape.net) + * @author Drew Davidson (drew@ognl.org) + * @version 15 October 1999 + */ +public class DefaultMemberAccess + implements MemberAccess +{ + + public boolean allowPrivateAccess = false; + public boolean allowProtectedAccess = false; + public boolean allowPackageProtectedAccess = false; + + /*=================================================================== + Constructors + ===================================================================*/ + public DefaultMemberAccess(boolean allowAllAccess) + { + this(allowAllAccess, allowAllAccess, allowAllAccess); + } + + public DefaultMemberAccess(boolean allowPrivateAccess, boolean allowProtectedAccess, boolean allowPackageProtectedAccess) + { + super(); + this.allowPrivateAccess = allowPrivateAccess; + this.allowProtectedAccess = allowProtectedAccess; + this.allowPackageProtectedAccess = allowPackageProtectedAccess; + } + + /*=================================================================== + Public methods + ===================================================================*/ + public boolean getAllowPrivateAccess() + { + return allowPrivateAccess; + } + + public void setAllowPrivateAccess(boolean value) + { + allowPrivateAccess = value; + } + + public boolean getAllowProtectedAccess() + { + return allowProtectedAccess; + } + + public void setAllowProtectedAccess(boolean value) + { + allowProtectedAccess = value; + } + + public boolean getAllowPackageProtectedAccess() + { + return allowPackageProtectedAccess; + } + + public void setAllowPackageProtectedAccess(boolean value) + { + allowPackageProtectedAccess = value; + } + + /*=================================================================== + MemberAccess interface + ===================================================================*/ + public Object setup(Map context, Object target, Member member, String propertyName) + { + Object result = null; + + if (isAccessible(context, target, member, propertyName)) { + AccessibleObject accessible = (AccessibleObject) member; + + if (!accessible.isAccessible()) { + result = Boolean.FALSE; + accessible.setAccessible(true); + } + } + return result; + } + + public void restore(Map context, Object target, Member member, String propertyName, Object state) + { + if (state != null) { + ((AccessibleObject) member).setAccessible(((Boolean) state).booleanValue()); + } + } + + /** + * Returns true if the given member is accessible or can be made accessible + * by this object. + * + * @param context the current execution context (not used). + * @param target the Object to test accessibility for (not used). + * @param member the Member to test accessibility for. + * @param propertyName the property to test accessibility for (not used). + * @return true if the member is accessible in the context, false otherwise. + */ + public boolean isAccessible(Map context, Object target, Member member, String propertyName) + { + int modifiers = member.getModifiers(); + boolean result = Modifier.isPublic(modifiers); + + if (!result) { + if (Modifier.isPrivate(modifiers)) { + result = getAllowPrivateAccess(); + } + else { + if (Modifier.isProtected(modifiers)) { + result = getAllowProtectedAccess(); + } + else { + result = getAllowPackageProtectedAccess(); + } + } + } + return result; + } +} diff --git a/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/ESClient.java b/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/ESClient.java new file mode 100644 index 000000000..f2a4dd6c6 --- /dev/null +++ b/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/ESClient.java @@ -0,0 +1,210 @@ +package com.wgzhao.datax.plugin.reader.elasticsearchreader; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.searchbox.action.Action; +import io.searchbox.client.JestClient; +import io.searchbox.client.JestClientFactory; +import io.searchbox.client.JestResult; +import io.searchbox.client.config.HttpClientConfig; +import io.searchbox.core.ClearScroll; +import io.searchbox.core.Search; +import io.searchbox.core.SearchResult; +import io.searchbox.core.SearchScroll; +import io.searchbox.indices.IndicesExists; +import io.searchbox.indices.aliases.AddAliasMapping; +import io.searchbox.indices.aliases.AliasMapping; +import io.searchbox.indices.aliases.GetAliases; +import io.searchbox.indices.aliases.ModifyAliases; +import io.searchbox.indices.aliases.RemoveAliasMapping; +import io.searchbox.params.SearchType; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * @author kesc + * @since 2020-04-14 10:32 + */ +public class ESClient +{ + private static final Logger log = LoggerFactory.getLogger(ESClient.class); + + private JestClient jestClient; + + public JestClient getClient() + { + return jestClient; + } + + public void createClient(String endpoint, + String user, + String passwd, + boolean multiThread, + int readTimeout, + boolean compression, + boolean discovery) + { + + JestClientFactory factory = new JestClientFactory(); + HttpClientConfig.Builder httpClientConfig = new HttpClientConfig.Builder(endpoint) + .multiThreaded(multiThread) + .connTimeout(30000) + .readTimeout(readTimeout) + .maxTotalConnection(200) + .requestCompressionEnabled(compression) + .discoveryEnabled(discovery) + .discoveryFrequency(5L, TimeUnit.MINUTES); + + if (!user.isEmpty() && !passwd.isEmpty()) { + httpClientConfig.defaultCredentials(user, passwd); + httpClientConfig.setPreemptiveAuth(new HttpHost(endpoint)); + } + + factory.setHttpClientConfig(httpClientConfig.build()); + + jestClient = factory.getObject(); + } + + public boolean indicesExists(String indexName) + throws Exception + { + boolean isIndicesExists = false; + JestResult rst = jestClient.execute(new IndicesExists.Builder(indexName).build()); + if (rst.isSucceeded()) { + isIndicesExists = true; + } + else { + switch (rst.getResponseCode()) { + case 404: + isIndicesExists = false; + break; + case 401: + // 无权访问 + default: + log.warn(rst.getErrorMessage()); + break; + } + } + return isIndicesExists; + } + + public SearchResult search(String query, + SearchType searchType, + String index, + String type, + String scroll, + Map headers) + throws IOException + { + Search.Builder searchBuilder = new Search.Builder(query) + .setSearchType(searchType) + .addIndex(index).addType(type).setHeader(headers); + if (StringUtils.isNotBlank(scroll)) { + searchBuilder.setParameter("scroll", scroll); + } + return jestClient.execute(searchBuilder.build()); + } + + public JestResult scroll(String scrollId, String scroll) + throws Exception + { + SearchScroll.Builder builder = new SearchScroll.Builder(scrollId, scroll); + return execute(builder.build()); + } + + public void clearScroll(String scrollId) + { + ClearScroll.Builder builder = new ClearScroll.Builder().addScrollId(scrollId); + try { + execute(builder.build()); + } + catch (Exception e) { + log.error(e.getMessage(), e); + } + } + + public JestResult execute(Action clientRequest) + throws Exception + { + JestResult rst; + rst = jestClient.execute(clientRequest); + if (!rst.isSucceeded()) { + log.warn(rst.getErrorMessage()); + } + return rst; + } + + public Integer getStatus(JestResult rst) + { + JsonObject jsonObject = rst.getJsonObject(); + if (jsonObject.has("status")) { + return jsonObject.get("status").getAsInt(); + } + return 600; + } + + public boolean isBulkResult(JestResult rst) + { + JsonObject jsonObject = rst.getJsonObject(); + return jsonObject.has("items"); + } + + public boolean alias(String indexname, String aliasname, boolean needClean) + throws IOException + { + GetAliases getAliases = new GetAliases.Builder().addIndex(aliasname).build(); + AliasMapping addAliasMapping = new AddAliasMapping.Builder(indexname, aliasname).build(); + JestResult rst = jestClient.execute(getAliases); + log.info(rst.getJsonString()); + List list = new ArrayList<>(); + if (rst.isSucceeded()) { + JsonParser jp = new JsonParser(); + JsonObject jo = (JsonObject) jp.parse(rst.getJsonString()); + for (Map.Entry entry : jo.entrySet()) { + String tindex = entry.getKey(); + if (indexname.equals(tindex)) { + continue; + } + AliasMapping m = new RemoveAliasMapping.Builder(tindex, aliasname).build(); + String s = new Gson().toJson(m.getData()); + log.info(s); + if (needClean) { + list.add(m); + } + } + } + + ModifyAliases modifyAliases = new ModifyAliases.Builder(addAliasMapping).addAlias(list).setParameter("master_timeout", "5m").build(); + rst = jestClient.execute(modifyAliases); + if (!rst.isSucceeded()) { + log.error(rst.getErrorMessage()); + return false; + } + return true; + } + + /** + * 关闭JestClient客户端 + */ + public void closeJestClient() + { + if (jestClient != null) { + try { + jestClient.close(); + } + catch (IOException e) { + log.warn("Failed to close es client:", e); + } + } + } +} diff --git a/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/ESReaderErrorCode.java b/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/ESReaderErrorCode.java new file mode 100644 index 000000000..0c9dd82fa --- /dev/null +++ b/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/ESReaderErrorCode.java @@ -0,0 +1,42 @@ +package com.wgzhao.datax.plugin.reader.elasticsearchreader; + +import com.wgzhao.datax.common.spi.ErrorCode; + +public enum ESReaderErrorCode + implements ErrorCode +{ + BAD_CONFIG_VALUE("ESReader-00", "您配置的值不合法."), + ES_SEARCH_ERROR("ESReader-01", "search出错."), + ES_INDEX_NOT_EXISTS("ESReader-02", "index不存在."), + UNKNOWN_DATA_TYPE("ESReader-03", "无法识别的数据类型."), + COLUMN_CANT_BE_EMPTY("ESReader-04", "column不能为空."), + ; + + private final String code; + private final String description; + + ESReaderErrorCode(String code, String description) + { + this.code = code; + this.description = description; + } + + @Override + public String getCode() + { + return this.code; + } + + @Override + public String getDescription() + { + return this.description; + } + + @Override + public String toString() + { + return String.format("Code:[%s], Description:[%s]. ", this.code, + this.description); + } +} \ No newline at end of file diff --git a/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/EsReader.java b/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/EsReader.java new file mode 100644 index 000000000..d1ea4e065 --- /dev/null +++ b/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/EsReader.java @@ -0,0 +1,372 @@ +package com.wgzhao.datax.plugin.reader.elasticsearchreader; + +import com.alibaba.fastjson.JSON; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.wgzhao.datax.common.element.BoolColumn; +import com.wgzhao.datax.common.element.BytesColumn; +import com.wgzhao.datax.common.element.Column; +import com.wgzhao.datax.common.element.DateColumn; +import com.wgzhao.datax.common.element.DoubleColumn; +import com.wgzhao.datax.common.element.LongColumn; +import com.wgzhao.datax.common.element.Record; +import com.wgzhao.datax.common.element.StringColumn; +import com.wgzhao.datax.common.exception.DataXException; +import com.wgzhao.datax.common.plugin.RecordSender; +import com.wgzhao.datax.common.spi.Reader; +import com.wgzhao.datax.common.statistics.PerfRecord; +import com.wgzhao.datax.common.statistics.PerfTrace; +import com.wgzhao.datax.common.util.Configuration; +import com.wgzhao.datax.plugin.reader.elasticsearchreader.gson.MapTypeAdapter; +import io.searchbox.client.JestResult; +import io.searchbox.core.SearchResult; +import io.searchbox.params.SearchType; +import ognl.Ognl; +import ognl.OgnlContext; +import ognl.OgnlException; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * @author kesc mail:492167585@qq.com + * @since 2020-04-14 10:32 + */ + +public class EsReader + extends Reader +{ + + public static class Job + extends Reader.Job + { + private static final Logger log = LoggerFactory.getLogger(Job.class); + private Configuration conf = null; + + @Override + public void prepare() + { + /* + * 注意:此方法仅执行一次。 + * 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。 + */ + ESClient esClient = new ESClient(); + esClient.createClient(Key.getEndpoint(conf), + Key.getAccessID(conf), + Key.getAccessKey(conf), + false, + 300000, + false, + false); + + String indexName = Key.getIndexName(conf); + String typeName = Key.getTypeName(conf); + log.info("index:[{}], type:[{}]", indexName, typeName); + try { + boolean isIndicesExists = esClient.indicesExists(indexName); + if (!isIndicesExists) { + throw new IOException(String.format("index[%s] not exist", indexName)); + } + } + catch (Exception ex) { + throw DataXException.asDataXException(ESReaderErrorCode.ES_INDEX_NOT_EXISTS, ex.toString()); + } + esClient.closeJestClient(); + } + + @Override + public void init() + { + this.conf = getPluginJobConf(); + } + + @Override + public List split(int adviceNumber) + { + List configurations = new ArrayList<>(); + List search = conf.getList(Key.SEARCH_KEY, Object.class); + for (Object query : search) { + Configuration clone = conf.clone(); + clone.set(Key.SEARCH_KEY, query); + configurations.add(clone); + } + return configurations; + } + + @Override + public void post() + { + // + } + + @Override + public void destroy() + { + log.info("============elasticsearch reader job destroy================="); + } + } + + public static class Task + extends Reader.Task + { + private static final Logger log = LoggerFactory.getLogger(Task.class); + private final OgnlContext ognlContext = new OgnlContext(null, null, new DefaultMemberAccess(true)); + ESClient esClient = null; + Gson gson = null; + private Configuration conf; + private String index; + private String type; + private SearchType searchType; + private Map headers; + private String query; + private String scroll; + private List column; + private String filter; + + @Override + public void prepare() + { + esClient.createClient(Key.getEndpoint(conf), + Key.getAccessID(conf), + Key.getAccessKey(conf), + Key.isMultiThread(conf), + Key.getTimeout(conf), + Key.isCompression(conf), + Key.isDiscovery(conf)); + } + + @Override + public void init() + { + this.conf = getPluginJobConf(); + this.esClient = new ESClient(); + this.gson = new GsonBuilder().registerTypeAdapterFactory(MapTypeAdapter.FACTORY).create(); + this.index = Key.getIndexName(conf); + this.type = Key.getTypeName(conf); + this.searchType = Key.getSearchType(conf); + this.headers = Key.getHeaders(conf); + this.query = Key.getQuery(conf); + this.scroll = Key.getScroll(conf); + this.filter = Key.getFilter(conf); + this.column = Key.getColumn(conf); + if (column == null || column.isEmpty()) { + throw DataXException.asDataXException(ESReaderErrorCode.COLUMN_CANT_BE_EMPTY, "column必须配置"); + } + if (column.size() == 1 && "*".equals(column.get(0))) { + // TODO get column from record + throw DataXException.asDataXException(ESReaderErrorCode.COLUMN_CANT_BE_EMPTY, "column暂不支持*配置"); + } + } + + @Override + public void startRead(RecordSender recordSender) + { + PerfTrace.getInstance().addTaskDetails(getTaskId(), index); + //search + PerfRecord queryPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.SQL_QUERY); + queryPerfRecord.start(); + SearchResult searchResult; + try { + searchResult = esClient.search(query, searchType, index, type, scroll, headers); + } + catch (Exception e) { + throw DataXException.asDataXException(ESReaderErrorCode.ES_SEARCH_ERROR, e); + } + if (!searchResult.isSucceeded()) { + throw DataXException.asDataXException(ESReaderErrorCode.ES_SEARCH_ERROR, searchResult.getResponseCode() + ":" + searchResult.getErrorMessage()); + } + queryPerfRecord.end(); + //transport records + PerfRecord allResultPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.RESULT_NEXT_ALL); + allResultPerfRecord.start(); + this.transportRecords(recordSender, searchResult); + allResultPerfRecord.end(); + //do scroll + JsonElement scrollIdElement = searchResult.getJsonObject().get("_scroll_id"); + if (scrollIdElement == null) { + return; + } + String scrollId = scrollIdElement.getAsString(); + log.debug("scroll id:{}", scrollId); + try { + boolean hasElement = true; + while (hasElement) { + queryPerfRecord.start(); + JestResult currScroll = esClient.scroll(scrollId, this.scroll); + queryPerfRecord.end(); + if (!currScroll.isSucceeded()) { + throw DataXException.asDataXException(ESReaderErrorCode.ES_SEARCH_ERROR, + String.format("scroll[id=%s] search error,code:%s,msg:%s", scrollId, currScroll.getResponseCode(), currScroll.getErrorMessage())); + } + allResultPerfRecord.start(); + hasElement = this.transportRecords(recordSender, parseSearchResult(currScroll)); + allResultPerfRecord.end(); + } + } + catch (DataXException dxe) { + throw dxe; + } + catch (Exception e) { + throw DataXException.asDataXException(ESReaderErrorCode.ES_SEARCH_ERROR, e); + } + finally { + esClient.clearScroll(scrollId); + } + } + + private SearchResult parseSearchResult(JestResult jestResult) + { + if (jestResult == null) { + return null; + } + SearchResult searchResult = new SearchResult(gson); + searchResult.setSucceeded(jestResult.isSucceeded()); + searchResult.setResponseCode(jestResult.getResponseCode()); + searchResult.setPathToResult(jestResult.getPathToResult()); + searchResult.setJsonString(jestResult.getJsonString()); + searchResult.setJsonObject(jestResult.getJsonObject()); + searchResult.setErrorMessage(jestResult.getErrorMessage()); + return searchResult; + } + + private Object getOgnlValue(Object expression, Map root, Object defaultValue) + { + try { + if (!(expression instanceof String)) { + return defaultValue; + } + Object value = Ognl.getValue(expression.toString(), ognlContext, root); + if (value == null) { + return defaultValue; + } + return value; + } + catch (OgnlException e) { + return defaultValue; + } + } + + private boolean filter(String filter, String deleteFilterKey, Map record) + { + if (StringUtils.isNotBlank(deleteFilterKey)) { + record.remove(deleteFilterKey); + } + if (StringUtils.isBlank(filter)) { + return true; + } + return (Boolean) getOgnlValue(filter, record, Boolean.TRUE); + } + + private boolean transportRecords(RecordSender recordSender, SearchResult result) + { + if (result == null) { + return false; + } + List sources = result.getSourceAsStringList(); + if (sources == null || sources.isEmpty()) { + return false; + } + for (String source : sources) { + this.transportOneRecord(recordSender, gson.fromJson(source, Map.class)); + } + return true; + } + + private void transportOneRecord(RecordSender recordSender, Map recordMap) + { + boolean allow = filter(this.filter, null, recordMap); + if (allow && recordMap.entrySet().stream().anyMatch(x -> x.getValue() != null)) { + Record record = recordSender.createRecord(); + boolean hasDirty = false; + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : recordMap.entrySet()) { + try { + Object o = recordMap.get(entry.getKey()); + record.addColumn(getColumn(o)); + } + catch (Exception e) { + hasDirty = true; + sb.append(e); + } + } + if (hasDirty) { + getTaskPluginCollector().collectDirtyRecord(record, sb.toString()); + } + recordSender.sendToWriter(record); + } + } + + private Column getColumn(Object value) + { + Column col; + if (value == null) { + col = new StringColumn(); + } + else if (value instanceof String) { + col = new StringColumn((String) value); + } + else if (value instanceof Integer) { + col = new LongColumn(((Integer) value).longValue()); + } + else if (value instanceof Long) { + col = new LongColumn((Long) value); + } + else if (value instanceof Byte) { + col = new LongColumn(((Byte) value).longValue()); + } + else if (value instanceof Short) { + col = new LongColumn(((Short) value).longValue()); + } + else if (value instanceof Double) { + col = new DoubleColumn(BigDecimal.valueOf((Double) value)); + } + else if (value instanceof Float) { + col = new DoubleColumn(BigDecimal.valueOf(((Float) value).doubleValue())); + } + else if (value instanceof Date) { + col = new DateColumn((Date) value); + } + else if (value instanceof Boolean) { + col = new BoolColumn((Boolean) value); + } + else if (value instanceof byte[]) { + col = new BytesColumn((byte[]) value); + } + else if (value instanceof List) { + col = new StringColumn(JSON.toJSONString(value)); + } + else if (value instanceof Map) { + col = new StringColumn(JSON.toJSONString(value)); + } + else if (value instanceof Array) { + col = new StringColumn(JSON.toJSONString(value)); + } + else { + throw DataXException.asDataXException(ESReaderErrorCode.UNKNOWN_DATA_TYPE, "type:" + value.getClass().getName()); + } + return col; + } + + @Override + public void post() + { + // + } + + @Override + public void destroy() + { + log.debug("============elasticsearch reader taskGroup[{}] taskId[{}] destroy=================", getTaskGroupId(), getTaskId()); + esClient.closeJestClient(); + } + } +} diff --git a/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/Key.java b/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/Key.java new file mode 100644 index 000000000..37037e3fc --- /dev/null +++ b/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/Key.java @@ -0,0 +1,115 @@ +package com.wgzhao.datax.plugin.reader.elasticsearchreader; + +import com.wgzhao.datax.common.util.Configuration; +import io.searchbox.params.SearchType; +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public final class Key +{ + // ---------------------------------------- + // 类型定义 主键字段定义 + // ---------------------------------------- + + public static final String SEARCH_KEY = "search"; + + private Key() {} + + public static SearchType getSearchType(Configuration conf) + { + String searchType = conf.getString("searchType", SearchType.DFS_QUERY_THEN_FETCH.toString()); + return SearchType.valueOf(searchType.toUpperCase()); + } + + public static String getEndpoint(Configuration conf) + { + return conf.getNecessaryValue("endpoint", ESReaderErrorCode.BAD_CONFIG_VALUE); + } + + public static String getAccessID(Configuration conf) + { + return conf.getString("accessId", ""); + } + + public static String getAccessKey(Configuration conf) + { + return conf.getString("accessKey", ""); + } + + public static int getBatchSize(Configuration conf) + { + return conf.getInt("batchSize", 1000); + } + + public static int getTrySize(Configuration conf) + { + return conf.getInt("trySize", 30); + } + + public static int getTimeout(Configuration conf) + { + return conf.getInt("timeout", 60) * 1000; + } + + public static boolean isCleanup(Configuration conf) + { + return conf.getBool("cleanup", false); + } + + public static boolean isDiscovery(Configuration conf) + { + return conf.getBool("discovery", false); + } + + public static boolean isCompression(Configuration conf) + { + return conf.getBool("compression", true); + } + + public static boolean isMultiThread(Configuration conf) + { + return conf.getBool("multiThread", true); + } + + public static String getIndexName(Configuration conf) + { + return conf.getNecessaryValue("index", ESReaderErrorCode.BAD_CONFIG_VALUE); + } + + public static String getTypeName(Configuration conf) + { + String indexType = conf.getString("indexType"); + if (StringUtils.isBlank(indexType)) { + indexType = conf.getString("type", getIndexName(conf)); + } + return indexType; + } + + public static Map getHeaders(Configuration conf) + { + return conf.getMap("headers", new HashMap<>()); + } + + public static String getQuery(Configuration conf) + { + return conf.getConfiguration(Key.SEARCH_KEY).toString(); + } + + public static String getScroll(Configuration conf) + { + return conf.getString("scroll"); + } + + public static List getColumn(Configuration conf) + { + return conf.getList("column", String.class); + } + + public static String getFilter(Configuration conf) + { + return conf.getString("filter", null); + } +} diff --git a/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/gson/MapTypeAdapter.java b/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/gson/MapTypeAdapter.java new file mode 100644 index 000000000..42c578316 --- /dev/null +++ b/plugin/reader/elasticsearchreader/src/main/java/com/wgzhao/datax/plugin/reader/elasticsearchreader/gson/MapTypeAdapter.java @@ -0,0 +1,115 @@ +package com.wgzhao.datax.plugin.reader.elasticsearchreader.gson; + +import com.google.gson.Gson; +import com.google.gson.TypeAdapter; +import com.google.gson.TypeAdapterFactory; +import com.google.gson.internal.LinkedTreeMap; +import com.google.gson.internal.bind.ObjectTypeAdapter; +import com.google.gson.reflect.TypeToken; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonToken; +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * @author kesc + * @since 2020-10-13 16:09 + */ +public class MapTypeAdapter + extends TypeAdapter +{ + public static final TypeAdapterFactory FACTORY = new TypeAdapterFactory() + { + @SuppressWarnings("unchecked") + @Override + public TypeAdapter create(Gson gson, TypeToken type) + { + if (type.getRawType() == Map.class) { + return (TypeAdapter) new MapTypeAdapter(gson); + } + return null; + } + }; + + private final Gson gson; + + MapTypeAdapter(Gson gson) + { + this.gson = gson; + } + + @Override + public Object read(JsonReader in) + throws IOException + { + JsonToken token = in.peek(); + switch (token) { + case BEGIN_ARRAY: + List list = new ArrayList<>(); + in.beginArray(); + while (in.hasNext()) { + list.add(read(in)); + } + in.endArray(); + return list; + + case BEGIN_OBJECT: + Map map = new LinkedTreeMap<>(); + in.beginObject(); + while (in.hasNext()) { + map.put(in.nextName(), read(in)); + } + in.endObject(); + return map; + + case STRING: + return in.nextString(); + + case NUMBER: + //改写数字的处理逻辑,将数字值分为整型与浮点型 + String numberStr = in.nextString(); + if (numberStr.contains(".") || numberStr.contains("e") + || numberStr.contains("E")) { + return Double.parseDouble(numberStr); + } + long value = Long.parseLong(numberStr); + if (value <= Integer.MAX_VALUE) { + return (int) value; + } + return value; + + case BOOLEAN: + return in.nextBoolean(); + + case NULL: + in.nextNull(); + return null; + + default: + throw new IllegalStateException(); + } + } + + @Override + public void write(JsonWriter out, Object value) + throws IOException + { + if (value == null) { + out.nullValue(); + return; + } + + TypeAdapter typeAdapter = gson.getAdapter((Class) value.getClass()); + if (typeAdapter instanceof ObjectTypeAdapter) { + out.beginObject(); + out.endObject(); + return; + } + + typeAdapter.write(out, value); + } +} \ No newline at end of file diff --git a/plugin/reader/elasticsearchreader/src/main/resources/plugin.json b/plugin/reader/elasticsearchreader/src/main/resources/plugin.json new file mode 100644 index 000000000..73d4274eb --- /dev/null +++ b/plugin/reader/elasticsearchreader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "elasticsearchreader", + "class": "com.wgzhao.datax.plugin.reader.elasticsearchreader.EsReader", + "description": "Retrieve data from ElasticSearch", + "developer": "kesc" +} \ No newline at end of file diff --git a/plugin/reader/elasticsearchreader/src/main/resources/plugin_job_template.json b/plugin/reader/elasticsearchreader/src/main/resources/plugin_job_template.json new file mode 100644 index 000000000..977aa4e0e --- /dev/null +++ b/plugin/reader/elasticsearchreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,17 @@ +{ + "name": "elasticsearchreader", + "parameter": { + "endpoint": "", + "index": "", + "type": "", + "searchType": "dfs_query_then_fetch", + "headers": {}, + "scroll": "3m", + "search": [ + { + "size": 100 + } + ], + "column": [] + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index f31551766..3d4ce934b 100644 --- a/pom.xml +++ b/pom.xml @@ -92,6 +92,7 @@ plugin/reader/cassandrareader plugin/reader/clickhousereader plugin/reader/dbffilereader + plugin/reader/elasticsearchreader plugin/reader/ftpreader plugin/reader/hbase11xreader plugin/reader/hbase11xsqlreader