Skip to content

Commit

Permalink
[hotfix][kafka] Undo DataGenerators changes (use inline kafka produce…
Browse files Browse the repository at this point in the history
…r again
  • Loading branch information
rmetzger committed Oct 12, 2016
1 parent 15df71b commit 744f8eb
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 347 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
return stream.addSink(prod);
/* FlinkKafkaProducer010.FlinkKafkaProducer010Configuration<T> sink = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props, partitioner);
sink.setFlushOnCheckpoint(true);
return sink; */
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ public void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}

// @Test(timeout = 60000)
// public void testPunctuatedExplicitWMConsumer() throws Exception {
// runExplicitPunctuatedWMgeneratingConsumerTest(false);
// }

// @Test(timeout = 60000)
// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
// runExplicitPunctuatedWMgeneratingConsumerTest(true);
// }

@Test(timeout = 60000)
public void testKeyValueSupport() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.streaming.connectors.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
Expand All @@ -31,21 +29,17 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.Table;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
Expand All @@ -68,7 +62,6 @@
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand All @@ -92,7 +85,6 @@
import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Assert;
Expand All @@ -116,7 +108,6 @@
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.test.util.TestUtils.tryExecute;
Expand Down Expand Up @@ -517,7 +508,7 @@ public void runCancelingOnFullInputTest() throws Exception {

// launch a producer thread
DataGenerators.InfiniteStringsGenerator generator =
new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic, flinkPort);
new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
generator.start();

// launch a consumer asynchronously
Expand Down Expand Up @@ -571,7 +562,6 @@ public void run() {
assertTrue(failueCause.getMessage().contains("Job was cancelled"));

if (generator.isAlive()) {
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "String generator");
generator.shutdown();
generator.join();
}
Expand Down Expand Up @@ -1723,234 +1713,4 @@ public void restoreState(Integer state) {
this.numElementsTotal = state;
}
}

///////////// Testing the Kafka consumer with embeded watermark generation functionality ///////////////

// @RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class)
// public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws Exception {
//
// final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString();
// final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString();
//
// final Map<String, Boolean> topics = new HashMap<>();
// topics.put(topic1, false);
// topics.put(topic2, emptyPartition);
//
// final int noOfTopcis = topics.size();
// final int partitionsPerTopic = 1;
// final int elementsPerPartition = 100 + 1;
//
// final int totalElements = emptyPartition ?
// partitionsPerTopic * elementsPerPartition :
// noOfTopcis * partitionsPerTopic * elementsPerPartition;
//
// createTestTopic(topic1, partitionsPerTopic, 1);
// createTestTopic(topic2, partitionsPerTopic, 1);
//
// final StreamExecutionEnvironment env =
// StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.setParallelism(partitionsPerTopic);
// env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
// env.getConfig().disableSysoutLogging();
//
// TypeInformation<Tuple2<Long, Integer>> longIntType = TypeInfoParser.parse("Tuple2<Long, Integer>");
//
// Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
// producerProperties.setProperty("retries", "0");
//
// putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType);
//
// List<String> topicTitles = new ArrayList<>(topics.keySet());
// runPunctuatedComsumer(env, topicTitles, totalElements, longIntType);
//
// executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest");
//
// for(String topic: topicTitles) {
// deleteTestTopic(topic);
// }
// }
//
// private void executeAndCatchException(StreamExecutionEnvironment env, String execName) throws Exception {
// try {
// tryExecutePropagateExceptions(env, execName);
// }
// catch (ProgramInvocationException | JobExecutionException e) {
// // look for NotLeaderForPartitionException
// Throwable cause = e.getCause();
//
// // search for nested SuccessExceptions
// int depth = 0;
// while (cause != null && depth++ < 20) {
// if (cause instanceof kafka.common.NotLeaderForPartitionException) {
// throw (Exception) cause;
// }
// cause = cause.getCause();
// }
// throw e;
// }
// }
//
// private void putDataInTopics(StreamExecutionEnvironment env,
// Properties producerProperties,
// final int elementsPerPartition,
// Map<String, Boolean> topics,
// TypeInformation<Tuple2<Long, Integer>> outputTypeInfo) {
// if(topics.size() != 2) {
// throw new RuntimeException("This method accepts two topics as arguments.");
// }
//
// TypeInformationSerializationSchema<Tuple2<Long, Integer>> sinkSchema =
// new TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig());
//
// DataStream<Tuple2<Long, Integer>> stream = env
// .addSource(new RichParallelSourceFunction<Tuple2<Long, Integer>>() {
// private boolean running = true;
//
// @Override
// public void run(SourceContext<Tuple2<Long, Integer>> ctx) throws InterruptedException {
// int topic = 0;
// int currentTs = 1;
//
// while (running && currentTs < elementsPerPartition) {
// long timestamp = (currentTs % 10 == 0) ? -1L : currentTs;
// ctx.collect(new Tuple2<Long, Integer>(timestamp, topic));
// currentTs++;
// }
//
// Tuple2<Long, Integer> toWrite2 = new Tuple2<Long, Integer>(-1L, topic);
// ctx.collect(toWrite2);
// }
//
// @Override
// public void cancel() {
// running = false;
// }
// }).setParallelism(1);
//
// List<Map.Entry<String, Boolean>> topicsL = new ArrayList<>(topics.entrySet());
//
// stream = stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() {
//
// @Override
// public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception {
// return value;
// }
// }).setParallelism(1);
// kafkaServer.produceIntoKafka(stream, topicsL.get(0).getKey(),
// new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null).setParallelism(1);
//
// if(!topicsL.get(1).getValue()) {
// stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() {
//
// @Override
// public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception {
// long timestamp = (value.f0 == -1) ? -1L : 1000 + value.f0;
// return new Tuple2<>(timestamp, 1);
// }
// }).setParallelism(1).addSink(kafkaServer.produceIntoKafka(topicsL.get(1).getKey(),
// new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null)).setParallelism(1);
// }
// }

