Skip to content

Commit

Permalink
Pulsar Admin: reduce code duplication - part 1 (apache#12732)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Nov 11, 2021
1 parent 19df0f0 commit 2e0d705
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.ServerErrorException;
import javax.ws.rs.ServiceUnavailableException;
Expand Down Expand Up @@ -282,4 +286,17 @@ public static String getReasonFromServer(WebApplicationException e) {
}
}
}

protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws PulsarAdminException {
try {
return executor.get().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
package org.apache.pulsar.client.admin.internal;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
Expand All @@ -43,16 +40,7 @@ public BookiesImpl(WebTarget web, Authentication auth, long readTimeoutMs) {

@Override
public BookiesRackConfiguration getBookiesRackInfo() throws PulsarAdminException {
try {
return getBookiesRackInfoAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getBookiesRackInfoAsync());
}

@Override
Expand All @@ -76,16 +64,7 @@ public void failed(Throwable throwable) {

@Override
public BookiesClusterInfo getBookies() throws PulsarAdminException {
try {
return getBookiesAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getBookiesAsync());
}

@Override
Expand All @@ -109,16 +88,7 @@ public void failed(Throwable throwable) {

@Override
public BookieInfo getBookieRackInfo(String bookieAddress) throws PulsarAdminException {
try {
return getBookieRackInfoAsync(bookieAddress).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getBookieRackInfoAsync(bookieAddress));
}

@Override
Expand All @@ -142,16 +112,7 @@ public void failed(Throwable throwable) {

@Override
public void deleteBookieRackInfo(String bookieAddress) throws PulsarAdminException {
try {
deleteBookieRackInfoAsync(bookieAddress).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> deleteBookieRackInfoAsync(bookieAddress));
}

@Override
Expand All @@ -163,16 +124,7 @@ public CompletableFuture<Void> deleteBookieRackInfoAsync(String bookieAddress) {
@Override
public void updateBookieRackInfo(String bookieAddress, String group, BookieInfo bookieInfo)
throws PulsarAdminException {
try {
updateBookieRackInfoAsync(bookieAddress, group, bookieInfo).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
sync(() -> updateBookieRackInfoAsync(bookieAddress, group, bookieInfo));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.admin.BrokerStats;
Expand All @@ -50,16 +47,7 @@ public BrokerStatsImpl(WebTarget target, Authentication auth, long readTimeoutMs

@Override
public String getMetrics() throws PulsarAdminException {
try {
return getMetricsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getMetricsAsync());
}

@Override
Expand All @@ -83,16 +71,7 @@ public void failed(Throwable throwable) {

@Override
public AllocatorStats getAllocatorStats(String allocatorName) throws PulsarAdminException {
try {
return getAllocatorStatsAsync(allocatorName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getAllocatorStatsAsync(allocatorName));
}

@Override
Expand All @@ -116,16 +95,7 @@ public void failed(Throwable throwable) {

@Override
public String getMBeans() throws PulsarAdminException {
try {
return getMBeansAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getMBeansAsync());
}

@Override
Expand All @@ -149,16 +119,7 @@ public void failed(Throwable throwable) {

@Override
public String getTopics() throws PulsarAdminException {
try {
return getTopicsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getTopicsAsync());
}

@Override
Expand All @@ -182,16 +143,7 @@ public void failed(Throwable throwable) {

@Override
public LoadManagerReport getLoadReport() throws PulsarAdminException {
try {
return getLoadReportAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getLoadReportAsync());
}

@Override
Expand All @@ -215,16 +167,7 @@ public void failed(Throwable throwable) {

@Override
public String getPendingBookieOpsStats() throws PulsarAdminException {
try {
return getPendingBookieOpsStatsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getPendingBookieOpsStatsAsync());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,7 @@ public LookupImpl(WebTarget web, Authentication auth, boolean useTls, long readT

@Override
public String lookupTopic(String topic) throws PulsarAdminException {
try {
return lookupTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> lookupTopicAsync(topic));
}

@Override
Expand Down Expand Up @@ -147,16 +138,7 @@ public CompletableFuture<Map<String, String>> lookupPartitionedTopicAsync(String

@Override
public String getBundleRange(String topic) throws PulsarAdminException {
try {
return getBundleRangeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
return sync(() -> getBundleRangeAsync(topic));
}

@Override
Expand Down

0 comments on commit 2e0d705

Please sign in to comment.