Skip to content

Commit

Permalink
improve dataflow and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Avery committed Jan 30, 2019
1 parent a30c2dc commit d635656
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public AccountProcessor(String paymentsInflightTopic, String paymentsCompleteTop
Materialized<String, AccountBalance, KeyValueStore<Bytes, byte[]>> accountStore = account.withKeySerde(new StringSerde()).withValueSerde(new AccountBalance.Serde());



// debit & credit processing topology
/**
* Debit & credit processing
*/
accountBalanceKTable = inflight.groupByKey()
.aggregate(
AccountBalance::new,
Expand All @@ -49,7 +50,9 @@ public AccountProcessor(String paymentsInflightTopic, String paymentsCompleteTop
Predicate<String, Payment> isCreditRecord = (key, value) -> value.getState() == Payment.State.credit;
Predicate<String, Payment> isCompleteRecord = (key, value) -> value.getState() == Payment.State.complete;

// handle payment state
/**
* Data flow and state processing
*/
KStream<String, Payment>[] branch = inflight
.map((KeyValueMapper<String, Payment, KeyValue<String, Payment>>) (key, value) -> {
if (value.getState() == Payment.State.debit) {
Expand All @@ -68,8 +71,6 @@ public AccountProcessor(String paymentsInflightTopic, String paymentsCompleteTop
branch[0].to(paymentsInflightTopic);
branch[1].to(paymentsCompleteTopic);



topology = builder.build();
}
public Topology getTopology() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,17 @@ public PaymentsConfirmed(String paymentsCompleteTopic, String paymentsConfirmedT
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Payment> complete = builder.stream(paymentsCompleteTopic);

// emit the payments as Confirmed once they have been processed
/**
* Data flow; emit the payments as Confirmed once they have been processed
*/
complete.transform(new CompleteTransformer()).to(paymentsConfirmedTopic);

Materialized<String, ConfirmedStats, WindowStore<Bytes, byte[]>> completeStore = Materialized.as("complete");
Materialized<String, ConfirmedStats, WindowStore<Bytes, byte[]>> completeWindowStore = completeStore.withKeySerde(new StringSerde()).withValueSerde(new ConfirmedStats.Serde());

/**
* Confirmation processing
*/
paymentStatsKTable = complete
.groupBy((key, value) -> "all-payments") // will force a repartition-topic
.windowedBy(TimeWindows.of(ONE_DAY))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
Expand Down Expand Up @@ -37,11 +38,12 @@ public PaymentsInFlight(String paymentsIncomingTopic, String paymentsInflightTop
KStream<String, Payment> inflight = builder.stream(Arrays.asList(paymentsIncomingTopic, paymentsCompleteTopic));

// emit the payments as Debits on the 'inflight' stream
inflight.transform(new Payment.InflightTransformer()).to(paymentsInflightTopic);

Materialized<String, PaymentStats, WindowStore<Bytes, byte[]>> inflightFirst = Materialized.as("inflight");
Materialized<String, PaymentStats, WindowStore<Bytes, byte[]>> inflightWindowStore = inflightFirst.withKeySerde(new StringSerde()).withValueSerde(new PaymentStats.Serde());

/**
* Inflight processing
*/
paymentStatsKTable = inflight
.groupBy((key, value) -> "all-payments") // will force a repartition-topic :(
.windowedBy(TimeWindows.of(ONE_DAY))
Expand All @@ -51,6 +53,16 @@ public PaymentsInFlight(String paymentsIncomingTopic, String paymentsInflightTop
inflightWindowStore
);

/**
* Data flow processing; flip incoming --> debit and filter complete events
*/
inflight.map((KeyValueMapper<String, Payment, KeyValue<String, Payment>>) (key, value) -> {
if (value.getState() == Payment.State.incoming) {
value.setState(Payment.State.debit);
}
return new KeyValue<>(key, value);
}).filter((key, value) -> value.getState() == Payment.State.debit).to(paymentsInflightTopic);

topology = builder.build();
}
public Topology getTopology() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void init(ProcessorContext context) {
@Override
public KeyValue<String, Payment> transform(String key, Payment payment) {

log.info("transform 'incoming' to 'debit': {}", payment);
log.debug("transform 'incoming' to 'debit': {}", payment);

if (payment.getState() == State.incoming) {
payment.setState(State.debit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,9 @@ public void setAmount(double amount) {
}

public PaymentStats update(Payment value) {
log.info(" InflightStats. update, processing:{} current:{} state:{}", value, this.amount, value.getState());

/**
* Note: the transformer will intercept the message and convert it to from 'incoming' -> 'debit' OR 'complete'
* The 'debit
* We need to process the debit
*/
if (value.getState() == Payment.State.debit) {
log.info(" InflightStats. update, processing:{} current:{} state:{}", value, this.amount, value.getState());

if (value.getState() == Payment.State.incoming) {
// accumulate on 'incoming' payment
this.amount += value.amount;
this.count++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,6 @@ public void incomingPaymentEmitsDebitPayment() throws Exception {
Assert.assertNotNull(inflightRecord);

System.out.println("Inflight Records:" + inflightRecord.value());


Map<String, StateStore> allStateStores = testDriver.getAllStateStores();
for (Map.Entry<String, StateStore> stringStateStoreEntry : allStateStores.entrySet()) {
System.out.println(stringStateStoreEntry.getKey() + ":" + stringStateStoreEntry.getValue());
}
}

private Properties getProperties(String broker) {
Expand Down

0 comments on commit d635656

Please sign in to comment.