Skip to content

Commit

Permalink
[hotfix][test] Fix some code styles in StreamSourceOperatorWatermarks…
Browse files Browse the repository at this point in the history
…Test and StreamSourceOperatorLatencyMetricsTest
  • Loading branch information
sunhaibotb authored and aljoscha committed Nov 14, 2019
1 parent 3171edf commit f9ba614
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ public class StreamSourceOperatorLatencyMetricsTest extends TestLogger {
*/
@Test
public void testLatencyMarkEmissionDisabled() throws Exception {
testLatencyMarkEmission(0, (operator, timeProvider) -> {
setupSourceOperator(operator, new ExecutionConfig(), MockEnvironment.builder().build(), timeProvider);
});
testLatencyMarkEmission(0,
(operator, timeProvider) -> setupSourceOperator(operator, new ExecutionConfig(), MockEnvironment.builder().build(), timeProvider));
}

/**
Expand Down Expand Up @@ -170,7 +169,7 @@ private void testLatencyMarkEmission(int numberLatencyMarkers, OperatorSetupOper
operator.getContainingTask(),
StreamTask.createRecordWriters(operator.getOperatorConfig(), new MockEnvironmentBuilder().build()));
try {
operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<Long>(output), operatorChain);
operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<>(output), operatorChain);
operator.close();
} finally {
operatorChain.releaseOutputs();
Expand Down Expand Up @@ -207,6 +206,7 @@ private void testLatencyMarkEmission(int numberLatencyMarkers, OperatorSetupOper

// ------------------------------------------------------------------------

@SuppressWarnings("unchecked")
private static <T> void setupSourceOperator(
StreamSource<T, ?> operator,
ExecutionConfig executionConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ public void testAutomaticWatermarkContext() throws Exception {
processingTimeService.setCurrentTime(i);
}

assertTrue(output.size() == 9);
assertEquals(9, output.size());

long nextWatermark = 0;
for (StreamElement el : output) {
nextWatermark += watermarkInterval;
Watermark wm = (Watermark) el;
assertTrue(wm.getTimestamp() == nextWatermark);
assertEquals(wm.getTimestamp(), nextWatermark);
}
}

Expand Down

0 comments on commit f9ba614

Please sign in to comment.