Skip to content

Commit

Permalink
Propagate specific Schema error to client (apache#3345)
Browse files Browse the repository at this point in the history
* Propagate specific Schema error to client

* Handling new enums in C++

* Fixed formatting
  • Loading branch information
merlimat authored Jan 10, 2019
1 parent fa45573 commit e9a5e61
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
import org.apache.pulsar.common.api.proto.PulsarApi;

/**
Expand Down Expand Up @@ -163,6 +164,8 @@ public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
} else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException
|| t instanceof SubscriptionFencedException) {
return PulsarApi.ServerError.ServiceNotReady;
} else if (t instanceof IncompatibleSchemaException) {
return PulsarApi.ServerError.IncompatibleSchema;
} else {
return PulsarApi.ServerError.UnknownError;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoUtil;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion;
Expand Down Expand Up @@ -600,7 +599,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
readCompacted, initialPosition);
} else {
return FutureUtil.failedFuture(
new BrokerServiceException(
new IncompatibleSchemaException(
"Trying to subscribe with incompatible schema"
));
}
Expand Down Expand Up @@ -846,7 +845,9 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}

schemaVersionFuture.exceptionally(exception -> {
ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, exception.getMessage()));
ctx.writeAndFlush(Commands.newError(requestId,
BrokerServiceException.getClientErrorCode(exception.getCause()),
exception.getMessage()));
producers.remove(producerId, producerFuture);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -156,7 +157,7 @@ public void newProducerWithoutSchemaOnTopicWithSchema() throws Exception {
}
} catch (PulsarClientException e) {
if (schemaValidationEnforced) {
Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
Assert.assertTrue(e instanceof IncompatibleSchemaException);
} else {
Assert.fail("Shouldn't throw IncompatibleSchemaException"
+ " if SchemaValidationEnforced is disabled");
Expand Down Expand Up @@ -193,7 +194,7 @@ public void newConsumerWithSchemaOnExistingTopicWithoutSchema() throws Exception
.topic(topic).subscriptionName("sub1").subscribe()) {
Assert.fail("Shouldn't be able to consume with a schema from a topic which has no schema set");
} catch (PulsarClientException e) {
Assert.assertTrue(e.getMessage().contains("Trying to subscribe with incompatible schema"));
Assert.assertTrue(e instanceof IncompatibleSchemaException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public TimeoutException(String msg) {
}
}

public static class IncompatibleSchemaException extends PulsarClientException {
public IncompatibleSchemaException(String msg) {
super(msg);
}
}

public static class LookupException extends PulsarClientException {
public LookupException(String msg) {
super(msg);
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/include/pulsar/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ enum Result
ResultUnsupportedVersionError, /// Error when an older client/version doesn't support a required feature
ResultTopicTerminated, /// Topic was already terminated
ResultCryptoError, /// Error when crypto operation fails

ResultIncompatibleSchema, /// Specified schema is incompatible with the topic's schema
};

// Return string representation of result code
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ static Result getResult(ServerError serverError) {

case InvalidTopicName:
return ResultInvalidTopicName;

case IncompatibleSchema:
return ResultIncompatibleSchema;
}
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// ServerError and miss them in the switch above we would like to get notified. Adding
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/Result.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ const char* pulsar::strResult(Result result) {

case ResultProducerBusy:
return "ProducerBusy";

case ResultIncompatibleSchema:
return "IncompatibleSchema";
};
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// ServerError and miss them in the switch above we would like to get notified. Adding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public RequestTime(long creationTime, long requestId) {
this.requestId = requestId;
}
}

public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
}
Expand Down Expand Up @@ -831,6 +831,8 @@ private PulsarClientException getPulsarClientException(ServerError error, String
return new PulsarClientException.ProducerBlockedQuotaExceededException(errorMsg);
case TopicTerminatedError:
return new PulsarClientException.TopicTerminatedException(errorMsg);
case IncompatibleSchema:
return new PulsarClientException.IncompatibleSchemaException(errorMsg);
case UnknownError:
default:
return new PulsarClientException(errorMsg);
Expand Down Expand Up @@ -862,6 +864,6 @@ private void checkRequestTimeout() {
}
}
}

private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ enum ServerError {

ProducerBusy = 16; // Producer with same name is already connected
InvalidTopicName = 17; // The topic name is not valid

IncompatibleSchema = 18; // Specified schema was incompatible with topic schema
}

enum AuthMethod {
Expand Down

0 comments on commit e9a5e61

Please sign in to comment.