Skip to content

Commit

Permalink
Pulsar Admin: reduce code duplication - part 4 (apache#13086)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Dec 2, 2021
1 parent 9eaf2b5 commit 8297c32
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 532 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
import java.io.File;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
Expand Down Expand Up @@ -66,16 +63,7 @@ public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpCl

@Override
public List<String> listSinks(String tenant, String namespace) throws PulsarAdminException {
try {
return listSinksAsync(tenant, namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> listSinksAsync(tenant, namespace));
}

@Override
Expand Down Expand Up @@ -106,16 +94,7 @@ public void failed(Throwable throwable) {

@Override
public SinkConfig getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
try {
return getSinkAsync(tenant, namespace, sinkName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getSinkAsync(tenant, namespace, sinkName));
}

@Override
Expand Down Expand Up @@ -147,16 +126,7 @@ public void failed(Throwable throwable) {
@Override
public SinkStatus getSinkStatus(
String tenant, String namespace, String sinkName) throws PulsarAdminException {
try {
return getSinkStatusAsync(tenant, namespace, sinkName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getSinkStatusAsync(tenant, namespace, sinkName));
}

@Override
Expand Down Expand Up @@ -188,16 +158,7 @@ public void failed(Throwable throwable) {
@Override
public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(
String tenant, String namespace, String sinkName, int id) throws PulsarAdminException {
try {
return getSinkStatusAsync(tenant, namespace, sinkName, id).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getSinkStatusAsync(tenant, namespace, sinkName, id));
}

@Override
Expand Down Expand Up @@ -231,16 +192,7 @@ public void failed(Throwable throwable) {

@Override
public void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException {
try {
createSinkAsync(sinkConfig, fileName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> createSinkAsync(sinkConfig, fileName));
}

@Override
Expand Down Expand Up @@ -284,16 +236,7 @@ public CompletableFuture<Void> createSinkAsync(SinkConfig sinkConfig, String fil

@Override
public void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException {
try {
createSinkWithUrlAsync(sinkConfig, pkgUrl).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> createSinkWithUrlAsync(sinkConfig, pkgUrl));
}

@Override
Expand All @@ -314,16 +257,7 @@ public CompletableFuture<Void> createSinkWithUrlAsync(SinkConfig sinkConfig, Str

@Override
public void deleteSink(String cluster, String namespace, String function) throws PulsarAdminException {
try {
deleteSinkAsync(cluster, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> deleteSinkAsync(cluster, namespace, function));
}

@Override
Expand All @@ -339,16 +273,7 @@ public CompletableFuture<Void> deleteSinkAsync(String tenant, String namespace,
@Override
public void updateSink(SinkConfig sinkConfig, String fileName, UpdateOptions updateOptions)
throws PulsarAdminException {
try {
updateSinkAsync(sinkConfig, fileName, updateOptions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> updateSinkAsync(sinkConfig, fileName, updateOptions));
}

@Override
Expand Down Expand Up @@ -411,16 +336,7 @@ public CompletableFuture<Void> updateSinkAsync(SinkConfig sinkConfig, String fil
@Override
public void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl, UpdateOptions updateOptions)
throws PulsarAdminException {
try {
updateSinkWithUrlAsync(sinkConfig, pkgUrl, updateOptions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> updateSinkWithUrlAsync(sinkConfig, pkgUrl, updateOptions));
}

@Override
Expand Down Expand Up @@ -466,17 +382,7 @@ public CompletableFuture<Void> updateSinkWithUrlAsync(SinkConfig sinkConfig, Str
@Override
public void restartSink(String tenant, String namespace, String functionName, int instanceId)
throws PulsarAdminException {
try {
restartSinkAsync(tenant, namespace, functionName, instanceId)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> restartSinkAsync(tenant, namespace, functionName, instanceId));
}

@Override
Expand All @@ -493,16 +399,7 @@ public CompletableFuture<Void> restartSinkAsync(

@Override
public void restartSink(String tenant, String namespace, String functionName) throws PulsarAdminException {
try {
restartSinkAsync(tenant, namespace, functionName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> restartSinkAsync(tenant, namespace, functionName));
}

@Override
Expand All @@ -518,16 +415,7 @@ public CompletableFuture<Void> restartSinkAsync(String tenant, String namespace,
@Override
public void stopSink(String tenant, String namespace, String sinkName, int instanceId)
throws PulsarAdminException {
try {
stopSinkAsync(tenant, namespace, sinkName, instanceId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> stopSinkAsync(tenant, namespace, sinkName, instanceId));
}

@Override
Expand All @@ -543,16 +431,7 @@ public CompletableFuture<Void> stopSinkAsync(String tenant, String namespace, St

@Override
public void stopSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
try {
stopSinkAsync(tenant, namespace, sinkName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> stopSinkAsync(tenant, namespace, sinkName));
}

@Override
Expand All @@ -568,16 +447,7 @@ public CompletableFuture<Void> stopSinkAsync(String tenant, String namespace, St
@Override
public void startSink(String tenant, String namespace, String sinkName, int instanceId)
throws PulsarAdminException {
try {
startSinkAsync(tenant, namespace, sinkName, instanceId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> startSinkAsync(tenant, namespace, sinkName, instanceId));
}

@Override
Expand All @@ -593,16 +463,7 @@ public CompletableFuture<Void> startSinkAsync(String tenant, String namespace, S

@Override
public void startSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
try {
startSinkAsync(tenant, namespace, sinkName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> startSinkAsync(tenant, namespace, sinkName));
}

@Override
Expand All @@ -617,16 +478,7 @@ public CompletableFuture<Void> startSinkAsync(String tenant, String namespace, S

@Override
public List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException {
try {
return getBuiltInSinksAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getBuiltInSinksAsync());
}

@Override
Expand Down Expand Up @@ -655,16 +507,7 @@ public void failed(Throwable throwable) {

@Override
public void reloadBuiltInSinks() throws PulsarAdminException {
try {
reloadBuiltInSinksAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> reloadBuiltInSinksAsync());
}

@Override
Expand Down
Loading

0 comments on commit 8297c32

Please sign in to comment.