Skip to content

Commit

Permalink
[websocket] WebSocket proxy should return status code depending on ty…
Browse files Browse the repository at this point in the history
…pe of PulsarClientException (apache#9031)

### Motivation

If the WebSocket proxy fails to create a producer or consumer, it should return the suitable HTTP status code to the client depending on the type of `PulsarClientException` that occurred. However, there are currently only a few exceptions where the WebSocket proxy returns a status code other than 500.
https://github.com/apache/pulsar/blob/7be1b8dfcfaa4298c541c4eb134f1e752fa6a7d8/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java#L119-L129

### Modifications

Moved the `getErrorCode()` method from `ProducerHandler` and `ConsumerHandler` to `AbstractWebSocketHandler` and increased the types of `PulsarClientException` to handle.
  • Loading branch information
Masahiro Sakamoto authored Dec 24, 2020
1 parent b001283 commit 8c45b82
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.service.ProxyServer;
Expand Down Expand Up @@ -384,6 +388,120 @@ public void producerBacklogQuotaExceededTest() throws Exception {
}
}

@Test(timeOut = 10000)
public void topicDoesNotExistTest() throws Exception {
final String namespace = "my-property/ns-topic-creation-not-allowed";
admin.namespaces().createNamespace(namespace);
admin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("test"));
admin.namespaces().setAutoTopicCreation(namespace,
new AutoTopicCreationOverride(false, TopicType.NON_PARTITIONED.toString(), null));

final String topic = namespace + "/my-topic";
final String subscription = "my-sub";
final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/producer/persistent/" + topic;
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/consumer/persistent/" + topic + "/" + subscription;

URI produceUri = URI.create(producerUri);
URI consumeUri = URI.create(consumerUri);

WebSocketClient produceClient = new WebSocketClient();
WebSocketClient consumeClient = new WebSocketClient();

SimpleProducerSocket produceSocket = new SimpleProducerSocket();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();

try {
produceClient.start();
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
producerFuture.get();
fail("should fail: topic does not exist");
} catch (Exception e) {
// Expected
assertTrue(e.getCause() instanceof UpgradeException);
assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), HttpServletResponse.SC_NOT_FOUND);
} finally {
stopWebSocketClient(produceClient);
}

try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
consumerFuture.get();
fail("should fail: topic does not exist");
} catch (Exception e) {
// Expected
assertTrue(e.getCause() instanceof UpgradeException);
assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), HttpServletResponse.SC_NOT_FOUND);
} finally {
stopWebSocketClient(consumeClient);
}

admin.namespaces().deleteNamespace(namespace);
}

@Test(timeOut = 10000)
public void producerFencedTest() throws Exception {
final String topic = "my-property/my-ns/producer-fenced-test";
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + topic)
.accessMode(ProducerAccessMode.Exclusive).create();

final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/producer/persistent/" + topic;
URI produceUri = URI.create(producerUri);

WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();

try {
produceClient.start();
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
producerFuture.get();
fail("should fail: producer fenced");
} catch (Exception e) {
// Expected
assertTrue(e.getCause() instanceof UpgradeException);
assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), HttpServletResponse.SC_CONFLICT);
} finally {
stopWebSocketClient(produceClient);
producer.close();
}
}

@Test(timeOut = 10000)
public void topicTerminatedTest() throws Exception {
final String topic = "my-property/my-ns/topic-terminated-test";
admin.topics().createNonPartitionedTopic("persistent://" + topic);
admin.topics().terminateTopic("persistent://" + topic);

final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/producer/persistent/" + topic;
URI produceUri = URI.create(producerUri);

WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();

try {
produceClient.start();
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
producerFuture.get();
fail("should fail: topic terminated");
} catch (Exception e) {
// Expected
assertTrue(e.getCause() instanceof UpgradeException);
assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(),
HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} finally {
stopWebSocketClient(produceClient);
admin.topics().delete("persistent://" + topic);
}
}

