Skip to content

Commit

Permalink
[FLINK-2008] Fix broker failure test case
Browse files Browse the repository at this point in the history
This closes apache#675
  • Loading branch information
rmetzger committed May 15, 2015
1 parent bd7d867 commit 705ee8a
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,18 @@ public void run(Collector<OUT> collector) throws Exception {
LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
continue;
}
OUT out = deserializationSchema.deserialize(message.message());
if (LOG.isTraceEnabled()) {
LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition());
}
lastOffsets[message.partition()] = message.offset();

OUT out = deserializationSchema.deserialize(message.message());
if (deserializationSchema.isEndOfStream(out)) {
LOG.info("DeserializationSchema signaled end of stream for this source");
break;
}

collector.collect(out);
if (LOG.isTraceEnabled()) {
LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition());
}
}
} catch(Exception ie) {
// this exception is coming from Scala code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,25 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;

import kafka.admin.AdminUtils;
import kafka.api.PartitionMetadata;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.network.SocketServer;
import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -46,25 +55,23 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -100,8 +107,6 @@ public class KafkaITCase {
private static List<KafkaServer> brokers;
private static String brokerConnectionStrings = "";

private static boolean shutdownKafkaBroker;

private static ConsumerConfig standardCC;

private static ZkClient zkClient;
Expand Down Expand Up @@ -218,6 +223,7 @@ public void testPersistentSourceWithOffsetUpdates() throws Exception {
// write a sequence from 0 to 99 to each of the three partitions.
writeSequence(env, topicName, 0, 99);


readSequence(env, standardCC, topicName, 0, 100, 300);

// check offsets
Expand All @@ -242,7 +248,7 @@ public void testPersistentSourceWithOffsetUpdates() throws Exception {
LOG.info("Finished testPersistentSourceWithOffsetUpdates()");
}

private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception {
private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, final String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception {
LOG.info("Reading sequence for verification until final count {}", finalCount);
DataStream<Tuple2<Integer, Integer>> source = env.addSource(
new PersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), env.getConfig()), cc)
Expand All @@ -252,7 +258,7 @@ private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, Str
.map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
Thread.sleep(75);
Thread.sleep(100);
return value;
}
}).setParallelism(3);
Expand All @@ -275,6 +281,8 @@ public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) thro
for (int i = 0; i < values.length; i++) {
int v = values[i];
if (v != 3) {
LOG.warn("Test is going to fail");
printTopic(topicName, valuesCount, this.getRuntimeContext().getExecutionConfig());
throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
}
}
Expand Down Expand Up @@ -340,27 +348,6 @@ public int partition(Object key, int numPartitions) {
}
}

public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
try {
see.execute(name);
} catch (JobExecutionException good) {
Throwable t = good.getCause();
int limit = 0;
while (!(t instanceof SuccessException)) {
if(t == null) {
LOG.warn("Test failed with exception", good);
Assert.fail("Test failed with: " + good.getMessage());
}

t = t.getCause();
if (limit++ == 20) {
LOG.warn("Test failed with exception", good);
Assert.fail("Test failed with: " + good.getMessage());
}
}
}
}


@Test
public void regularKafkaSourceTest() throws Exception {
Expand Down Expand Up @@ -800,16 +787,55 @@ public void cancel() {
}

private static boolean leaderHasShutDown = false;
private static boolean shutdownKafkaBroker;

@Test(timeout=60000)
public void brokerFailureTest() throws Exception {
String topic = "brokerFailureTestTopic";

createTestTopic(topic, 2, 2);

// KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
// final String leaderToShutDown = kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
// --------------------------- write data to topic ---------------------
LOG.info("Writing data to topic {}", topic);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);

DataStream<String> stream = env.addSource(new SourceFunction<String>() {
boolean running = true;

@Override
public void run(Collector<String> collector) throws Exception {
LOG.info("Starting source.");
int cnt = 0;
while (running) {
String msg = "kafka-" + cnt++;
collector.collect(msg);
LOG.info("sending message = "+msg);

if ((cnt - 1) % 20 == 0) {
LOG.debug("Sending message #{}", cnt - 1);
}
if(cnt == 200) {
LOG.info("Stopping to produce after 200 msgs");
break;
}

}
}

@Override
public void cancel() {
LOG.info("Source got chancel()");
running = false;
}
});
stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()))
.setParallelism(1);

tryExecute(env, "broker failure test - writer");

// --------------------------- read and let broker fail ---------------------

