Skip to content

Commit

Permalink
Pickup Data: Removing the Java var declarations.
Browse files Browse the repository at this point in the history
They're creating a JDK-version issue I don't need. :-D
  • Loading branch information
Kris Jenkins committed Nov 17, 2021
1 parent 8b946a7 commit 00b9a0e
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public AdventureWeb() throws IOException {
@OnOpen
public void onOpen(Session session, @PathParam("user-id") String userId) throws IOException {
logger.info("CONNECTED");
var onOpenEventevent = new Event<String, String>("WebSocket", userId, "HELLO");
Event<String, String> onOpenEventevent = new Event<String, String>("WebSocket", userId, "HELLO");
sendEvent(session, onOpenEventevent);

consumerThread = new Thread(() -> {
Expand Down Expand Up @@ -97,7 +97,8 @@ public void onMessage(Session session, @PathParam("user-id") String userId, Stri
});
logger.info("Got event: {}", event);

var record = new ProducerRecord<String, CommandValue>(COMMANDS_STREAM, userId, event.getValue());
ProducerRecord<String, CommandValue> record =
new ProducerRecord<String, CommandValue>(COMMANDS_STREAM, userId, event.getValue());
logger.info("Sending: {}", record);

producer.send(record, (receipt, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
Expand All @@ -42,7 +43,7 @@ public static void main(String[] args) {
throw new RuntimeException(e);
}

final var builder = new StreamsBuilder();
final StreamsBuilder builder = new StreamsBuilder();

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Expand All @@ -51,18 +52,19 @@ public static void main(String[] args) {
Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);

// Serdes galore.
var commandValueSerde = new SpecificAvroSerde<CommandValue>();
SpecificAvroSerde<CommandValue> commandValueSerde = new SpecificAvroSerde<>();
commandValueSerde.configure(schemaRegistryProps, false);

var responseValueSerde = new SpecificAvroSerde<ResponseValue>();
SpecificAvroSerde<ResponseValue> responseValueSerde = new SpecificAvroSerde<>();
responseValueSerde.configure(schemaRegistryProps, false);

// Let's start some streams.
var commandStream = builder.stream(COMMANDS_STREAM, Consumed.with(Serdes.UUID(), commandValueSerde));
KStream<UUID, CommandValue> commandStream =
builder.stream(COMMANDS_STREAM, Consumed.with(Serdes.UUID(), commandValueSerde));

// Echo.
commandStream.mapValues(command -> {
var response = new ResponseValue();
ResponseValue response = new ResponseValue();
response.setSOURCE("Echo");
response.setRESPONSE(String.format("ECHO: %s", command.getCOMMAND()));
return response;
Expand All @@ -81,7 +83,7 @@ public static void main(String[] args) {
final Topology topology = builder.build();
logger.info("Topology: {}", topology.describe());

var streams = new KafkaStreams(topology, streamsConfiguration);
final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
streams.start();

logger.info("Installing shutdown hook.");
Expand All @@ -90,7 +92,7 @@ public static void main(String[] args) {

public static Properties loadProperties(String fileName) throws IOException {
try (FileInputStream input = new FileInputStream(fileName)) {
var props = new Properties();
final Properties props = new Properties();
props.load(input);
return props;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Map;
import java.util.UUID;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Branched;
Expand All @@ -20,17 +21,17 @@

public class InitialCommandProcessor {
public static void buildStreams(Map<String, String> schemaRegistryProps, StreamsBuilder builder) {
var commandValueSerde = new SpecificAvroSerde<CommandValue>();
SpecificAvroSerde<CommandValue> commandValueSerde = new SpecificAvroSerde<>();
commandValueSerde.configure(schemaRegistryProps, false);
var movementCommandValueSerde = new SpecificAvroSerde<MovementCommandValue>();
Serde<MovementCommandValue> movementCommandValueSerde = new SpecificAvroSerde<>();
movementCommandValueSerde.configure(schemaRegistryProps, false);
var statusCommandValueSerde = new SpecificAvroSerde<StatusCommandValue>();
Serde<StatusCommandValue> statusCommandValueSerde = new SpecificAvroSerde<>();
statusCommandValueSerde.configure(schemaRegistryProps, false);
var inventoryCommandValueSerde = new SpecificAvroSerde<InventoryCommandValue>();
Serde<InventoryCommandValue> inventoryCommandValueSerde = new SpecificAvroSerde<>();
inventoryCommandValueSerde.configure(schemaRegistryProps, false);
var responseValueSerde = new SpecificAvroSerde<ResponseValue>();
SpecificAvroSerde<ResponseValue> responseValueSerde = new SpecificAvroSerde<>();
responseValueSerde.configure(schemaRegistryProps, false);
var producedResponse = Produced.with(Serdes.UUID(), responseValueSerde);
Produced<UUID, ResponseValue> producedResponse = Produced.with(Serdes.UUID(), responseValueSerde);

BranchedKStream<UUID, CommandValue> commandBranches =
builder.stream(COMMANDS_STREAM, Consumed.with(Serdes.UUID(), commandValueSerde)).split();
Expand Down Expand Up @@ -59,7 +60,7 @@ public static void buildStreams(Map<String, String> schemaRegistryProps, Streams
commandBranches.branch((k, v) -> {
return "HELP".equals(v.getCOMMAND());
}, Branched.withConsumer(stream -> stream.mapValues(v -> {
var responseString = new StringBuilder();
StringBuilder responseString = new StringBuilder();
responseString.append("Available commands are:\n");
responseString.append("\tLOOK\n");
responseString.append("\tGO NORTH\n");
Expand All @@ -71,19 +72,19 @@ public static void buildStreams(Map<String, String> schemaRegistryProps, Streams
responseString.append("\tINVENTORY\n");
responseString.append("\tHELP\n");

var response = new ResponseValue();
ResponseValue response = new ResponseValue();
response.setSOURCE("Help");
response.setRESPONSE(responseString.toString());
return response;
}).to(RESPONSES_STREAM, producedResponse)));

// Fallback.
commandBranches.defaultBranch(Branched.withConsumer(stream -> stream.mapValues(msg -> {
var response = new ResponseValue();
ResponseValue response = new ResponseValue();
response.setSOURCE("Unknown Command");
response.setRESPONSE(String.format("Unknown command: %s\n\nTry asking for HELP", msg.getCOMMAND()));
return response;
}).to(RESPONSES_STREAM, producedResponse)));

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Map;
import java.util.UUID;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Branched;
Expand All @@ -34,27 +35,27 @@ public class InventoryProcessor {

public static void buildStreams(Map<String, String> schemaRegistryProps, StreamsBuilder builder,
KTable<UUID, LocationData> latestUserLocation) {
var inventoryCommandValueSerde = new SpecificAvroSerde<InventoryCommandValue>();
Serde<InventoryCommandValue> inventoryCommandValueSerde = new SpecificAvroSerde<>();
inventoryCommandValueSerde.configure(schemaRegistryProps, false);
Runtime.getRuntime().addShutdownHook(new Thread(inventoryCommandValueSerde::close));

var inventoryValueSerde = new SpecificAvroSerde<InventoryValue>();
Serde<InventoryValue> inventoryValueSerde = new SpecificAvroSerde<>();
inventoryValueSerde.configure(schemaRegistryProps, false);
Runtime.getRuntime().addShutdownHook(new Thread(inventoryValueSerde::close));

var responseValueSerde = new SpecificAvroSerde<ResponseValue>();
SpecificAvroSerde<ResponseValue> responseValueSerde = new SpecificAvroSerde<>();
responseValueSerde.configure(schemaRegistryProps, false);
Runtime.getRuntime().addShutdownHook(new Thread(responseValueSerde::close));

var knapsackSerde = new SpecificAvroSerde<Knapsack>();
Serde<Knapsack> knapsackSerde = new SpecificAvroSerde<>();
knapsackSerde.configure(schemaRegistryProps, false);
Runtime.getRuntime().addShutdownHook(new Thread(knapsackSerde::close));

var itemRulesValueSerde = new SpecificAvroSerde<ItemRulesValue>();
Serde<ItemRulesValue> itemRulesValueSerde = new SpecificAvroSerde<>();
itemRulesValueSerde.configure(schemaRegistryProps, false);
Runtime.getRuntime().addShutdownHook(new Thread(itemRulesValueSerde::close));

var statusCommandValueSerde = new SpecificAvroSerde<StatusCommandValue>();
Serde<StatusCommandValue> statusCommandValueSerde = new SpecificAvroSerde<>();
statusCommandValueSerde.configure(schemaRegistryProps, false);
Runtime.getRuntime().addShutdownHook(new Thread(statusCommandValueSerde::close));

Expand All @@ -77,13 +78,13 @@ public static void buildStreams(Map<String, String> schemaRegistryProps, Streams
newInventoryStream
.split()
.branch((k, pair) -> pair.getValue1(), Branched.withConsumer(stream -> stream.mapValues(pair -> {
var v = new InventoryValue();
InventoryValue v = new InventoryValue();
v.setITEM(pair.getValue0());
v.setHELD(true);
return v;
}).to(INVENTORY_STREAM, Produced.with(Serdes.UUID(), inventoryValueSerde))))
.defaultBranch(Branched.withConsumer(stream -> stream.mapValues(pair -> {
var response = new ResponseValue();
ResponseValue response = new ResponseValue();
response.setSOURCE("Inventory");
response.setRESPONSE(String.format("You cannot pick up: %s", pair.getValue0()));
return response;
Expand All @@ -93,7 +94,7 @@ public static void buildStreams(Map<String, String> schemaRegistryProps, Streams
.stream(INVENTORY_STREAM, Consumed.with(Serdes.UUID(), inventoryValueSerde))
.filter((k, inventoryValue) -> inventoryValue.getHELD())
.mapValues((k, inventoryValue) -> {
var response = new ResponseValue();
ResponseValue response = new ResponseValue();
response.setSOURCE("Inventory");
response.setRESPONSE(String.format("You pick up: %s.", inventoryValue.getITEM()));
return response;
Expand All @@ -105,7 +106,7 @@ public static void buildStreams(Map<String, String> schemaRegistryProps, Streams
.stream(INVENTORY_STREAM, Consumed.with(Serdes.UUID(), inventoryValueSerde))
.groupByKey(Grouped.with(Serdes.UUID(), inventoryValueSerde))
.aggregate(() -> {
var knapsack = new Knapsack();
Knapsack knapsack = new Knapsack();
knapsack.setOBJECTS(new ArrayList<String>());
return knapsack;
}, (k, item, knapsack) -> {
Expand All @@ -114,7 +115,7 @@ public static void buildStreams(Map<String, String> schemaRegistryProps, Streams
logger.info("Adding {} to {}", item, knapsack);
currentObjects.add(item.getITEM());
} else {
var filteredObjects = new ArrayList<String>(knapsack.getOBJECTS());
ArrayList<String> filteredObjects = new ArrayList<>(knapsack.getOBJECTS());
logger.info("Removing {} from {}", item, knapsack);
filteredObjects.removeIf(item.getITEM()::equals);
knapsack.setOBJECTS(filteredObjects);
Expand All @@ -138,7 +139,7 @@ public static void buildStreams(Map<String, String> schemaRegistryProps, Streams
boolean isAvailable = useItemJoin.knapsack().getOBJECTS().contains(useItemJoin.inventoryCommand().getITEM());
return !isAvailable;
}, Branched.withConsumer(stream -> stream.mapValues(useItemJoin -> {
var response = new ResponseValue();
ResponseValue response = new ResponseValue();
response.setSOURCE("Inventory Use");
response.setRESPONSE("You do not have that item to use.");
return response;
Expand All @@ -154,7 +155,7 @@ public static void buildStreams(Map<String, String> schemaRegistryProps, Streams
boolean isWrongLocation = !(matchingX && matchingY);
return isWrongLocation;
}, Branched.withConsumer(stream -> stream.mapValues(v -> {
var response = new ResponseValue();
ResponseValue response = new ResponseValue();
response.setSOURCE("Inventory Use");
response.setRESPONSE("You cannot use that item here.");
return response;
Expand All @@ -173,7 +174,7 @@ public static void buildStreams(Map<String, String> schemaRegistryProps, Streams

// Send the description to the user.
stream.mapValues(v -> {
var response = new ResponseValue();
ResponseValue response = new ResponseValue();
response.setSOURCE("Inventory Use");
response.setRESPONSE(v.itemRule().getDESCRIPTION());
return response;
Expand All @@ -186,7 +187,7 @@ public static void buildStreams(Map<String, String> schemaRegistryProps, Streams
statusCommandBranches.branch((k, v) -> {
return "INVENTORY".equals(v.getCOMMAND());
}, Branched.withConsumer(stream -> stream.leftJoin(inventoryTable, (command, items) -> items).mapValues(items -> {
var response = new ResponseValue();
ResponseValue response = new ResponseValue();
response.setSOURCE("Inventory Check");

if (items == null || items.getOBJECTS().isEmpty()) {
Expand All @@ -199,4 +200,4 @@ public static void buildStreams(Map<String, String> schemaRegistryProps, Streams
}).to(RESPONSES_STREAM, producedResponse)));

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import static io.confluent.developer.adventure.Constants.*;
import static io.confluent.developer.adventure.Constants.MOVEMENT_COMMAND_STREAM;
import static io.confluent.developer.adventure.Constants.RESPONSES_STREAM;
import static java.util.Collections.emptyList;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
Expand All @@ -28,20 +30,20 @@ public class MovementProcessor {

public static KTable<UUID, LocationData> buildStreams(Map<String, String> schemaRegistryProps,
StreamsBuilder builder) {
var locationDataValueSerde = new SpecificAvroSerde<LocationDataValue>();
Serde<LocationDataValue> locationDataValueSerde = new SpecificAvroSerde<>();
locationDataValueSerde.configure(schemaRegistryProps, false);
Runtime.getRuntime().addShutdownHook(new Thread(locationDataValueSerde::close));

GlobalKTable<Position, LocationDataValue> locationDataTable =
builder.globalTable(LOCATION_DATA_STREAM, Materialized.with(PositionSerde.Serde, locationDataValueSerde));

var movementCommandValueSerde = new SpecificAvroSerde<MovementCommandValue>();
Serde<MovementCommandValue> movementCommandValueSerde = new SpecificAvroSerde<>();
movementCommandValueSerde.configure(schemaRegistryProps, false);

var locationDataSerde = new SpecificAvroSerde<LocationData>();
Serde<LocationData> locationDataSerde = new SpecificAvroSerde<>();
locationDataSerde.configure(schemaRegistryProps, false);

var responseValueSerde = new SpecificAvroSerde<ResponseValue>();
SpecificAvroSerde<ResponseValue> responseValueSerde = new SpecificAvroSerde<>();
responseValueSerde.configure(schemaRegistryProps, false);
Produced<UUID, ResponseValue> producedResponse = Produced.with(Serdes.UUID(), responseValueSerde);

Expand Down Expand Up @@ -72,7 +74,7 @@ public static KTable<UUID, LocationData> buildStreams(Map<String, String> schema
// Location description stream.
logger.info("Defining the user location change response.");
userPositionTable.toStream().mapValues(position -> {
var response = new ResponseValue();
ResponseValue response = new ResponseValue();
response.setSOURCE("Position");
response.setRESPONSE(String.format("You moved to %s", position));
return response;
Expand All @@ -89,13 +91,13 @@ public static KTable<UUID, LocationData> buildStreams(Map<String, String> schema
locationData == null ? null : locationData.getOBJECTS()));

userLocationChanges.mapValues(locationData -> {
var response = new ResponseValue();
ResponseValue response = new ResponseValue();
response.setSOURCE("Location Description");

if (locationData.getDESCRIPTION() == null) {
response.setRESPONSE("You have fallen off the edge of the map. Best head back the way you came!");
} else {
var responseString = new StringBuilder();
StringBuilder responseString = new StringBuilder();
responseString.append(locationData.getDESCRIPTION());

List<String> items = locationData.getOBJECTS();
Expand Down

0 comments on commit 00b9a0e

Please sign in to comment.