Skip to content

Commit

Permalink
[FLINK-7003] [table] Fix 'SELECT *' for tables with nested schema.
Browse files Browse the repository at this point in the history
This closes apache#4989.
  • Loading branch information
Shuyi Chen authored and fhueske committed Nov 16, 2017
1 parent 101fef7 commit a63d2be
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ class CompositeRelDataType(
val compositeType: CompositeType[_],
val nullable: Boolean,
typeFactory: FlinkTypeFactory)
extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, typeFactory)) {
extends RelRecordType(
StructKind.PEEK_FIELDS_NO_EXPAND,
createFieldList(compositeType, typeFactory)) {

override def toString = s"COMPOSITE($compositeType)"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,25 @@ class CalcITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test
def testSelectStarFromNestedTable(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT * FROM MyTable"

val ds = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv).as('a, 'b)
tEnv.registerTable("MyTable", ds)

val result = tEnv.sqlQuery(sqlQuery)

val expected = "(1,1),one\n" + "(2,2),two\n" + "(3,3),three\n"

val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test
def testSelectStarFromDataSet(): Unit = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,10 @@ class CalcITCase(
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*)
val t = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 'a, 'b).select('*)

val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
"15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
"19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
val expected =
"(1,1),one\n" + "(2,2),two\n" + "(3,3),three\n"
val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,27 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}

/** test select star **/
@Test
def testSelectStar(): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.clear

val sqlQuery = "SELECT * FROM MyTable"

val t = StreamTestData.getSmallNestedTupleDataStream(env).toTable(tEnv).as('a, 'b)
tEnv.registerTable("MyTable", t)

val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()

val expected = List("(1,1),one", "(2,2),two", "(3,3),three")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}

/** test selection **/
@Test
def testSelectExpressionFromTable(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}

@Test
def testSelectStar(): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
val ds = StreamTestData.getSmallNestedTupleDataStream(env).toTable(tEnv).select('*)

val results = ds.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink[Row])
env.execute()

val expected = mutable.MutableList("(1,1),one", "(2,2),two", "(3,3),three")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}

@Test
def testSelectFirst(): Unit = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,13 @@ object StreamTestData {
data.+=((5, 15L, 14, "KLM", 2L))
env.fromCollection(data)
}

def getSmallNestedTupleDataStream(env: StreamExecutionEnvironment):
DataStream[((Int, Int), String)] = {
val data = new mutable.MutableList[((Int, Int), String)]
data.+=(((1, 1), "one"))
data.+=(((2, 2), "two"))
data.+=(((3, 3), "three"))
env.fromCollection(data)
}
}

0 comments on commit a63d2be

Please sign in to comment.