Skip to content

Commit

Permalink
[scala] Reactivate DeltaIterationSanityCheckTest
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Sep 22, 2014
1 parent 31ed0c4 commit fd28098
Showing 1 changed file with 169 additions and 192 deletions.
Original file line number Diff line number Diff line change
@@ -1,192 +1,169 @@
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//package org.apache.flink.api.scala
//
//import org.junit.Test
//import org.apache.flink.api.common.InvalidProgramException
//
//import org.apache.flink.api.scala._
//import org.apache.flink.api.scala.operators._
//import org.scalatest.junit.AssertionsForJUnit
//
//// Verify that the sanity checking in delta iterations works. We just
//// have a dummy job that is not meant to be executed. Only verify that
//// the join/coGroup inside the iteration is checked.
//class DeltaIterationSanityCheckTest extends Serializable {
//
// @Test
// def testCorrectJoinWithSolution1 {
// val solutionInput = CollectionDataSource(Array((1, "1")))
// val worksetInput = CollectionDataSource(Array((2, "2")))
//
// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
// val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
// (result, ws)
// }
// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
//
// val output = iteration.write("/dummy", CsvOutputFormat())
//
// val plan = new ScalaPlan(Seq(output))
// }
//
// @Test
// def testCorrectJoinWithSolution2 {
// val solutionInput = CollectionDataSource(Array((1, "1")))
// val worksetInput = CollectionDataSource(Array((2, "2")))
//
// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
// val result = ws join s where {_._1} isEqualTo {_._1} map { (l, r) => l }
// (result, ws)
// }
// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
//
// val output = iteration.write("/dummy", CsvOutputFormat())
//
// val plan = new ScalaPlan(Seq(output))
// }
//
// @Test(expected = classOf[InvalidProgramException])
// def testIncorrectJoinWithSolution1 {
// val solutionInput = CollectionDataSource(Array((1, "1")))
// val worksetInput = CollectionDataSource(Array((2, "2")))
//
// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
// val result = s join ws where {_._2} isEqualTo {_._2} map { (l, r) => l }
// (result, ws)
// }
// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
//
// val output = iteration.write("/dummy", CsvOutputFormat())
//
// val plan = new ScalaPlan(Seq(output))
// }
//
// @Test(expected = classOf[InvalidProgramException])
// def testIncorrectJoinWithSolution2 {
// val solutionInput = CollectionDataSource(Array((1, "1")))
// val worksetInput = CollectionDataSource(Array((2, "2")))
//
// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
// val result = ws join s where {_._2} isEqualTo {_._2} map { (l, r) => l }
// (result, ws)
// }
// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
//
// val output = iteration.write("/dummy", CsvOutputFormat())
//
// val plan = new ScalaPlan(Seq(output))
// }
//
// @Test(expected = classOf[InvalidProgramException])
// def testIncorrectJoinWithSolution3 {
// val solutionInput = CollectionDataSource(Array((1, "1")))
// val worksetInput = CollectionDataSource(Array((2, "2")))
//
// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
// val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
// (result, ws)
// }
// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
//
// val output = iteration.write("/dummy", CsvOutputFormat())
//
// val plan = new ScalaPlan(Seq(output))
// }
//
// @Test
// def testCorrectCoGroupWithSolution1 {
// val solutionInput = CollectionDataSource(Array((1, "1")))
// val worksetInput = CollectionDataSource(Array((2, "2")))
//
// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
// val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
// (result, ws)
// }
// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
//
// val output = iteration.write("/dummy", CsvOutputFormat())
//
// val plan = new ScalaPlan(Seq(output))
// }
//
// @Test
// def testCorrectCoGroupWithSolution2 {
// val solutionInput = CollectionDataSource(Array((1, "1")))
// val worksetInput = CollectionDataSource(Array((2, "2")))
//
// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
// val result = ws cogroup s where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
// (result, ws)
// }
// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
//
// val output = iteration.write("/dummy", CsvOutputFormat())
//
// val plan = new ScalaPlan(Seq(output))
// }
//
// @Test(expected = classOf[InvalidProgramException])
// def testIncorrectCoGroupWithSolution1 {
// val solutionInput = CollectionDataSource(Array((1, "1")))
// val worksetInput = CollectionDataSource(Array((2, "2")))
//
// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
// val result = s cogroup ws where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
// (result, ws)
// }
// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
//
// val output = iteration.write("/dummy", CsvOutputFormat())
//
// val plan = new ScalaPlan(Seq(output))
// }
//
// @Test(expected = classOf[InvalidProgramException])
// def testIncorrectCoGroupWithSolution2 {
// val solutionInput = CollectionDataSource(Array((1, "1")))
// val worksetInput = CollectionDataSource(Array((2, "2")))
//
// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
// val result = ws cogroup s where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
// (result, ws)
// }
// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
//
// val output = iteration.write("/dummy", CsvOutputFormat())
//
// val plan = new ScalaPlan(Seq(output))
// }
//
// @Test(expected = classOf[InvalidProgramException])
// def testIncorrectCoGroupWithSolution3 {
// val solutionInput = CollectionDataSource(Array((1, "1")))
// val worksetInput = CollectionDataSource(Array((2, "2")))
//
// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
// val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
// (result, ws)
// }
// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
//
// val output = iteration.write("/dummy", CsvOutputFormat())
//
// val plan = new ScalaPlan(Seq(output))
// }
//}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala

