Skip to content

Commit

Permalink
KAFKA-5368: Add test for skipped-records metric (apache#4365)
Browse files Browse the repository at this point in the history
* KAFKA-5368: Add test for skipped-records metric

Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
mjsax authored and guozhangwang committed Jan 2, 2018
1 parent 47db063 commit 39f2d45
Showing 1 changed file with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
Expand All @@ -37,6 +39,7 @@
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
Expand Down Expand Up @@ -864,6 +867,51 @@ public boolean conditionMet() {
}
}

@Test
public void shouldReportSkippedRecordsForInvalidTimestamps() throws Exception {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);

final Properties config = configProps(false);
config.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class.getName());
final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config), false);

thread.setState(StreamThread.State.RUNNING);
thread.setState(StreamThread.State.PARTITIONS_REVOKED);

final Set<TopicPartition> assignedPartitions = Collections.singleton(new TopicPartition(t1p1.topic(), t1p1.partition()));
thread.taskManager().setAssignmentMetadata(
Collections.singletonMap(
new TaskId(0, t1p1.partition()),
assignedPartitions),
Collections.<TaskId, Set<TopicPartition>>emptyMap());
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);

final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(Collections.singleton(t1p1));
mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));

final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());

long offset = -1;
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
thread.runOnce(-1);
assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());

mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
thread.runOnce(-1);
assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());

mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
thread.runOnce(-1);
assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
}

private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata metadata, StreamThread.State state) {
assertEquals(state.name(), metadata.threadState());
assertTrue(metadata.activeTasks().isEmpty());
Expand Down

0 comments on commit 39f2d45

Please sign in to comment.