Skip to content

Commit

Permalink
changes for 3P comments chapter six Manning TE
Browse files Browse the repository at this point in the history
  • Loading branch information
bbejeck committed Feb 1, 2018
1 parent c09dc0f commit 33203a0
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions src/main/java/bbejeck/chapter_6/ZMartProcessorApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@ public static void main(String[] args) throws Exception {

Topology topology = new Topology();

MapValueProcessor<String, Purchase, Purchase> maskingProcessor = new MapValueProcessor<>(p -> Purchase.builder(p).maskCreditCard().build());
MapValueProcessor<String, Purchase, RewardAccumulator> rewardProcessor = new MapValueProcessor<>(purchase -> RewardAccumulator.builder(purchase).build());
MapValueProcessor<String, Purchase, PurchasePattern> patternProcessor = new MapValueProcessor<>(purchase -> PurchasePattern.builder(purchase).build());

topology.addSource("txn-source", stringDeserializer, purchaseDeserializer, "transactions")
.addProcessor("masking-processor", () -> maskingProcessor, "txn-source")
.addProcessor("rewards-processor", () -> rewardProcessor, "txn-source")
.addProcessor("patterns-processor", () -> patternProcessor, "txn-source")
.addProcessor("masking-processor",
() -> new MapValueProcessor<String, Purchase, Purchase>(p -> Purchase.builder(p).maskCreditCard().build()), "txn-source")
.addProcessor("rewards-processor",
() -> new MapValueProcessor<String, Purchase, RewardAccumulator>(purchase -> RewardAccumulator.builder(purchase).build()), "txn-source")
.addProcessor("patterns-processor",
() -> new MapValueProcessor<String, Purchase, PurchasePattern>(purchase -> PurchasePattern.builder(purchase).build()), "txn-source")
.addSink("purchase-sink", "purchases", stringSerializer, purchaseSerializer, "masking-processor")
.addSink("rewards-sink", "rewards", stringSerializer, rewardsSerializer, "rewards-processor")
.addSink("patterns-sink", "patterns", stringSerializer, patternSerializer, "patterns-processor");
Expand Down

0 comments on commit 33203a0

Please sign in to comment.