LOG.info("Reading data from topic {} and let a broker fail", topic);
PartitionMetadata firstPart = null;
do {
if(firstPart != null) {
Expand All @@ -822,6 +848,7 @@ public void brokerFailureTest() throws Exception {
} while(firstPart.errorCode() != 0);

final String leaderToShutDown = firstPart.leader().get().connectionString();
LOG.info("Leader to shutdown {}", leaderToShutDown);

final Thread brokerShutdown = new Thread(new Runnable() {
@Override
Expand All @@ -836,11 +863,7 @@ public void run() {
}

for (KafkaServer kafkaServer : brokers) {
if (leaderToShutDown.equals(
kafkaServer.config().advertisedHostName()
+ ":"
+ kafkaServer.config().advertisedPort()
)) {
if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
LOG.info("Killing Kafka Server {}", leaderToShutDown);
kafkaServer.shutdown();
leaderHasShutDown = true;
Expand All @@ -851,11 +874,8 @@ public void run() {
});
brokerShutdown.start();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);

// add consuming topology:
DataStreamSource<String> consuming = env.addSource(
new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
DataStreamSource<String> consuming = env.addSource(new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
consuming.setParallelism(1);

consuming.addSink(new SinkFunction<String>() {
Expand All @@ -875,18 +895,24 @@ public void invoke(String value) throws Exception {
if (start == -1) {
start = v;
}
Assert.assertFalse("Received tuple twice", validator.get(v - start));
int offset = v - start;
Assert.assertFalse("Received tuple with value " + offset + " twice", validator.get(offset));
if (v - start < 0 && LOG.isWarnEnabled()) {
LOG.warn("Not in order: {}", value);
}

validator.set(v - start);
validator.set(offset);
elCnt++;
if (elCnt == 20) {
LOG.info("Asking leading broker to shut down");
// shut down a Kafka broker
shutdownKafkaBroker = true;
}

if (shutdownKafkaBroker) {
// we become a bit slower because the shutdown takes some time and we have
// only a fixed nubmer of elements to read
Thread.sleep(20);
}
if (leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
if (elCnt >= stopAfterMessages) {
// check if everything in the bitset is set to true
Expand All @@ -899,44 +925,31 @@ public void invoke(String value) throws Exception {
}
}
});
tryExecute(env, "broker failure test - reader");

// add producing topology
DataStream<String> stream = env.addSource(new SourceFunction<String>() {
boolean running = true;

@Override
public void run(Collector<String> collector) throws Exception {
LOG.info("Starting source.");
int cnt = 0;
while (running) {
String msg = "kafka-" + cnt++;
collector.collect(msg);
LOG.info("sending message = "+msg);

if ((cnt - 1) % 20 == 0) {
LOG.debug("Sending message #{}", cnt - 1);
}
}

try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
try {
see.execute(name);
} catch (JobExecutionException good) {
Throwable t = good.getCause();
int limit = 0;
while (!(t instanceof SuccessException)) {
if(t == null) {
LOG.warn("Test failed with exception", good);
Assert.fail("Test failed with: " + good.getMessage());
}
}

@Override
public void cancel() {
LOG.info("Source got chancel()");
running = false;
t = t.getCause();
if (limit++ == 20) {
LOG.warn("Test failed with exception", good);
Assert.fail("Test failed with: " + good.getMessage());
}
}
});
stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()))
.setParallelism(1);

tryExecute(env, "broker failure test");
}
}


private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
// create topic
Properties topicConfig = new Properties();
Expand Down Expand Up @@ -975,4 +988,65 @@ public static class SuccessException extends Exception {
private static final long serialVersionUID = 1L;
}


// ----------------------- Debugging utilities --------------------

/**
* Read topic to list, only using Kafka code.
* @return
*/
private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
// we request only one stream per consumer instance. Kafka will make sure that each consumer group
// will see each message only once.
Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
if(streams.size() != 1) {
throw new RuntimeException("Expected only one message stream but got "+streams.size());
}
List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
if(kafkaStreams == null) {
throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
}
if(kafkaStreams.size() != 1) {
throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
}
LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();

List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
int read = 0;
while(iteratorToRead.hasNext()) {
read++;
result.add(iteratorToRead.next());
if(read == stopAfter) {
LOG.info("Read "+read+" elements");
return result;
}
}
return result;
}

private static void printTopic(String topicName, ConsumerConfig config, DeserializationSchema deserializationSchema, int stopAfter){
List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
for(MessageAndMetadata<byte[], byte[]> message: contents) {
Object out = deserializationSchema.deserialize(message.message());
LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
}
}

private static void printTopic(String topicName, int elements, ExecutionConfig ec) {
// write the sequence to log for debugging purposes
Properties stdProps = standardCC.props().props();
Properties newProps = new Properties(stdProps);
newProps.setProperty("group.id", "topic-printer"+UUID.randomUUID().toString());
newProps.setProperty("auto.offset.reset", "smallest");
newProps.setProperty("zookeeper.connect", standardCC.zkConnect());

ConsumerConfig printerConfig = new ConsumerConfig(newProps);
DeserializationSchema deserializer = new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), ec);
printTopic(topicName, printerConfig, deserializer, elements);
}

}
Loading

0 comments on commit 705ee8a

Please sign in to comment.