Skip to content

Commit

Permalink
Merge branch 'rebalancing' of git://github.com/afeinberg/voldemort in…
Browse files Browse the repository at this point in the history
…to rebalancing
  • Loading branch information
bbansal committed Nov 16, 2009
2 parents cffd8d7 + 46ab667 commit 3ae9933
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 33 deletions.
8 changes: 7 additions & 1 deletion src/java/voldemort/server/VoldemortServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import voldemort.server.protocol.AdminRequestHandlerFactory;
import voldemort.server.protocol.RequestHandlerFactory;
import voldemort.server.protocol.SocketRequestHandlerFactory;
import voldemort.server.protocol.admin.AsyncOperationRunner;
import voldemort.server.scheduler.SchedulerService;
import voldemort.server.socket.SocketService;
import voldemort.server.storage.StorageService;
Expand All @@ -60,11 +61,15 @@ public class VoldemortServer extends AbstractService {
private static final Logger logger = Logger.getLogger(VoldemortServer.class.getName());
public static final long DEFAULT_PUSHER_POLL_MS = 60 * 1000;

private final static int ASYNC_REQUEST_THREADS = 8;
private final static int ASYNC_REQUEST_CACHE_SIZE = 64;

private final Node identityNode;
private final List<VoldemortService> services;
private final StoreRepository storeRepository;
private final VoldemortConfig voldemortConfig;
private final MetadataStore metadata;
private final AsyncOperationRunner asyncRunner = new AsyncOperationRunner(ASYNC_REQUEST_THREADS, ASYNC_REQUEST_CACHE_SIZE);

public VoldemortServer(VoldemortConfig config) {
super(ServiceType.VOLDEMORT);
Expand Down Expand Up @@ -130,7 +135,8 @@ private List<VoldemortService> createServices() {
if(voldemortConfig.isAdminServerEnabled()) {
AdminRequestHandlerFactory adminRequestHandlerFactory = new AdminRequestHandlerFactory(this.storeRepository,
this.metadata,
this.voldemortConfig);
this.voldemortConfig,
this.asyncRunner);
services.add(new SocketService(adminRequestHandlerFactory,
identityNode.getAdminPort(),
voldemortConfig.getAdminCoreThreads(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import voldemort.client.protocol.RequestFormatType;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.admin.AsyncOperationRunner;
import voldemort.server.protocol.admin.ProtoBuffAdminServiceRequestHandler;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.metadata.MetadataStore;
Expand All @@ -23,13 +24,16 @@ public class AdminRequestHandlerFactory implements RequestHandlerFactory {
private final StoreRepository repository;
private final MetadataStore metadata;
private final VoldemortConfig voldemortConfig;
private final AsyncOperationRunner asyncRunner;

public AdminRequestHandlerFactory(StoreRepository repository,
MetadataStore metadata,
VoldemortConfig voldemortConfig) {
VoldemortConfig voldemortConfig,
AsyncOperationRunner asyncRunner) {
this.repository = repository;
this.metadata = metadata;
this.voldemortConfig = voldemortConfig;
this.asyncRunner = asyncRunner;
}

public RequestHandler getRequestHandler(RequestFormatType type) {
Expand All @@ -38,7 +42,8 @@ public RequestHandler getRequestHandler(RequestFormatType type) {
return new ProtoBuffAdminServiceRequestHandler(new ErrorCodeMapper(),
repository,
metadata,
voldemortConfig);
voldemortConfig,
asyncRunner);
default:
throw new VoldemortException("Unknown wire format " + type);
}
Expand Down
42 changes: 24 additions & 18 deletions src/java/voldemort/server/protocol/admin/AsyncOperationRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,61 +7,67 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author afeinberg
* Asynchronous job scheduler for admin service requests
* Asynchronous job scheduler for admin service operations
*
*/
public class AsyncOperationRunner {
private final Map<Integer, AsyncOperation> requests;
private final Map<Integer, AsyncOperation> operations;
private final ExecutorService executor;
private final Logger logger = Logger.getLogger(AsyncOperationRunner.class);
private final AtomicInteger lastOperationId = new AtomicInteger(0);

@SuppressWarnings("unchecked") // apache commons collections aren't updated for 1.5 yet
public AsyncOperationRunner(int poolSize, int cacheSize) {
requests = Collections.synchronizedMap(new AsyncOperationRepository(cacheSize));
operations = Collections.synchronizedMap(new AsyncOperationRepository(cacheSize));
executor = Executors.newFixedThreadPool(poolSize);
}

/**
* Submit a operation. Throw a run time exception if the operation is already submitted
* @param operation The asynchronous operation to submit
* Submit a operations. Throw a run time exception if the operations is already submitted
* @param operation The asynchronous operations to submit
* @param requestId Id of the request
*/
public void startRequest(int requestId, AsyncOperation operation) {
if (requests.containsKey(requestId)) {
public void submitOperation(int requestId, AsyncOperation operation) {
if (this.operations.containsKey(requestId)) {
throw new VoldemortException("Request " + requestId + " already submitted to the system");
}
requests.put(requestId, operation);
this.operations.put(requestId, operation);
executor.submit(operation);
logger.debug("Handling async operation " + requestId);
}

/**
* Is a request complete? If so, forget the requests
* Is a request complete? If so, forget the operations
* @param requestId Id of the request
* @return True if request is complete, false otherwise
*/
public boolean isComplete(int requestId) {
if (!requests.containsKey(requestId)) {
throw new VoldemortException("No request with id " + requestId + " found");
if (!operations.containsKey(requestId)) {
throw new VoldemortException("No operation with id " + requestId + " found");
}

if (requests.get(requestId).getStatus().isComplete()) {
logger.debug("Request complete " + requestId);
requests.remove(requestId);
if (operations.get(requestId).getStatus().isComplete()) {
logger.debug("Operation complete " + requestId);
operations.remove(requestId);

return true;
}
return false;
}

public String getRequestStatus(int requestId) {
if (!requests.containsKey(requestId)) {
throw new VoldemortException("No request with id " + requestId + " found");
public AsyncOperationStatus getOperationStatus(int requestId) {
if (!operations.containsKey(requestId)) {
throw new VoldemortException("No operation with id " + requestId + " found");
}

return requests.get(requestId).getStatus().getStatus();
return operations.get(requestId).getStatus();
}

public int getRequestId() {
return lastOperationId.getAndIncrement();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;

Expand Down Expand Up @@ -71,22 +70,20 @@ public class ProtoBuffAdminServiceRequestHandler implements RequestHandler {
private final VoldemortConfig voldemortConfig;
private final AsyncOperationRunner asyncRunner;

private final static int ASYNC_REQUEST_THREADS = 8;
private final static int ASYNC_REQUEST_CACHE_SIZE = 64;

private final AtomicInteger lastOperationId = new AtomicInteger(0);


public ProtoBuffAdminServiceRequestHandler(ErrorCodeMapper errorCodeMapper,
StoreRepository storeRepository,
MetadataStore metadataStore,
VoldemortConfig voldemortConfig) {
VoldemortConfig voldemortConfig,
AsyncOperationRunner asyncRunner) {
this.errorCodeMapper = errorCodeMapper;
this.metadataStore = metadataStore;
this.storeRepository = storeRepository;
this.voldemortConfig = voldemortConfig;
this.networkClassLoader = new NetworkClassLoader(Thread.currentThread()
.getContextClassLoader());
this.asyncRunner = new AsyncOperationRunner(ASYNC_REQUEST_THREADS, ASYNC_REQUEST_CACHE_SIZE);
this.asyncRunner = asyncRunner;
}

public void handleRequest(final DataInputStream inputStream, final DataOutputStream outputStream)
Expand Down Expand Up @@ -245,15 +242,15 @@ public VAdminProto.AsyncOperationStatusResponse handleFetchAndUpdate(VAdminProto
: new DefaultVoldemortFilter();
final String storeName = request.getStore();

int requestId = lastOperationId.getAndIncrement();
int requestId = asyncRunner.getRequestId();
VAdminProto.AsyncOperationStatusResponse.Builder response = VAdminProto.AsyncOperationStatusResponse.newBuilder()
.setRequestId(requestId)
.setComplete(false)
.setDescription("Fetch and update")
.setStatus("started");

try {
asyncRunner.startRequest(requestId, new AsyncOperation(requestId, "Fetch and Update") {
asyncRunner.submitOperation(requestId, new AsyncOperation(requestId, "Fetch and Update") {

@Override
public void operate() {
Expand Down Expand Up @@ -306,11 +303,11 @@ public VAdminProto.AsyncOperationStatusResponse handleAsyncStatus(VAdminProto.As
VAdminProto.AsyncOperationStatusResponse.Builder response = VAdminProto.AsyncOperationStatusResponse.newBuilder();
try {
int requestId = request.getRequestId();
String requestStatus = asyncRunner.getRequestStatus(requestId);
AsyncOperationStatus operationStatus = asyncRunner.getOperationStatus(requestId);
boolean requestComplete = asyncRunner.isComplete(requestId);
response.setDescription("description");
response.setDescription(operationStatus.getDescription());
response.setComplete(requestComplete);
response.setStatus(requestStatus);
response.setStatus(operationStatus.getStatus());
response.setRequestId(requestId);
} catch(VoldemortException e) {
response.setError(ProtoUtils.encodeError(errorCodeMapper, e));
Expand Down

0 comments on commit 3ae9933

Please sign in to comment.