Skip to content

Commit

Permalink
Fix Kinesis integration test (apache#14948)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Apr 5, 2022
1 parent 7cc5cf1 commit 05b1685
Showing 1 changed file with 34 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,34 @@
*/
package org.apache.pulsar.tests.integration.io.sinks;

import java.util.LinkedHashMap;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;

@Slf4j
public class KinesisSinkTester extends SinkTester<LocalStackContainer> {

private static final String NAME = "kinesis";
private static final int LOCALSTACK_SERVICE_PORT = 4566;
public static final String STREAM_NAME = "my-stream-1";
private KinesisAsyncClient client;

Expand All @@ -64,16 +63,11 @@ public void prepareSink() throws Exception {
final LocalStackContainer localStackContainer = getServiceContainer();
final URI endpointOverride = localStackContainer.getEndpointOverride(LocalStackContainer.Service.KINESIS);
sinkConfig.put("awsEndpoint", NAME);
sinkConfig.put("awsEndpointPort", endpointOverride.getPort());
sinkConfig.put("awsEndpointPort", LOCALSTACK_SERVICE_PORT);
sinkConfig.put("skipCertificateValidation", true);
client = KinesisAsyncClient.builder().credentialsProvider(new AwsCredentialsProvider() {
@Override
public AwsCredentials resolveCredentials() {
return AwsBasicCredentials.create(
"access",
"secret");
}
})
client = KinesisAsyncClient.builder().credentialsProvider(() -> AwsBasicCredentials.create(
"access",
"secret"))
.region(Region.US_EAST_1)
.endpointOverride(endpointOverride)
.build();
Expand All @@ -84,8 +78,14 @@ public AwsCredentials resolveCredentials() {
.build())
.get();
log.info("prepareSink for kinesis: created stream {}", STREAM_NAME);
}


@Override
public void stopServiceContainer(PulsarCluster cluster) {
if (client != null) {
client.close();
}
super.stopServiceContainer(cluster);
}

@Override
Expand All @@ -95,13 +95,12 @@ protected LocalStackContainer createSinkService(PulsarCluster cluster) {
}

@Override
@SneakyThrows
public void validateSinkResult(Map<String, String> kvs) {
Awaitility.await().untilAsserted(() -> validateSinkResult());
Awaitility.await().untilAsserted(() -> internalValidateSinkResult(kvs));
}

@SneakyThrows
private void validateSinkResult() {
private void internalValidateSinkResult(Map<String, String> kvs) {
final String shardId = client.listShards(
ListShardsRequest.builder()
.streamName(STREAM_NAME)
Expand All @@ -118,15 +117,30 @@ private void validateSinkResult() {
.build())
.get()
.shardIterator();

Map<String, String> records = new LinkedHashMap<>();

// millisBehindLatest equals zero when record processing is caught up,
// and there are no new records to process at this moment.
// See https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
Awaitility.await().until(() -> addMoreRecordsAndGetMillisBehindLatest(records, iterator) == 0);

assertEquals(kvs, records);
}

@SneakyThrows
private Long addMoreRecordsAndGetMillisBehindLatest(Map<String, String> records, String iterator) {
final GetRecordsResponse response = client.getRecords(
GetRecordsRequest
.builder()
.shardIterator(iterator)
.build())
.get();
assertTrue(response.hasRecords());
for (Record record : response.records()) {
assertTrue(record.data().asString(StandardCharsets.UTF_8).startsWith("value-"));
if(response.hasRecords()) {
response.records().forEach(
record -> records.put(record.partitionKey(), record.data().asString(StandardCharsets.UTF_8))
);
}
return response.millisBehindLatest();
}
}

0 comments on commit 05b1685

Please sign in to comment.