Skip to content

Commit

Permalink
[FLINK-25578][core] Graduate and deprecate unified Sink API V1
Browse files Browse the repository at this point in the history
  • Loading branch information
fapaul committed Feb 4, 2022
1 parent b2131e4 commit 5f4659f
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ org.apache.flink.configuration.ConfigOptions.key(java.lang.String): Returned lea
org.apache.flink.configuration.DescribedEnum.getDescription(): Returned leaf type org.apache.flink.configuration.description.InlineElement does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.configuration.MemorySize.parse(java.lang.String, org.apache.flink.configuration.MemorySize$MemoryUnit): Argument leaf type org.apache.flink.configuration.MemorySize$MemoryUnit does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.base.DeliveryGuarantee.getDescription(): Returned leaf type org.apache.flink.configuration.description.InlineElement does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.base.sink.AsyncSinkBase.createCommitter(): Returned leaf type org.apache.flink.api.connector.sink.Committer does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.base.sink.AsyncSinkBase.createGlobalCommitter(): Returned leaf type org.apache.flink.api.connector.sink.GlobalCommitter does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(java.lang.Object, org.apache.flink.api.connector.sink.SinkWriter$Context): Argument leaf type org.apache.flink.api.connector.sink.SinkWriter$Context does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.base.sink.writer.ElementConverter.apply(java.lang.Object, org.apache.flink.api.connector.sink.SinkWriter$Context): Argument leaf type org.apache.flink.api.connector.sink.SinkWriter$Context does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.base.source.hybrid.HybridSource.builder(org.apache.flink.api.connector.source.Source): Returned leaf type org.apache.flink.connector.base.source.hybrid.HybridSource$HybridSourceBuilder does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.base.source.hybrid.HybridSource.createEnumerator(org.apache.flink.api.connector.source.SplitEnumeratorContext): Argument leaf type org.apache.flink.connector.base.source.hybrid.HybridSourceSplit does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.base.source.hybrid.HybridSource.createEnumerator(org.apache.flink.api.connector.source.SplitEnumeratorContext): Returned leaf type org.apache.flink.connector.base.source.hybrid.HybridSourceEnumeratorState does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
Expand All @@ -71,12 +67,7 @@ org.apache.flink.connector.base.source.hybrid.HybridSource.restoreEnumerator(org
org.apache.flink.connector.base.source.hybrid.HybridSource.restoreEnumerator(org.apache.flink.api.connector.source.SplitEnumeratorContext, org.apache.flink.connector.base.source.hybrid.HybridSourceEnumeratorState): Argument leaf type org.apache.flink.connector.base.source.hybrid.HybridSourceSplit does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.base.source.hybrid.HybridSource.restoreEnumerator(org.apache.flink.api.connector.source.SplitEnumeratorContext, org.apache.flink.connector.base.source.hybrid.HybridSourceEnumeratorState): Returned leaf type org.apache.flink.connector.base.source.hybrid.HybridSourceEnumeratorState does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.base.source.hybrid.HybridSource.restoreEnumerator(org.apache.flink.api.connector.source.SplitEnumeratorContext, org.apache.flink.connector.base.source.hybrid.HybridSourceEnumeratorState): Returned leaf type org.apache.flink.connector.base.source.hybrid.HybridSourceSplit does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter.emit(java.lang.Object, org.apache.flink.api.connector.sink.SinkWriter$Context, org.apache.flink.connector.elasticsearch.sink.RequestIndexer): Argument leaf type org.apache.flink.api.connector.sink.SinkWriter$Context does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter.emit(java.lang.Object, org.apache.flink.api.connector.sink.SinkWriter$Context, org.apache.flink.connector.elasticsearch.sink.RequestIndexer): Argument leaf type org.apache.flink.connector.elasticsearch.sink.RequestIndexer does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink.createCommitter(): Returned leaf type org.apache.flink.api.connector.sink.Committer does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink.createGlobalCommitter(): Returned leaf type org.apache.flink.api.connector.sink.GlobalCommitter does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink.createWriter(org.apache.flink.api.connector.sink.Sink$InitContext, java.util.List): Argument leaf type org.apache.flink.api.connector.sink.Sink$InitContext does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink.createWriter(org.apache.flink.api.connector.sink.Sink$InitContext, java.util.List): Returned leaf type org.apache.flink.api.connector.sink.SinkWriter does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.build(): Returned leaf type org.apache.flink.connector.file.sink.FileSink does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.withOutputFileConfig(org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig): Argument leaf type org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.file.src.AbstractFileSource.getAssignerFactory(): Returned leaf type org.apache.flink.connector.file.src.assigners.FileSplitAssigner$Provider does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
Expand Down Expand Up @@ -425,8 +416,5 @@ org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware.setProcessin
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(org.apache.kafka.clients.consumer.ConsumerRecord): Returned leaf type org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.getProducedType(): Returned leaf type org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.isEndOfStream(org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode): Argument leaf type org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.table.connector.sink.SinkProvider.createSink(): Returned leaf type org.apache.flink.api.connector.sink.Sink does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.table.connector.sink.SinkProvider.of(org.apache.flink.api.connector.sink.Sink): Argument leaf type org.apache.flink.api.connector.sink.Sink does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.table.connector.sink.SinkProvider.of(org.apache.flink.api.connector.sink.Sink, java.lang.Integer): Argument leaf type org.apache.flink.api.connector.sink.Sink does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.table.operations.QueryOperation.accept(org.apache.flink.table.operations.QueryOperationVisitor): Argument leaf type org.apache.flink.table.operations.QueryOperationVisitor does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.types.parser.FieldParser.getErrorState(): Returned leaf type org.apache.flink.types.parser.FieldParser$ParseErrorState does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@ org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer$Int
org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer$LongPrimitiveArraySerializerSnapshot does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer$ShortPrimitiveArraySerializerSnapshot does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.typeutils.base.array.StringArraySerializer$StringArraySerializerSnapshot does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.connector.sink.Sink$InitContext does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.connector.sink.Sink$ProcessingTimeService does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.connector.sink.Sink$ProcessingTimeService$ProcessingTimeCallback does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.connector.sink.SinkWriter$Context does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.connector.source.lib.NumberSequenceSource$NumberSequenceSplit does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.java.Utils$ChecksumHashCode does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.java.Utils$ChecksumHashCodeHelper does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.flink.api.connector.sink;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;

import java.io.IOException;
import java.util.List;
Expand All @@ -28,8 +28,10 @@
* The {@code Committer} is responsible for committing the data staged by the sink.
*
* @param <CommT> The type of information needed to commit the staged data
* @deprecated Please use {@link org.apache.flink.api.connector.sink2.Committer}.
*/
@Experimental
@Deprecated
@PublicEvolving
public interface Committer<CommT> extends AutoCloseable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.flink.api.connector.sink;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;

import java.io.IOException;
import java.util.List;
Expand All @@ -32,8 +32,11 @@
*
* @param <CommT> The type of information needed to commit data staged by the sink
* @param <GlobalCommT> The type of the aggregated committable
* @deprecated Please use {@code WithPostCommitTopology} with {@code
* StandardSinkTopologies#addGlobalCommitter}.
*/
@Experimental
@Deprecated
@PublicEvolving
public interface GlobalCommitter<CommT, GlobalCommT> extends AutoCloseable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.flink.api.connector.sink;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
Expand Down Expand Up @@ -52,8 +52,10 @@
* @param <CommT> The type of information needed to commit data staged by the sink
* @param <WriterStateT> The type of the sink writer's state
* @param <GlobalCommT> The type of the aggregated committable
* @deprecated Please use {@link org.apache.flink.api.connector.sink2.Sink} or a derivative.
*/
@Experimental
@Deprecated
@PublicEvolving
public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends Serializable {

/**
Expand Down Expand Up @@ -129,7 +131,14 @@ default Collection<String> getCompatibleStateNames() {
return Collections.emptyList();
}

/** The interface exposes some runtime info for creating a {@link SinkWriter}. */
/**
* The interface exposes some runtime info for creating a {@link SinkWriter}.
*
* @deprecated Please migrate to {@link org.apache.flink.api.connector.sink2.Sink} and use
* {@link org.apache.flink.api.connector.sink2.Sink.InitContext}.
*/
@Deprecated
@PublicEvolving
interface InitContext {
/**
* Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath,
Expand Down Expand Up @@ -175,7 +184,12 @@ interface InitContext {
/**
* A service that allows to get the current processing time and register timers that will
* execute the given {@link ProcessingTimeCallback} when firing.
*
* @deprecated Please migrate to {@link org.apache.flink.api.connector.sink2.Sink} and use
* {@link org.apache.flink.api.common.operators.ProcessingTimeService}.
*/
@Deprecated
@PublicEvolving
interface ProcessingTimeService {

/** Returns the current processing time. */
Expand All @@ -192,7 +206,13 @@ interface ProcessingTimeService {
/**
* A callback that can be registered via {@link #registerProcessingTimer(long,
* ProcessingTimeCallback)}.
*
* @deprecated Please migrate to {@link org.apache.flink.api.connector.sink2.Sink} and use
* {@link
* org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback}.
*/
@Deprecated
@PublicEvolving
interface ProcessingTimeCallback {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.flink.api.connector.sink;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.eventtime.Watermark;

import java.io.IOException;
Expand All @@ -35,8 +35,10 @@
* @param <InputT> The type of the sink writer's input
* @param <CommT> The type of information needed to commit data staged by the sink
* @param <WriterStateT> The type of the writer's state
* @deprecated Please use {@link org.apache.flink.api.connector.sink2.SinkWriter} or a derivative.
*/
@Experimental
@Deprecated
@PublicEvolving
public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {

/**
Expand Down Expand Up @@ -89,7 +91,14 @@ default List<WriterStateT> snapshotState(long checkpointId) throws IOException {
return snapshotState();
}

/** Context that {@link #write} can use for getting additional data about an input record. */
/**
* Context that {@link #write} can use for getting additional data about an input record.
*
* @deprecated Please migrate to {@link org.apache.flink.api.connector.sink2.SinkWriter} and use
* {@link org.apache.flink.api.connector.sink2.SinkWriter.Context}.
*/
@Deprecated
@PublicEvolving
interface Context {

/** Returns the current event-time watermark. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
Expand Down Expand Up @@ -1249,7 +1248,7 @@ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
* @param sink The user defined sink.
* @return The closed DataStream.
*/
@Experimental
@PublicEvolving
public DataStreamSink<T> sinkTo(org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sink) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
*
* <p>{@code DataStreamSinkProvider} in {@code flink-table-api-java-bridge} is available for
* advanced connector developers.
*
* @deprecated Please convert your sink to {@link org.apache.flink.api.connector.sink2.Sink} and use
* {@link SinkV2Provider}.
*/
@Deprecated
@PublicEvolving
public interface SinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {

Expand Down

0 comments on commit 5f4659f

Please sign in to comment.