Skip to content

Commit

Permalink
Fix http produce msg redirect issue. (apache#15551)
Browse files Browse the repository at this point in the history
Master Issue: apache#15546

### Motivation
When lookup the topic ownership using REST produce, the redirect URI is incorrect, because :

```
uri.getPath(false); //Get the path of the current request relative to the base URI as a string.
```
So the redirect URI does not contain the base path:
```
URI redirectURI = new URI(String.format("%s%s", redirectAddresses.get(0), uri.getPath(false))) 
```
  • Loading branch information
Technoboy- authored May 13, 2022
1 parent ddd0316 commit 7f976da
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
Expand All @@ -41,6 +41,7 @@
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
Expand Down Expand Up @@ -378,10 +379,14 @@ private void processLookUpResult(List<String> redirectAddresses, AsyncResponse
log.debug("Redirect rest produce request for topic {} from {} to {}.",
topicName, pulsar().getWebServiceAddress(), redirectAddresses.get(0));
}
URI redirectURI = new URI(String.format("%s%s", redirectAddresses.get(0), uri.getPath(false)));
URL redirectAddress = new URL(redirectAddresses.get(0));
URI redirectURI = UriBuilder.fromUri(uri.getRequestUri())
.host(redirectAddress.getHost())
.port(redirectAddress.getPort())
.build();
asyncResponse.resume(Response.temporaryRedirect(redirectURI).build());
future.complete(true);
} catch (URISyntaxException | NullPointerException e) {
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.error("Error in preparing redirect url with rest produce message request for topic {}: {}",
topicName, e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -313,13 +314,13 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
@Test
public void testLookUpWithRedirect() throws Exception {
String topicName = "persistent://" + testTenant + "/" + testNamespace + "/" + testTopicName;
String requestPath = "/admin/v3/topics/my-tenant/my-namespace/my-topic";
URI requestPath = URI.create(pulsar.getWebServiceAddress() + "/topics/my-tenant/my-namespace/my-topic");
//create topic on one broker
admin.topics().createNonPartitionedTopic(topicName);
PulsarService pulsar2 = startBroker(getDefaultConf());
doReturn(false).when(topics).isRequestHttps();
UriInfo uriInfo = mock(UriInfo.class);
doReturn(requestPath).when(uriInfo).getPath(anyBoolean());
doReturn(requestPath).when(uriInfo).getRequestUri();
Whitebox.setInternalState(topics, "uri", uriInfo);
//do produce on another broker
topics.setPulsar(pulsar2);
Expand All @@ -336,8 +337,7 @@ public void testLookUpWithRedirect() throws Exception {
// Verify got redirect response
Assert.assertEquals(responseCaptor.getValue().getStatusInfo(), Response.Status.TEMPORARY_REDIRECT);
// Verify URI point to address of broker the topic was created on
Assert.assertEquals(responseCaptor.getValue().getLocation().toString(),
pulsar.getWebServiceAddress() + requestPath);
Assert.assertEquals(responseCaptor.getValue().getLocation().toString(), requestPath.toString());
}

@Test
Expand Down

0 comments on commit 7f976da

Please sign in to comment.