Skip to content

Commit

Permalink
MINOR: fixes lgtm.com warnings (apache#4582)
Browse files Browse the repository at this point in the history
fixes lgmt.com warnings
cleanup PrintForeachAction and Printed

Author: Matthias J. Sax <[email protected]>

Reviewers: Sebastian Bauersfeld <[email protected]>, Damian Guy <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
mjsax authored Feb 25, 2018
1 parent 99d650c commit 5df535e
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public LRUCache(final int maxSize) {
cache = new LinkedHashMap<K, V>(16, .75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > maxSize;
return this.size() > maxSize; // require this. prefix to make lgtm.com happy
}
};
}
Expand Down
30 changes: 13 additions & 17 deletions core/src/main/scala/kafka/tools/StreamsResetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ private void maybeReset(final String groupId,
shiftOffsetsBy(client, inputTopicPartitions, options.valueOf(shiftByOption));
} else if (options.has(toDatetimeOption)) {
final String ts = options.valueOf(toDatetimeOption);
final Long timestamp = getDateTime(ts);
final long timestamp = getDateTime(ts);
resetToDatetime(client, inputTopicPartitions, timestamp);
} else if (options.has(byDurationOption)) {
final String duration = options.valueOf(byDurationOption);
Expand All @@ -401,8 +401,7 @@ private void maybeReset(final String groupId,
}

for (final TopicPartition p : inputTopicPartitions) {
final Long position = client.position(p);
System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + position);
System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + client.position(p));
}
}
}
Expand All @@ -416,8 +415,7 @@ public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, Set<Topic
checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);

for (final TopicPartition topicPartition : inputTopicPartitions) {
final Long offset = validatedTopicPartitionsAndOffset.get(topicPartition);
client.seek(topicPartition, offset);
client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
}
}

Expand All @@ -429,7 +427,7 @@ private Map<TopicPartition, Long> getTopicPartitionOffsetFromResetPlan(String re
private void resetByDuration(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Duration duration) throws DatatypeConfigurationException {
final Date now = new Date();
duration.negate().addTo(now);
final Long timestamp = now.getTime();
final long timestamp = now.getTime();

final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
Expand All @@ -439,8 +437,7 @@ private void resetByDuration(Consumer<byte[], byte[]> client, Set<TopicPartition
final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);

for (final TopicPartition topicPartition : inputTopicPartitions) {
final Long offset = topicPartitionsAndOffset.get(topicPartition).offset();
client.seek(topicPartition, offset);
client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
}
}

Expand All @@ -453,20 +450,19 @@ private void resetToDatetime(Consumer<byte[], byte[]> client, Set<TopicPartition
final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);

for (final TopicPartition topicPartition : inputTopicPartitions) {
final Long offset = topicPartitionsAndOffset.get(topicPartition).offset();
client.seek(topicPartition, offset);
client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
}
}

// visible for testing
public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long shiftBy) {
public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, long shiftBy) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);

final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
final Long position = client.position(topicPartition);
final Long offset = position + shiftBy;
final long position = client.position(topicPartition);
final long offset = position + shiftBy;
topicPartitionsAndOffset.put(topicPartition, offset);
}

Expand Down Expand Up @@ -497,7 +493,7 @@ public void resetOffsetsTo(Consumer<byte[], byte[]> client, Set<TopicPartition>
}

