Skip to content

Commit

Permalink
Pulsar Admin: reduce code duplication - part 3 (apache#13027)
Browse files Browse the repository at this point in the history
* Pulsar Admin: reduce code duplication - part 3

* fix build

* fix build

* fix build
  • Loading branch information
eolivelli authored Nov 29, 2021
1 parent 3d54beb commit ba339fb
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 695 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.admin.Lookup;
Expand Down Expand Up @@ -81,16 +79,7 @@ public void failed(Throwable throwable) {

@Override
public Map<String, String> lookupPartitionedTopic(String topic) throws PulsarAdminException {
try {
return lookupPartitionedTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> lookupPartitionedTopicAsync(topic));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
Expand All @@ -50,16 +47,7 @@ public NonPersistentTopicsImpl(WebTarget web, Authentication auth, long readTime

@Override
public void createPartitionedTopic(String topic, int numPartitions) throws PulsarAdminException {
try {
createPartitionedTopicAsync(topic, numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> createPartitionedTopicAsync(topic, numPartitions));
}

@Override
Expand All @@ -72,16 +60,7 @@ public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int num

@Override
public PartitionedTopicMetadata getPartitionedTopicMetadata(String topic) throws PulsarAdminException {
try {
return getPartitionedTopicMetadataAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getPartitionedTopicMetadataAsync(topic));
}

@Override
Expand All @@ -107,16 +86,7 @@ public void failed(Throwable throwable) {

@Override
public NonPersistentTopicStats getStats(String topic) throws PulsarAdminException {
try {
return getStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getStatsAsync(topic));
}

@Override
Expand All @@ -142,16 +112,7 @@ public void failed(Throwable throwable) {

@Override
public PersistentTopicInternalStats getInternalStats(String topic) throws PulsarAdminException {
try {
return getInternalStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getInternalStatsAsync(topic));
}

@Override
Expand All @@ -177,16 +138,7 @@ public void failed(Throwable throwable) {

@Override
public void unload(String topic) throws PulsarAdminException {
try {
unloadAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> unloadAsync(topic));
}

@Override
Expand All @@ -198,16 +150,7 @@ public CompletableFuture<Void> unloadAsync(String topic) {

@Override
public List<String> getListInBundle(String namespace, String bundleRange) throws PulsarAdminException {
try {
return getListInBundleAsync(namespace, bundleRange).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getListInBundleAsync(namespace, bundleRange));
}

@Override
Expand All @@ -231,16 +174,7 @@ public void failed(Throwable throwable) {

@Override
public List<String> getList(String namespace) throws PulsarAdminException {
try {
return getListAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getListAsync(namespace));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
Expand Down Expand Up @@ -63,16 +60,7 @@ public PackagesImpl(WebTarget webTarget, Authentication auth, AsyncHttpClient cl

@Override
public PackageMetadata getMetadata(String packageName) throws PulsarAdminException {
try {
return getMetadataAsync(packageName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getMetadataAsync(packageName));
}

@Override
Expand All @@ -94,14 +82,7 @@ public void failed(Throwable throwable) {

@Override
public void updateMetadata(String packageName, PackageMetadata metadata) throws PulsarAdminException {
try {
updateMetadataAsync(packageName, metadata).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
}
sync(() -> updateMetadataAsync(packageName, metadata));
}

@Override
Expand All @@ -112,14 +93,7 @@ public CompletableFuture<Void> updateMetadataAsync(String packageName, PackageMe

@Override
public void upload(PackageMetadata metadata, String packageName, String path) throws PulsarAdminException {
try {
uploadAsync(metadata, packageName, path).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
}
sync(() -> uploadAsync(metadata, packageName, path));
}

@Override
Expand Down Expand Up @@ -154,19 +128,7 @@ public CompletableFuture<Void> uploadAsync(PackageMetadata metadata, String pack

@Override
public void download(String packageName, String path) throws PulsarAdminException {
try {
downloadAsync(packageName, path).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof PulsarAdminException) {
throw (PulsarAdminException) cause;
} else {
throw new PulsarAdminException(cause);
}
}
sync(() -> downloadAsync(packageName, path));
}

@Override
Expand Down Expand Up @@ -202,14 +164,7 @@ public void failed(Throwable throwable) {

@Override
public void delete(String packageName) throws PulsarAdminException {
try {
deleteAsync(packageName).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
}
sync(() -> deleteAsync(packageName));
}

@Override
Expand All @@ -221,14 +176,7 @@ public CompletableFuture<Void> deleteAsync(String packageName) {

@Override
public List<String> listPackageVersions(String packageName) throws PulsarAdminException {
try {
return listPackageVersionsAsync(packageName).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
}
return sync(() -> listPackageVersionsAsync(packageName));
}


Expand All @@ -254,14 +202,7 @@ public void failed(Throwable throwable) {

@Override
public List<String> listPackages(String type, String namespace) throws PulsarAdminException {
try {
return listPackagesAsync(type, namespace).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
}
return sync(() -> listPackagesAsync(type, namespace));
}

@Override
Expand Down
Loading

0 comments on commit ba339fb

Please sign in to comment.