Skip to content

Commit

Permalink
[FLINK-10583][table] Add State Retention support to TemporalProcessTi…
Browse files Browse the repository at this point in the history
…meJoin.
  • Loading branch information
kl0u committed Jan 9, 2019
1 parent f828031 commit 120c4fc
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.flink.api.common.functions.util.FunctionUtils
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, TimestampedCollector, TwoInputStreamOperator}
import org.apache.flink.runtime.state.VoidNamespace
import org.apache.flink.streaming.api.operators.{InternalTimer, TimestampedCollector}
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.Compiler
Expand All @@ -38,8 +39,7 @@ class TemporalProcessTimeJoin(
genJoinFuncName: String,
genJoinFuncCode: String,
queryConfig: StreamQueryConfig)
extends AbstractStreamOperator[CRow]
with TwoInputStreamOperator[CRow, CRow, CRow]
extends BaseTwoInputStreamOperatorWithStateRetention(queryConfig)
with Compiler[FlatJoinFunction[Row, Row, Row]]
with Logging {

Expand Down Expand Up @@ -70,6 +70,8 @@ class TemporalProcessTimeJoin(
collector = new TimestampedCollector[CRow](output)
cRowWrapper = new CRowWrappingCollector()
cRowWrapper.out = collector

super.open()
}

override def processElement1(element: StreamRecord[CRow]): Unit = {
Expand All @@ -82,18 +84,36 @@ class TemporalProcessTimeJoin(
cRowWrapper.setChange(element.getValue.change)

joinFunction.join(element.getValue.row, rightSideRow, cRowWrapper)

registerProcessingCleanUpTimer()
}

override def processElement2(element: StreamRecord[CRow]): Unit = {

if (element.getValue.change) {
rightState.update(element.getValue.row)
registerProcessingCleanUpTimer()
} else {
rightState.clear()
cleanUpLastTimer()
}
}

override def close(): Unit = {
FunctionUtils.closeFunction(joinFunction)
}

/**
* The method to be called when a cleanup timer fires.
*
* @param time The timestamp of the fired timer.
*/
override def cleanUpState(time: Long): Unit = {
rightState.clear()
}

/**
* Invoked when an event-time timer fires.
*/
override def onEventTime(timer: InternalTimer[Any, VoidNamespace]): Unit = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.flink.table.runtime.CRowKeySelector
import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TestStreamQueryConfig}
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
import org.hamcrest.{CoreMatchers, Matcher}
import org.hamcrest.CoreMatchers
import org.hamcrest.Matchers.{endsWith, startsWith}
import org.junit.Assert.assertTrue
import org.junit.Test
Expand Down Expand Up @@ -658,6 +658,82 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
testHarness.close()
}

// ---------------------- Processing time TTL tests ----------------------

@Test
def testProcessingTimeJoinCleanupTimerUpdatedFromProbeSide(): Unit = {
// min=2ms max=4ms
val testHarness = createTestHarness(new OrdersRatesProctimeTemporalJoinInfo())

testHarness.open()
val expectedOutput = new ConcurrentLinkedQueue[Object]()

testHarness.setProcessingTime(1L)

testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))

expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L)))

// this should push further the clean-up the state
testHarness.setProcessingTime(4L)

testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 6L)))
expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 6L, "Euro", 114L, 0L)))

// this should do nothing
testHarness.setProcessingTime(5L)

testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 8L)))
expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 8L, "Euro", 114L, 0L)))

// this should clean up the state
testHarness.setProcessingTime(8L)

testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 10L)))

verify(expectedOutput, testHarness.getOutput)

testHarness.close()
}

@Test
def testProcessingTimeJoinCleanupTimerUpdatedFromBuildSide(): Unit = {
// min=2ms max=4ms
val testHarness = createTestHarness(new OrdersRatesProctimeTemporalJoinInfo())

testHarness.open()
val expectedOutput = new ConcurrentLinkedQueue[Object]()

testHarness.setProcessingTime(1L)

testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))

expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L)))

// this should push further the clean-up the state
testHarness.setProcessingTime(4L)

testHarness.processElement2(new StreamRecord(CRow("Euro", 116L, 1L)))

// this should do nothing
testHarness.setProcessingTime(5L)

testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 6L)))
expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 6L, "Euro", 116L, 1L)))

// this should clean up the state
testHarness.setProcessingTime(8L)

// so this should find no match
testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 10L)))

verify(expectedOutput, testHarness.getOutput)

testHarness.close()
}

def translateJoin(joinInfo: TemporalJoinInfo, joinRelType: JoinRelType = JoinRelType.INNER)
: (CRowKeySelector, CRowKeySelector, TwoInputStreamOperator[CRow, CRow, CRow]) = {

Expand Down

0 comments on commit 120c4fc

Please sign in to comment.