Skip to content

Commit

Permalink
Created an operation status object, refactored request id to be an in…
Browse files Browse the repository at this point in the history
…teger rather than a string. Changed protocol buffers messages to match the new schema for requests and request ids.
  • Loading branch information
afeinberg committed Nov 12, 2009
1 parent 91f3948 commit 0b55319
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 594 deletions.
2 changes: 1 addition & 1 deletion src/java/voldemort/client/protocol/admin/AdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public abstract void fetchAndUpdateStreams(int donorNodeId,
* @param requestId Previously returned request Id
* @return A Pair of String (request status) and Boolean (is request complete?)
*/
public abstract Pair<String,Boolean> getAsyncRequestStatus(int nodeId, String requestId);
public abstract Pair<String,Boolean> getAsyncRequestStatus(int nodeId, int requestId);

/**
* update remote metadata on a particular node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,8 @@ public void fetchAndUpdateStreams(int donorNodeId, int stealerNodeId, String sto
.setInitiateFetchAndUpdate(initiateFetchAndUpdateRequest)
.setType(VAdminProto.AdminRequestType.INITIATE_FETCH_AND_UPDATE)
.build();
VAdminProto.InitiateFetchAndUpdateResponse.Builder response = sendAndReceive(stealerNodeId, adminRequest,
VAdminProto.InitiateFetchAndUpdateResponse.newBuilder());
VAdminProto.AsyncOperationStatusResponse.Builder response = sendAndReceive(stealerNodeId, adminRequest,
VAdminProto.AsyncOperationStatusResponse.newBuilder());

if (response.hasError()) {
throwException(response.getError());
Expand All @@ -390,7 +390,7 @@ public void fetchAndUpdateStreams(int donorNodeId, int stealerNodeId, String sto
*/
long delay = 250;
long maxDelay = 1000*60;
String requestId = response.getRequestId();
int requestId = response.getRequestId();
while (true) {
Pair<String,Boolean> status = getAsyncRequestStatus(stealerNodeId, requestId);
if (status.getSecond())
Expand All @@ -406,21 +406,21 @@ public void fetchAndUpdateStreams(int donorNodeId, int stealerNodeId, String sto
}

@Override
public Pair<String, Boolean> getAsyncRequestStatus(int nodeId, String requestId) {
VAdminProto.AsyncStatusRequest asyncRequest = VAdminProto.AsyncStatusRequest.newBuilder()
public Pair<String, Boolean> getAsyncRequestStatus(int nodeId, int requestId) {
VAdminProto.AsyncOperationStatusRequest asyncRequest = VAdminProto.AsyncOperationStatusRequest.newBuilder()
.setRequestId(requestId)
.build();
VAdminProto.VoldemortAdminRequest adminRequest = VAdminProto.VoldemortAdminRequest.newBuilder()
.setType(VAdminProto.AdminRequestType.ASYNC_STATUS)
.setAsyncStatus(asyncRequest)
.setType(VAdminProto.AdminRequestType.ASYNC_OPERATION_STATUS)
.setAsyncOperationStatus(asyncRequest)
.build();
VAdminProto.AsyncStatusResponse.Builder response = sendAndReceive(nodeId, adminRequest,
VAdminProto.AsyncStatusResponse.newBuilder());
VAdminProto.AsyncOperationStatusResponse.Builder response = sendAndReceive(nodeId, adminRequest,
VAdminProto.AsyncOperationStatusResponse.newBuilder());

if (response.hasError())
throwException(response.getError());

return new Pair<String,Boolean>(response.getStatus(), response.getIsComplete());
return new Pair<String,Boolean>(response.getStatus(), response.getComplete());
}

private VAdminProto.VoldemortFilter encodeFilter(VoldemortFilter filter) throws IOException {
Expand Down
Loading

0 comments on commit 0b55319

Please sign in to comment.