Skip to content

Commit

Permalink
Move tribe to a module (elastic#25778)
Browse files Browse the repository at this point in the history
This commit moves tribe to a module, stripping core from the tribe functionality.
  • Loading branch information
ywelsch authored Jul 28, 2017
1 parent 5cf56a8 commit 1a01514
Show file tree
Hide file tree
Showing 26 changed files with 452 additions and 267 deletions.
2 changes: 1 addition & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]snapshots[/\\]SnapshotShardsService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]snapshots[/\\]SnapshotsService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]threadpool[/\\]ThreadPool.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]tribe[/\\]TribeService.java" checks="LineLength" />
<suppress files="modules[/\\]tribe[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]tribe[/\\]TribeService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]apache[/\\]lucene[/\\]queries[/\\]BlendedTermQueryTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]VersionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]RejectionActionIT.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class ClusterModule extends AbstractModule {
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final AllocationDeciders allocationDeciders;
private final AllocationService allocationService;
private final Runnable onStarted;
// pkg private for tests
final Collection<AllocationDecider> deciderList;
final ShardsAllocator shardsAllocator;
Expand All @@ -106,6 +107,7 @@ public ClusterModule(Settings settings, ClusterService clusterService, List<Clus
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
this.allocationService = new AllocationService(settings, allocationDeciders, shardsAllocator, clusterInfoService);
this.onStarted = () -> clusterPlugins.forEach(plugin -> plugin.onNodeStarted());
}


Expand Down Expand Up @@ -241,4 +243,8 @@ protected void configure() {
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}

public Runnable onStarted() {
return onStarted;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster;

import org.elasticsearch.cluster.metadata.MetaData;

/**
* Interface to allow merging {@link org.elasticsearch.cluster.metadata.MetaData.Custom}.
* When multiple Mergable Custom metadata of the same type are found (from underlying clusters), the
* Custom metadata can be merged using {@link #merge(MetaData.Custom)}.
*
* @param <T> type of custom meta data
*/
public interface MergableCustomMetaData<T extends MetaData.Custom> {

/**
* Merges this custom metadata with other, returning either this or <code>other</code> custom metadata.
* This method should not mutate either <code>this</code> or the <code>other</code> custom metadata.
*
* @param other custom meta data
* @return the same instance or <code>other</code> custom metadata based on implementation
* if both the instances are considered equal, implementations should return this
* instance to avoid redundant cluster state changes.
*/
T merge(T other);
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.fs.FsService;
import org.elasticsearch.monitor.jvm.JvmGcMonitorService;
import org.elasticsearch.monitor.jvm.JvmService;
Expand All @@ -91,7 +90,6 @@
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.tribe.TribeService;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.util.Arrays;
Expand Down Expand Up @@ -369,13 +367,6 @@ public void apply(Settings value, Settings current, Settings previous) {
ThreadContext.DEFAULT_HEADERS_SETTING,
ESLoggerFactory.LOG_DEFAULT_LEVEL_SETTING,
ESLoggerFactory.LOG_LEVEL_SETTING,
TribeService.BLOCKS_METADATA_SETTING,
TribeService.BLOCKS_WRITE_SETTING,
TribeService.BLOCKS_WRITE_INDICES_SETTING,
TribeService.BLOCKS_READ_INDICES_SETTING,
TribeService.BLOCKS_METADATA_INDICES_SETTING,
TribeService.ON_CONFLICT_SETTING,
TribeService.TRIBE_NAME_SETTING,
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING,
OsService.REFRESH_INTERVAL_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tribe.TribeService;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -49,8 +48,6 @@ public class SettingsModule implements Module {
private final Set<String> settingsFilterPattern = new HashSet<>();
private final Map<String, Setting<?>> nodeSettings = new HashMap<>();
private final Map<String, Setting<?>> indexSettings = new HashMap<>();
private static final Predicate<String> TRIBE_CLIENT_NODE_SETTINGS_PREDICATE = (s) -> s.startsWith("tribe.")
&& TribeService.TRIBE_SETTING_KEYS.contains(s) == false;
private final Logger logger;
private final IndexScopedSettings indexScopedSettings;
private final ClusterSettings clusterSettings;
Expand Down Expand Up @@ -135,9 +132,7 @@ public SettingsModule(Settings settings, List<Setting<?>> additionalSettings, Li
}
}
// by now we are fully configured, lets check node level settings for unregistered index settings
final Predicate<String> acceptOnlyClusterSettings = TRIBE_CLIENT_NODE_SETTINGS_PREDICATE.negate();
clusterSettings.validate(settings.filter(acceptOnlyClusterSettings));
validateTribeSettings(settings, clusterSettings);
clusterSettings.validate(settings);
this.settingsFilter = new SettingsFilter(settings, settingsFilterPattern);
}

Expand Down Expand Up @@ -195,20 +190,6 @@ private void registerSettingsFilter(String filter) {
settingsFilterPattern.add(filter);
}

private void validateTribeSettings(Settings settings, ClusterSettings clusterSettings) {
Map<String, Settings> groups = settings.filter(TRIBE_CLIENT_NODE_SETTINGS_PREDICATE).getGroups("tribe.", true);
for (Map.Entry<String, Settings> tribeSettings : groups.entrySet()) {
Settings thisTribesSettings = tribeSettings.getValue();
for (Map.Entry<String, String> entry : thisTribesSettings.getAsMap().entrySet()) {
try {
clusterSettings.validate(entry.getKey(), thisTribesSettings);
} catch (IllegalArgumentException ex) {
throw new IllegalArgumentException("tribe." + tribeSettings.getKey() +" validation failed: "+ ex.getMessage(), ex);
}
}
}
}

public Settings getSettings() {
return settings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, hostsProvider, allocationService));
discoveryTypes.put("tribe", () -> new TribeDiscovery(settings, transportService, masterService, clusterApplier));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
for (DiscoveryPlugin plugin : plugins) {
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
Expand Down
31 changes: 4 additions & 27 deletions core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.tribe.TribeService;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.watcher.ResourceWatcherService;

Expand Down Expand Up @@ -256,8 +255,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
Settings tmpSettings = Settings.builder().put(environment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();

tmpSettings = TribeService.processSettings(tmpSettings);

// create the node environment as soon as possible, to recover the node id and enable logging
try {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
Expand Down Expand Up @@ -385,15 +382,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
.flatMap(p -> p.getNamedXContent().stream()),
ClusterModule.getNamedXWriteables().stream())
.flatMap(Function.identity()).collect(toList()));
final TribeService tribeService =
new TribeService(
settings,
environment.configFile(),
clusterService,
nodeId,
namedWriteableRegistry,
(s, p) -> newTribeClientNode(s, classpathPlugins, p));
resourcesToClose.add(tribeService);
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
Expand All @@ -404,7 +392,8 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>

Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
scriptModule.getScriptService(), xContentRegistry).stream())
scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,
namedWriteableRegistry).stream())
.collect(Collectors.toList());
final RestController restController = actionModule.getRestController();
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
Expand Down Expand Up @@ -458,7 +447,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
b.bind(Environment.class).toInstance(this.environment);
b.bind(ThreadPool.class).toInstance(threadPool);
b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
b.bind(TribeService.class).toInstance(tribeService);
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(BigArrays.class).toInstance(bigArrays);
Expand Down Expand Up @@ -612,10 +600,6 @@ public Node start() throws NodeValidationException {
Discovery discovery = injector.getInstance(Discovery.class);
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);

