Skip to content

Commit

Permalink
[FLINK-19531] Add runtime writer operators for new Sink API
Browse files Browse the repository at this point in the history
This patch introduces two types of writer operators:
1. The `StatelessWriterOperator` is for the case where the writer state serializer is not provided.
2. The `StatefulWriterOperator` is for the case where the writer state serializer is provided.

The `AbstractWriterOperator` is the base class of the two operators.
  • Loading branch information
guoweiM authored and aljoscha committed Oct 15, 2020
1 parent d5e34a5 commit 8d52860
Show file tree
Hide file tree
Showing 7 changed files with 730 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.Writer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.util.List;

/**
* Abstract base class for operators that work with a {@link Writer}.
*
* <p>Sub-classes are responsible for creating the specific {@link Writer} by implementing {@link
* #createWriter()}.
*
* @param <InputT> The input type of the {@link Writer}.
* @param <CommT> The committable type of the {@link Writer}.
*/
@Internal
abstract class AbstractWriterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
implements OneInputStreamOperator<InputT, CommT>, BoundedOneInput {

private static final long serialVersionUID = 1L;

/** The runtime information of the input element. */
private final Context<InputT> context;

// ------------------------------- runtime fields ---------------------------------------

/** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
private Long currentWatermark;

/** The sink writer that does most of the work. */
protected Writer<InputT, CommT, ?> writer;

AbstractWriterOperator() {
this.context = new Context<>();
}

@Override
public void open() throws Exception {
super.open();

this.currentWatermark = Long.MIN_VALUE;

writer = createWriter();
}

@Override
public void processElement(StreamRecord<InputT> element) throws Exception {
context.element = element;
writer.write(element.getValue(), context);
}

@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
super.prepareSnapshotPreBarrier(checkpointId);
sendCommittables(writer.prepareCommit(false));
}

@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
this.currentWatermark = mark.getTimestamp();
}

@Override
public void endInput() throws Exception {
sendCommittables(writer.prepareCommit(true));
}

@Override
public void close() throws Exception {
super.close();
writer.close();
}

protected Sink.InitContext createInitContext() {
return new Sink.InitContext() {
@Override
public int getSubtaskId() {
return getRuntimeContext().getIndexOfThisSubtask();
}

@Override
public MetricGroup metricGroup() {
return getMetricGroup();
}
};
}

/**
* Creates and returns a {@link Writer}.
*
* @throws Exception If creating {@link Writer} fail
*/
abstract Writer<InputT, CommT, ?> createWriter() throws Exception;

private void sendCommittables(final List<CommT> committables) {
for (CommT committable : committables) {
output.collect(new StreamRecord<>(committable));
}
}

private class Context<IN> implements Writer.Context {

private StreamRecord<IN> element;

@Override
public long currentWatermark() {
return currentWatermark;
}

@Override
public Long timestamp() {
if (element.hasTimestamp()) {
return element.getTimestamp();
}
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.api.connector.sink.Writer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;

/**
* Base {@link OneInputStreamOperatorFactory} for subclasses of {@link AbstractWriterOperator}.
*
* @param <InputT> The input type of the {@link Writer}.
* @param <CommT> The committable type of the {@link Writer}.
*/
abstract class AbstractWriterOperatorFactory<InputT, CommT> extends AbstractStreamOperatorFactory<CommT>
implements OneInputStreamOperatorFactory<InputT, CommT> {

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<CommT>> T createStreamOperator(StreamOperatorParameters<CommT> parameters) {
final AbstractWriterOperator<InputT, CommT> writerOperator = createWriterOperator();
writerOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
return (T) writerOperator;
}

abstract AbstractWriterOperator<InputT, CommT> createWriterOperator();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.Writer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.util.CollectionUtil;

import java.util.List;

/**
* Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link
* Writer Writers} that have state.
*
* @param <InputT> The input type of the {@link Writer}.
* @param <CommT> The committable type of the {@link Writer}.
* @param <WriterStateT> The type of the {@link Writer Writer's} state.
*/
@Internal
final class StatefulWriterOperator<InputT, CommT, WriterStateT> extends AbstractWriterOperator<InputT, CommT> {

/** The operator's state descriptor. */
private static final ListStateDescriptor<byte[]> WRITER_RAW_STATES_DESC =
new ListStateDescriptor<>("writer_raw_states", BytePrimitiveArraySerializer.INSTANCE);

/** Used to create the stateful {@link Writer}. */
private final Sink<InputT, CommT, WriterStateT, ?> sink;

/** The writer operator's state serializer. */
private final SimpleVersionedSerializer<WriterStateT> writerStateSimpleVersionedSerializer;

// ------------------------------- runtime fields ---------------------------------------

/** The operator's state. */
private ListState<WriterStateT> writerState;

StatefulWriterOperator(
final Sink<InputT, CommT, WriterStateT, ?> sink,
final SimpleVersionedSerializer<WriterStateT> writerStateSimpleVersionedSerializer) {
this.sink = sink;
this.writerStateSimpleVersionedSerializer = writerStateSimpleVersionedSerializer;
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);

final ListState<byte[]> rawState = context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC);
writerState = new SimpleVersionedListState<>(rawState, writerStateSimpleVersionedSerializer);
}

@SuppressWarnings("unchecked")
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
writerState.update((List<WriterStateT>) writer.snapshotState());
}

@Override
Writer<InputT, CommT, WriterStateT> createWriter() throws Exception {
final List<WriterStateT> committables = CollectionUtil.iterableToList(writerState.get());
return sink.createWriter(createInitContext(), committables);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.Writer;
import org.apache.flink.streaming.api.operators.StreamOperator;

/**
* A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link
* StatefulWriterOperator}.
*
* @param <InputT> The input type of the {@link Writer}.
* @param <CommT> The committable type of the {@link Writer}.
* @param <WriterStateT> The type of the {@link Writer Writer's} state.
*/
public final class StatefulWriterOperatorFactory<InputT, CommT, WriterStateT> extends AbstractWriterOperatorFactory<InputT, CommT> {

private final Sink<InputT, CommT, WriterStateT, ?> sink;

public StatefulWriterOperatorFactory(Sink<InputT, CommT, WriterStateT, ?> sink) {
this.sink = sink;
}

@Override
AbstractWriterOperator<InputT, CommT> createWriterOperator() {
return new StatefulWriterOperator<>(sink, sink.getWriterStateSerializer().get());
}

@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return StatefulWriterOperator.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.Writer;

import java.util.Collections;

/**
* Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link
* Writer Writers} that don't have state.
*
* @param <InputT> The input type of the {@link Writer}.
* @param <CommT> The committable type of the {@link Writer}.
*/
@Internal
final class StatelessWriterOperator<InputT, CommT> extends AbstractWriterOperator<InputT, CommT>{

/** Used to create the stateless {@link Writer}. */
private final Sink<InputT, CommT, ?, ?> sink;

StatelessWriterOperator(final Sink<InputT, CommT, ?, ?> sink) {
this.sink = sink;
}

@Override
Writer<InputT, CommT, ?> createWriter() {
return sink.createWriter(createInitContext(), Collections.emptyList());
}
}
Loading

0 comments on commit 8d52860

Please sign in to comment.