Skip to content

Commit

Permalink
Merge pull request apache#998 from drcrallen/zkConfigJackson
Browse files Browse the repository at this point in the history
Change zk configs to use Jackson injection instead of Skife
  • Loading branch information
fjy committed Jan 5, 2015
2 parents c0469c9 + 65286a2 commit ab0ddc0
Show file tree
Hide file tree
Showing 26 changed files with 740 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.inject.Binder;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.server.initialization.IndexerZkConfig;

/**
*/
Expand All @@ -31,5 +32,6 @@ public static void configureTaskRunnerConfigs(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.indexer.runner", ForkingTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.runner", RemoteTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, "druid.zk.paths.indexer", IndexerZkConfig.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.tasklogs.TaskLogStreamer;
import org.apache.commons.lang.mutable.MutableInt;
Expand Down Expand Up @@ -102,7 +103,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer

private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config;
private final ZkPathsConfig zkPaths;
private final IndexerZkConfig indexerZkConfig;
private final CuratorFramework cf;
private final PathChildrenCacheFactory pathChildrenCacheFactory;
private final PathChildrenCache workerPathCache;
Expand All @@ -129,7 +130,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
public RemoteTaskRunner(
ObjectMapper jsonMapper,
RemoteTaskRunnerConfig config,
ZkPathsConfig zkPaths,
IndexerZkConfig indexerZkConfig,
CuratorFramework cf,
PathChildrenCacheFactory pathChildrenCacheFactory,
HttpClient httpClient,
Expand All @@ -138,10 +139,10 @@ public RemoteTaskRunner(
{
this.jsonMapper = jsonMapper;
this.config = config;
this.zkPaths = zkPaths;
this.indexerZkConfig = indexerZkConfig;
this.cf = cf;
this.pathChildrenCacheFactory = pathChildrenCacheFactory;
this.workerPathCache = pathChildrenCacheFactory.make(cf, zkPaths.getIndexerAnnouncementPath());
this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath());
this.httpClient = httpClient;
this.strategy = strategy;
}
Expand Down Expand Up @@ -496,7 +497,7 @@ private void cleanup(final String taskId)
} else {
final String workerId = worker.getHost();
log.info("Cleaning up task[%s] on worker[%s]", taskId, workerId);
final String statusPath = JOINER.join(zkPaths.getIndexerStatusPath(), workerId, taskId);
final String statusPath = JOINER.join(indexerZkConfig.getStatus(), workerId, taskId);
try {
cf.delete().guaranteed().forPath(statusPath);
}
Expand Down Expand Up @@ -582,7 +583,7 @@ private void announceTask(
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes());
}

String taskPath = JOINER.join(zkPaths.getIndexerTaskPath(), theWorker.getHost(), task.getId());
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), theWorker.getHost(), task.getId());