// visible for testing
public Long getDateTime(String timestamp) throws ParseException {
public long getDateTime(String timestamp) throws ParseException {
final String[] timestampParts = timestamp.split("T");
if (timestampParts.length < 2) {
throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length());
Expand Down Expand Up @@ -549,10 +545,10 @@ private Map<TopicPartition, Long> checkOffsetRange(final Map<TopicPartition, Lon
final Map<TopicPartition, Long> endOffsets) {
final Map<TopicPartition, Long> validatedTopicPartitionsOffsets = new HashMap<>();
for (final Map.Entry<TopicPartition, Long> topicPartitionAndOffset : inputTopicPartitionsAndOffset.entrySet()) {
final Long endOffset = endOffsets.get(topicPartitionAndOffset.getKey());
final Long offset = topicPartitionAndOffset.getValue();
final long endOffset = endOffsets.get(topicPartitionAndOffset.getKey());
final long offset = topicPartitionAndOffset.getValue();
if (offset < endOffset) {
final Long beginningOffset = beginningOffsets.get(topicPartitionAndOffset.getKey());
final long beginningOffset = beginningOffsets.get(topicPartitionAndOffset.getKey());
if (offset > beginningOffset) {
validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), offset);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
import org.apache.kafka.streams.errors.TopologyException;

import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Objects;

/**
Expand All @@ -32,7 +31,7 @@
* @see KStream#print(Printed)
*/
public class Printed<K, V> {
protected final PrintWriter printWriter;
protected final OutputStream outputStream;
protected String label;
protected KeyValueMapper<? super K, ? super V, String> mapper = new KeyValueMapper<K, V, String>() {
@Override
Expand All @@ -41,16 +40,16 @@ public String apply(final K key, final V value) {
}
};

private Printed(final PrintWriter printWriter) {
this.printWriter = printWriter;
private Printed(final OutputStream outputStream) {
this.outputStream = outputStream;
}

/**
* Copy constructor.
* @param printed instance of {@link Printed} to copy
*/
protected Printed(final Printed<K, V> printed) {
this.printWriter = printed.printWriter;
this.outputStream = printed.outputStream;
this.label = printed.label;
this.mapper = printed.mapper;
}
Expand All @@ -69,8 +68,8 @@ public static <K, V> Printed<K, V> toFile(final String filePath) {
throw new TopologyException("filePath can't be an empty string");
}
try {
return new Printed<>(new PrintWriter(filePath, StandardCharsets.UTF_8.name()));
} catch (final FileNotFoundException | UnsupportedEncodingException e) {
return new Printed<>(new FileOutputStream(filePath));
} catch (final FileNotFoundException e) {
throw new TopologyException("Unable to write stream to file at [" + filePath + "] " + e.getMessage());
}
}
Expand All @@ -83,7 +82,7 @@ public static <K, V> Printed<K, V> toFile(final String filePath) {
* @return a new Printed instance
*/
public static <K, V> Printed<K, V> toSysOut() {
return new Printed<>((PrintWriter) null);
return new Printed<>(System.out);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
import org.apache.kafka.streams.state.StoreBuilder;

import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.io.FileOutputStream;
import java.util.Objects;
import java.util.Set;

Expand Down Expand Up @@ -346,7 +344,10 @@ public void print(final Serde<K> keySerde,
final String label) {
Objects.requireNonNull(label, "label can't be null");
final String name = builder.newProcessorName(PRINTING_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label)), this.name);
builder.internalTopologyBuilder.addProcessor(
name,
new KStreamPrint<>(new PrintForeachAction<>(System.out, defaultKeyValueMapper, label)),
this.name);
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -384,11 +385,13 @@ public void writeAsText(final String filePath,
if (filePath.trim().isEmpty()) {
throw new TopologyException("filePath can't be an empty string");
}
String name = builder.newProcessorName(PRINTING_NAME);
final String name = builder.newProcessorName(PRINTING_NAME);
try {
PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, defaultKeyValueMapper, label)), this.name);
} catch (final FileNotFoundException | UnsupportedEncodingException e) {
builder.internalTopologyBuilder.addProcessor(
name,
new KStreamPrint<>(new PrintForeachAction<>(new FileOutputStream(filePath), defaultKeyValueMapper, label)),
this.name);
} catch (final FileNotFoundException e) {
throw new TopologyException(String.format("Unable to write stream to file at [%s] %s", filePath, e.getMessage()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,45 @@
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KeyValueMapper;

import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;

public class PrintForeachAction<K, V> implements ForeachAction<K, V> {

private final String label;
private final PrintWriter printWriter;
private final boolean closable;
private final KeyValueMapper<? super K, ? super V, String> mapper;

/**
* Print customized output with given writer. The PrintWriter can be null in order to
* distinguish between {@code System.out} and the others. If the PrintWriter is {@code PrintWriter(System.out)},
* then it would close {@code System.out} output stream.
* <p>
* Afterall, not to pass in {@code PrintWriter(System.out)} but {@code null} instead.
* Print customized output with given writer. The {@link OutputStream} can be {@link System#out} or the others.
*
* @param printWriter Use {@code System.out.println} if {@code null}.
* @param outputStream The output stream to write to.
* @param mapper The mapper which can allow user to customize output will be printed.
* @param label The given name will be printed.
*/
public PrintForeachAction(final PrintWriter printWriter, final KeyValueMapper<? super K, ? super V, String> mapper, final String label) {
this.printWriter = printWriter;
PrintForeachAction(final OutputStream outputStream,
final KeyValueMapper<? super K, ? super V, String> mapper,
final String label) {
this.printWriter = new PrintWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8));
this.closable = outputStream != System.out && outputStream != System.err;
this.mapper = mapper;
this.label = label;
}

@Override
public void apply(final K key, final V value) {
final String data = String.format("[%s]: %s", label, mapper.apply(key, value));
if (printWriter == null) {
System.out.println(data);
} else {
printWriter.println(data);
}
printWriter.println(data);
}

public void close() {
if (printWriter == null) {
System.out.flush();
} else {
if (closable) {
printWriter.close();
} else {
printWriter.flush();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ public PrintedInternal(final Printed<K, V> printed) {
* @return the {@code ProcessorSupplier} to be used for printing
*/
public ProcessorSupplier<K, V> build(final String processorName) {
return new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, label != null ? label : processorName));
return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private Set<InternalTopicConfig> validateTopicPartitions(final Collection<Intern
final Map<String, Integer> existingTopicNamesPartitions) {
final Set<InternalTopicConfig> topicsToBeCreated = new HashSet<>();
for (final InternalTopicConfig topic : topicsPartitionsMap) {
final Integer numberOfPartitions = topic.numberOfPartitions();
final int numberOfPartitions = topic.numberOfPartitions();
if (existingTopicNamesPartitions.containsKey(topic.name())) {
if (!existingTopicNamesPartitions.get(topic.name()).equals(numberOfPartitions)) {
final String errorMsg = String.format("Existing internal topic %s has invalid partitions: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription
final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
for (Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
final String topic = entry.getKey();
final Integer numPartitions = entry.getValue().numPartitions;
final int numPartitions = entry.getValue().numPartitions;

for (int partition = 0; partition < numPartitions; partition++) {
allRepartitionTopicPartitions.put(new TopicPartition(topic, partition),
Expand Down Expand Up @@ -638,7 +638,7 @@ private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitio

for (final InternalTopicMetadata metadata : topicPartitions.values()) {
final InternalTopicConfig topic = metadata.config;
final Integer numPartitions = metadata.numPartitions;
final int numPartitions = metadata.numPartitions;

if (numPartitions == NOT_AVAILABLE) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,15 +894,13 @@ private void addToResetList(final TopicPartition partition, final Set<TopicParti
* @param records Records, can be null
*/
private void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) {
if (records != null && !records.isEmpty()) {
int numAddedRecords = 0;
int numAddedRecords = 0;

for (final TopicPartition partition : records.partitions()) {
final StreamTask task = taskManager.activeTask(partition);
numAddedRecords += task.addRecords(partition, records.records(partition));
}
streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
for (final TopicPartition partition : records.partitions()) {
final StreamTask task = taskManager.activeTask(partition);
numAddedRecords += task.addRecords(partition, records.records(partition));
}
streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ public class PrintedTest {

private final PrintStream originalSysOut = System.out;
private final ByteArrayOutputStream sysOut = new ByteArrayOutputStream();
private final Printed<String, Integer> sysOutPrinter = Printed.toSysOut();
private Printed<String, Integer> sysOutPrinter;

@Before
public void before() {
System.setOut(new PrintStream(sysOut));
sysOutPrinter = Printed.toSysOut();
}

@After
Expand All @@ -72,7 +73,10 @@ public void shouldCreateProcessorThatPrintsToFile() throws IOException {
@Test
public void shouldCreateProcessorThatPrintsToStdOut() throws UnsupportedEncodingException {
final ProcessorSupplier<String, Integer> supplier = new PrintedInternal<>(sysOutPrinter).build("processor");
supplier.get().process("good", 2);
final Processor<String, Integer> processor = supplier.get();

processor.process("good", 2);
processor.close();
assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: good, 2\n"));
}

Expand All @@ -83,6 +87,7 @@ public void shouldPrintWithLabel() throws UnsupportedEncodingException {
.get();

processor.process("hello", 3);
processor.close();
assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[label]: hello, 3\n"));
}

Expand All @@ -97,6 +102,7 @@ public String apply(final String key, final Integer value) {
})).build("processor")
.get();
processor.process("hello", 1);
processor.close();
assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: hello -> 1\n"));
}

Expand Down
Loading

0 comments on commit 5df535e

Please sign in to comment.