Skip to content

Commit

Permalink
[SPARK-6514] [SPARK-5960] [SPARK-6656] [SPARK-7679] [STREAMING] [KINE…
Browse files Browse the repository at this point in the history
…SIS] Updates to the Kinesis API

SPARK-6514 - Use correct region
SPARK-5960 - Allow AWS Credentials to be directly passed
SPARK-6656 - Specify kinesis application name explicitly
SPARK-7679 - Upgrade to latest KCL and AWS SDK.

Author: Tathagata Das <[email protected]>

Closes apache#6147 from tdas/kinesis-api-update and squashes the following commits:

f23ea77 [Tathagata Das] Updated versions and updated APIs
373b201 [Tathagata Das] Updated Kinesis API
  • Loading branch information
tdas committed May 17, 2015
1 parent 2ca60ac commit ca4257a
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[kinesis] class KinesisCheckpointState(
/**
* Advance the checkpoint clock by the checkpoint interval.
*/
def advanceCheckpoint() = {
def advanceCheckpoint(): Unit = {
checkpointClock.advance(checkpointInterval.milliseconds)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,40 @@
*/
package org.apache.spark.streaming.kinesis

import java.net.InetAddress
import java.util.UUID

import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker

private[kinesis]
case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable

/**
* Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
* This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
* https://github.com/awslabs/amazon-kinesis-client
* This is a custom receiver used with StreamingContext.receiverStream(Receiver)
* as described here:
* http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* Instances of this class will get shipped to the Spark Streaming Workers
* to run within a Spark Executor.
* This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here:
* http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* Instances of this class will get shipped to the Spark Streaming Workers to run within a
* Spark Executor.
*
* @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
* by the Kinesis Client Library. If you change the App name or Stream name,
* the KCL will throw errors. This usually requires deleting the backing
* DynamoDB table with the same name this Kinesis application.
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Region name used by the Kinesis Client Library for
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
Expand All @@ -59,92 +60,103 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
* @param storageLevel Storage level to use for storing the received objects
*
* @return ReceiverInputDStream[Array[Byte]]
* @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
* the credentials
*/
private[kinesis] class KinesisReceiver(
appName: String,
streamName: String,
endpointUrl: String,
checkpointInterval: Duration,
regionName: String,
initialPositionInStream: InitialPositionInStream,
storageLevel: StorageLevel)
extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>

/*
* The following vars are built in the onStart() method which executes in the Spark Worker after
* this code is serialized and shipped remotely.
*/

/*
* workerId should be based on the ip address of the actual Spark Worker where this code runs
* (not the Driver's ip address.)
*/
var workerId: String = null
checkpointInterval: Duration,
storageLevel: StorageLevel,
awsCredentialsOption: Option[SerializableAWSCredentials]
) extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>

/*
* This impl uses the DefaultAWSCredentialsProviderChain and searches for credentials
* in the following order of precedence:
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
* Credential profiles file at the default location (~/.aws/credentials) shared by all
* AWS SDKs and the AWS CLI
* Instance profile credentials delivered through the Amazon EC2 metadata service
* =================================================================================
* The following vars are initialize in the onStart() method which executes in the
* Spark worker after this Receiver is serialized and shipped to the worker.
* =================================================================================
*/
var credentialsProvider: AWSCredentialsProvider = null

/* KCL config instance. */
var kinesisClientLibConfiguration: KinesisClientLibConfiguration = null

/*
* RecordProcessorFactory creates impls of IRecordProcessor.
* IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
* IRecordProcessor.processRecords() method.
* We're using our custom KinesisRecordProcessor in this case.
/**
* workerId is used by the KCL should be based on the ip address of the actual Spark Worker where this code runs
* (not the driver's IP address.)
*/
var recordProcessorFactory: IRecordProcessorFactory = null
private var workerId: String = null

/*
* Create a Kinesis Worker.
* This is the core client abstraction from the Kinesis Client Library (KCL).
* We pass the RecordProcessorFactory from above as well as the KCL config instance.
* A Kinesis Worker can process 1..* shards from the given stream - each with its
* own RecordProcessor.
/**
* Worker is the core client abstraction from the Kinesis Client Library (KCL).
* A worker can process more than one shards from the given stream.
* Each shard is assigned its own IRecordProcessor and the worker run multiple such
* processors.
*/
var worker: Worker = null
private var worker: Worker = null

/**
* This is called when the KinesisReceiver starts and must be non-blocking.
* The KCL creates and manages the receiving/processing thread pool through the Worker.run()
* method.
* This is called when the KinesisReceiver starts and must be non-blocking.
* The KCL creates and manages the receiving/processing thread pool through Worker.run().
*/
override def onStart() {
workerId = Utils.localHostName() + ":" + UUID.randomUUID()
credentialsProvider = new DefaultAWSCredentialsProviderChain()
kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName,
credentialsProvider, workerId).withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream).withTaskBackoffTimeMillis(500)
recordProcessorFactory = new IRecordProcessorFactory {

// KCL config instance
val awsCredProvider = resolveAWSCredentialsProvider()
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(appName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)

/*
* RecordProcessorFactory creates impls of IRecordProcessor.
* IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
* IRecordProcessor.processRecords() method.
* We're using our custom KinesisRecordProcessor in this case.
*/
val recordProcessorFactory = new IRecordProcessorFactory {
override def createProcessor: IRecordProcessor = new KinesisRecordProcessor(receiver,
workerId, new KinesisCheckpointState(checkpointInterval))
}

worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
worker.run()

logInfo(s"Started receiver with workerId $workerId")
}

/**
* This is called when the KinesisReceiver stops.
* The KCL worker.shutdown() method stops the receiving/processing threads.
* The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
* This is called when the KinesisReceiver stops.
* The KCL worker.shutdown() method stops the receiving/processing threads.
* The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
*/
override def onStop() {
worker.shutdown()
logInfo(s"Shut down receiver with workerId $workerId")
if (worker != null) {
worker.shutdown()
logInfo(s"Stopped receiver for workerId $workerId")
worker = null
}
workerId = null
credentialsProvider = null
kinesisClientLibConfiguration = null
recordProcessorFactory = null
worker = null
}

/**
* If AWS credential is provided, return a AWSCredentialProvider returning that credential.
* Otherwise, return the DefaultAWSCredentialsProviderChain.
*/
private def resolveAWSCredentialsProvider(): AWSCredentialsProvider = {
awsCredentialsOption match {
case Some(awsCredentials) =>
logInfo("Using provided AWS credentials")
new AWSCredentialsProvider {
override def getCredentials: AWSCredentials = awsCredentials
override def refresh(): Unit = { }
}
case None =>
logInfo("Using DefaultAWSCredentialsProviderChain")
new DefaultAWSCredentialsProviderChain()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ import com.amazonaws.services.kinesis.model.Record
/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
* This implementation operates on the Array[Byte] from the KinesisReceiver.
* The Kinesis Worker creates an instance of this KinesisRecordProcessor upon startup.
* The Kinesis Worker creates an instance of this KinesisRecordProcessor for each
* shard in the Kinesis stream upon startup. This is normally done in separate threads,
* but the KCLs within the KinesisReceivers will balance themselves out if you create
* multiple Receivers.
*
* @param receiver Kinesis receiver
* @param workerId for logging purposes
Expand All @@ -47,17 +50,17 @@ private[kinesis] class KinesisRecordProcessor(
workerId: String,
checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging {

/* shardId to be populated during initialize() */
var shardId: String = _
// shardId to be populated during initialize()
private var shardId: String = _

/**
* The Kinesis Client Library calls this method during IRecordProcessor initialization.
*
* @param shardId assigned by the KCL to this particular RecordProcessor.
*/
override def initialize(shardId: String) {
logInfo(s"Initialize: Initializing workerId $workerId with shardId $shardId")
this.shardId = shardId
logInfo(s"Initialized workerId $workerId with shardId $shardId")
}

/**
Expand All @@ -73,12 +76,17 @@ private[kinesis] class KinesisRecordProcessor(
if (!receiver.isStopped()) {
try {
/*
* Note: If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
* Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
* internally-configured Spark serializer (kryo, etc).
* This is not desirable, so we instead store a raw Array[Byte] and decouple
* ourselves from Spark's internal serialization strategy.
*/
* Notes:
* 1) If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
* Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
* internally-configured Spark serializer (kryo, etc).
* 2) This is not desirable, so we instead store a raw Array[Byte] and decouple
* ourselves from Spark's internal serialization strategy.
* 3) For performance, the BlockGenerator is asynchronously queuing elements within its
* memory before creating blocks. This prevents the small block scenario, but requires
* that you register callbacks to know when a block has been generated and stored
* (WAL is sufficient for storage) before can checkpoint back to the source.
*/
batch.foreach(record => receiver.store(record.getData().array()))

logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
Expand Down Expand Up @@ -116,7 +124,7 @@ private[kinesis] class KinesisRecordProcessor(
logError(s"Exception: WorkerId $workerId encountered and exception while storing " +
" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)

/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor.*/
/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
throw e
}
}
Expand Down Expand Up @@ -190,7 +198,7 @@ private[kinesis] object KinesisRecordProcessor extends Logging {
logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
}
/* Throw: Shutdown has been requested by the Kinesis Client Library.*/
/* Throw: Shutdown has been requested by the Kinesis Client Library. */
case _: ShutdownException => {
logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e)
throw e
Expand Down
Loading

0 comments on commit ca4257a

Please sign in to comment.