From 329e2310069b61e25ce3f87f2828fab78f97187a Mon Sep 17 00:00:00 2001 From: Addison Higham Date: Fri, 6 Mar 2020 16:06:11 -0700 Subject: [PATCH] [proxy] Fix proxy routing to functions worker (#6486) ### Motivation Currently, the proxy only works to proxy v1/v2 functions routes to the function worker. ### Modifications This changes this code to proxy all routes for the function worker when those routes match. At the moment this is still a static list of prefixes, but in the future it may be possible to have this list of prefixes be dynamically fetched from the REST routes. ### Verifying this change - added some tests to ensure the routing works as expected --- .../proxy/server/AdminProxyHandler.java | 26 +++++++- .../server/FunctionWorkerRoutingTest.java | 66 +++++++++++++++++++ 2 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index ca44c8fe0c08f..56a933bb5f5dd 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -26,9 +26,12 @@ import java.net.URI; import java.nio.ByteBuffer; import java.security.cert.X509Certificate; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.Objects; +import java.util.Set; import java.util.concurrent.Executor; import javax.net.ssl.SSLContext; @@ -60,6 +63,21 @@ class AdminProxyHandler extends ProxyServlet { private static final Logger LOG = LoggerFactory.getLogger(AdminProxyHandler.class); + private static final Set functionRoutes = new HashSet<>(Arrays.asList( + "/admin/v3/function", + "/admin/v2/function", + "/admin/function", + "/admin/v3/source", + "/admin/v2/source", + "/admin/source", + "/admin/v3/sink", + "/admin/v2/sink", + "/admin/sink", + "/admin/v2/worker", + "/admin/v2/worker-stats", + "/admin/worker", + "/admin/worker-stats" + )); private final ProxyConfiguration config; private final BrokerDiscoveryProvider discoveryProvider; @@ -260,9 +278,11 @@ protected String rewriteTarget(HttpServletRequest request) { boolean isFunctionsRestRequest = false; String requestUri = request.getRequestURI(); - if (requestUri.startsWith("/admin/v2/functions") - || requestUri.startsWith("/admin/functions")) { - isFunctionsRestRequest = true; + for (String routePrefix: functionRoutes) { + if (requestUri.startsWith(routePrefix)) { + isFunctionsRestRequest = true; + break; + } } if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java new file mode 100644 index 0000000000000..b5d89cce6647a --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import javax.servlet.http.HttpServletRequest; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class FunctionWorkerRoutingTest { + + @Test + public void testFunctionWorkerRedirect() throws Exception { + String functionWorkerUrl = "http://function"; + String brokerUrl = "http://broker"; + + ProxyConfiguration proxyConfig = new ProxyConfiguration(); + proxyConfig.setBrokerWebServiceURL(brokerUrl); + proxyConfig.setFunctionWorkerWebServiceURL(functionWorkerUrl); + + BrokerDiscoveryProvider discoveryProvider = mock(BrokerDiscoveryProvider.class); + AdminProxyHandler handler = new AdminProxyHandler(proxyConfig, discoveryProvider); + + String funcUrl = handler.rewriteTarget(buildRequest("/admin/v3/functions/test/test")); + Assert.assertEquals(funcUrl, String.format("%s/admin/v3/functions/%s/%s", + functionWorkerUrl, "test", "test")); + + String sourceUrl = handler.rewriteTarget(buildRequest("/admin/v3/sources/test/test")); + Assert.assertEquals(sourceUrl, String.format("%s/admin/v3/sources/%s/%s", + functionWorkerUrl, "test", "test")); + + String sinkUrl = handler.rewriteTarget(buildRequest("/admin/v3/sinks/test/test")); + Assert.assertEquals(sinkUrl, String.format("%s/admin/v3/sinks/%s/%s", + functionWorkerUrl, "test", "test")); + + String tenantUrl = handler.rewriteTarget(buildRequest("/admin/v2/tenants/test")); + Assert.assertEquals(tenantUrl, String.format("%s/admin/v2/tenants/%s", + brokerUrl, "test")); + } + + static HttpServletRequest buildRequest(String url) { + HttpServletRequest mockReq = mock(HttpServletRequest.class); + when(mockReq.getRequestURI()).thenReturn(url); + return mockReq; + } + +}