Skip to content

Commit

Permalink
[FLINK-18515][Kinesis] Adding FanOutRecordPublisher for Kinesis EFO s…
Browse files Browse the repository at this point in the history
…upport

This closes apache#13189.
  • Loading branch information
dannycranmer authored and tzulitai committed Sep 21, 2020
1 parent 9565099 commit bbcd0c7
Show file tree
Hide file tree
Showing 36 changed files with 2,537 additions and 234 deletions.
1 change: 1 addition & 0 deletions flink-connectors/flink-connector-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ under the License.
<scope>test</scope>
</dependency>

<!-- Amazon AWS SDK v1.x dependencies -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ public enum EFORegistrationType {
/** The maximum number of records that will be buffered before suspending consumption of a shard. */
public static final String WATERMARK_SYNC_QUEUE_CAPACITY = "flink.watermark.sync.queue.capacity";

public static final String EFO_HTTP_CLIENT_MAX_CONCURRENCY = "flink.stream.efo.http-client.max-concurrency";

// ------------------------------------------------------------------------
// Default values for consumer configuration
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -272,7 +274,7 @@ public enum EFORegistrationType {

public static final double DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;

public static final int DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES = 5;
public static final int DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES = 10;

public static final long DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE = 1000L;

Expand Down Expand Up @@ -308,10 +310,21 @@ public enum EFORegistrationType {

public static final long DEFAULT_WATERMARK_SYNC_MILLIS = 30_000;

public static final int DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY = 10_000;

/**
* To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
* getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
*/
public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L;

/**
* Build the key of an EFO consumer ARN according to a stream name.
* @param streamName the stream name the key is built upon.
* @return a key of EFO consumer ARN.
*/
public static String efoConsumerArn(final String streamName) {
return EFO_CONSUMER_ARN_PREFIX + "." + streamName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public DynamoDBStreamsDataFetcher(List<String> streams,
new ArrayList<>(),
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
// use DynamoDBStreamsProxy
DynamoDBStreamsProxy::create);
DynamoDBStreamsProxy::create,
null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
Expand All @@ -41,8 +43,11 @@
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
Expand All @@ -56,6 +61,9 @@
import com.amazonaws.services.kinesis.model.Shard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -74,6 +82,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.POLLING;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -183,6 +193,9 @@ public class KinesisDataFetcher<T> {
/** The Kinesis proxy factory that will be used to create instances for discovery and shard consumers. */
private final FlinkKinesisProxyFactory kinesisProxyFactory;

/** The Kinesis proxy V2 factory that will be used to create instances for EFO shard consumers. */
private final FlinkKinesisProxyV2Factory kinesisProxyV2Factory;

/** The Kinesis proxy that the fetcher will be using to discover new shards. */
private final KinesisProxyInterface kinesis;

Expand Down Expand Up @@ -242,6 +255,13 @@ public interface FlinkKinesisProxyFactory {
KinesisProxyInterface create(Properties configProps);
}

/**
* Factory to create Kinesis proxy V@ instances used by a fetcher.
*/
public interface FlinkKinesisProxyV2Factory {
KinesisProxyV2Interface create(Properties configProps);
}

/**
* The wrapper that holds the watermark handling related parameters
* of a record produced by the shard consumer thread.
Expand Down Expand Up @@ -318,14 +338,15 @@ public RecordWrapper<T> peek() {
* @param configProps the consumer configuration properties
* @param deserializationSchema deserialization schema
*/
public KinesisDataFetcher(List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner shardAssigner,
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
WatermarkTracker watermarkTracker) {
public KinesisDataFetcher(
final List<String> streams,
final SourceFunction.SourceContext<T> sourceContext,
final RuntimeContext runtimeContext,
final Properties configProps,
final KinesisDeserializationSchema<T> deserializationSchema,
final KinesisShardAssigner shardAssigner,
final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
final WatermarkTracker watermarkTracker) {
this(streams,
sourceContext,
sourceContext.getCheckpointLock(),
Expand All @@ -338,23 +359,26 @@ public KinesisDataFetcher(List<String> streams,
new AtomicReference<>(),
new ArrayList<>(),
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
KinesisProxy::create);
KinesisProxy::create,
KinesisDataFetcher::createKinesisProxyV2);
}

@VisibleForTesting
protected KinesisDataFetcher(List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
Object checkpointLock,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner shardAssigner,
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
WatermarkTracker watermarkTracker,
AtomicReference<Throwable> error,
List<KinesisStreamShardState> subscribedShardsState,
HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
FlinkKinesisProxyFactory kinesisProxyFactory) {
protected KinesisDataFetcher(
final List<String> streams,
final SourceFunction.SourceContext<T> sourceContext,
final Object checkpointLock,
final RuntimeContext runtimeContext,
final Properties configProps,
final KinesisDeserializationSchema<T> deserializationSchema,
final KinesisShardAssigner shardAssigner,
final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
final WatermarkTracker watermarkTracker,
final AtomicReference<Throwable> error,
final List<KinesisStreamShardState> subscribedShardsState,
final HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
final FlinkKinesisProxyFactory kinesisProxyFactory,
@Nullable final FlinkKinesisProxyV2Factory kinesisProxyV2Factory) {
this.streams = checkNotNull(streams);
this.configProps = checkNotNull(configProps);
this.sourceContext = checkNotNull(sourceContext);
Expand All @@ -367,6 +391,7 @@ protected KinesisDataFetcher(List<String> streams,
this.periodicWatermarkAssigner = periodicWatermarkAssigner;
this.watermarkTracker = watermarkTracker;
this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
this.kinesisProxyV2Factory = kinesisProxyV2Factory;
this.kinesis = kinesisProxyFactory.create(configProps);
this.recordPublisherFactory = createRecordPublisherFactory();

Expand All @@ -379,6 +404,7 @@ protected KinesisDataFetcher(List<String> streams,

this.shardConsumersExecutor =
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());

this.recordEmitter = createRecordEmitter(configProps);
}

Expand All @@ -402,11 +428,11 @@ private RecordEmitter createRecordEmitter(Properties configProps) {
* @return shard consumer
*/
protected ShardConsumer<T> createShardConsumer(
Integer subscribedShardStateIndex,
StreamShardHandle subscribedShard,
SequenceNumber lastSequenceNum,
MetricGroup metricGroup,
KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException {
final Integer subscribedShardStateIndex,
final StreamShardHandle subscribedShard,
final SequenceNumber lastSequenceNum,
final MetricGroup metricGroup,
final KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException {

return new ShardConsumer<>(
this,
Expand All @@ -418,8 +444,17 @@ protected ShardConsumer<T> createShardConsumer(
shardDeserializer);
}

private RecordPublisherFactory createRecordPublisherFactory() {
return new PollingRecordPublisherFactory(kinesisProxyFactory);
protected RecordPublisherFactory createRecordPublisherFactory() {
RecordPublisherType recordPublisherType = RecordPublisherType.valueOf(
configProps.getProperty(RECORD_PUBLISHER_TYPE, POLLING.name()));

switch (recordPublisherType) {
case EFO:
return new FanOutRecordPublisherFactory(kinesisProxyV2Factory.create(configProps));
case POLLING:
default:
return new PollingRecordPublisherFactory(kinesisProxyFactory);
}
}

protected RecordPublisher createRecordPublisher(
Expand All @@ -432,6 +467,11 @@ protected RecordPublisher createRecordPublisher(
return recordPublisherFactory.create(startingPosition, configProps, metricGroup, subscribedShard);
}

private static KinesisProxyV2Interface createKinesisProxyV2(final Properties configProps) {
final KinesisAsyncClient client = AwsV2Util.createKinesisAsyncClient(configProps);
return new KinesisProxyV2(client);
}

/**
* Starts the fetcher. After starting the fetcher, it can only
* be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
Expand Down Expand Up @@ -672,6 +712,8 @@ public void shutdownFetcher() {
LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
}
shardConsumersExecutor.shutdownNow();

recordPublisherFactory.close();
}

/** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;

import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -51,6 +53,9 @@
*/
@Internal
public class ShardConsumer<T> implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);

private final KinesisDeserializationSchema<T> deserializer;

private final int subscribedShardStateIndex;
Expand Down Expand Up @@ -102,6 +107,11 @@ public void run() {
try {
while (isRunning()) {
final RecordPublisherRunResult result = recordPublisher.run(batch -> {
if (!batch.getDeaggregatedRecords().isEmpty()) {
LOG.debug("stream: {}, shard: {}, millis behind latest: {}, batch size: {}",
subscribedShard.getStreamName(), subscribedShard.getShard().getShardId(),
batch.getMillisBehindLatest(), batch.getDeaggregatedRecordSize());
}
for (UserRecord userRecord : batch.getDeaggregatedRecords()) {
if (filterDeaggregatedRecord(userRecord)) {
deserializeRecordForCollectionAndUpdateState(userRecord);
Expand All @@ -118,7 +128,6 @@ public void run() {

if (result == COMPLETE) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());

// we can close this consumer thread once we've reached the end of the subscribed shard
break;
}
Expand Down Expand Up @@ -188,7 +197,7 @@ private void deserializeRecordForCollectionAndUpdateState(final UserRecord recor
* This method is to support restarting from a partially consumed aggregated sequence number.
*
* @param record the record to filter
* @return {@code true} if the record should be retained
* @return true if the record should be retained
*/
private boolean filterDeaggregatedRecord(final UserRecord record) {
if (!lastSequenceNum.isAggregated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,11 @@ RecordPublisher create(
MetricGroup metricGroup,
StreamShardHandle streamShardHandle) throws InterruptedException;

/**
* Destroy any open resources used by the factory.
*/
default void close() {
// Do nothing by default
}

}
Loading

0 comments on commit bbcd0c7

Please sign in to comment.