Skip to content

Commit

Permalink
[pulsar-proxy]Add the LookupProxyHandler handle getSchema request and…
Browse files Browse the repository at this point in the history
… test (apache#4083)

### Motivation
In order to support apache#3742 apache#3876.
Now, proxy handle ProxyLookupRequests don't support GetSchema.

### Modifications
Add the getSchema method implementation in ProxyConnection 

### Verifying this change
Add new a test in ProxyTest testGetSchema()

### Dependencies (does it add or upgrade a dependency): (yes / no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (yes)
The admin cli options: (no)
Anything that affects deployment: (no)
### Documentation
Does this pull request introduce a new feature? (no)
If yes, how is the feature documented? (no)
If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
  • Loading branch information
congbobo184 authored and sijie committed Apr 21, 2019
1 parent 7b2bf95 commit 1e097cb
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -60,6 +65,11 @@ public class LookupProxyHandler {
.create()
.register();

private static final Counter getSchemaRequests = Counter
.build("pulsar_proxy_get_schema_requests", "Counter of schema requests")
.create()
.register();

static final Counter rejectedLookupRequests = Counter.build("pulsar_proxy_rejected_lookup_requests",
"Counter of topic lookup requests rejected due to throttling").create().register();

Expand Down Expand Up @@ -280,26 +290,12 @@ public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTop
}
}


private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace,
long clientRequestId) {
String serviceUrl;
if (isBlank(brokerServiceURL)) {
ServiceLookupData availableBroker;
try {
availableBroker = service.getDiscoveryProvider().nextBroker();
} catch (Exception e) {
log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
proxyConnection.ctx().writeAndFlush(Commands.newError(
clientRequestId, ServerError.ServiceNotReady, e.getMessage()
));
return;
}
serviceUrl = this.connectWithTLS ?
availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
} else {
serviceUrl = this.connectWithTLS ?
service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL();
String serviceUrl = getServiceUrl(clientRequestId);

if(!StringUtils.isNotBlank(serviceUrl)) {
return;
}
performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10,
commandGetTopicsOfNamespace.getMode());
Expand All @@ -316,16 +312,12 @@ private void performGetTopicsOfNamespace(long clientRequestId,
return;
}

URI brokerURI;
try {
brokerURI = new URI(brokerServiceUrl);
} catch (URISyntaxException e) {
proxyConnection.ctx().writeAndFlush(
Commands.newError(clientRequestId, ServerError.MetadataError, e.getMessage()));
InetSocketAddress addr = getAddr(brokerServiceUrl, clientRequestId);

if(addr == null){
return;
}

InetSocketAddress addr = InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
if (log.isDebugEnabled()) {
log.debug("Getting connections to '{}' for getting TopicsOfNamespace '{}' with clientReq Id '{}'",
addr, namespaceName, clientRequestId);
Expand All @@ -352,5 +344,88 @@ private void performGetTopicsOfNamespace(long clientRequestId,
});
}

public void handleGetSchema(CommandGetSchema commandGetSchema) {
getSchemaRequests.inc();
if (log.isDebugEnabled()) {
log.debug("[{}] Received GetSchema", clientAddress);
}

final long clientRequestId = commandGetSchema.getRequestId();
String serviceUrl = getServiceUrl(clientRequestId);

if(!StringUtils.isNotBlank(serviceUrl)) {
return;
}
InetSocketAddress addr = getAddr(serviceUrl, clientRequestId);

if(addr == null){
return;
}
if (log.isDebugEnabled()) {
log.debug("Getting connections to '{}' for getting schema of topic '{}' with clientReq Id '{}'",
addr, commandGetSchema.getTopic(), clientRequestId);
}

proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
// Connected to backend broker
long requestId = proxyConnection.newRequestId();
ByteBuf command;
byte[] schemaVersion = commandGetSchema.getSchemaVersion().toByteArray();
command = Commands.newGetSchema(requestId, commandGetSchema.getTopic(),
Optional.ofNullable(BytesSchemaVersion.of(schemaVersion)));
clientCnx.sendGetSchema(command, requestId).thenAccept(optionalSchemaInfo -> {
SchemaInfo schemaInfo = optionalSchemaInfo.get();
proxyConnection.ctx().writeAndFlush(
Commands.newGetSchemaResponse(clientRequestId,
schemaInfo,
BytesSchemaVersion.of(schemaVersion)));
}).exceptionally(ex -> {
log.warn("[{}] Failed to get schema {}: {}", clientAddress, commandGetSchema.getTopic(), ex.getMessage());
proxyConnection.ctx().writeAndFlush(
Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage()));
return null;
});
}).exceptionally(ex -> {
// Failed to connect to backend broker
proxyConnection.ctx().writeAndFlush(
Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage()));
return null;
});

}

