Skip to content

Commit

Permalink
Merge branch '5.0.4-post' into 5.1.0-post
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Jun 4, 2020
2 parents 289c01e + be647a0 commit 2a72d6e
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ interface Emailer {
void sendEmail(EmailTuple details);
}

public class EmailTuple {

public static class EmailTuple {
public Order order;
public Payment payment;
public Customer customer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void getWithTimeout(@PathParam("id") final String id,
}
}

class FilteredResponse<K, V> {
static class FilteredResponse<K, V> {
private final AsyncResponse asyncResponse;
private final Predicate<K, V> predicate;

Expand Down Expand Up @@ -236,7 +236,7 @@ private HostStoreInfo getKeyLocationOrBlock(final String id, final AsyncResponse
}
try {
//Sleep a bit until metadata becomes available
Thread.sleep(Math.min(Long.valueOf(CALL_TIMEOUT), 200));
Thread.sleep(Math.min(Long.parseLong(CALL_TIMEOUT), 200));
} catch (final InterruptedException e) {
e.printStackTrace();
}
Expand Down Expand Up @@ -264,6 +264,7 @@ private void fetchFromOtherHost(final String path, final AsyncResponse asyncResp
});
asyncResponse.resume(bean);
} catch (final Exception swallowed) {
log.warn("GET failed.", swallowed);
}
}

Expand Down Expand Up @@ -410,7 +411,7 @@ public static void main(final String[] args) throws Exception {
final String restPort = args.length > 3 ? args[3] : null;

Schemas.configureSerdesWithSchemaRegistryUrl(schemaRegistryUrl);
final OrdersService service = new OrdersService(restHostname, restPort == null ? 0 : Integer.valueOf(restPort));
final OrdersService service = new OrdersService(restHostname, restPort == null ? 0 : Integer.parseInt(restPort));
service.start(bootstrapServers, "/tmp/kafka-streams");
addShutdownHookAndBlock(service);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public String toString() {

public static class Topics {

public static final Map<String, Topic<?, ?>> ALL = new HashMap<>();
public final static Map<String, Topic<?, ?>> ALL = new HashMap<>();
public static Topic<String, Order> ORDERS;
public static Topic<String, OrderEnriched> ORDERS_ENRICHED;
public static Topic<String, Payment> PAYMENTS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io.confluent.examples.streams.microservices.Service;
import io.confluent.examples.streams.microservices.domain.Schemas;
import io.confluent.examples.streams.utils.MonitoringInterceptorUtils;

import org.apache.commons.compress.utils.Charsets;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -24,6 +26,7 @@

import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -115,7 +118,7 @@ public void configure(final Map<String, ?> map, final boolean b) {

@Override
public byte[] serialize(final String topic, final Product pt) {
return pt.toString().getBytes();
return pt.toString().getBytes(Charsets.UTF_8);
}

@Override
Expand All @@ -133,7 +136,7 @@ public void configure(final Map<String, ?> map, final boolean b) {

@Override
public Product deserialize(final String topic, final byte[] bytes) {
return Product.valueOf(new String(bytes));
return Product.valueOf(new String(bytes, Charsets.UTF_8));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.util.Comparator;
import java.util.Map;
import java.util.PriorityQueue;
Expand Down Expand Up @@ -50,7 +51,9 @@ public PriorityQueue<T> deserialize(final String s, final byte[] bytes) {
final int records = dataInputStream.readInt();
for (int i = 0; i < records; i++) {
final byte[] valueBytes = new byte[dataInputStream.readInt()];
dataInputStream.read(valueBytes);
if (dataInputStream.read(valueBytes) != valueBytes.length) {
throw new BufferUnderflowException();
};
priorityQueue.add(valueDeserializer.deserialize(s, valueBytes));
}
} catch (final IOException e) {
Expand Down

0 comments on commit 2a72d6e

Please sign in to comment.