Skip to content

Commit

Permalink
[FLINK-3226] implement getUniqueName method in TranslationContext
Browse files Browse the repository at this point in the history
This closes apache#1600 and apache#1567
  • Loading branch information
vasia committed Mar 18, 2016
1 parent 7972426 commit 670ec6a
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object RexNodeTranslator {

exp match {
case agg: Aggregation =>
val name = "TMP_" + agg.hashCode().toHexString.toUpperCase
val name = TranslationContext.getUniqueName
val aggCall = toAggCall(agg, name, relBuilder)
val fieldExp = new UnresolvedFieldReference(name)
(fieldExp, List(aggCall))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ object TranslationContext {

}

def getUniqueName: String = {
"TMP_" + nameCntr.getAndIncrement()
}

def getRelBuilder: RelBuilder = {
relBuilder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class DataSetGroupReduce(
config: TableConfig,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config)
val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType)

// get the output types
val fieldsNames = rowType.getFieldNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public AggregationsITCase(TestExecutionMode mode){
super(mode);
}

@Ignore //DataSetMap needs to be implemented
@Test
public void testAggregationTypes() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Expand Down Expand Up @@ -116,8 +115,7 @@ public void testWorkingAggregationDataTypes() throws Exception {
compareResultAsText(results, expected);
}

@Ignore // it seems like the arithmetic expression is added to the field position
@Test(expected = NotImplementedError.class)
@Test
public void testAggregationWithArithmetic() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {

@Ignore //DataSetMap needs to be implemented
@Test
def testAggregationTypes(): Unit = {

Expand Down Expand Up @@ -71,8 +70,21 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Ignore // it seems like the arithmetic expression is added to the field position
@Test(expected = classOf[NotImplementedError])
@Test
def testProjection(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val t = env.fromElements(
(1: Byte, 1: Short),
(2: Byte, 2: Short)).toTable
.select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)

val expected = "1,3,2,1,3"
val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test
def testAggregationWithArithmetic(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class ExpressionsITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Ignore
@Test
def testCaseInsensitiveForAs(): Unit = {

Expand Down

0 comments on commit 670ec6a

Please sign in to comment.