Skip to content

Commit

Permalink
Issue 9130: Pulsar-admin sinks create: bad error message "java.lang.N…
Browse files Browse the repository at this point in the history
…ullPointerException: path is 'null'." in case of missing "--name" parameter (apache#9131)

Fixes apache#9130

### Motivation
Return a better error to the user

### Modifications
Add an explicit validation and do not let WebTarget#path throw an internal error.
The validation covers tenant,namespace and sinkname in all of the functions regarding sinks.
We must do the validation here because we have to prevent code to pass bad values to JAX-RS client 

### Verifying this change
This change added unit tests
  • Loading branch information
eolivelli authored Jan 12, 2021
1 parent d4c1677 commit 7b48dea
Show file tree
Hide file tree
Showing 2 changed files with 298 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Sink;
import org.apache.pulsar.client.admin.Sinks;
Expand Down Expand Up @@ -78,8 +79,11 @@ public List<String> listSinks(String tenant, String namespace) throws PulsarAdmi

@Override
public CompletableFuture<List<String>> listSinksAsync(String tenant, String namespace) {
WebTarget path = sink.path(tenant).path(namespace);
final CompletableFuture<List<String>> future = new CompletableFuture<>();
if (!validateNamespace(tenant, namespace, future)) {
return future;
}
WebTarget path = sink.path(tenant).path(namespace);
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
Expand Down Expand Up @@ -115,8 +119,11 @@ public SinkConfig getSink(String tenant, String namespace, String sinkName) thro

@Override
public CompletableFuture<SinkConfig> getSinkAsync(String tenant, String namespace, String sinkName) {
WebTarget path = sink.path(tenant).path(namespace).path(sinkName);
final CompletableFuture<SinkConfig> future = new CompletableFuture<>();
if (!validateSinkName(tenant, namespace, sinkName, future)) {
return future;
}
WebTarget path = sink.path(tenant).path(namespace).path(sinkName);
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
Expand Down Expand Up @@ -153,8 +160,11 @@ public SinkStatus getSinkStatus(

@Override
public CompletableFuture<SinkStatus> getSinkStatusAsync(String tenant, String namespace, String sinkName) {
WebTarget path = sink.path(tenant).path(namespace).path(sinkName).path("status");
final CompletableFuture<SinkStatus> future = new CompletableFuture<>();
if (!validateSinkName(tenant, namespace, sinkName, future)) {
return future;
}
WebTarget path = sink.path(tenant).path(namespace).path(sinkName).path("status");
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
Expand Down Expand Up @@ -192,9 +202,12 @@ public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(
@Override
public CompletableFuture<SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> getSinkStatusAsync(
String tenant, String namespace, String sinkName, int id) {
WebTarget path = sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(id)).path("status");
final CompletableFuture<SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> future =
new CompletableFuture<>();
if (!validateSinkName(tenant, namespace, sinkName, future)) {
return future;
}
WebTarget path = sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(id)).path("status");
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
Expand Down Expand Up @@ -232,6 +245,9 @@ public void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdmi
@Override
public CompletableFuture<Void> createSinkAsync(SinkConfig sinkConfig, String fileName) {
final CompletableFuture<Void> future = new CompletableFuture<>();
if (!validateSinkName(sinkConfig.getTenant(), sinkConfig.getNamespace(), sinkConfig.getName(), future)) {
return future;
}
try {
RequestBuilder builder =
post(sink.path(sinkConfig.getTenant())
Expand Down Expand Up @@ -286,6 +302,11 @@ public CompletableFuture<Void> createSinkWithUrlAsync(SinkConfig sinkConfig, Str
mp.bodyPart(new FormDataBodyPart("sinkConfig",
new Gson().toJson(sinkConfig),
MediaType.APPLICATION_JSON_TYPE));
CompletableFuture<Void> validationFuture = new CompletableFuture<>();
if (!validateSinkName(sinkConfig.getTenant(), sinkConfig.getNamespace(),
sinkConfig.getName(), validationFuture)) {
return validationFuture;
}
WebTarget path = sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName());
return asyncPostRequest(path, Entity.entity(mp, MediaType.MULTIPART_FORM_DATA));
}
Expand All @@ -306,6 +327,10 @@ public void deleteSink(String cluster, String namespace, String function) throws

@Override
public CompletableFuture<Void> deleteSinkAsync(String tenant, String namespace, String function) {
CompletableFuture<Void> validationFuture = new CompletableFuture<>();
if (!validateSinkName(tenant, namespace, function, validationFuture)) {
return validationFuture;
}
WebTarget path = sink.path(tenant).path(namespace).path(function);
return asyncDeleteRequest(path);
}
Expand All @@ -329,6 +354,9 @@ public void updateSink(SinkConfig sinkConfig, String fileName, UpdateOptions upd
public CompletableFuture<Void> updateSinkAsync(
SinkConfig sinkConfig, String fileName, UpdateOptions updateOptions) {
final CompletableFuture<Void> future = new CompletableFuture<>();
if (!validateSinkName(sinkConfig.getTenant(), sinkConfig.getNamespace(), sinkConfig.getName(), future)) {
return future;
}
try {
RequestBuilder builder =
put(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace())
Expand Down Expand Up @@ -397,6 +425,9 @@ public void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl, UpdateOption
public CompletableFuture<Void> updateSinkWithUrlAsync(
SinkConfig sinkConfig, String pkgUrl, UpdateOptions updateOptions) {
final CompletableFuture<Void> future = new CompletableFuture<>();
if (!validateSinkName(sinkConfig.getTenant(), sinkConfig.getNamespace(), sinkConfig.getName(), future)) {
return future;
}
try {
final FormDataMultiPart mp = new FormDataMultiPart();
mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
Expand Down Expand Up @@ -448,6 +479,10 @@ public void restartSink(String tenant, String namespace, String functionName, in
@Override
public CompletableFuture<Void> restartSinkAsync(
String tenant, String namespace, String functionName, int instanceId) {
CompletableFuture<Void> validationFuture = new CompletableFuture<>();
if (!validateSinkName(tenant, namespace, functionName, validationFuture)) {
return validationFuture;
}
WebTarget path = sink.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
.path("restart");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
Expand All @@ -469,6 +504,10 @@ public void restartSink(String tenant, String namespace, String functionName) th

@Override
public CompletableFuture<Void> restartSinkAsync(String tenant, String namespace, String functionName) {
CompletableFuture<Void> validationFuture = new CompletableFuture<>();
if (!validateSinkName(tenant, namespace, functionName, validationFuture)) {
return validationFuture;
}
WebTarget path = sink.path(tenant).path(namespace).path(functionName).path("restart");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
Expand All @@ -490,6 +529,10 @@ public void stopSink(String tenant, String namespace, String sinkName, int insta

@Override
public CompletableFuture<Void> stopSinkAsync(String tenant, String namespace, String sinkName, int instanceId) {
CompletableFuture<Void> validationFuture = new CompletableFuture<>();
if (!validateSinkName(tenant, namespace, sinkName, validationFuture)) {
return validationFuture;
}
WebTarget path = sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(instanceId))
.path("stop");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
Expand All @@ -511,6 +554,10 @@ public void stopSink(String tenant, String namespace, String sinkName) throws Pu

@Override
public CompletableFuture<Void> stopSinkAsync(String tenant, String namespace, String sinkName) {
CompletableFuture<Void> validationFuture = new CompletableFuture<>();
if (!validateSinkName(tenant, namespace, sinkName, validationFuture)) {
return validationFuture;
}
WebTarget path = sink.path(tenant).path(namespace).path(sinkName).path("stop");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
Expand All @@ -532,6 +579,10 @@ public void startSink(String tenant, String namespace, String sinkName, int inst

@Override
public CompletableFuture<Void> startSinkAsync(String tenant, String namespace, String sinkName, int instanceId) {
CompletableFuture<Void> validationFuture = new CompletableFuture<>();
if (!validateSinkName(tenant, namespace, sinkName, validationFuture)) {
return validationFuture;
}
WebTarget path = sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(instanceId))
.path("start");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
Expand All @@ -553,6 +604,10 @@ public void startSink(String tenant, String namespace, String sinkName) throws P

@Override
public CompletableFuture<Void> startSinkAsync(String tenant, String namespace, String sinkName) {
CompletableFuture<Void> validationFuture = new CompletableFuture<>();
if (!validateSinkName(tenant, namespace, sinkName, validationFuture)) {
return validationFuture;
}
WebTarget path = sink.path(tenant).path(namespace).path(sinkName).path("start");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
Expand Down Expand Up @@ -614,4 +669,29 @@ public CompletableFuture<Void> reloadBuiltInSinksAsync() {
WebTarget path = sink.path("reloadBuiltInSinks");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

private static boolean validateNamespace(String tenant, String namespace, CompletableFuture<?> future) {
if (StringUtils.isBlank(tenant)) {
future.completeExceptionally(new PulsarAdminException("tenant is required"));
return false;
}
if (StringUtils.isBlank(namespace)) {
future.completeExceptionally(new PulsarAdminException("namespace is required"));
return false;
}
return true;
}

private static boolean validateSinkName(String tenant, String namespace,
String sinkName, CompletableFuture<?> future) {
if (!validateNamespace(tenant, namespace, future)) {
return false;
}
if (StringUtils.isBlank(sinkName)) {
future.completeExceptionally(new PulsarAdminException("sink name is required"));
return false;
}
return true;
}

}
Loading

0 comments on commit 7b48dea

Please sign in to comment.