Skip to content

Commit

Permalink
[FLINK-1794] [test-utils] Adds test base for scala tests and adapts e…
Browse files Browse the repository at this point in the history
…xisting flink-ml tests

[FLINK-1794] [test-utils] Adds scala docs to FlinkTestBase

This closes apache#540.
  • Loading branch information
tillrohrmann committed Mar 31, 2015
1 parent c0b2975 commit b8aa49c
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class Client {

private final Optimizer compiler; // the compiler to compile the jobs

private boolean printStatusDuringExecution = false;
private boolean printStatusDuringExecution = true;

/**
* If != -1, this field specifies the total number of available slots on the cluster
Expand Down
25 changes: 20 additions & 5 deletions flink-staging/flink-ml/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>com.github.fommil.netlib</groupId>
<artifactId>core</artifactId>
Expand All @@ -65,6 +60,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -105,6 +107,19 @@
<goals>
<goal>test</goal>
</goals>
<configuration>
<suffixes>(?&lt;!(IT|Integration))(Test|Suite|Case)</suffixes>
</configuration>
</execution>
<execution>
<id>integration-test</id>
<phase>integration-test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<suffixes>(IT|Integration)(Test|Suite|Case)</suffixes>
</configuration>
</execution>
</executions>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@
package org.apache.flink.ml.feature

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.client.CliFrontendTestUtils
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector
import org.junit.{BeforeClass, Test}
import org.scalatest.ShouldMatchers
import org.scalatest.{Matchers, FlatSpec}

import org.apache.flink.api.scala._
import org.apache.flink.test.util.FlinkTestBase

class PolynomialBaseITCase extends ShouldMatchers {
class PolynomialBaseITSuite
extends FlatSpec
with Matchers
with FlinkTestBase {

@Test
def testMapElementToPolynomialVectorSpace (): Unit = {
behavior of "The polynomial base implementation"

it should "map single element vectors to the polynomial vector space" in {
val env = ExecutionEnvironment.getExecutionEnvironment

env.setParallelism (2)
Expand Down Expand Up @@ -60,8 +63,7 @@ class PolynomialBaseITCase extends ShouldMatchers {
}
}

@Test
def testMapVectorToPolynomialVectorSpace(): Unit = {
it should "map vectors to the polynomial vector space" in {
val env = ExecutionEnvironment.getExecutionEnvironment

env.setParallelism(2)
Expand Down Expand Up @@ -92,8 +94,7 @@ class PolynomialBaseITCase extends ShouldMatchers {
}
}

@Test
def testReturnEmptyVectorIfDegreeIsZero(): Unit = {
it should "return an empty vector if the max degree is zero" in {
val env = ExecutionEnvironment.getExecutionEnvironment

env.setParallelism(2)
Expand Down Expand Up @@ -123,10 +124,3 @@ class PolynomialBaseITCase extends ShouldMatchers {
}
}
}

object PolynomialBaseITCase {
@BeforeClass
def setup(): Unit = {
CliFrontendTestUtils.pipeSystemOutToNull()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,28 @@

package org.apache.flink.ml.recommendation

import org.apache.flink.api.common.ExecutionMode
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.client.CliFrontendTestUtils
import org.junit.{BeforeClass, Test}
import org.scalatest.ShouldMatchers
import scala.language.postfixOps

import org.scalatest._

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.test.util.FlinkTestBase

class ALSITSuite
extends FlatSpec
with Matchers
with FlinkTestBase {

override val parallelism = 2

class ALSITCase extends ShouldMatchers {
behavior of "The alternating least squares (ALS) implementation"

@Test
def testMatrixFactorization(): Unit = {
it should "properly factorize a matrix" in {
import ALSData._

val env = ExecutionEnvironment.getExecutionEnvironment

env.setParallelism(2)

val als = ALS()
.setIterations(iterations)
.setLambda(lambda)
Expand Down Expand Up @@ -72,14 +76,6 @@ class ALSITCase extends ShouldMatchers {
}
}

object ALSITCase {

@BeforeClass
def setup(): Unit = {
CliFrontendTestUtils.pipeSystemOutToNull()
}
}

object ALSData {

val iterations = 9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@
package org.apache.flink.ml.regression

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.client.CliFrontendTestUtils
import org.apache.flink.ml.common.ParameterMap
import org.apache.flink.ml.feature.PolynomialBase
import org.junit.{BeforeClass, Test}
import org.scalatest.ShouldMatchers
import org.scalatest.{Matchers, FlatSpec}

import org.apache.flink.api.scala._
import org.apache.flink.test.util.FlinkTestBase

class MultipleLinearRegressionITCase extends ShouldMatchers {
class MultipleLinearRegressionITSuite
extends FlatSpec
with Matchers
with FlinkTestBase {

@Test
def testEstimationOfLinearFunction(): Unit = {
behavior of "The multipe linear regression implementation"

it should "estimate a linear function" in {
val env = ExecutionEnvironment.getExecutionEnvironment

env.setParallelism(2)
Expand Down Expand Up @@ -65,8 +68,7 @@ class MultipleLinearRegressionITCase extends ShouldMatchers {
srs should be (expectedSquaredResidualSum +- 2)
}

@Test
def testEstimationOfCubicFunction(): Unit = {
it should "estimate a cubic function" in {
val env = ExecutionEnvironment.getExecutionEnvironment

env.setParallelism(2)
Expand Down Expand Up @@ -105,11 +107,3 @@ class MultipleLinearRegressionITCase extends ShouldMatchers {
srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5)
}
}

object MultipleLinearRegressionITCase{

@BeforeClass
def setup(): Unit = {
CliFrontendTestUtils.pipeSystemOutToNull()
}
}
6 changes: 6 additions & 0 deletions flink-test-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ under the License.
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.test.util

import org.scalatest.{Suite, BeforeAndAfter}

/** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala based tests.
* Additionally a TestEnvironment with the started cluster is created and set as the default
* [[org.apache.flink.api.java.ExecutionEnvironment]].
*
* This mixin starts a ForkableFlinkMiniCluster with one TaskManager and a number of slots given
* by parallelism. This value can be overridden in a sub class in order to start the cluster
* with a different number of slots.
*
* The cluster is started once before starting the tests and is re-used for the individual tests.
* After all tests have been executed, the cluster is shutdown.
*
* The cluster is used by obtaining the default [[org.apache.flink.api.java.ExecutionEnvironment]].
*
* @example
* {{{
* def testSomething: Unit = {
* // Obtain TestEnvironment with started ForkableFlinkMiniCluster
* val env = ExecutionEnvironment.getExecutionEnvironment
*
* env.fromCollection(...)
*
* env.execute
* }
* }}}
*
*/
trait FlinkTestBase
extends BeforeAndAfter {
that: Suite =>

var cluster: Option[ForkableFlinkMiniCluster] = None
val parallelism = 4

before {
val cl = TestBaseUtils.startCluster(1, parallelism, false)
val clusterEnvironment = new TestEnvironment(cl, parallelism)
clusterEnvironment.setAsContext

cluster = Some(cl)
}

after {
cluster.map(c => TestBaseUtils.stopCluster(c, TestBaseUtils.DEFAULT_TIMEOUT))
}

}

0 comments on commit b8aa49c

Please sign in to comment.