if (cf.checkExists().forPath(taskPath) == null) {
cf.create()
Expand Down Expand Up @@ -642,7 +643,7 @@ private ListenableFuture<ZkWorker> addWorker(final Worker worker)
log.info("Worker[%s] reportin' for duty!", worker.getHost());

try {
final String workerStatusPath = JOINER.join(zkPaths.getIndexerStatusPath(), worker.getHost());
final String workerStatusPath = JOINER.join(indexerZkConfig.getStatus(), worker.getHost());
final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath);
final SettableFuture<ZkWorker> retVal = SettableFuture.create();
final ZkWorker zkWorker = new ZkWorker(
Expand Down Expand Up @@ -787,7 +788,7 @@ private void removeWorker(final Worker worker)
if (zkWorker != null) {
try {
List<String> tasksToFail = Lists.newArrayList(
cf.getChildren().forPath(JOINER.join(zkPaths.getIndexerTaskPath(), worker.getHost()))
cf.getChildren().forPath(JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost()))
);
log.info("[%s]: Found %d tasks assigned", worker.getHost(), tasksToFail.size());

Expand All @@ -805,7 +806,7 @@ private void removeWorker(final Worker worker)
for (String assignedTask : tasksToFail) {
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
String taskPath = JOINER.join(zkPaths.getIndexerTaskPath(), worker.getHost(), assignedTask);
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost(), assignedTask);
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;

Expand All @@ -38,7 +39,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
{
private final CuratorFramework curator;
private final RemoteTaskRunnerConfig remoteTaskRunnerConfig;
private final ZkPathsConfig zkPaths;
private final IndexerZkConfig zkPaths;
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final WorkerSelectStrategy strategy;
Expand All @@ -47,7 +48,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
public RemoteTaskRunnerFactory(
final CuratorFramework curator,
final RemoteTaskRunnerConfig remoteTaskRunnerConfig,
final ZkPathsConfig zkPaths,
final IndexerZkConfig zkPaths,
final ObjectMapper jsonMapper,
@Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerBehaviourConfigSupplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory;
import io.druid.server.DruidNode;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
Expand Down Expand Up @@ -74,7 +75,7 @@ public TaskMaster(
final TaskStorage taskStorage,
final TaskActionClientFactory taskActionClientFactory,
@Self final DruidNode node,
final ZkPathsConfig zkPaths,
final IndexerZkConfig zkPaths,
final TaskRunnerFactory runnerFactory,
final ResourceManagementSchedulerFactory managementSchedulerFactory,
final CuratorFramework curator,
Expand All @@ -85,7 +86,7 @@ public TaskMaster(
this.taskActionClientFactory = taskActionClientFactory;
this.leaderSelector = new LeaderSelector(
curator,
zkPaths.getIndexerLeaderLatchPath(),
zkPaths.getLeaderLatchPath(),
new LeaderSelectorListener()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import com.metamx.common.logger.Logger;
import io.druid.curator.announcement.Announcer;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.initialization.IndexerZkConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -63,7 +63,7 @@ public class WorkerCuratorCoordinator
@Inject
public WorkerCuratorCoordinator(
ObjectMapper jsonMapper,
ZkPathsConfig zkPaths,
IndexerZkConfig indexerZkConfig,
RemoteTaskRunnerConfig config,
CuratorFramework curatorFramework,
Worker worker
Expand All @@ -76,9 +76,9 @@ public WorkerCuratorCoordinator(

this.announcer = new Announcer(curatorFramework, MoreExecutors.sameThreadExecutor());

this.baseAnnouncementsPath = getPath(Arrays.asList(zkPaths.getIndexerAnnouncementPath(), worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(zkPaths.getIndexerTaskPath(), worker.getHost()));
this.baseStatusPath = getPath(Arrays.asList(zkPaths.getIndexerStatusPath(), worker.getHost()));
this.baseAnnouncementsPath = getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(), worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(), worker.getHost()));
this.baseStatusPath = getPath(Arrays.asList(indexerZkConfig.getStatus(), worker.getHost()));
}

@LifecycleStart
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.server.initialization;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.inject.Inject;
import org.apache.curator.utils.ZKPaths;

/**
*
*/
public class IndexerZkConfig
{
@Inject
@JsonIgnore
private ZkPathsConfig zkPathsConfig = new ZkPathsConfig();
@JsonProperty
private String base;
@JsonProperty
private String announcementsPath;
@JsonProperty
private String tasksPath;
@JsonProperty
private String status;
@JsonProperty
private String leaderLatchPath;

private String defaultIndexerPath(final String subPath)
{
return getZkPathsConfig().defaultPath(ZKPaths.makePath(getBase(), subPath));
}

public String getBase()
{
return base == null ? "indexer" : base;
}

public String getAnnouncementsPath()
{
return announcementsPath == null ? defaultIndexerPath("announcements") : announcementsPath;
}

public String getTasksPath()
{
return tasksPath == null ? defaultIndexerPath("tasks") : tasksPath;
}

public String getStatus()
{
return status == null ? defaultIndexerPath("status") : status;
}

public String getLeaderLatchPath()
{
return leaderLatchPath == null ? defaultIndexerPath("leaderLatchPath") : leaderLatchPath;
}

public ZkPathsConfig getZkPathsConfig()
{
return zkPathsConfig;
}

// Setter required for easy debugging
public IndexerZkConfig setZkPathsConfig(ZkPathsConfig zkPathsConfig)
{
this.zkPathsConfig = zkPathsConfig;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
Expand All @@ -57,6 +58,7 @@

import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class RemoteTaskRunnerTest
{
Expand All @@ -66,6 +68,7 @@ public class RemoteTaskRunnerTest
private static final String announcementsPath = String.format("%s/indexer/announcements/worker", basePath);
private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath);
private static final String statusPath = String.format("%s/indexer/status/worker", basePath);
private static final int TIMEOUT_SECONDS = 5;

private TestingCluster testingCluster;
private CuratorFramework cf;
Expand Down Expand Up @@ -282,7 +285,7 @@ public void testStatusRemoved() throws Exception

cf.delete().forPath(joiner.join(statusPath, task.getId()));

TaskStatus status = future.get();
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);

Assert.assertEquals(status.getStatusCode(), TaskStatus.Status.FAILED);
}
Expand Down Expand Up @@ -335,7 +338,7 @@ public void testRunWithTaskComplete() throws Exception

ListenableFuture<TaskStatus> future = remoteTaskRunner.run(task);

TaskStatus status = future.get();
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);

Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode());
}
Expand All @@ -353,7 +356,7 @@ public void testWorkerRemoved() throws Exception

cf.delete().forPath(announcementsPath);

TaskStatus status = future.get();
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);

Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
}
Expand Down Expand Up @@ -393,14 +396,14 @@ private void makeRemoteTaskRunner() throws Exception
remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
config,
new ZkPathsConfig()
new IndexerZkConfig().setZkPathsConfig(new ZkPathsConfig()
{
@Override
public String getZkBasePath()
public String getBase()
{
return basePath;
}
},
}),
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -139,14 +140,15 @@ public void setUp() throws Exception

workerCuratorCoordinator = new WorkerCuratorCoordinator(
jsonMapper,
new IndexerZkConfig().setZkPathsConfig(
new ZkPathsConfig()
{
@Override
public String getZkBasePath()
public String getBase()
{
return basePath;
}
},
}),
new TestRemoteTaskRunnerConfig(),
cf,
worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.druid.indexing.worker.Worker;
import io.druid.indexing.worker.WorkerCuratorCoordinator;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -76,14 +77,14 @@ public void setUp() throws Exception

curatorCoordinator = new WorkerCuratorCoordinator(
jsonMapper,
new ZkPathsConfig()
new IndexerZkConfig().setZkPathsConfig(new ZkPathsConfig()
{
@Override
public String getZkBasePath()
public String getBase()
{
return basePath;
}
},
}),
new RemoteTaskRunnerConfig(),
cf,
worker
Expand Down
Loading

0 comments on commit ab0ddc0

Please sign in to comment.