private String getServiceUrl(long clientRequestId) {
if (isBlank(brokerServiceURL)) {
ServiceLookupData availableBroker;
try {
availableBroker = service.getDiscoveryProvider().nextBroker();
} catch (Exception e) {
log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
proxyConnection.ctx().writeAndFlush(Commands.newError(
clientRequestId, ServerError.ServiceNotReady, e.getMessage()
));
return null;
}
return this.connectWithTLS ?
availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
} else {
return this.connectWithTLS ?
service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL();
}

}

private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId) {
URI brokerURI;
try {
brokerURI = new URI(brokerServiceUrl);
} catch (URISyntaxException e) {
proxyConnection.ctx().writeAndFlush(
Commands.newError(clientRequestId, ServerError.MetadataError, e.getMessage()));
return null;
}
return InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
}

private static final Logger log = LoggerFactory.getLogger(LookupProxyHandler.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.slf4j.Logger;
Expand Down Expand Up @@ -371,6 +372,12 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet

lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
}
@Override
protected void handleGetSchema(CommandGetSchema commandGetSchema) {
checkArgument(state == State.ProxyLookupRequests);

lookupProxyHandler.handleGetSchema(commandGetSchema);
}

/**
* handles discovery request from client ands sends next active broker address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.avro.reflect.Nullable;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
Expand All @@ -47,11 +52,14 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -65,6 +73,18 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();


@Data
@ToString
@EqualsAndHashCode
public static class Foo {
@Nullable
private String field1;
@Nullable
private String field2;
private int field3;
}

@Override
@BeforeClass
protected void setup() throws Exception {
Expand Down Expand Up @@ -205,6 +225,29 @@ public void testRegexSubscription() throws Exception {
}
}

@Test
private void testGetSchema() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get())
.build();
Producer<Foo> producer;
Schema schema = Schema.AVRO(Foo.class);
try {
producer = client.newProducer(schema).topic("persistent://sample/test/local/get-schema")
.create();
} catch (Exception ex) {
Assert.fail("Should not have failed since can acquire LookupRequestSemaphore");
}
byte[] schemaVersion = new byte[8];
byte b = new Long(0l).byteValue();
for (int i = 0; i<8; i++){
schemaVersion[i] = b;
}
SchemaInfo schemaInfo = ((PulsarClientImpl)client).getLookup()
.getSchema(TopicName.get("persistent://sample/test/local/get-schema"), schemaVersion).get().orElse(null);
Assert.assertEquals(new String(schemaInfo.getSchema()), new String(schema.getSchemaInfo().getSchema()));
client.close();
}

@Test
private void testProtocolVersionAdvertisement() throws Exception {
final String url = "pulsar://localhost:" + proxyConfig.getServicePort().get();
Expand Down Expand Up @@ -234,6 +277,7 @@ private void testProtocolVersionAdvertisement() throws Exception {
client.close();
}


private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
throws Exception {
ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());
Expand Down

0 comments on commit 1e097cb

Please sign in to comment.