Skip to content

Commit

Permalink
Cleaned up the code to not return Response objects (apache#3520)
Browse files Browse the repository at this point in the history
  • Loading branch information
srkukarni authored and merlimat committed Feb 7, 2019
1 parent a5b17f5 commit b833bce
Showing 1 changed file with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,25 +309,27 @@ public synchronized void removeAssignments(Collection<Assignment> assignments) {
}
}

public Response restartFunctionInstance(String tenant, String namespace, String functionName, int instanceId,
public void restartFunctionInstance(String tenant, String namespace, String functionName, int instanceId,
URI uri) throws Exception {
if (runtimeFactory.externallyManaged()) {
return Response.status(Status.NOT_IMPLEMENTED).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData("Externally managed schedulers can't do per instance stop")).build();
throw new WebApplicationException(Response.serverError().status(Status.NOT_IMPLEMENTED)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData("Externally managed schedulers can't do per instance stop")).build());
}
Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName, instanceId);
if (assignment == null) {
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " doesn't exist")).build();
throw new WebApplicationException(Response.serverError().status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " doesn't exist")).build());
}

final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();

if (assignedWorkerId.equals(workerId)) {
stopFunction(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), true);
return Response.status(Status.OK).build();
return;
} else {
// query other worker
List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
Expand All @@ -338,8 +340,9 @@ public Response restartFunctionInstance(String tenant, String namespace, String
}
}
if (workerInfo == null) {
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build();
throw new WebApplicationException(Response.serverError().status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build());
}

if (uri == null) {
Expand All @@ -351,14 +354,15 @@ public Response restartFunctionInstance(String tenant, String namespace, String
}
}

public Response restartFunctionInstances(String tenant, String namespace, String functionName)
public void restartFunctionInstances(String tenant, String namespace, String functionName)
throws Exception {
final String fullFunctionName = String.format("%s/%s/%s", tenant, namespace, functionName);
Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);

if (assignments.isEmpty()) {
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build();
throw new WebApplicationException(Response.serverError().status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build());
}
if (runtimeFactory.externallyManaged()) {
Assignment assignment = assignments.iterator().next();
Expand All @@ -379,8 +383,9 @@ public Response restartFunctionInstances(String tenant, String namespace, String
if (log.isDebugEnabled()) {
log.debug("[{}] has not been assigned yet", fullyQualifiedInstanceId);
}
return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build();
throw new WebApplicationException(Response.serverError().status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build());
}
this.functionAdmin.functions().restartFunction(tenant, namespace, functionName);
}
Expand Down Expand Up @@ -410,7 +415,7 @@ public Response restartFunctionInstances(String tenant, String namespace, String
}
}
}
return Response.status(Status.OK).build();
return;
}

/**
Expand Down

0 comments on commit b833bce

Please sign in to comment.