Skip to content

Commit

Permalink
[FLINK-22972][datastream] Remove StreamOperator#dispose in favour of …
Browse files Browse the repository at this point in the history
…close and finish

This commit cleans up StreamOperator API in regards to the termination phase and introduces a clean finish() method for flushing all records without releasing resources.

The StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method.

This closes apache#16351
  • Loading branch information
dawidwys committed Jul 6, 2021
1 parent 63183d8 commit 541f430
Show file tree
Hide file tree
Showing 56 changed files with 314 additions and 627 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
mainOperator.processElement(streamRecord);
} else {
mainOperator.endInput();
mainOperator.finish();
controller.suspendDefaultAction();
mailboxProcessor.suspend();
}
Expand All @@ -117,7 +118,6 @@ protected void cancelTask() {}
@Override
protected void cleanup() throws Exception {
mainOperator.close();
mainOperator.dispose();
}

private static class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ public void open() throws Exception {
}

@Override
public void close() throws Exception {
operator.close();
public void finish() throws Exception {
operator.finish();
}

@Override
public void dispose() throws Exception {
operator.dispose();
public void close() throws Exception {
operator.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ public void open() throws Exception {
}

@Override
public void close() throws Exception {
ACTUAL_ORDER_TRACKING.add("close");
public void finish() throws Exception {
ACTUAL_ORDER_TRACKING.add("finish");
}

@Override
public void dispose() throws Exception {
ACTUAL_ORDER_TRACKING.add("dispose");
public void close() throws Exception {
ACTUAL_ORDER_TRACKING.add("close");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,16 @@ public void open() throws Exception {
}

@Override
public void close() throws Exception {
public void finish() throws Exception {
try {
invokeFinishBundle();
} finally {
super.close();

try {
cleanUpLeakingClasses(this.getClass().getClassLoader());
} catch (Throwable t) {
LOG.warn("Failed to clean up the leaking objects.", t);
}
super.finish();
}
}

@Override
public void dispose() throws Exception {
public void close() throws Exception {
try {
if (checkFinishBundleTimer != null) {
checkFinishBundleTimer.cancel(true);
Expand All @@ -165,7 +159,13 @@ public void dispose() throws Exception {
pythonFunctionRunner = null;
}
} finally {
super.dispose();
super.close();

try {
cleanUpLeakingClasses(this.getClass().getClassLoader());
} catch (Throwable t) {
LOG.warn("Failed to clean up the leaking objects.", t);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark
}

@Override
public void close() throws Exception {
super.close();
public void finish() throws Exception {
super.finish();
watermarkGenerator.onPeriodicEmit(watermarkOutput);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public void open() throws Exception {
}

@Override
public void dispose() throws Exception {
super.dispose();
public void close() throws Exception {
super.close();
if (arrowSerializer != null) {
arrowSerializer.close();
arrowSerializer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ public void endInput() throws Exception {
}

@Override
public void close() throws Exception {
public void finish() throws Exception {
invokeCurrentBatch();
super.close();
super.finish();
}

protected abstract void invokeCurrentBatch() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,18 @@ public void endInput() throws Exception {
}

@Override
public void dispose() throws Exception {
super.dispose();
if (arrowSerializer != null) {
arrowSerializer.close();
arrowSerializer = null;
}
public void finish() throws Exception {
invokeCurrentBatch();
super.finish();
}

@Override
public void close() throws Exception {
invokeCurrentBatch();
super.close();
if (arrowSerializer != null) {
arrowSerializer.close();
arrowSerializer = null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@
*
* <ol>
* <li>if {@link ReaderState#IDLE IDLE} then close immediately
* <li>otherwise switch to {@link ReaderState#CLOSING CLOSING}, call {@link
* MailboxExecutor#yield() yield} in a loop until state is {@link ReaderState#CLOSED CLOSED}
* <li>otherwise switch to {@link ReaderState#FINISHING CLOSING}, call {@link
* MailboxExecutor#yield() yield} in a loop until state is {@link ReaderState#FINISHED CLOSED}
* <li>{@link MailboxExecutor#yield() yield()} causes remaining records (and splits) to be
* processed in the same way as above
* </ol>
Expand Down Expand Up @@ -139,7 +139,7 @@ public void onNoMoreData(ContinuousFileReaderOperator<?, ?> op) {
}
},
/**
* No further processing can be done; only state disposal transition to {@link #CLOSED}
* No further processing can be done; only state disposal transition to {@link #FINISHED}
* allowed.
*/
FAILED {
Expand All @@ -153,7 +153,7 @@ public <T extends TimestampedInputSplit> boolean prepareToProcessRecord(
* {@link #close()} was called but unprocessed data (records and splits) remains and needs
* to be processed. {@link #close()} caller is blocked.
*/
CLOSING {
FINISHING {
@Override
public <T extends TimestampedInputSplit> boolean prepareToProcessRecord(
ContinuousFileReaderOperator<?, T> op) throws IOException {
Expand All @@ -168,10 +168,10 @@ public void onNoMoreData(ContinuousFileReaderOperator<?, ?> op) {
// need one more mail to unblock possible yield() in close() method (todo: wait with
// timeout in yield)
op.enqueueProcessRecord();
op.switchState(CLOSED);
op.switchState(FINISHED);
}
},
CLOSED {
FINISHED {
@Override
public <T extends TimestampedInputSplit> boolean prepareToProcessRecord(
ContinuousFileReaderOperator<?, T> op) {
Expand All @@ -186,12 +186,12 @@ public <T extends TimestampedInputSplit> boolean prepareToProcessRecord(

static {
Map<ReaderState, Set<ReaderState>> tmpTransitions = new HashMap<>();
tmpTransitions.put(IDLE, EnumSet.of(OPENING, CLOSED, FAILED));
tmpTransitions.put(OPENING, EnumSet.of(READING, CLOSING, FAILED));
tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING, CLOSING, FAILED));
tmpTransitions.put(CLOSING, EnumSet.of(CLOSED, FAILED));
tmpTransitions.put(FAILED, EnumSet.of(CLOSED));
tmpTransitions.put(CLOSED, EnumSet.noneOf(ReaderState.class));
tmpTransitions.put(IDLE, EnumSet.of(OPENING, FINISHED, FAILED));
tmpTransitions.put(OPENING, EnumSet.of(READING, FINISHING, FAILED));
tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING, FINISHING, FAILED));
tmpTransitions.put(FINISHING, EnumSet.of(FINISHED, FAILED));
tmpTransitions.put(FAILED, EnumSet.of(FINISHED));
tmpTransitions.put(FINISHED, EnumSet.noneOf(ReaderState.class));
VALID_TRANSITIONS = new EnumMap<>(tmpTransitions);
}

Expand All @@ -200,7 +200,7 @@ public boolean isAcceptingSplits() {
}

public final boolean isTerminal() {
return this == CLOSED;
return this == FINISHED;
}

public boolean canSwitchTo(ReaderState next) {
Expand Down Expand Up @@ -302,7 +302,7 @@ public void open() throws Exception {

this.state = ReaderState.IDLE;
if (this.format instanceof RichInputFormat) {
((RichInputFormat) this.format).setRuntimeContext(getRuntimeContext());
((RichInputFormat<?, ?>) this.format).setRuntimeContext(getRuntimeContext());
}
this.format.configure(new Configuration());

Expand Down Expand Up @@ -380,7 +380,7 @@ private void onSplitProcessed() throws IOException {

private void readAndCollectRecord() throws IOException {
Preconditions.checkState(
state == ReaderState.READING || state == ReaderState.CLOSING,
state == ReaderState.READING || state == ReaderState.FINISHING,
"can't process record in state %s",
state);
if (format.reachedEnd()) {
Expand All @@ -394,14 +394,14 @@ private void readAndCollectRecord() throws IOException {

private void loadSplit(T split) throws IOException {
Preconditions.checkState(
state != ReaderState.READING && state != ReaderState.CLOSED,
state != ReaderState.READING && state != ReaderState.FINISHED,
"can't load split in state %s",
state);
Preconditions.checkNotNull(split, "split is null");
LOG.debug("load split: {}", split);
currentSplit = split;
if (this.format instanceof RichInputFormat) {
((RichInputFormat) this.format).openInputFormat();
((RichInputFormat<?, ?>) this.format).openInputFormat();
}
if (format instanceof CheckpointableInputFormat && currentSplit.getSplitState() != null) {
// recovering after a node failure with an input
Expand Down Expand Up @@ -436,14 +436,38 @@ public void processWatermark(Watermark mark) throws Exception {
}

@Override
public void dispose() throws Exception {
public void finish() throws Exception {
LOG.debug("finishing");
super.finish();

switch (state) {
case IDLE:
switchState(ReaderState.FINISHED);
break;
case FINISHED:
LOG.warn("operator is already closed, doing nothing");
return;
default:
switchState(ReaderState.FINISHING);
while (!state.isTerminal()) {
executor.yield();
}
}

try {
sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
} catch (Exception e) {
LOG.warn("unable to emit watermark while closing", e);
}
}

@Override
public void close() throws Exception {
Exception e = null;
if (state != ReaderState.CLOSED) {
try {
cleanUp();
} catch (Exception ex) {
e = ex;
}
try {
cleanUp();
} catch (Exception ex) {
e = ex;
}
{
checkpointedState = null;
Expand All @@ -457,7 +481,7 @@ public void dispose() throws Exception {
splits = null;
}
try {
super.dispose();
super.close();
} catch (Exception ex) {
e = ExceptionUtils.firstOrSuppressed(ex, e);
}
Expand All @@ -466,34 +490,6 @@ public void dispose() throws Exception {
}
}

@Override
public void close() throws Exception {
LOG.debug("closing");
super.close();

switch (state) {
case IDLE:
switchState(ReaderState.CLOSED);
break;
case CLOSED:
LOG.warn("operator is already closed, doing nothing");
return;
default:
switchState(ReaderState.CLOSING);
while (!state.isTerminal()) {
executor.yield();
}
}

try {
sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
} catch (Exception e) {
LOG.warn("unable to emit watermark while closing", e);
}

cleanUp();
}

private void cleanUp() throws Exception {
LOG.debug("cleanup, state={}", state);

Expand All @@ -502,7 +498,7 @@ private void cleanUp() throws Exception {
() -> format.close(),
() -> {
if (this.format instanceof RichInputFormat) {
((RichInputFormat) this.format).closeInputFormat();
((RichInputFormat<?, ?>) this.format).closeInputFormat();
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,30 +322,11 @@ protected boolean isUsingCustomRawKeyedState() {
@Override
public void open() throws Exception {}

/**
* This method is called after all records have been added to the operators via the methods
* {@link OneInputStreamOperator#processElement(StreamRecord)}, or {@link
* TwoInputStreamOperator#processElement1(StreamRecord)} and {@link
* TwoInputStreamOperator#processElement2(StreamRecord)}.
*
* <p>The method is expected to flush all remaining buffered data. Exceptions during this
* flushing of buffered should be propagated, in order to cause the operation to be recognized
* asa failed, because the last data items are not processed properly.
*
* @throws Exception An exception in this method causes the operator to fail.
*/
@Override
public void close() throws Exception {}
public void finish() throws Exception {}

/**
* This method is called at the very end of the operator's life, both in the case of a
* successful completion of the operation, and in the case of a failure and canceling.
*
* <p>This method is expected to make a thorough effort to release all resources that the
* operator has acquired.
*/
@Override
public void dispose() throws Exception {
public void close() throws Exception {
if (stateHandler != null) {
stateHandler.dispose();
}
Expand Down
Loading

0 comments on commit 541f430

Please sign in to comment.