Skip to content

Commit

Permalink
[hotfix] Setup BufferDataOverWindowOperatorTest a bit more properly
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski committed Jun 12, 2024
1 parent 23fa2ae commit 503593a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
Expand Down Expand Up @@ -55,7 +52,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

import static org.apache.flink.table.runtime.operators.over.NonBufferOverWindowOperatorTest.comparator;
import static org.apache.flink.table.runtime.operators.over.NonBufferOverWindowOperatorTest.function;
Expand All @@ -70,14 +66,10 @@
/** Test for {@link BufferDataOverWindowOperator}. */
public class BufferDataOverWindowOperatorTest {

private static final int MEMORY_SIZE = 50 * 1024 * 32;
private RowType valueType =
new RowType(Collections.singletonList(new RowType.RowField("f0", new BigIntType())));

private List<GenericRowData> collect;
private MemoryManager memoryManager =
MemoryManagerBuilder.newBuilder().setMemorySize(MEMORY_SIZE).build();
private IOManager ioManager;
private BufferDataOverWindowOperator operator;
private GeneratedRecordComparator boundComparator =
new GeneratedRecordComparator("", "", new Object[0]) {
Expand All @@ -89,7 +81,6 @@ public RecordComparator newInstance(ClassLoader classLoader) {

@Before
public void before() throws Exception {
ioManager = new IOManagerAsync();
collect = new ArrayList<>();
}

Expand Down Expand Up @@ -195,66 +186,45 @@ public void testSliding() throws Exception {
}

private void test(OverWindowFrame[] frames, GenericRowData[] expect) throws Exception {
MockEnvironment env =
new MockEnvironmentBuilder()
.setIOManager(ioManager)
.setMemoryManager(memoryManager)
.build();
MockEnvironment env = new MockEnvironmentBuilder().build();
StreamTask<Object, StreamOperator<Object>> task =
new StreamTask<Object, StreamOperator<Object>>(env) {
@Override
protected void init() {}
};
StreamConfig streamConfig = mock(StreamConfig.class);
when(streamConfig.<RowData>getTypeSerializerIn1(
Thread.currentThread().getContextClassLoader()))
.thenReturn(inputSer);
when(streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
eq(ManagedMemoryUseCase.OPERATOR),
any(Configuration.class),
any(Configuration.class),
any(ClassLoader.class)))
.thenReturn(0.99);
when(streamConfig.getOperatorID()).thenReturn(new OperatorID());
operator =
new BufferDataOverWindowOperator(frames, comparator, true) {
{
output =
new NonBufferOverWindowOperatorTest.ConsumerOutput(
new Consumer<RowData>() {
@Override
public void accept(RowData r) {
collect.add(
GenericRowData.of(
r.getInt(0),
r.getLong(1),
r.getLong(2),
r.getLong(3),
r.getLong(4)));
}
});
}

@Override
public ClassLoader getUserCodeClassloader() {
return Thread.currentThread().getContextClassLoader();
}

@Override
public StreamConfig getOperatorConfig() {
StreamConfig conf = mock(StreamConfig.class);
when(conf.<RowData>getTypeSerializerIn1(getUserCodeClassloader()))
.thenReturn(inputSer);
when(conf.getManagedMemoryFractionOperatorUseCaseOfSlot(
eq(ManagedMemoryUseCase.OPERATOR),
any(Configuration.class),
any(Configuration.class),
any(ClassLoader.class)))
.thenReturn(0.99);
return conf;
}

@Override
public StreamTask<?, ?> getContainingTask() {
return task;
}

@Override
public StreamingRuntimeContext getRuntimeContext() {
return mock(StreamingRuntimeContext.class);
}
};
operator.setProcessingTimeService(new TestProcessingTimeService());
operator.setup(
task,
streamConfig,
new NonBufferOverWindowOperatorTest.ConsumerOutput(
r ->
collect.add(
GenericRowData.of(
r.getInt(0),
r.getLong(1),
r.getLong(2),
r.getLong(3),
r.getLong(4)))));
operator.open();

addRow(0, 1L, 4L); /* 1 **/
addRow(0, 1L, 1L); /* 2 **/
addRow(0, 1L, 1L); /* 3 **/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@

package org.apache.flink.table.runtime.operators.over;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.table.data.GenericRowData;
Expand All @@ -48,6 +55,8 @@
import java.util.function.Consumer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -119,44 +128,42 @@ public void testResetAccumulators() throws Exception {
}

private void test(boolean[] resetAccumulators, GenericRowData[] expect) throws Exception {
MockEnvironment env = new MockEnvironmentBuilder().build();
StreamTask<Object, StreamOperator<Object>> task =
new StreamTask<Object, StreamOperator<Object>>(env) {
@Override
protected void init() {}
};
StreamConfig streamConfig = mock(StreamConfig.class);
when(streamConfig.<RowData>getTypeSerializerIn1(
Thread.currentThread().getContextClassLoader()))
.thenReturn(inputSer);
when(streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
eq(ManagedMemoryUseCase.OPERATOR),
any(Configuration.class),
any(Configuration.class),
any(ClassLoader.class)))
.thenReturn(0.99);
when(streamConfig.getOperatorID()).thenReturn(new OperatorID());
operator =
new NonBufferOverWindowOperator(functions, comparator, resetAccumulators) {
{
output =
new ConsumerOutput(
new Consumer<RowData>() {
@Override
public void accept(RowData r) {
collect.add(
GenericRowData.of(
r.getInt(0),
r.getLong(1),
r.getLong(2),
r.getLong(3),
r.getLong(4)));
}
});
}

@Override
public ClassLoader getUserCodeClassloader() {
return Thread.currentThread().getContextClassLoader();
}

@Override
public StreamConfig getOperatorConfig() {
StreamConfig conf = mock(StreamConfig.class);
when(conf.<RowData>getTypeSerializerIn1(getUserCodeClassloader()))
.thenReturn(inputSer);
return conf;
}

@Override
public StreamingRuntimeContext getRuntimeContext() {
return mock(StreamingRuntimeContext.class);
}
};
operator.setProcessingTimeService(new TestProcessingTimeService());
operator.setup(
task,
streamConfig,
new ConsumerOutput(
r ->
collect.add(
GenericRowData.of(
r.getInt(0),
r.getLong(1),
r.getLong(2),
r.getLong(3),
r.getLong(4)))));
operator.open();
addRow(0, 1L, 4L);
addRow(0, 1L, 1L);
Expand Down

0 comments on commit 503593a

Please sign in to comment.