Skip to content

Commit

Permalink
Make admin operations on Statestore non blocking (apache#9348)
Browse files Browse the repository at this point in the history
Co-authored-by: Prashant <[email protected]>
  • Loading branch information
pkumar-singh and Prashant authored Feb 5, 2021
1 parent 4b65df2 commit 13faf63
Showing 1 changed file with 23 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
Expand Down Expand Up @@ -344,6 +344,26 @@ PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaDat
return packageLocationMetaDataBuilder;
}

private void deleteStatestoreTableAsync(String namespace, String table) {
StorageAdminClient adminClient = worker().getStateStoreAdminClient();
if (adminClient != null) {
adminClient.deleteStream(namespace, table).whenComplete((res, throwable) -> {
if ((throwable == null && res.booleanValue())
|| (throwable != null &&
(throwable instanceof NamespaceNotFoundException
|| throwable instanceof StreamNotFoundException) )) {
log.info("{}/{} table deleted successfully", namespace, table);
} else {
if (throwable != null) {
log.error("{}/{} table deletion failed {} but moving on", namespace, table, throwable);
} else {
log.error("{}/{} table deletion failed but moving on", namespace, table);
}
}
});
}
}

@Override
public void deregisterFunction(final String tenant,
final String namespace,
Expand All @@ -365,19 +385,6 @@ public void deregisterFunction(final String tenant,
log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
// delete state table
if (null != worker().getStateStoreAdminClient()) {
final String tableNs = getStateNamespace(tenant, namespace);
final String tableName = componentName;
try {
FutureUtils.result(worker().getStateStoreAdminClient().deleteStream(tableNs, tableName));
} catch (NamespaceNotFoundException | StreamNotFoundException e) {
// ignored if the state table doesn't exist
} catch (Exception e) {
log.error("{}/{}/{} Failed to delete state table: {}", tenant, namespace, componentName, e.getMessage());
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}

// validate parameters
try {
Expand Down Expand Up @@ -419,6 +426,8 @@ public void deregisterFunction(final String tenant,
functionMetaData.getPackageLocation().getPackagePath(), e);
}
}

deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName);
}

@Override
Expand Down

0 comments on commit 13faf63

Please sign in to comment.