import org.junit.Test
import org.apache.flink.api.common.InvalidProgramException

import org.apache.flink.api.scala._
import org.scalatest.junit.AssertionsForJUnit

// Verify that the sanity checking in delta iterations works. We just
// have a dummy job that is not meant to be executed. Only verify that
// the join/coGroup inside the iteration is checked.
class DeltaIterationSanityCheckTest extends Serializable {

@Test
def testCorrectJoinWithSolution1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
val result = s.join(ws).where("_1").equalTo("_1") { (l, r) => l }
(result, ws)
}

val output = iteration.print()
}

@Test
def testCorrectJoinWithSolution2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
(result, ws)
}

val output = iteration.print()
}

@Test(expected = classOf[InvalidProgramException])
def testIncorrectJoinWithSolution1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
val result = s.join(ws).where("_2").equalTo("_2") { (l, r) => l }
(result, ws)
}

val output = iteration.print()
}

@Test(expected = classOf[InvalidProgramException])
def testIncorrectJoinWithSolution2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
val result = ws.join(s).where("_2").equalTo("_2") { (l, r) => l }
(result, ws)
}

val output = iteration.print() }

@Test(expected = classOf[InvalidProgramException])
def testIncorrectJoinWithSolution3(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
(result, ws)
}

val output = iteration.print()
}

@Test
def testCorrectCoGroupWithSolution1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
val result = s.coGroup(ws).where("_1").equalTo("_1") { (l, r) => l.min }
(result, ws)
}

val output = iteration.print()
}

@Test
def testCorrectCoGroupWithSolution2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
(result, ws)
}

val output = iteration.print()
}

@Test(expected = classOf[InvalidProgramException])
def testIncorrectCoGroupWithSolution1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
val result = s.coGroup(ws).where("_2").equalTo("_2") { (l, r) => l.min }
(result, ws)
}

val output = iteration.print()
}

@Test(expected = classOf[InvalidProgramException])
def testIncorrectCoGroupWithSolution2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
val result = ws.coGroup(s).where("_2").equalTo("_2") { (l, r) => l.min }
(result, ws)
}

val output = iteration.print() }

@Test(expected = classOf[InvalidProgramException])
def testIncorrectCoGroupWithSolution3(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
val worksetInput = env.fromElements((2, "2"))

val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
(result, ws)
}

val output = iteration.print()
}
}

0 comments on commit fd28098

Please sign in to comment.