Skip to content

Commit

Permalink
Merge branch 'pr2949'
Browse files Browse the repository at this point in the history
  • Loading branch information
Brandon Devries committed Oct 2, 2018
2 parents 813cc1f + 4f538f1 commit 895323f
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package org.apache.nifi.jms.processors;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import javax.jms.BytesMessage;
import javax.jms.Destination;
Expand Down Expand Up @@ -78,11 +81,14 @@ public Message createMessage(Session session) throws JMSException {

void setMessageHeaderAndProperties(final Message message, final Map<String, String> flowFileAttributes) throws JMSException {
if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
for (Entry<String, String> entry : flowFileAttributes.entrySet()) {

Map<String, String> flowFileAttributesToSend = flowFileAttributes.entrySet().stream()
.filter(entry -> !entry.getKey().contains("-") && !entry.getKey().contains(".")) // '-' and '.' are illegal chars in JMS property names
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));

for (Entry<String, String> entry : flowFileAttributesToSend.entrySet()) {
try {
if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && !entry.getKey().contains("-") && !entry.getKey().contains(".")) {// '-' and '.' are illegal char in JMS prop names
message.setStringProperty(entry.getKey(), entry.getValue());
} else if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));
} else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) {
message.setJMSExpiration(Integer.parseInt(entry.getValue()));
Expand Down Expand Up @@ -110,6 +116,11 @@ void setMessageHeaderAndProperties(final Message message, final Map<String, Stri
} else {
logUnbuildableDestination(entry.getKey(), JmsHeaders.DESTINATION);
}
} else {
// not a special attribute handled above, so send it as a property using the specified property type
String type = flowFileAttributes.getOrDefault(entry.getKey().concat(".type"), "unknown").toLowerCase();
propertySetterMap.getOrDefault(type, JmsPropertySetterEnum.STRING)
.setProperty(message, entry.getKey(), entry.getValue());
}
} catch (NumberFormatException ne) {
this.processLog.warn("Incompatible value for attribute " + entry.getKey()
Expand Down Expand Up @@ -146,4 +157,55 @@ public Queue doInJms(Session session) throws JMSException {

return destination;
}

/**
* Implementations of this interface use {@link javax.jms.Message} methods to set strongly typed properties.
*/
public interface JmsPropertySetter {
void setProperty(final Message message, final String name, final String value) throws JMSException, NumberFormatException;
}

public enum JmsPropertySetterEnum implements JmsPropertySetter {
BOOLEAN( (message, name, value) -> {
message.setBooleanProperty(name, Boolean.parseBoolean(value));
} ),
BYTE( (message, name, value) -> {
message.setByteProperty(name, Byte.parseByte(value));
} ),
SHORT( (message, name, value) -> {
message.setShortProperty(name, Short.parseShort(value));
} ),
INTEGER( (message, name, value) -> {
message.setIntProperty(name, Integer.parseInt(value));
} ),
LONG( (message, name, value) -> {
message.setLongProperty(name, Long.parseLong(value));
} ),
FLOAT( (message, name, value) -> {
message.setFloatProperty(name, Float.parseFloat(value));
} ),
DOUBLE( (message, name, value) -> {
message.setDoubleProperty(name, Double.parseDouble(value));
} ),
STRING( (message, name, value) -> {
message.setStringProperty(name, value);
} );

private final JmsPropertySetter setter;
JmsPropertySetterEnum(JmsPropertySetter setter) {
this.setter = setter;
}

public void setProperty(Message message, String name, String value) throws JMSException, NumberFormatException {
setter.setProperty(message, name, value);
}
}

/**
* This map helps us avoid using JmsPropertySetterEnum.valueOf and dealing with IllegalArgumentException on failed lookup.
*/
public static Map<String, JmsPropertySetterEnum> propertySetterMap = new HashMap<>();
static {
Arrays.stream(JmsPropertySetterEnum.values()).forEach(e -> propertySetterMap.put(e.name().toLowerCase(), e));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@
@ReadsAttribute(attribute = JmsHeaders.TYPE, description = "This attribute becomes the JMSType message header. Must be an integer."),
@ReadsAttribute(attribute = JmsHeaders.REPLY_TO, description = "This attribute becomes the JMSReplyTo message header. Must be an integer."),
@ReadsAttribute(attribute = JmsHeaders.DESTINATION, description = "This attribute becomes the JMSDestination message header. Must be an integer."),
@ReadsAttribute(attribute = "other attributes", description = "All other attributes that do not start with " + JmsHeaders.PREFIX + " are added as message properties.")
@ReadsAttribute(attribute = "other attributes", description = "All other attributes that do not start with " + JmsHeaders.PREFIX + " are added as message properties."),
@ReadsAttribute(attribute = "other attributes .type", description = "When an attribute will be added as a message property, a second attribute of the same name but with an extra"
+ " `.type` at the end will cause the message property to be sent using that strong type. For example, attribute `delay` with value `12000` and another attribute"
+ " `delay.type` with value `integer` will cause a JMS message property `delay` to be sent as an Integer rather than a String. Supported types are boolean, byte,"
+ " short, integer, long, float, double, and string (which is the default).")
})
@SeeAlso(value = { ConsumeJMS.class, JMSConnectionFactoryProvider.class })
@SystemResourceConsideration(resource = SystemResource.MEMORY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -180,4 +181,76 @@ public void validatePublishTextMessage() throws Exception {

runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
}

@Test(timeout = 10000)
public void validatePublishPropertyTypes() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");

final String destinationName = "validatePublishPropertyTypes";
PublishJMS pubProc = new PublishJMS();
TestRunner runner = TestRunners.newTestRunner(pubProc);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);

runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);

runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, destinationName);

Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "foo");
attributes.put("myboolean", "true");
attributes.put("myboolean.type", "boolean");
attributes.put("mybyte", "127");
attributes.put("mybyte.type", "byte");
attributes.put("myshort", "16384");
attributes.put("myshort.type", "short");
attributes.put("myinteger", "1544000");
attributes.put("myinteger.type", "INTEGER"); // test upper case
attributes.put("mylong", "9876543210");
attributes.put("mylong.type", "long");
attributes.put("myfloat", "3.14");
attributes.put("myfloat.type", "float");
attributes.put("mydouble", "3.14159265359");
attributes.put("mydouble.type", "double");
attributes.put("badtype", "3.14");
attributes.put("badtype.type", "pi"); // pi not recognized as a type, so send as String
attributes.put("badint", "3.14"); // value is not an integer
attributes.put("badint.type", "integer");

runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.

final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);

JmsTemplate jmst = new JmsTemplate(cf);
BytesMessage message = (BytesMessage) jmst.receive(destinationName);

byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
assertEquals("Hey dude!", new String(messageBytes));
assertEquals(true, message.getObjectProperty("foo") instanceof String);
assertEquals("foo", message.getStringProperty("foo"));
assertEquals(true, message.getObjectProperty("myboolean") instanceof Boolean);
assertEquals(true, message.getBooleanProperty("myboolean"));
assertEquals(true, message.getObjectProperty("mybyte") instanceof Byte);
assertEquals(127, message.getByteProperty("mybyte"));
assertEquals(true, message.getObjectProperty("myshort") instanceof Short);
assertEquals(16384, message.getShortProperty("myshort"));
assertEquals(true, message.getObjectProperty("myinteger") instanceof Integer);
assertEquals(1544000, message.getIntProperty("myinteger"));
assertEquals(true, message.getObjectProperty("mylong") instanceof Long);
assertEquals(9876543210L, message.getLongProperty("mylong"));
assertEquals(true, message.getObjectProperty("myfloat") instanceof Float);
assertEquals(3.14F, message.getFloatProperty("myfloat"), 0.001F);
assertEquals(true, message.getObjectProperty("mydouble") instanceof Double);
assertEquals(3.14159265359D, message.getDoubleProperty("mydouble"), 0.00000000001D);
assertEquals(true, message.getObjectProperty("badtype") instanceof String);
assertEquals("3.14", message.getStringProperty("badtype"));
assertFalse(message.propertyExists("badint"));

runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
}
}

0 comments on commit 895323f

Please sign in to comment.