Skip to content

Commit

Permalink
Uncommitted mapping updates should not efect existing indices (elasti…
Browse files Browse the repository at this point in the history
…c#21306)

When processing a mapping updates, the master current creates an `IndexService` and uses its mapper service to do the hard work. However, if the master is also a data node and it already has an instance of `IndexService`, we currently reuse the the `MapperService` of that instance. Sadly, since mapping updates are change the in memory objects, this means that a mapping change that can rejected later on during cluster state publishing will leave a side effect on the index in question, bypassing the cluster state safety mechanism.

This commit removes this optimization and replaces the `IndexService` creation with a direct creation of a `MapperService`. 

Also, this fixes an issue multiple from multiple shards for the same field caused unneeded cluster state publishing as the current code always created a new cluster state.

This were discovered while researching elastic#21189
  • Loading branch information
bleskes authored Nov 15, 2016
1 parent ad94bea commit 6d9af2f
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class PutMappingClusterStateUpdateRequest extends IndicesClusterStateUpda

private boolean updateAllTypes = false;

PutMappingClusterStateUpdateRequest() {
public PutMappingClusterStateUpdateRequest() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package org.elasticsearch.cluster.metadata;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateTaskListener;
Expand All @@ -34,7 +34,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -51,10 +50,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Service responsible for submitting mapping changes
*/
Expand Down Expand Up @@ -215,62 +212,58 @@ class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterSt
@Override
public BatchResult<PutMappingClusterStateUpdateRequest> execute(ClusterState currentState,
List<PutMappingClusterStateUpdateRequest> tasks) throws Exception {
Set<Index> indicesToClose = new HashSet<>();
Map<Index, MapperService> indexMapperServices = new HashMap<>();
BatchResult.Builder<PutMappingClusterStateUpdateRequest> builder = BatchResult.builder();
try {
// precreate incoming indices;
for (PutMappingClusterStateUpdateRequest request : tasks) {
try {
for (Index index : request.indices()) {
final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index);
if (indicesService.hasIndex(indexMetaData.getIndex()) == false) {
// if the index does not exists we create it once, add all types to the mapper service and
// close it later once we are done with mapping update
indicesToClose.add(indexMetaData.getIndex());
IndexService indexService = indicesService.createIndex(indexMetaData, Collections.emptyList());
if (indexMapperServices.containsKey(indexMetaData.getIndex()) == false) {
MapperService mapperService = indicesService.createIndexMapperService(indexMetaData);
indexMapperServices.put(index, mapperService);
// add mappings for all types, we need them for cross-type validation
for (ObjectCursor<MappingMetaData> mapping : indexMetaData.getMappings().values()) {
indexService.mapperService().merge(mapping.value.type(), mapping.value.source(),
mapperService.merge(mapping.value.type(), mapping.value.source(),
MapperService.MergeReason.MAPPING_RECOVERY, request.updateAllTypes());
}
}
}
currentState = applyRequest(currentState, request);
currentState = applyRequest(currentState, request, indexMapperServices);
builder.success(request);
} catch (Exception e) {
builder.failure(request, e);
}
}
return builder.build(currentState);
} finally {
for (Index index : indicesToClose) {
indicesService.removeIndex(index, "created for mapping processing");
}
IOUtils.close(indexMapperServices.values());
}
}

private ClusterState applyRequest(ClusterState currentState, PutMappingClusterStateUpdateRequest request) throws IOException {
private ClusterState applyRequest(ClusterState currentState, PutMappingClusterStateUpdateRequest request,
Map<Index, MapperService> indexMapperServices) throws IOException {
String mappingType = request.type();
CompressedXContent mappingUpdateSource = new CompressedXContent(request.source());
final MetaData metaData = currentState.metaData();
final List<Tuple<IndexService, IndexMetaData>> updateList = new ArrayList<>();
final List<IndexMetaData> updateList = new ArrayList<>();
for (Index index : request.indices()) {
IndexService indexService = indicesService.indexServiceSafe(index);
MapperService mapperService = indexMapperServices.get(index);
// IMPORTANT: always get the metadata from the state since it get's batched
// and if we pull it from the indexService we might miss an update etc.
final IndexMetaData indexMetaData = currentState.getMetaData().getIndexSafe(index);

// this is paranoia... just to be sure we use the exact same indexService and metadata tuple on the update that
// this is paranoia... just to be sure we use the exact same metadata tuple on the update that
// we used for the validation, it makes this mechanism little less scary (a little)
updateList.add(new Tuple<>(indexService, indexMetaData));
updateList.add(indexMetaData);
// try and parse it (no need to add it here) so we can bail early in case of parsing exception
DocumentMapper newMapper;
DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type());
DocumentMapper existingMapper = mapperService.documentMapper(request.type());
if (MapperService.DEFAULT_MAPPING.equals(request.type())) {
// _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default
newMapper = indexService.mapperService().parse(request.type(), mappingUpdateSource, false);
newMapper = mapperService.parse(request.type(), mappingUpdateSource, false);
} else {
newMapper = indexService.mapperService().parse(request.type(), mappingUpdateSource, existingMapper == null);
newMapper = mapperService.parse(request.type(), mappingUpdateSource, existingMapper == null);
if (existingMapper != null) {
// first, simulate: just call merge and ignore the result
existingMapper.merge(newMapper.mapping(), request.updateAllTypes());
Expand All @@ -286,9 +279,9 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
for (ObjectCursor<MappingMetaData> mapping : indexMetaData.getMappings().values()) {
String parentType = newMapper.parentFieldMapper().type();
if (parentType.equals(mapping.value.type()) &&
indexService.mapperService().getParentTypes().contains(parentType) == false) {
mapperService.getParentTypes().contains(parentType) == false) {
throw new IllegalArgumentException("can't add a _parent field that points to an " +
"already existing type, that isn't already a parent");
"already existing type, that isn't already a parent");
}
}
}
Expand All @@ -306,24 +299,25 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
throw new InvalidTypeNameException("Document mapping type name can't start with '_', found: [" + mappingType + "]");
}
MetaData.Builder builder = MetaData.builder(metaData);
for (Tuple<IndexService, IndexMetaData> toUpdate : updateList) {
boolean updated = false;
for (IndexMetaData indexMetaData : updateList) {
// do the actual merge here on the master, and update the mapping source
// we use the exact same indexService and metadata we used to validate above here to actually apply the update
final IndexService indexService = toUpdate.v1();
final IndexMetaData indexMetaData = toUpdate.v2();
final Index index = indexMetaData.getIndex();
final MapperService mapperService = indexMapperServices.get(index);
CompressedXContent existingSource = null;
DocumentMapper existingMapper = indexService.mapperService().documentMapper(mappingType);
DocumentMapper existingMapper = mapperService.documentMapper(mappingType);
if (existingMapper != null) {
existingSource = existingMapper.mappingSource();
}
DocumentMapper mergedMapper = indexService.mapperService().merge(mappingType, mappingUpdateSource, MapperService.MergeReason.MAPPING_UPDATE, request.updateAllTypes());
DocumentMapper mergedMapper = mapperService.merge(mappingType, mappingUpdateSource, MapperService.MergeReason.MAPPING_UPDATE, request.updateAllTypes());
CompressedXContent updatedSource = mergedMapper.mappingSource();

if (existingSource != null) {
if (existingSource.equals(updatedSource)) {
// same source, no changes, ignore it
} else {
updated = true;
// use the merged mapping source
if (logger.isDebugEnabled()) {
logger.debug("{} update_mapping [{}] with source [{}]", index, mergedMapper.type(), updatedSource);
Expand All @@ -333,6 +327,7 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt

}
} else {
updated = true;
if (logger.isDebugEnabled()) {
logger.debug("{} create_mapping [{}] with source [{}]", index, mappingType, updatedSource);
} else if (logger.isInfoEnabled()) {
Expand All @@ -343,13 +338,16 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(indexMetaData);
// Mapping updates on a single type may have side-effects on other types so we need to
// update mapping metadata on all types
for (DocumentMapper mapper : indexService.mapperService().docMappers(true)) {
for (DocumentMapper mapper : mapperService.docMappers(true)) {
indexMetaDataBuilder.putMapping(new MappingMetaData(mapper.mappingSource()));
}
builder.put(indexMetaDataBuilder);
}

return ClusterState.builder(currentState).metaData(builder).build();
if (updated) {
return ClusterState.builder(currentState).metaData(builder).build();
} else {
return currentState;
}
}

@Override
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.index.cache.query.IndexQueryCache;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
Expand Down Expand Up @@ -359,6 +360,16 @@ public IndexService newIndexService(NodeEnvironment environment, IndexService.Sh
searchOperationListeners, indexOperationListeners);
}

/**
* creates a new mapper service to do administrative work like mapping updates. This *should not* be used for document parsing.
* doing so will result in an exception.
*/
public MapperService newIndexMapperService(MapperRegistry mapperRegistry) throws IOException {
return new MapperService(indexSettings, analysisRegistry.build(indexSettings),
new SimilarityService(indexSettings, similarities), mapperRegistry,
() -> { throw new UnsupportedOperationException("no index query shard context available"); });
}

/**
* Forces a certain query cache to use instead of the default one. If this is set
* and query caching is not disabled with {@code index.queries.cache.enabled}, then
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
public class IndexService extends AbstractIndexComponent implements IndicesClusterStateService.AllocatedIndex<IndexShard> {

private final IndexEventListener eventListener;
private final IndexAnalyzers indexAnalyzers;
private final IndexFieldDataService indexFieldData;
private final BitsetFilterCache bitsetFilterCache;
private final NodeEnvironment nodeEnv;
Expand Down Expand Up @@ -142,12 +141,11 @@ public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
List<IndexingOperationListener> indexingOperationListeners) throws IOException {
super(indexSettings);
this.indexSettings = indexSettings;
this.indexAnalyzers = registry.build(indexSettings);
this.similarityService = similarityService;
this.mapperService = new MapperService(indexSettings, indexAnalyzers, similarityService, mapperRegistry,
this.mapperService = new MapperService(indexSettings, registry.build(indexSettings), similarityService, mapperRegistry,
// we parse all percolator queries as they would be parsed on shard 0
() -> newQueryShardContext(0, null, () -> {
throw new IllegalArgumentException("Percolator queries are not allowed to use the curent timestamp");
throw new IllegalArgumentException("Percolator queries are not allowed to use the current timestamp");
}));
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService);
this.shardStoreDeleter = shardStoreDeleter;
Expand Down Expand Up @@ -225,7 +223,7 @@ public IndexFieldDataService fieldData() {
}

public IndexAnalyzers getIndexAnalyzers() {
return this.indexAnalyzers;
return this.mapperService.getIndexAnalyzers();
}

public MapperService mapperService() {
Expand All @@ -249,7 +247,7 @@ public synchronized void close(final String reason, boolean delete) throws IOExc
}
}
} finally {
IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, indexAnalyzers, refreshTask, fsyncTask);
IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, mapperService, refreshTask, fsyncTask);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.indices.TypeMissingException;
import org.elasticsearch.indices.mapper.MapperRegistry;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -62,7 +63,7 @@
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;

public class MapperService extends AbstractIndexComponent {
public class MapperService extends AbstractIndexComponent implements Closeable {

/**
* The reason why a mapping is being merged.
Expand Down Expand Up @@ -624,6 +625,11 @@ public Set<String> getParentTypes() {
return parentTypes;
}

@Override
public void close() throws IOException {
indexAnalyzers.close();
}

/**
* @return Whether a field is a metadata field.
*/
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.indices;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
Expand Down Expand Up @@ -430,6 +429,21 @@ private synchronized IndexService createIndexService(final String reason, IndexM
indicesQueriesRegistry, clusterService, client, indicesQueryCache, mapperRegistry, indicesFieldDataCache);
}

/**
* creates a new mapper service for the given index, in order to do administrative work like mapping updates.
* This *should not* be used for document parsing. Doing so will result in an exception.
*
* Note: the returned {@link MapperService} should be closed when unneeded.
*/
public synchronized MapperService createIndexMapperService(IndexMetaData indexMetaData) throws IOException {
final Index index = indexMetaData.getIndex();
final Predicate<String> indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
pluginsService.onIndexModule(indexModule);
return indexModule.newIndexMapperService(mapperRegistry);
}

/**
* This method verifies that the given {@code metaData} holds sane values to create an {@link IndexService}.
* This method tries to update the meta data of the created {@link IndexService} if the given {@code metaDataUpdate} is different from the given {@code metaData}.
Expand Down
Loading

0 comments on commit 6d9af2f

Please sign in to comment.