Skip to content

Commit

Permalink
[Issue 6024][pulsar_storm] PulsarSpout emit to multiple streams (apac…
Browse files Browse the repository at this point in the history
…he#6039)

Fixes apache#6024

### Motivation
This is all described in detail in apache#6024, but in short, an insurmountable obstacle to using Pulsar in our storm topology is the fact that `PulsarSpout` only emits to the "default" stream. In our environment, we need to emit on different streams based on the content of each received message. This change extends `PulsarSpout` to recognize a `Values` extension that specifies an alternate output stream, and uses that stream when given.

### Modifications
A new `PulsarTuple` class is added. It extends `Values` and adds a method to return the output stream.

When emitting a tuple after calling `toValues(msg)`, `PulsarSpout` checks if the returned `Values` is a `PulsarTuple`. If so, it emits to the designated stream, otherwise it emits as before.
  • Loading branch information
dawillcox authored Feb 3, 2020
1 parent 0b06dba commit af6b5f6
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public interface MessageToValuesMapper extends Serializable {
* @param msg
* @return
*/
public Values toValues(Message<byte[]> msg);
Values toValues(Message<byte[]> msg);

/**
* Declare the output schema for the spout.
*
* @param declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer);
void declareOutputFields(OutputFieldsDeclarer declarer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,11 @@ private boolean mapToValueAndEmit(Message<byte[]> msg) {
}
ack(msg);
} else {
collector.emit(values, msg);
if (values instanceof PulsarTuple) {
collector.emit(((PulsarTuple) values).getOutputStream(), values, msg);
} else {
collector.emit(values, msg);
}
++messagesEmitted;
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Emitted message {} to the collector", spoutId, msg.getMessageId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.storm;


import org.apache.storm.tuple.Values;

/**
* Returned by MessageToValuesMapper, this specifies the Values
* for an output tuple and the stream it should be sent to.
*/
public class PulsarTuple extends Values {

protected final String outputStream;

public PulsarTuple(String outStream, Object ... values) {
super(values);
outputStream = outStream;
}

/**
* Return stream the tuple should be emitted on.
*
* @return String
*/
public String getOutputStream() {
return outputStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import java.lang.reflect.Field;
Expand All @@ -47,6 +50,7 @@
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Values;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;

import com.google.common.collect.Maps;
Expand Down Expand Up @@ -92,8 +96,17 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
verify(consumer, atLeast(1)).receive(anyInt(), any());
}

@Test
public void testPulsarTuple() throws Exception {
testPulsarSpout(true);
}

@Test
public void testPulsarSpout() throws Exception {
testPulsarSpout(false);
}

public void testPulsarSpout(boolean pulsarTuple) throws Exception {
PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
conf.setServiceUrl("http://localhost:8080");
conf.setSubscriptionName("sub1");
Expand All @@ -105,7 +118,15 @@ public void testPulsarSpout() throws Exception {
@Override
public Values toValues(Message<byte[]> msg) {
called.set(true);
return new Values("test");
if ("message to be dropped".equals(new String(msg.getData()))) {
return null;
}
String val = new String(msg.getData());
if (val.startsWith("stream:")) {
String stream = val.split(":")[1];
return new PulsarTuple(stream, val);
}
return new Values(val);
}

@Override
Expand All @@ -114,6 +135,8 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {

});

String msgContent = pulsarTuple ? "stream:pstream" : "test";

ClientBuilder builder = spy(new ClientBuilderImpl());
PulsarSpout spout = spy(new PulsarSpout(conf, builder));
TopologyContext context = mock(TopologyContext.class);
Expand All @@ -131,14 +154,22 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
when(client.getSharedConsumer(any())).thenReturn(consumer);
instances.put(componentId, client);

Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), "test".getBytes(), Schema.BYTES);
Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), msgContent.getBytes(), Schema.BYTES);
when(consumer.receive(anyInt(), any())).thenReturn(msg);

spout.open(config, context, collector);
spout.emitNextAvailableTuple();

assertTrue(called.get());
verify(consumer, atLeast(1)).receive(anyInt(), any());
ArgumentCaptor<Values> capt = ArgumentCaptor.forClass(Values.class);
if (pulsarTuple) {
verify(collector, times(1)).emit(eq("pstream"), capt.capture(), eq(msg));
} else {
verify(collector, times(1)).emit(capt.capture(), eq(msg));
}
Values vals = capt.getValue();
assertEquals(msgContent, vals.get(0));
}

}

0 comments on commit af6b5f6

Please sign in to comment.