Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Experimental Spark adapter
Browse files Browse the repository at this point in the history
- batch processing (from Cassandra storage backends)
- stream processing (from any storage backend)
- closes #97
  • Loading branch information
krasserm committed Jul 7, 2016
1 parent 3eb2393 commit d42a3ce
Show file tree
Hide file tree
Showing 31 changed files with 1,592 additions and 49 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ script:
- .travis/test-core.sh ++$TRAVIS_SCALA_VERSION
- .travis/test-leveldb.sh ++$TRAVIS_SCALA_VERSION
- .travis/test-crdt.sh ++$TRAVIS_SCALA_VERSION
- .travis/test-spark.sh ++$TRAVIS_SCALA_VERSION
- find $HOME/.sbt -name "*.lock" | xargs rm
- find $HOME/.ivy2 -name "ivydata-*.properties" | xargs rm
after_success:
Expand Down
3 changes: 3 additions & 0 deletions .travis/test-spark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/sh

sbt $1 "adapterSpark/it:testOnly com.rbmhtechnology.eventuate.adapter.spark.SparkStreamAdapterSpec"
2 changes: 0 additions & 2 deletions .travis/test-template.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/bin/bash

echo $2

sbt $1 "$2/test:testOnly"; exit_1=$?
sbt $1 "$2/it:testOnly"; exit_2=$?
sbt $1 "$2/multi-jvm:test"; exit_3=$?
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Eventuate is a toolkit for building applications composed of event-driven and ev
- supports the implementation of reliable business processes from collaborating services that are tolerant to inter-service network partitions
- supports the aggregation of events from distributed services for updating persistent and in-memory query databases
- provides implementations of operation-based CRDTs as specified in [A comprehensive study of Convergent and Commutative Replicated Data Types](http://hal.upmc.fr/file/index/docid/555588/filename/techreport.pdf)
- provides adapters to 3rd-party stream processing frameworks for analyzing generated events (planned)
- provides adapters to 3rd-party stream processing frameworks for analyzing generated events

Documentation
-------------
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
import sbt._import sbt.Keys._import sbtunidoc.Plugin.UnidocKeys._import MultiJvmKeys._import ProjectSettings._import ProjectDependencies._version in ThisBuild := "0.8-SNAPSHOT"organization in ThisBuild := "com.rbmhtechnology"scalaVersion in ThisBuild := "2.11.7"lazy val root = (project in file(".")) .aggregate(core, crdt, logCassandra, logLeveldb, examples) .dependsOn(core, logCassandra, logLeveldb) .settings(name := "eventuate") .settings(commonSettings: _*) .settings(documentationSettings: _*) .settings(unidocProjectFilter in (ScalaUnidoc, unidoc) := inAnyProject -- inProjects(examples)) .settings(libraryDependencies ++= Seq(AkkaRemote)) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val core = (project in file("eventuate-core")) .settings(name := "eventuate-core") .settings(commonSettings: _*) .settings(protocSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CommonsIo, Java8Compat, Scalaz)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Javaslang % "test", JunitInterface % "test", Scalatest % "test,it")) .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val logCassandra = (project in file("eventuate-log-cassandra")) .dependsOn(core % "compile->compile;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-log-cassandra") .settings(commonSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraDriver)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Log4jApi % "test,it", Log4jCore % "test,it", Log4jSlf4j % "test,it", Scalatest % "test,it", Sigar % "test,it")) .settings(libraryDependencies ++= Seq(CassandraUnit % "test,it" excludeAll ExclusionRule(organization = "ch.qos.logback"))) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4712") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val logLeveldb = (project in file("eventuate-log-leveldb")) .dependsOn(core % "compile->compile;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-log-leveldb") .settings(commonSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, Leveldb)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Scalatest % "test,it")) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4713") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val crdt = (project in file("eventuate-crdt")) .dependsOn(core % "compile->compile;it->it;multi-jvm->multi-jvm") .dependsOn(logLeveldb % "test;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-crdt") .settings(commonSettings: _*) .settings(protocSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Scalatest % "test,it")) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4715") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val examples = (project in file("eventuate-examples")) .dependsOn(core, logLeveldb) .settings(name := "eventuate-examples") .settings(commonSettings: _*) .settings(exampleSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraDriver, Javaslang, Log4jApi, Log4jCore, Log4jSlf4j)) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)
import sbt._import sbt.Keys._import sbtunidoc.Plugin.UnidocKeys._import MultiJvmKeys._import ProjectSettings._import ProjectDependencies._version in ThisBuild := "0.8-SNAPSHOT"organization in ThisBuild := "com.rbmhtechnology"scalaVersion in ThisBuild := "2.11.7"lazy val root = (project in file(".")) .aggregate(core, crdt, logCassandra, logLeveldb, adapterSpark, examples, exampleSpark) .dependsOn(core, logCassandra, logLeveldb) .settings(name := "eventuate") .settings(commonSettings: _*) .settings(documentationSettings: _*) .settings(unidocProjectFilter in (ScalaUnidoc, unidoc) := inAnyProject -- inProjects(examples)) .settings(libraryDependencies ++= Seq(AkkaRemote)) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val core = (project in file("eventuate-core")) .settings(name := "eventuate-core") .settings(commonSettings: _*) .settings(protocSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CommonsIo, Java8Compat, Scalaz)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Javaslang % "test", JunitInterface % "test", Scalatest % "test,it")) .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val logCassandra = (project in file("eventuate-log-cassandra")) .dependsOn(core % "compile->compile;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-log-cassandra") .settings(commonSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraDriver)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Log4jApi % "test,it", Log4jCore % "test,it", Log4jSlf4j % "test,it", Scalatest % "test,it", Sigar % "test,it")) .settings(libraryDependencies ++= Seq(CassandraUnit % "test,it" excludeAll ExclusionRule(organization = "ch.qos.logback"))) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4712") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val logLeveldb = (project in file("eventuate-log-leveldb")) .dependsOn(core % "compile->compile;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-log-leveldb") .settings(commonSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, Leveldb)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Scalatest % "test,it")) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4713") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val adapterSpark = (project in file("eventuate-adapter-spark")) .dependsOn(logCassandra % "compile->compile;it->it;multi-jvm->multi-jvm") .dependsOn(logLeveldb % "compile->compile;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-adapter-spark") .settings(commonSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraClientUtil, CassandraConnector, SparkCore % "provided" exclude("org.slf4j", "slf4j-log4j12"), SparkSql % "provided" exclude("org.slf4j", "slf4j-log4j12"), SparkStreaming % "provided" exclude("org.slf4j", "slf4j-log4j12"))) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Scalatest % "test,it", Sigar % "test,it")) .settings(libraryDependencies ++= Seq(CassandraUnit % "test,it" excludeAll ExclusionRule(organization = "ch.qos.logback"))) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4714") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val crdt = (project in file("eventuate-crdt")) .dependsOn(core % "compile->compile;it->it;multi-jvm->multi-jvm") .dependsOn(logLeveldb % "test;it->it;multi-jvm->multi-jvm") .settings(name := "eventuate-crdt") .settings(commonSettings: _*) .settings(protocSettings: _*) .settings(integrationTestSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote)) .settings(libraryDependencies ++= Seq(AkkaTestkit % "test,it", AkkaTestkitMultiNode % "test", Scalatest % "test,it")) .settings(jvmOptions in MultiJvm += "-Dmultinode.server-port=4715") .configs(IntegrationTest, MultiJvm) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val examples = (project in file("eventuate-examples")) .dependsOn(core, logLeveldb) .settings(name := "eventuate-examples") .settings(commonSettings: _*) .settings(exampleSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraDriver, Javaslang, Log4jApi, Log4jCore, Log4jSlf4j)) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)lazy val exampleSpark = (project in file("eventuate-example-spark")) .dependsOn(core, logCassandra, adapterSpark) .settings(name := "eventuate-example-spark") .settings(commonSettings: _*) .settings(exampleSettings: _*) .settings(libraryDependencies ++= Seq(AkkaRemote, CassandraDriver, Log4jApi, Log4jCore, Log4jSlf4j, SparkCore exclude("org.slf4j", "slf4j-log4j12"), SparkSql exclude("org.slf4j", "slf4j-log4j12"), SparkStreaming exclude("org.slf4j", "slf4j-log4j12"))) .enablePlugins(HeaderPlugin, AutomateHeaderPlugin)
Expand Down
8 changes: 8 additions & 0 deletions eventuate-adapter-spark/src/it/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
akka.loglevel = "ERROR"
akka.test.single-expect-default = 20s

eventuate.log.cassandra.default-port = 9152
eventuate.log.cassandra.partition-size = 3
eventuate.log.leveldb.dir = target/test-log
eventuate.log.write-batch-size = 2
eventuate.snapshot.filesystem.dir = target/test-snapshot
13 changes: 13 additions & 0 deletions eventuate-adapter-spark/src/it/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" shutdownHook="disable">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="error">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
Loading

0 comments on commit d42a3ce

Please sign in to comment.