Skip to content

Commit

Permalink
Retry creation of assignment topic a few times before giving up (apac…
Browse files Browse the repository at this point in the history
…he#3722)

* Retry creation of assignment topic a few times before giving up

* Use Action based retry mechanism

* Fix build

* Catch interrupted exception

* Fix unittest

* Added header
  • Loading branch information
srkukarni authored Mar 7, 2019
1 parent 17b531b commit fd05219
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Utils;

Expand Down Expand Up @@ -360,7 +361,7 @@ private void submitService() throws Exception {

String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());

RuntimeUtils.Actions.Action createService = RuntimeUtils.Actions.Action.builder()
Actions.Action createService = Actions.Action.builder()
.actionName(String.format("Submitting service for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
Expand All @@ -372,25 +373,25 @@ private void submitService() throws Exception {
// already exists
if (e.getCode() == HTTP_CONFLICT) {
log.warn("Service already present for function {}", fqfn);
return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
return Actions.ActionResult.builder().success(true).build();
}

String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}

return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
return Actions.ActionResult.builder().success(true).build();
})
.build();


AtomicBoolean success = new AtomicBoolean(false);
RuntimeUtils.Actions.newBuilder()
Actions.newBuilder()
.addAction(createService.toBuilder()
.onSuccess(() -> success.set(true))
.onSuccess((ignored) -> success.set(true))
.build())
.run();

Expand Down Expand Up @@ -432,7 +433,7 @@ private void submitStatefulSet() throws Exception {

String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());

RuntimeUtils.Actions.Action createStatefulSet = RuntimeUtils.Actions.Action.builder()
Actions.Action createStatefulSet = Actions.Action.builder()
.actionName(String.format("Submitting statefulset for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
Expand All @@ -444,25 +445,25 @@ private void submitStatefulSet() throws Exception {
// already exists
if (e.getCode() == HTTP_CONFLICT) {
log.warn("Statefulset already present for function {}", fqfn);
return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
return Actions.ActionResult.builder().success(true).build();
}

String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}

return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
return Actions.ActionResult.builder().success(true).build();
})
.build();


AtomicBoolean success = new AtomicBoolean(false);
RuntimeUtils.Actions.newBuilder()
Actions.newBuilder()
.addAction(createStatefulSet.toBuilder()
.onSuccess(() -> success.set(true))
.onSuccess((ignored) -> success.set(true))
.build())
.run();

Expand All @@ -479,7 +480,7 @@ public void deleteStatefulSet() throws InterruptedException {
options.setPropagationPolicy("Foreground");

String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
RuntimeUtils.Actions.Action deleteStatefulSet = RuntimeUtils.Actions.Action.builder()
Actions.Action deleteStatefulSet = Actions.Action.builder()
.actionName(String.format("Deleting statefulset for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
Expand All @@ -498,16 +499,16 @@ public void deleteStatefulSet() throws InterruptedException {
// if already deleted
if (e.getCode() == HTTP_NOT_FOUND) {
log.warn("Statefulset for function {} does not exist", fqfn);
return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
return Actions.ActionResult.builder().success(true).build();
}

String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
} catch (IOException e) {
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(false)
.errorMsg(e.getMessage())
.build();
Expand All @@ -516,9 +517,9 @@ public void deleteStatefulSet() throws InterruptedException {
// if already deleted
if (response.code() == HTTP_NOT_FOUND) {
log.warn("Statefulset for function {} does not exist", fqfn);
return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
return Actions.ActionResult.builder().success(true).build();
} else {
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(response.isSuccessful())
.errorMsg(response.message())
.build();
Expand All @@ -527,7 +528,7 @@ public void deleteStatefulSet() throws InterruptedException {
.build();


RuntimeUtils.Actions.Action waitForStatefulSetDeletion = RuntimeUtils.Actions.Action.builder()
Actions.Action waitForStatefulSetDeletion = Actions.Action.builder()
.actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn))
// set retry period to be about 2x the graceshutdown time
.numRetries(NUM_RETRIES * 2)
Expand All @@ -540,24 +541,24 @@ public void deleteStatefulSet() throws InterruptedException {
} catch (ApiException e) {
// statefulset is gone
if (e.getCode() == HTTP_NOT_FOUND) {
return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
return Actions.ActionResult.builder().success(true).build();
}

String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(false)
.errorMsg(response.getStatus().toString())
.build();
})
.build();

// Need to wait for all pods to die so we can cleanup subscriptions.
RuntimeUtils.Actions.Action waitForStatefulPodsToTerminate = RuntimeUtils.Actions.Action.builder()
Actions.Action waitForStatefulPodsToTerminate = Actions.Action.builder()
.actionName(String.format("Waiting for pods for function %s to terminate", fqfn))
.numRetries(NUM_RETRIES * 2)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS * 2)
Expand All @@ -575,19 +576,19 @@ public void deleteStatefulSet() throws InterruptedException {
} catch (ApiException e) {

String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}

if (response.getItems().size() > 0) {
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(false)
.errorMsg(response.getItems().size() + " pods still alive.")
.build();
} else {
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(true)
.build();
}
Expand All @@ -596,27 +597,27 @@ public void deleteStatefulSet() throws InterruptedException {


AtomicBoolean success = new AtomicBoolean(false);
RuntimeUtils.Actions.newBuilder()
Actions.newBuilder()
.addAction(deleteStatefulSet.toBuilder()
.continueOn(true)
.build())
.addAction(waitForStatefulSetDeletion.toBuilder()
.continueOn(false)
.onSuccess(() -> success.set(true))
.onSuccess((ignored) -> success.set(true))
.build())
.addAction(deleteStatefulSet.toBuilder()
.continueOn(true)
.build())
.addAction(waitForStatefulSetDeletion.toBuilder()
.onSuccess(() -> success.set(true))
.onSuccess((ignored) -> success.set(true))
.build())
.run();

if (!success.get()) {
throw new RuntimeException(String.format("Failed to delete statefulset for function %s", fqfn));
} else {
// wait for pods to terminate
RuntimeUtils.Actions.newBuilder()
Actions.newBuilder()
.addAction(waitForStatefulPodsToTerminate)
.run();
}
Expand All @@ -630,7 +631,7 @@ public void deleteService() throws InterruptedException {
String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
String serviceName = createJobName(instanceConfig.getFunctionDetails());

RuntimeUtils.Actions.Action deleteService = RuntimeUtils.Actions.Action.builder()
Actions.Action deleteService = Actions.Action.builder()
.actionName(String.format("Deleting service for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
Expand All @@ -648,16 +649,16 @@ public void deleteService() throws InterruptedException {
// if already deleted
if (e.getCode() == HTTP_NOT_FOUND) {
log.warn("Service for function {} does not exist", fqfn);
return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
return Actions.ActionResult.builder().success(true).build();
}

String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
} catch (IOException e) {
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(false)
.errorMsg(e.getMessage())
.build();
Expand All @@ -666,17 +667,17 @@ public void deleteService() throws InterruptedException {
// if already deleted
if (response.code() == HTTP_NOT_FOUND) {
log.warn("Service for function {} does not exist", fqfn);
return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
return Actions.ActionResult.builder().success(true).build();
} else {
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(response.isSuccessful())
.errorMsg(response.message())
.build();
}
})
.build();

RuntimeUtils.Actions.Action waitForServiceDeletion = RuntimeUtils.Actions.Action.builder()
Actions.Action waitForServiceDeletion = Actions.Action.builder()
.actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
Expand All @@ -689,35 +690,35 @@ public void deleteService() throws InterruptedException {
} catch (ApiException e) {
// statefulset is gone
if (e.getCode() == HTTP_NOT_FOUND) {
return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
return Actions.ActionResult.builder().success(true).build();
}
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}
return RuntimeUtils.Actions.ActionResult.builder()
return Actions.ActionResult.builder()
.success(false)
.errorMsg(response.getStatus().toString())
.build();
})
.build();

AtomicBoolean success = new AtomicBoolean(false);
RuntimeUtils.Actions.newBuilder()
Actions.newBuilder()
.addAction(deleteService.toBuilder()
.continueOn(true)
.build())
.addAction(waitForServiceDeletion.toBuilder()
.continueOn(false)
.onSuccess(() -> success.set(true))
.onSuccess((ignored) -> success.set(true))
.build())
.addAction(deleteService.toBuilder()
.continueOn(true)
.build())
.addAction(waitForServiceDeletion.toBuilder()
.onSuccess(() -> success.set(true))
.onSuccess((ignored) -> success.set(true))
.build())
.run();

Expand Down
Loading

0 comments on commit fd05219

Please sign in to comment.