Skip to content

Commit

Permalink
Merge branch 'master' into fix/578-accessed-date
Browse files Browse the repository at this point in the history
# Conflicts:
#	docs/source/admin/fs/elasticsearch.rst
  • Loading branch information
dadoonet committed Jul 30, 2018
2 parents aa86456 + 775cf15 commit 447b186
Show file tree
Hide file tree
Showing 18 changed files with 88 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fr.pilato.elasticsearch.crawler.fs.cli;/*
/*
* Licensed to David Pilato (the "Author") under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -17,6 +17,8 @@
* under the License.
*/

package fr.pilato.elasticsearch.crawler.fs.cli;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import fr.pilato.elasticsearch.crawler.fs.FsCrawlerImpl;
Expand Down
2 changes: 1 addition & 1 deletion cli/src/test/resources/legacy/2_0/david.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
"bulk_size" : 100,
"flush_interval" : "5s"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ public class FsCrawlerImpl {

@Deprecated
public static final String INDEX_TYPE_FOLDER = "folder";
@Deprecated
public static final String INDEX_TYPE_DOC = "doc";

private static final Logger logger = LogManager.getLogger(FsCrawlerImpl.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ public abstract class FsParser implements Runnable {
private final Integer loop;
private final MessageDigest messageDigest;

/**
* This is a temporary value we need to support both v5 and newer versions.
* V5 does not allow a type named _doc but V6 recommends using it.
*/
private final String typeName;

private ScanStatistic stats;
private final AtomicInteger runNumber = new AtomicInteger(0);
private static final Object semaphore = new Object();
Expand All @@ -105,6 +111,8 @@ public FsParser(FsSettings fsSettings, Path config, ElasticsearchClientManager e
} else {
messageDigest = null;
}

typeName = esClientManager.client().getDefaultTypeName();
}

protected abstract FileAbstractor buildFileAbstractor();
Expand Down Expand Up @@ -562,25 +570,25 @@ private void removeEsDirectoryRecursively(final String path) throws Exception {
* Add to bulk an IndexRequest in JSon format
*/
void esIndex(BulkProcessor bulkProcessor, String index, String id, String json, String pipeline) {
logger.debug("Indexing {}/doc/{}?pipeline={}", index, id, pipeline);
logger.debug("Indexing {}/{}/{}?pipeline={}", index, typeName, id, pipeline);
logger.trace("JSon indexed : {}", json);

if (!closed) {
bulkProcessor.add(new IndexRequest(index, "doc", id).source(json, XContentType.JSON).setPipeline(pipeline));
bulkProcessor.add(new IndexRequest(index, typeName, id).source(json, XContentType.JSON).setPipeline(pipeline));
} else {
logger.warn("trying to add new file while closing crawler. Document [{}]/[doc]/[{}] has been ignored", index, id);
logger.warn("trying to add new file while closing crawler. Document [{}]/[{}]/[{}] has been ignored", index, typeName, id);
}
}

/**
* Add to bulk a DeleteRequest
*/
void esDelete(String index, String id) {
logger.debug("Deleting {}/doc/{}", index, id);
logger.debug("Deleting {}/{}/{}", index, typeName, id);
if (!closed) {
esClientManager.bulkProcessorDoc().add(new DeleteRequest(index, "doc", id));
esClientManager.bulkProcessorDoc().add(new DeleteRequest(index, typeName, id));
} else {
logger.warn("trying to remove a file while closing crawler. Document [{}]/[doc]/[{}] has been ignored", index, id);
logger.warn("trying to remove a file while closing crawler. Document [{}]/[{}]/[{}] has been ignored", index, typeName, id);
}
}

Expand Down
7 changes: 6 additions & 1 deletion docs/source/admin/fs/elasticsearch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ Or fall back to the command line:
- ``6/_settings.json``: for elasticsearch 6.x series document index settings
- ``6/_settings_folder.json``: for elasticsearch 6.x series folder index settings

.. note::

For versions before 6.x series, the type of the document is ``doc``.
From 6.x, the type of the document is ``_doc``.

Creating your own mapping (analyzers)
"""""""""""""""""""""""""""""""""""""

Expand Down Expand Up @@ -138,7 +143,7 @@ The following example uses a ``french`` analyzer to index the
}
},
"mappings": {
"doc": {
"_doc": {
"dynamic_templates": [
{
"raw_as_text": {
Expand Down
2 changes: 1 addition & 1 deletion docs/source/admin/fs/local-fs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ JSon document. This field is not indexed. Default mapping for
.. code:: json
{
"doc" : {
"_doc" : {
"properties" : {
"attachment" : {
"type" : "binary",
Expand Down
2 changes: 1 addition & 1 deletion docs/source/admin/fs/rest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ You will get back your document as it has been stored by elasticsearch:
{
"_index" : "fscrawler-rest-tests_doc",
"_type" : "doc",
"_type" : "_doc",
"_id" : "dd18bf3a8ea2a3e53e2661c7fb53534",
"_version" : 1,
"found" : true,
Expand Down
7 changes: 5 additions & 2 deletions docs/source/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ to upgrade elasticsearch.
This procedure only applies if you did not set previously
``elasticsearch.type`` setting (default value was ``doc``). If you did,
then you also need to reindex the existing documents to the default
``doc`` type as per elasticsearch 6.0:
``_doc`` type as per elasticsearch 6.x (or ``doc`` for 5.x series):

::

Expand All @@ -161,7 +161,7 @@ then you also need to reindex the existing documents to the default
},
"dest": {
"index": "job_name",
"type": "doc"
"type": "_doc"
}
}
# Remove old type data from job_name index
Expand Down Expand Up @@ -252,3 +252,6 @@ Then restore old data:
The default mapping changed for FSCrawler for ``meta.raw.*`` fields.
Might be better to reindex your data.

- For new indices, FSCrawler now uses ``_doc`` as the default type name for clusters
running elasticsearch 6.x or superior.

Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,27 @@ public class ElasticsearchClient extends RestHighLevelClient {
private static final Logger logger = LogManager.getLogger(ElasticsearchClient.class);

private boolean INGEST_SUPPORT = true;
/**
* Type name for Elasticsearch versions < 6.0
* @deprecated Will be removed with Elasticsearch V8
*/
@Deprecated
private static final String INDEX_TYPE_DOC_V5 = "doc";
/**
* Type name for Elasticsearch versions >= 6.0
* @deprecated Will be removed with Elasticsearch V8
*/
@Deprecated
private static final String INDEX_TYPE_DOC = "_doc";
/**
* Type name to use. It depends on elasticsearch version.
* @deprecated Will be removed with Elasticsearch V8
*/
@Deprecated
private String defaultTypeName = INDEX_TYPE_DOC;
private Version VERSION = null;

public ElasticsearchClient(RestClientBuilder client) throws IOException {
public ElasticsearchClient(RestClientBuilder client) {
super(client);
}

Expand Down Expand Up @@ -280,6 +298,14 @@ public void setElasticsearchBehavior() throws IOException {
INGEST_SUPPORT = false;
logger.debug("Using elasticsearch < 5, so we can't use ingest node feature");
}

// With elasticsearch 6.x, we can use _doc as the default type name
if (VERSION.onOrAfter(Version.V_6_0_0)) {
logger.debug("Using elasticsearch >= 6, so we can use {} as the default type name", defaultTypeName);
} else {
defaultTypeName = INDEX_TYPE_DOC_V5;
logger.debug("Using elasticsearch < 6, so we use {} as the default type name", defaultTypeName);
}
}
}

Expand All @@ -291,6 +317,10 @@ public Version getVersion() {
return VERSION;
}

public String getDefaultTypeName() {
return defaultTypeName;
}

public static Node decodeCloudId(String cloudId) {
// 1. Ignore anything before `:`.
String id = cloudId.substring(cloudId.indexOf(':')+1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public abstract class AbstractITCase extends AbstractFSCrawlerTestCase {

private static ElasticsearchContainer container;
private static RestClient esRestClient;
static String typeName;

@BeforeClass
public static void createFsCrawlerJobDir() throws IOException {
Expand Down Expand Up @@ -279,6 +280,8 @@ public static void startElasticsearchRestClient() throws IOException {

// We set what will be elasticsearch behavior as it depends on the cluster version
elasticsearchClient.setElasticsearchBehavior();

typeName = elasticsearchClient.getDefaultTypeName();
}

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void test_filename_as_id() throws Exception {

assertThat("Document should exists with [roottxtfile.txt] id...", awaitBusy(() -> {
try {
return elasticsearchClient.exists(new GetRequest(getCrawlerName(), "doc", "roottxtfile.txt"));
return elasticsearchClient.exists(new GetRequest(getCrawlerName(), typeName, "roottxtfile.txt"));
} catch (IOException e) {
return false;
}
Expand All @@ -70,14 +70,14 @@ public void test_remove_deleted_with_filename_as_id() throws Exception {

assertThat("Document should exists with [id1.txt] id...", awaitBusy(() -> {
try {
return elasticsearchClient.exists(new GetRequest(getCrawlerName(), "doc", "id1.txt"));
return elasticsearchClient.exists(new GetRequest(getCrawlerName(), typeName, "id1.txt"));
} catch (IOException e) {
return false;
}
}), equalTo(true));
assertThat("Document should exists with [id2.txt] id...", awaitBusy(() -> {
try {
return elasticsearchClient.exists(new GetRequest(getCrawlerName(), "doc", "id2.txt"));
return elasticsearchClient.exists(new GetRequest(getCrawlerName(), typeName, "id2.txt"));
} catch (IOException e) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public void test_mapping() throws Exception {

// This will cause an Elasticsearch Exception as the String is not a Date
// If the mapping is incorrect
elasticsearchClient.index(new IndexRequest(getCrawlerName(), "doc", "1")
elasticsearchClient.index(new IndexRequest(getCrawlerName(), typeName, "1")
.source(json1, XContentType.JSON)
);
elasticsearchClient.index(new IndexRequest(getCrawlerName(), "doc", "2")
elasticsearchClient.index(new IndexRequest(getCrawlerName(), typeName, "2")
.source(json2, XContentType.JSON)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private void checkDocVersions(SearchResponse response, long maxVersion) {
for (SearchHit hit : response.getHits().getHits()) {
// Read the document. This is needed since 5.0 as search does not return the _version field
try {
GetResponse getHit = elasticsearchClient.get(new GetRequest(hit.getIndex(), "doc", hit.getId()));
GetResponse getHit = elasticsearchClient.get(new GetRequest(hit.getIndex(), typeName, hit.getId()));
assertThat(getHit.getVersion(), lessThanOrEqualTo(maxVersion));
} catch (IOException e) {
fail("We got an IOException: " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static void start(FsSettings settings, ElasticsearchClientManager elastic
final ResourceConfig rc = new ResourceConfig()
.registerInstances(
new ServerStatusApi(elasticsearchClientManager.client(), settings),
new UploadApi(settings, elasticsearchClientManager.bulkProcessorDoc()))
new UploadApi(settings, elasticsearchClientManager))
.register(MultiPartFeature.class)
.register(RestJsonProvider.class)
.register(JacksonFeature.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

import fr.pilato.elasticsearch.crawler.fs.beans.Doc;
import fr.pilato.elasticsearch.crawler.fs.beans.DocParser;
import fr.pilato.elasticsearch.crawler.fs.client.ElasticsearchClientManager;
import fr.pilato.elasticsearch.crawler.fs.framework.SignTool;
import fr.pilato.elasticsearch.crawler.fs.settings.Elasticsearch;
import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings;
import fr.pilato.elasticsearch.crawler.fs.tika.TikaDocParser;
import org.apache.commons.io.FilenameUtils;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.xcontent.XContentType;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
Expand All @@ -51,14 +51,14 @@
@Path("/_upload")
public class UploadApi extends RestApi {

private final BulkProcessor bulkProcessor;
private final ElasticsearchClientManager elasticsearchClientManager;
private final FsSettings settings;
private final MessageDigest messageDigest;
private static final TimeBasedUUIDGenerator TIME_UUID_GENERATOR = new TimeBasedUUIDGenerator();

UploadApi(FsSettings settings, BulkProcessor bulkProcessor) {
UploadApi(FsSettings settings, ElasticsearchClientManager elasticsearchClientManager) {
this.settings = settings;
this.bulkProcessor = bulkProcessor;
this.elasticsearchClientManager = elasticsearchClientManager;
// Create MessageDigest instance
try {
messageDigest = settings.getFs().getChecksum() == null ?
Expand Down Expand Up @@ -110,9 +110,13 @@ public UploadResponse post(
logger.debug("Simulate mode is on, so we skip sending document [{}] to elasticsearch.", filename);
} else {
logger.debug("Sending document [{}] to elasticsearch.", filename);
bulkProcessor.add(new org.elasticsearch.action.index.IndexRequest(settings.getElasticsearch().getIndex(), "doc", id)
.setPipeline(settings.getElasticsearch().getPipeline())
.source(DocParser.toJson(doc), XContentType.JSON));
elasticsearchClientManager.bulkProcessorDoc().add(
new org.elasticsearch.action.index.IndexRequest(
settings.getElasticsearch().getIndex(),
elasticsearchClientManager.client().getDefaultTypeName(),
id)
.setPipeline(settings.getElasticsearch().getPipeline())
.source(DocParser.toJson(doc), XContentType.JSON));
// Elasticsearch entity coordinates (we use the first node address)
Elasticsearch.Node node = settings.getElasticsearch().getNodes().get(0);
if (node.getCloudId() != null) {
Expand All @@ -121,7 +125,7 @@ public UploadResponse post(
url = buildUrl(
node.getScheme().toLowerCase(), node.getHost(), node.getPort()) + "/" +
settings.getElasticsearch().getIndex() + "/" +
"doc" + "/" +
elasticsearchClientManager.client().getDefaultTypeName() + "/" +
id;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
}
},
"mappings": {
"doc": {
"_doc": {
"dynamic_templates": [
{
"raw_as_text": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
}
},
"mappings": {
"doc": {
"_doc": {
"properties" : {
"real" : {
"type" : "keyword",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ public void fsSettingsForDocVersion6() throws Exception {
" }\n" +
" },\n" +
" \"mappings\": {\n" +
" \"doc\": {\n" +
" \"_doc\": {\n" +
" \"dynamic_templates\": [\n" +
" {\n" +
" \"raw_as_text\": {\n" +
Expand Down Expand Up @@ -830,7 +830,7 @@ public void fsSettingsForFolderVersion6() throws Exception {
" }\n" +
" },\n" +
" \"mappings\": {\n" +
" \"doc\": {\n" +
" \"_doc\": {\n" +
" \"properties\" : {\n" +
" \"real\" : {\n" +
" \"type\" : \"keyword\",\n" +
Expand Down

0 comments on commit 447b186

Please sign in to comment.