private DataStreamSink<Tuple2<Long, Integer>> runPunctuatedComsumer(StreamExecutionEnvironment env,
List<String> topics,
final int totalElementsToExpect,
TypeInformation<Tuple2<Long, Integer>> inputTypeInfo) {

TypeInformationSerializationSchema<Tuple2<Long, Integer>> sourceSchema =
new TypeInformationSerializationSchema<>(inputTypeInfo, env.getConfig());

Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer
.getConsumer(topics, sourceSchema, props)
.assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor());

DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source);

return consuming
.transform("testingWatermarkOperator", inputTypeInfo, new WMTestingOperator())
.addSink(new RichSinkFunction<Tuple2<Long, Integer>>() {

private int elementCount = 0;

@Override
public void invoke(Tuple2<Long, Integer> value) throws Exception {
elementCount++;
if (elementCount == totalElementsToExpect) {
throw new SuccessException();
}
}

@Override
public void close() throws Exception {
super.close();
}
});
}

/** An extractor that emits a Watermark whenever the timestamp <b>in the record</b> is equal to {@code -1}. */
private static class TestPunctuatedTSExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<Long, Integer>> {

@Override
public Watermark checkAndGetNextWatermark(Tuple2<Long, Integer> lastElement, long extractedTimestamp) {
return (lastElement.f0 == -1) ? new Watermark(extractedTimestamp) : null;
}

@Override
public long extractTimestamp(Tuple2<Long, Integer> element, long previousElementTimestamp) {
return element.f0;
}
}

private static class WMTestingOperator extends AbstractStreamOperator<Tuple2<Long, Integer>> implements OneInputStreamOperator<Tuple2<Long, Integer>, Tuple2<Long, Integer>> {

private long lastReceivedWatermark = Long.MIN_VALUE;

private Map<Integer, Boolean> isEligible = new HashMap<>();
private Map<Integer, Long> perPartitionMaxTs = new HashMap<>();

WMTestingOperator() {
isEligible = new HashMap<>();
perPartitionMaxTs = new HashMap<>();
}

@Override
public void processElement(StreamRecord<Tuple2<Long, Integer>> element) throws Exception {
int partition = element.getValue().f1;
Long maxTs = perPartitionMaxTs.get(partition);
if(maxTs == null || maxTs < element.getValue().f0) {
perPartitionMaxTs.put(partition, element.getValue().f0);
isEligible.put(partition, element.getValue().f0 > lastReceivedWatermark);
}
output.collect(element);
}

@Override
public void processWatermark(Watermark mark) throws Exception {
int partition = -1;
long minTS = Long.MAX_VALUE;
for (Integer part : perPartitionMaxTs.keySet()) {
Long ts = perPartitionMaxTs.get(part);
if (ts < minTS && isEligible.get(part)) {
partition = part;
minTS = ts;
lastReceivedWatermark = ts;
}
}
isEligible.put(partition, false);

assertEquals(minTS, mark.getTimestamp());
output.emitWatermark(mark);
}

@Override
public void close() throws Exception {
super.close();
perPartitionMaxTs.clear();
isEligible.clear();
}
}
}
Loading

0 comments on commit 744f8eb

Please sign in to comment.