Skip to content

Commit

Permalink
SAMZA-197: Use JSON rather than custom encoding for container SSPs.
Browse files Browse the repository at this point in the history
  • Loading branch information
jghoman committed Mar 24, 2014
1 parent c96ecc6 commit 429c1ed
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object SamzaContainer extends Logging {
val config = JsonConfigSerializer.fromJson(configStr)
val encodedStreamsAndPartitions = System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS)

val partitions = Util.createStreamPartitionsFromString(encodedStreamsAndPartitions)
val partitions = Util.deserializeSSPSetFromJSON(encodedStreamsAndPartitions)

if (partitions.isEmpty) {
throw new SamzaException("No partitions for this task. Can't run a task without partition assignments. It's likely that the partition manager for this system doesn't know about the stream you're trying to read.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ShellCommandBuilder extends CommandBuilder {
def buildCommand() = config.getCommand

def buildEnvironment(): java.util.Map[String, String] = {
val streamsAndPartsString = Util.createStreamPartitionString(systemStreamPartitions.toSet) // Java to Scala set conversion
val streamsAndPartsString = Util.serializeSSPSetToJSON(systemStreamPartitions.toSet) // Java to Scala set conversion

Map(
ShellCommandConfig.ENV_CONTAINER_NAME -> name,
Expand Down
59 changes: 20 additions & 39 deletions samza-core/src/main/scala/org/apache/samza/util/Util.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import org.apache.samza.config.Config
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
import scala.collection.JavaConversions._
import org.apache.samza.system.{ SystemStreamPartition, SystemAdmin, SystemFactory, SystemStream }
import org.apache.samza.system.{ SystemStreamPartition, SystemFactory, SystemStream }
import org.codehaus.jackson.map.ObjectMapper
import org.codehaus.jackson.`type`.TypeReference
import java.util


object Util extends Logging {
val random = new Random
Expand Down Expand Up @@ -140,51 +144,28 @@ object Util extends Logging {
ssp.filter(_.getPartition.getPartitionId % containerCount == containerId)
}

val partitionSeparator = ";"
val topicSeparator = ","
val topicStreamGrouper = "#"
/**
* Serialize a collection of stream-partitions to a string suitable for passing between processes.
* The streams will be grouped by partition. The partition will be separated from the topics by
* a colon (":"), the topics separated by commas (",") and the topic-stream groups by a slash ("/").
* Ordering of the grouping is not specified.
*
* For example: (A,0),(A,4)(B,0)(B,4)(C,0) could be transformed to: 4:a,b/0:a,b,c
*
* @param sp Stream topics to group into a string
* @return Serialized string of the topics and streams grouped and delimited
* Jackson really hates Scala's classes, so we need to wrap up the SSP in a form Jackson will take
*/
def createStreamPartitionString(sp: Set[SystemStreamPartition]): String = {
for (
ch <- List(partitionSeparator, topicSeparator, topicStreamGrouper);
s <- sp
) {
if (s.getStream.contains(ch)) throw new IllegalArgumentException(s + " contains illegal character " + ch)
}

sp.groupBy(_.getPartition).map(z => z._1.getPartitionId + partitionSeparator + z._2.map(y => y.getSystem + "." + y.getStream).mkString(topicSeparator)).mkString(topicStreamGrouper)

private class SSPWrapper(@scala.beans.BeanProperty var partition:java.lang.Integer = null,
@scala.beans.BeanProperty var Stream:java.lang.String = null,
@scala.beans.BeanProperty var System:java.lang.String = null) {
def this() { this(null, null, null) }
def this(ssp:SystemStreamPartition) { this(ssp.getPartition.getPartitionId, ssp.getSystemStream.getStream, ssp.getSystemStream.getSystem)}
}

/**
* Invert @{list createStreamPartitionString}, building a list of streams and their partitions,
* from the string that function produced.
*
* @param sp Strings and partitions encoded as a stream by the above function
* @return List of string and partition tuples extracted from string. Order is not necessarily preserved.
*/
def createStreamPartitionsFromString(sp: String): Set[SystemStreamPartition] = {
if (sp == null || sp.isEmpty) return Set.empty
def serializeSSPSetToJSON(ssps: Set[SystemStreamPartition]): String = {
val al = new util.ArrayList[SSPWrapper](ssps.size)
for(ssp <- ssps) { al.add(new SSPWrapper(ssp)) }

def splitPartitionGroup(pg: String) = {
val split = pg.split(partitionSeparator) // Seems like there should be a more scalar way of doing this
val part = split(0).toInt
val streams = split(1).split(topicSeparator).toList
new ObjectMapper().writeValueAsString(al)
}

streams.map(s => new SystemStreamPartition(getSystemStreamFromNames(s), new Partition(part))).toSet
}
def deserializeSSPSetFromJSON(ssp: String) = {
val om = new ObjectMapper()

sp.split(topicStreamGrouper).map(splitPartitionGroup(_)).toSet.flatten
val asWrapper = om.readValue(ssp, new TypeReference[util.ArrayList[SSPWrapper]]() { }).asInstanceOf[util.ArrayList[SSPWrapper]]
asWrapper.map(w => new SystemStreamPartition(w.getSystem, w.getStream(), new Partition(w.getPartition()))).toSet
}

/**
Expand Down
30 changes: 5 additions & 25 deletions samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,8 @@ class TestUtil {
}

@Test
def testCreateStreamPartitionStringBlocksDelimeters() {
val partOne = new Partition(1)
val toTry = List(Util.topicSeparator, Util.topicStreamGrouper, Util.partitionSeparator)
.map(ch => (ch, Set(new SystemStreamPartition("kafka", "good1", partOne),
new SystemStreamPartition("kafka", "bad" + ch, partOne),
new SystemStreamPartition("notkafka", "alsogood", partOne))))
toTry.foreach(t => try {
createStreamPartitionString(t._2)
fail("Should have thrown an exception")
} catch {
case iae:IllegalArgumentException =>
val expected = "SystemStreamPartition [partition=Partition [partition=1], system" +
"=kafka, stream=bad" + t._1 + "] contains illegal character " + t._1
assertEquals(expected, iae.getMessage)
} )
}

@Test
def testCreateStreamPartitionStringRoundTrip() {
val getPartitions = {
def testJsonCreateStreamPartitionStringRoundTrip() {
val getPartitions: Set[SystemStreamPartition] = {
// Build a heavily skewed set of partitions.
def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet
val system = "all-same-system."
Expand All @@ -117,14 +99,12 @@ class TestUtil {
part <- streamsMap.getOrElse(s, Set.empty)) yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet
}

val streamsAndParts = getStreamsAndPartitionsForContainer(0, 4, getPartitions)
val streamsAndParts: Set[SystemStreamPartition] = getStreamsAndPartitionsForContainer(0, 4, getPartitions).toSet
println(streamsAndParts)
val asString = createStreamPartitionString(streamsAndParts)
println(asString)
val backToStreamsAndParts = createStreamPartitionsFromString(asString)
val asString = serializeSSPSetToJSON(streamsAndParts)

val backToStreamsAndParts = deserializeSSPSetFromJSON(asString)
assertEquals(streamsAndParts, backToStreamsAndParts)

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.junit.Assert._
import org.junit.Test
import org.apache.samza.config.Config
import org.apache.samza.config.MapConfig
import org.apache.samza.{Partition, SamzaException}
import org.apache.samza.Partition
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.util.ConverterUtils
import scala.collection.JavaConversions._
Expand All @@ -35,9 +35,8 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.records.NodeReport
import TestSamzaAppMasterTaskManager._
import org.apache.samza.system.{SystemStreamPartition, SystemAdmin, SystemFactory}
import org.apache.samza.system.{SystemStreamPartition, SystemFactory}
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.util.Util._
import org.apache.samza.util.Util
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin

Expand Down Expand Up @@ -130,7 +129,8 @@ object TestSamzaAppMasterTaskManager {
}

class TestSamzaAppMasterTaskManager {

import org.junit.Assert._

val config = new MapConfig(Map[String, String](
"yarn.container.count" -> "1",
"systems.test-system.samza.factory" -> "org.apache.samza.job.yarn.MockSystemFactory",
Expand Down Expand Up @@ -378,33 +378,33 @@ class TestSamzaAppMasterTaskManager {
@Test
def testPartitionsShouldWorkWithMoreTasksThanPartitions {
val onePartition = Set(new SystemStreamPartition("system", "stream", new Partition(0)))
assert(Util.getStreamsAndPartitionsForContainer(0, 2, onePartition).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0)))))
assert(Util.getStreamsAndPartitionsForContainer(1, 2, onePartition).equals(Set()))
assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, onePartition), Set(new SystemStreamPartition("system", "stream", new Partition(0))))
assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, onePartition), Set())
}

@Test
def testPartitionsShouldWorkWithMorePartitionsThanTasks {
val fivePartitions = (0 until 5).map(p => new SystemStreamPartition("system", "stream", new Partition(p))).toSet
assert(Util.getStreamsAndPartitionsForContainer(0, 2, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0)), new SystemStreamPartition("system", "stream", new Partition(2)), new SystemStreamPartition("system", "stream", new Partition(4)))))
assert(Util.getStreamsAndPartitionsForContainer(1, 2, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(1)), new SystemStreamPartition("system", "stream", new Partition(3)))))
assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(0)), new SystemStreamPartition("system", "stream", new Partition(2)), new SystemStreamPartition("system", "stream", new Partition(4))))
assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(1)), new SystemStreamPartition("system", "stream", new Partition(3))))
}

@Test
def testPartitionsShouldWorkWithTwelvePartitionsAndFiveContainers {
val fivePartitions = (0 until 12).map(p => new SystemStreamPartition("system", "stream", new Partition(p))).toSet
assert(Util.getStreamsAndPartitionsForContainer(0, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0)), new SystemStreamPartition("system", "stream", new Partition(5)), new SystemStreamPartition("system", "stream", new Partition(10)))))
assert(Util.getStreamsAndPartitionsForContainer(1, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(1)), new SystemStreamPartition("system", "stream", new Partition(6)), new SystemStreamPartition("system", "stream", new Partition(11)))))
assert(Util.getStreamsAndPartitionsForContainer(2, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(2)), new SystemStreamPartition("system", "stream", new Partition(7)))))
assert(Util.getStreamsAndPartitionsForContainer(3, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(3)), new SystemStreamPartition("system", "stream", new Partition(8)))))
assert(Util.getStreamsAndPartitionsForContainer(4, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(4)), new SystemStreamPartition("system", "stream", new Partition(9)))))
assertEquals(Util.getStreamsAndPartitionsForContainer(0, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(0)), new SystemStreamPartition("system", "stream", new Partition(5)), new SystemStreamPartition("system", "stream", new Partition(10))))
assertEquals(Util.getStreamsAndPartitionsForContainer(1, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(1)), new SystemStreamPartition("system", "stream", new Partition(6)), new SystemStreamPartition("system", "stream", new Partition(11))))
assertEquals(Util.getStreamsAndPartitionsForContainer(2, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(2)), new SystemStreamPartition("system", "stream", new Partition(7))))
assertEquals(Util.getStreamsAndPartitionsForContainer(3, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(3)), new SystemStreamPartition("system", "stream", new Partition(8))))
assertEquals(Util.getStreamsAndPartitionsForContainer(4, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(4)), new SystemStreamPartition("system", "stream", new Partition(9))))
}

@Test
def testPartitionsShouldWorkWithEqualPartitionsAndTasks {
val twoPartitions = (0 until 2).map(p => new SystemStreamPartition("system", "stream", new Partition(p))).toSet
assert(Util.getStreamsAndPartitionsForContainer(0, 2, twoPartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0)))))
assert(Util.getStreamsAndPartitionsForContainer(1, 2, twoPartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(1)))))
assert(Util.getStreamsAndPartitionsForContainer(0, 1, Set(new SystemStreamPartition("system", "stream", new Partition(0)))).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0)))))
assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, twoPartitions), Set(new SystemStreamPartition("system", "stream", new Partition(0))))
assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, twoPartitions), Set(new SystemStreamPartition("system", "stream", new Partition(1))))
assertEquals(Util.getStreamsAndPartitionsForContainer(0, 1, Set(new SystemStreamPartition("system", "stream", new Partition(0)))), Set(new SystemStreamPartition("system", "stream", new Partition(0))))
}

val clock = () => System.currentTimeMillis
Expand Down

0 comments on commit 429c1ed

Please sign in to comment.