MapR Academy Sandbox

Architecture examples

connected drive

MapR Streams


  • Partition Id
  • Hash of messageId
  • Round-Robin

sending messages via client library

sending by client consuming by broker

spreading message between partitions, assigning message to paritiion

  • by partition number
  • by message key
  • round-robin ( without previous two )
  • properties.put("streams.patitioner.class", "my.package.MyClassName.class")
public class MyClassName implements Partitioner{
   public int partition( String topic, Object, key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){}


replication back loop

reading messages via client library

lib request client rading

reading messages cursor types

  • Read cursor ( client request it and broker sent )
  • Committed cursor ( client confirmed/commited reading ) cursor types

Replicating streams

  • Master->Slave
  • Many->One
  • MultiMaster: Master<-->Master
  • Stream replications: Node-->Node2-->Node3-->Node4 ... ( with loop preventing )

command line

find CLDB hosts ( ContainerLocationDataBase )

maprcli node listcldbs

create stream

maprcli stream create -path <filepath & name>
maprcli stream create -path <filepath & name> -consumeperm u:<userId> -produceperm u:<userId> -topicperm u:<userId>

create topic

maprcli stream topic create -path <path and name of the stream> -topic <name of the topic>

API, java programming


java example

Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// org.apache.kafka.common.serialization.ByteSerializer
// properties.put("", <client id>)

import org.apache.kafka.clients.producer.KafkaProducer;
KafkaProducer producer = new KafkaProducer<String, String>(properties);

String streamTopic = "<streamname>:<topicname>"; // "/streams/my-stream:topic-name"
ProducerRecord<String, String> record = new ProducerRecord<String, String>(streamTopic, textOfMessage);
// ProducerRecord<String, String> record = new ProducerRecord<String, String>(streamTopic, messageTextKey, textOfMessage);
// ProducerRecord<String, String> record = new ProducerRecord<String, String>(streamTopic, partitionIntNumber, textOfMessage);

Callback callback = new Callback(){
  public void onCompletion(RecordMetadata meta, Exception ex){
producer.send(record, callback);

sending conditions

flash client buffer

parallel sending

streams.parallel.flushers.per.partition default true:
  • does not wait for ACK before sending more messages
  • possible for messages to arrive out of order
streams.parallel.flushers.per.partition set to false: 
  • client library will wait for ACK from server
  • slower than default setting sending types

retrieving metadata during connection with Kafka

How frequently to fetch metadata


java consumer

Properties properties = new Properties();
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// org.apache.kafka.common.serialization.ByteSerializer
// properties.put("auto.offset.reset", <Earliest, Latest, None>)
// properties.put("", <group identificator>)
// properties.put("", <true - default | false >), use consumer.commitSync() if false
// properties.put("", <default value 1000ms>)

import org.apache.kafka.clients.consumer.KafkaConsumer;
KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);

String streamTopic = "<streamname>:<topicname>"; // "/streams/my-stream:topic-name"
// consumer.subscribe(topic, new RebalanceListener());
ConsumerRecords<String, String> messages = consumer.poll(1000L); // reading with timeout
messages.iterator().next().toString(); // "/streams/my-stream:topic-name, parition=1, offset=256, key=one, value=text"

java rebalance listener

public class RebalanceListener implements ConsumerRebalanceListener{
    onPartitionAssigned(Collection<TopicPartition> partitions)
    onPartitionRevoked(Collection<TopicPartition> partitions)

execute java app

(maven repository)[]


execute on cluster

mapr classpath
java -cp `mapr classpath`:my-own-app.jar mypackage.MainClass


login, print info, logout

maprlogin password -user {your cluster username}
maprlogin print
maprlogin logout