diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 0072dc5162e8b..548c52269d73a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -24,14 +24,19 @@ import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.Optional; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.schema.BytesSchemaVersion; +import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +65,11 @@ public class LookupProxyHandler { .create() .register(); + private static final Counter getSchemaRequests = Counter + .build("pulsar_proxy_get_schema_requests", "Counter of schema requests") + .create() + .register(); + static final Counter rejectedLookupRequests = Counter.build("pulsar_proxy_rejected_lookup_requests", "Counter of topic lookup requests rejected due to throttling").create().register(); @@ -280,26 +290,12 @@ public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTop } } - private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace, long clientRequestId) { - String serviceUrl; - if (isBlank(brokerServiceURL)) { - ServiceLookupData availableBroker; - try { - availableBroker = service.getDiscoveryProvider().nextBroker(); - } catch (Exception e) { - log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e); - proxyConnection.ctx().writeAndFlush(Commands.newError( - clientRequestId, ServerError.ServiceNotReady, e.getMessage() - )); - return; - } - serviceUrl = this.connectWithTLS ? - availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl(); - } else { - serviceUrl = this.connectWithTLS ? - service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL(); + String serviceUrl = getServiceUrl(clientRequestId); + + if(!StringUtils.isNotBlank(serviceUrl)) { + return; } performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10, commandGetTopicsOfNamespace.getMode()); @@ -316,16 +312,12 @@ private void performGetTopicsOfNamespace(long clientRequestId, return; } - URI brokerURI; - try { - brokerURI = new URI(brokerServiceUrl); - } catch (URISyntaxException e) { - proxyConnection.ctx().writeAndFlush( - Commands.newError(clientRequestId, ServerError.MetadataError, e.getMessage())); + InetSocketAddress addr = getAddr(brokerServiceUrl, clientRequestId); + + if(addr == null){ return; } - InetSocketAddress addr = InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort()); if (log.isDebugEnabled()) { log.debug("Getting connections to '{}' for getting TopicsOfNamespace '{}' with clientReq Id '{}'", addr, namespaceName, clientRequestId); @@ -352,5 +344,88 @@ private void performGetTopicsOfNamespace(long clientRequestId, }); } + public void handleGetSchema(CommandGetSchema commandGetSchema) { + getSchemaRequests.inc(); + if (log.isDebugEnabled()) { + log.debug("[{}] Received GetSchema", clientAddress); + } + + final long clientRequestId = commandGetSchema.getRequestId(); + String serviceUrl = getServiceUrl(clientRequestId); + + if(!StringUtils.isNotBlank(serviceUrl)) { + return; + } + InetSocketAddress addr = getAddr(serviceUrl, clientRequestId); + + if(addr == null){ + return; + } + if (log.isDebugEnabled()) { + log.debug("Getting connections to '{}' for getting schema of topic '{}' with clientReq Id '{}'", + addr, commandGetSchema.getTopic(), clientRequestId); + } + + proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> { + // Connected to backend broker + long requestId = proxyConnection.newRequestId(); + ByteBuf command; + byte[] schemaVersion = commandGetSchema.getSchemaVersion().toByteArray(); + command = Commands.newGetSchema(requestId, commandGetSchema.getTopic(), + Optional.ofNullable(BytesSchemaVersion.of(schemaVersion))); + clientCnx.sendGetSchema(command, requestId).thenAccept(optionalSchemaInfo -> { + SchemaInfo schemaInfo = optionalSchemaInfo.get(); + proxyConnection.ctx().writeAndFlush( + Commands.newGetSchemaResponse(clientRequestId, + schemaInfo, + BytesSchemaVersion.of(schemaVersion))); + }).exceptionally(ex -> { + log.warn("[{}] Failed to get schema {}: {}", clientAddress, commandGetSchema.getTopic(), ex.getMessage()); + proxyConnection.ctx().writeAndFlush( + Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage())); + return null; + }); + }).exceptionally(ex -> { + // Failed to connect to backend broker + proxyConnection.ctx().writeAndFlush( + Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage())); + return null; + }); + + } + + private String getServiceUrl(long clientRequestId) { + if (isBlank(brokerServiceURL)) { + ServiceLookupData availableBroker; + try { + availableBroker = service.getDiscoveryProvider().nextBroker(); + } catch (Exception e) { + log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e); + proxyConnection.ctx().writeAndFlush(Commands.newError( + clientRequestId, ServerError.ServiceNotReady, e.getMessage() + )); + return null; + } + return this.connectWithTLS ? + availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl(); + } else { + return this.connectWithTLS ? + service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL(); + } + + } + + private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId) { + URI brokerURI; + try { + brokerURI = new URI(brokerServiceUrl); + } catch (URISyntaxException e) { + proxyConnection.ctx().writeAndFlush( + Commands.newError(clientRequestId, ServerError.MetadataError, e.getMessage())); + return null; + } + return InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort()); + } + private static final Logger log = LoggerFactory.getLogger(LookupProxyHandler.class); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index a1147f6c59fc1..18e4e5d750822 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -47,6 +47,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema; import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; import org.slf4j.Logger; @@ -371,6 +372,12 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace); } + @Override + protected void handleGetSchema(CommandGetSchema commandGetSchema) { + checkArgument(state == State.ProxyLookupRequests); + + lookupProxyHandler.handleGetSchema(commandGetSchema); + } /** * handles discovery request from client ands sends next active broker address diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index be47d9ea09817..94cb83ece07ac 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -24,12 +24,17 @@ import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.avro.reflect.Nullable; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -47,11 +52,14 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -65,6 +73,18 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + + @Data + @ToString + @EqualsAndHashCode + public static class Foo { + @Nullable + private String field1; + @Nullable + private String field2; + private int field3; + } + @Override @BeforeClass protected void setup() throws Exception { @@ -205,6 +225,29 @@ public void testRegexSubscription() throws Exception { } } + @Test + private void testGetSchema() throws Exception { + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get()) + .build(); + Producer producer; + Schema schema = Schema.AVRO(Foo.class); + try { + producer = client.newProducer(schema).topic("persistent://sample/test/local/get-schema") + .create(); + } catch (Exception ex) { + Assert.fail("Should not have failed since can acquire LookupRequestSemaphore"); + } + byte[] schemaVersion = new byte[8]; + byte b = new Long(0l).byteValue(); + for (int i = 0; i<8; i++){ + schemaVersion[i] = b; + } + SchemaInfo schemaInfo = ((PulsarClientImpl)client).getLookup() + .getSchema(TopicName.get("persistent://sample/test/local/get-schema"), schemaVersion).get().orElse(null); + Assert.assertEquals(new String(schemaInfo.getSchema()), new String(schema.getSchemaInfo().getSchema())); + client.close(); + } + @Test private void testProtocolVersionAdvertisement() throws Exception { final String url = "pulsar://localhost:" + proxyConfig.getServicePort().get(); @@ -234,6 +277,7 @@ private void testProtocolVersionAdvertisement() throws Exception { client.close(); } + private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf) throws Exception { ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());