This project is used to receive message from Rocketmq for Spark Streaming. Both push & pull consumer mode are provided. It provides simple parallelism, 1:1 correspondence between RocketMq's message queue id and Spark partitions.
For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact.
groupId = org.apache.rocketmq
artifactId = rocketmq-spark
version = 0.0.1-SNAPSHOT
In fact we may not find the artifact, So we should execute the following command in rocketmq-spark root directory firstly.
mvn clean install dependency:copy-dependencies
For Scala:
val dStream: InputDStream[MessageExt] = RocketMqUtils.createMQPullStream(streamingContext, groupId, topic, ConsumerStrategy.earliest, true, false, false)
dStream.map(message => (message.getBody)).print()
For Java:
JavaInputDStream<MessageExt> stream = RocketMqUtils.createJavaMQPullStream(javaStreamingContext, groupId,
topics, ConsumerStrategy.earliest(), true, false, false);
stream.foreachRDD(new VoidFunction<JavaRDD<MessageExt>>() {
@Override
public void call(JavaRDD<MessageExt> messageExtJavaRDD) throws Exception {
messageExtJavaRDD.foreach(new VoidFunction<MessageExt>() {
@Override
public void call(MessageExt messageExt) throws Exception {
System.out.println(messageExt.toString());
}
});
}
});
For Scala:
val offsetRanges = new util.HashMap[TopicQueueId, Array[OffsetRange]]
val topicQueueId1 = new TopicQueueId("topic", 1)
val ranges1 = Array(OffsetRange("groupId", 1, "broker-1", 0, 100), OffsetRange("groupId", 1, "broker-2", 0, 100))
offsetRanges.put(topicQueueId1, ranges1)
val topicQueueId2 = new TopicQueueId("topic", 2)
val ranges2 = Array(OffsetRange("groupId", 2, "broker-1", 0, 100), OffsetRange("groupId", 2, "broker-2", 0, 100))
offsetRanges.put(topicQueueId1, ranges2)
val optionParams = new util.HashMap[String, String]
val rdd: RDD[MessageExt] = RocketMqUtils.createRDD(sparkContext, groupId, offsetRanges, optionParams)
rdd.foreach(message => System.out.println(message.getBody))
For Java:
Map<TopicQueueId, OffsetRange[]> offsetRanges = new HashMap<>();
TopicQueueId topicQueueId1 = new TopicQueueId("topic", 1);
OffsetRange [] ranges1 = {OffsetRange.create("groupId", 1, "broker-1", 0, 100),
OffsetRange.create("groupId", 1, "broker-2", 0, 100)};
offsetRanges.put(topicQueueId1, ranges1);
TopicQueueId topicQueueId2 = new TopicQueueId("topic", 2);
OffsetRange [] ranges2 = {OffsetRange.create("groupId", 2, "broker-1", 0, 100),
OffsetRange.create("groupId", 2, "broker-2", 0, 100)};
offsetRanges.put(topicQueueId2, ranges2);
Map<String, String> optionParams= new HashMap();
LocationStrategy locationStrategy = LocationStrategies.PreferConsistent();
JavaRDD<MessageExt> rdd = RocketMqUtils.createJavaRDD(sparkContext, groupId, offsetRanges,
optionParams, locationStrategy);
rdd.foreach(new VoidFunction<MessageExt>() {
@Override
public void call(MessageExt messageExt) throws Exception {
System.out.println(messageExt.getBodyCRC());
}
});
The RocketMq consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.
In most cases, you should use LocationStrategies.PreferConsistent
as shown above. This will distribute partitions evenly across available executors. Finally, if you have a significant skew in load among partitions, use PreferFixed
. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location).
The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) RocketMq partitions, you can change this setting via pull.consumer.cache.maxCapacity
The RocketMq consumer will start to consume from different offset based on different consumer strategy.
-
EarliestStrategy: Specify the earliest available offset for every message queue to start to consume. But if the Rocketmq server has checkpoint for a message queue, then the consumer will consume from the checkpoint.
-
LatestStrategy: Specify the lastest available offset for every message queue to start to consume. But if the Rocketmq server has checkpoint for a message queue, then the consumer will consume from the checkpoint.
-
SpecificOffsetStrategy: Specify the specific available offset for every message queue to start to consume. Generally if the Rocketmq server has checkpoint for a message queue, then the consumer will consume from the checkpoint. But if the forceSpecial is true, the consumer will start to consume from the specific available offset in any case. Of course, the consumer will start to consume from the min available offset if a message queue is not specified. If the specify offset is 'ConsumerStrategy.LATEST' for a message queue, it indicates resolution to the latest offset. And if the specify offset is 'ConsumerStrategy.EARLIEST', it indicates resolution to the earliest offset.
Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the result of createMQPullStream, not later down a chain of methods. You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. Be aware that the one-to-one mapping between RDD partition and RocketMq partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().
dStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => val queueId = TaskContext.get.partitionId val offsets: Array[OffsetRange] = offsetRanges.get(new TopicQueueId(topic, queueId)) } }
RocketMq delivery semantics in the case of failure depend on how and when offsets are stored. Spark output operations are at-least-once. So if you want the equivalent of exactly-once semantics, you must either store offsets after an idempotent output, or store offsets in an atomic transaction alongside output. With this integration, you have 3 options, in order of increasing reliability (and code complexity), for how to store offsets.
If you enable Spark checkpointing, offsets will be stored in the checkpoint. This is easy to enable, but there are drawbacks. Your output operation must be idempotent, since you will get repeated outputs; transactions are not an option. Furthermore, you cannot recover from a checkpoint if your application code has changed. For planned upgrades, you can mitigate this by running the new code at the same time as the old code (since outputs need to be idempotent anyway, they should not clash). But for unplanned failures that require code changes, you will lose data unless you have another way to identify known good starting offsets.
RocketMq has an offset commit API that stores offsets in a special RocketMq topic. By default, the new consumer will auto-commit offsets by setting "autoCommit" true. This is almost certainly not what you want, because messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semantics. Then messages maybe lost. However, you can commit offsets to Rocket after you your output has been stored, using the commitAsync API.At the same time, you must make "autoCommit" be false. The benefit as compared to checkpoints is that RocketMq is a durable store regardless of changes to your application code. However, RocketMq is not transactional, so your outputs must still be idempotent.
For Scala:
//store commits
dStream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
dStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
For Java:
dStream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});
For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you're careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics. It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent.
// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new MessageQueue(resultSet.string("topic"), resultSet.string("broker"),
resultSet.int("queueId")) -> resultSet.long("offset")
}.toMap
val specificStrategy = ConsumerStrategy.specificOffset(fromOffsets)
val stream = RocketMqUtils
.createMQPullStream(streamingContext, groupId, topic, specificStrategy, false, true, true)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val results = yourCalculation(rdd)
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}
The following configs are for Consumer Pull Mode
Property Name | Default | Meaning |
---|---|---|
pull.max.speed.per.partition | -1 | Maximum rate (number of records per second) at which data will be read from each RocketMq partition, and the default value is "-1", it means consumer can pull message from rocketmq as fast as the consumer can. Other that, you also enables or disables Spark Streaming's internal backpressure mechanism by the config "spark.streaming.backpressure.enabled". |
pull.max.batch.size | 32 | To pick up the consume speed, the consumer can pull a batch of messages at a time. |
pull.timeout.ms | 3000 | pull timeout for the RocketMq consumer |
pull.consumer.cache.initialCapacity | 16 | the configs for consumer cache |
pull.consumer.cache.maxCapacity | 64 | the configs for consumer cache |
pull.consumer.cache.loadFactor | 0.75 | the configs for consumer cache |
Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.
- RocketMQReceiver - which is no fault-tolerance guarantees
- ReliableRocketMQReceiver - which is fault-tolerance guarantees
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
Properties properties = new Properties();
properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, NAMESERVER_ADDR);
properties.setProperty(RocketMQConfig.CONSUMER_GROUP, CONSUMER_GROUP);
properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, CONSUMER_TOPIC);
// no fault-tolerance guarantees
JavaInputDStream ds = RocketMQUtils.createInputDStream(jssc, properties, StorageLevel.MEMORY_ONLY());
// fault-tolerance guarantees
// JavaInputDStream ds = RocketMQUtils.createReliableInputDStream(jssc, properties, StorageLevel.MEMORY_ONLY());
ds.print();
jssc.start();
jssc.awaitTerminationOrTimeout(60000);