/**
* It verifies proxy topic-stats and proxy-metrics api
*
Expand Down Expand Up @@ -648,4 +766,4 @@ private void stopWebSocketClient(WebSocketClient... clients) {
}

private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.PulsarClientException.AuthenticationException;
import org.apache.pulsar.client.api.PulsarClientException.AuthorizationException;
import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
import org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException;
import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
import org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Codec;
Expand All @@ -31,7 +44,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.Closeable;
Expand Down Expand Up @@ -70,7 +82,7 @@ protected boolean checkAuth(ServletUpgradeResponse response) {
log.info("[{}:{}] Authenticated WebSocket client {} on topic {}", request.getRemoteAddr(),
request.getRemotePort(), authRole, topic);

} catch (AuthenticationException e) {
} catch (javax.naming.AuthenticationException e) {
log.warn("[{}:{}] Failed to authenticated WebSocket client {} on topic {}: {}", request.getRemoteAddr(),
request.getRemotePort(), authRole, topic, e.getMessage());
try {
Expand Down Expand Up @@ -107,6 +119,38 @@ protected boolean checkAuth(ServletUpgradeResponse response) {
return true;
}

protected static int getErrorCode(Exception e) {
if (e instanceof IllegalArgumentException) {
return HttpServletResponse.SC_BAD_REQUEST;
} else if (e instanceof AuthenticationException) {
return HttpServletResponse.SC_UNAUTHORIZED;
} else if (e instanceof AuthorizationException) {
return HttpServletResponse.SC_FORBIDDEN;
} else if (e instanceof NotFoundException || e instanceof TopicDoesNotExistException) {
return HttpServletResponse.SC_NOT_FOUND;
} else if (e instanceof ProducerBusyException || e instanceof ConsumerBusyException
|| e instanceof ProducerFencedException || e instanceof IncompatibleSchemaException) {
return HttpServletResponse.SC_CONFLICT;
} else if (e instanceof TooManyRequestsException) {
return 429; // Too Many Requests
} else if (e instanceof ProducerBlockedQuotaExceededError || e instanceof ProducerBlockedQuotaExceededException
|| e instanceof TopicTerminatedException) {
return HttpServletResponse.SC_SERVICE_UNAVAILABLE;
} else if (e instanceof TimeoutException) {
return HttpServletResponse.SC_GATEWAY_TIMEOUT;
} else {
return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
}
}

protected static String getErrorMessage(Exception e) {
if (e instanceof IllegalArgumentException) {
return "Invalid query params: " + e.getMessage();
} else {
return "Failed to create producer/consumer: " + e.getMessage();
}
}

@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.atomic.LongAdder;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -43,7 +42,6 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
Expand Down Expand Up @@ -131,24 +129,6 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser
}
}

private static int getErrorCode(Exception e) {
if (e instanceof IllegalArgumentException) {
return HttpServletResponse.SC_BAD_REQUEST;
} else if (e instanceof ConsumerBusyException) {
return HttpServletResponse.SC_CONFLICT;
} else {
return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
}
}

private static String getErrorMessage(Exception e) {
if (e instanceof IllegalArgumentException) {
return "Invalid query params: " + e.getMessage();
} else {
return "Failed to subscribe: " + e.getMessage();
}
}

private void receiveMessage() {
if (log.isDebugEnabled()) {
log.debug("[{}:{}] [{}] [{}] Receive next message", request.getRemoteAddr(), request.getRemotePort(), topic, subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.atomic.LongAdder;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.CompressionType;
Expand All @@ -47,9 +46,6 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.util.DateFormatter;
Expand Down Expand Up @@ -116,26 +112,6 @@ public ProducerHandler(WebSocketService service, HttpServletRequest request, Ser
}
}

private static int getErrorCode(Exception e) {
if (e instanceof IllegalArgumentException) {
return HttpServletResponse.SC_BAD_REQUEST;
} else if (e instanceof ProducerBusyException) {
return HttpServletResponse.SC_CONFLICT;
} else if (e instanceof ProducerBlockedQuotaExceededError || e instanceof ProducerBlockedQuotaExceededException) {
return HttpServletResponse.SC_SERVICE_UNAVAILABLE;
} else {
return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
}
}

private static String getErrorMessage(Exception e) {
if (e instanceof IllegalArgumentException) {
return "Invalid query params: " + e.getMessage();
} else {
return "Failed to create producer: " + e.getMessage();
}
}

@Override
public void close() throws IOException {
if (producer != null) {
Expand Down

0 comments on commit 8c45b82

Please sign in to comment.