// start before the cluster service since it adds/removes initial Cluster state blocks
final TribeService tribeService = injector.getInstance(TribeService.class);
tribeService.start();

// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
Expand Down Expand Up @@ -682,10 +666,10 @@ public void onTimeout(TimeValue timeout) {
writePortsFile("transport", transport.boundAddress());
}

// start nodes now, after the http server, because it may take some time
tribeService.startNodes();
logger.info("started");

pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);

return this;
}

Expand All @@ -696,7 +680,6 @@ private Node stop() {
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
logger.info("stopping ...");

injector.getInstance(TribeService.class).stop();
injector.getInstance(ResourceWatcherService.class).stop();
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServerTransport.class).stop();
Expand Down Expand Up @@ -744,7 +727,6 @@ public synchronized void close() throws IOException {
List<Closeable> toClose = new ArrayList<>();
StopWatch stopWatch = new StopWatch("node_close");
toClose.add(() -> stopWatch.start("tribe"));
toClose.add(injector.getInstance(TribeService.class));
toClose.add(() -> stopWatch.stop().start("node_service"));
toClose.add(nodeService);
toClose.add(() -> stopWatch.stop().start("http"));
Expand Down Expand Up @@ -920,11 +902,6 @@ private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<Disc
return customNameResolvers;
}

/** Constructs an internal node used as a client into a cluster fronted by this tribe node. */
protected Node newTribeClientNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins, Path configPath) {
return new Node(new Environment(settings, configPath), classpathPlugins);
}

/** Constructs a ClusterInfoService which may be mocked for tests. */
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
ThreadPool threadPool, NodeClient client, Consumer<ClusterInfo> listeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,11 @@ default Collection<AllocationDecider> createAllocationDeciders(Settings settings
default Map<String, Supplier<ShardsAllocator>> getShardsAllocators(Settings settings, ClusterSettings clusterSettings) {
return Collections.emptyMap();
}

/**
* Called when the node is started
*/
default void onNodeStarted() {

}
}
9 changes: 8 additions & 1 deletion core/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.repositories.RepositoriesModule;
Expand Down Expand Up @@ -104,10 +106,15 @@ public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses()
* @param threadPool A service to allow retrieving an executor to run an async action
* @param resourceWatcherService A service to watch for changes to node local files
* @param scriptService A service to allow running scripts on the local node
* @param xContentRegistry the registry for extensible xContent parsing
* @param environment the environment for path and setting configurations
* @param nodeEnvironment the node environment used coordinate access to the data paths
* @param namedWriteableRegistry the registry for {@link NamedWriteable} object parsing
*/
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry) {
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -67,11 +70,13 @@ public TestPlugin(Settings settings) {
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry) {
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(UPDATE_TEMPLATE_DUMMY_SETTING, integer -> {
logger.debug("the template dummy setting was updated to {}", integer);
});
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry);
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry,
environment, nodeEnvironment, namedWriteableRegistry);
}

@Override
Expand Down
Loading

0 comments on commit 1a01514

Please sign in to comment.