Skip to content

Commit

Permalink
refactor to start supporting non-monolith runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Avery committed Apr 11, 2019
1 parent 07fe8dd commit 0b80e62
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 94 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package io.confluent.kpay;

import io.confluent.common.utils.TestUtils;
import io.confluent.kpay.control.ControlProperties;
import io.confluent.kpay.control.Controllable;
import io.confluent.kpay.control.PauseControllable;
import io.confluent.kpay.control.StartStopController;
import io.confluent.kpay.control.model.Status;
import io.confluent.kpay.metrics.PaymentsThroughput;
import io.confluent.kpay.metrics.model.ThroughputStats;
import io.confluent.kpay.payments.AccountProcessor;
import io.confluent.kpay.payments.PaymentProperties;
import io.confluent.kpay.payments.PaymentsConfirmed;
import io.confluent.kpay.payments.PaymentsInFlight;
import io.confluent.kpay.payments.model.AccountBalance;
Expand All @@ -19,46 +20,35 @@
import io.confluent.kpay.util.KafkaTopicClient;
import io.confluent.kpay.util.KafkaTopicClientImpl;
import io.confluent.kpay.util.Pair;
import java.math.BigDecimal;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Collections.sort;

public class KPayAllInOneImpl implements KPay {
private static final Logger log = LoggerFactory.getLogger(KPayAllInOneImpl.class);

private String paymentsIncomingTopic = "kpay.payments.incoming";
private String paymentsInflightTopic = "kpay.payments.inflight";
private String paymentsCompleteTopic = "kpay.payments.complete";
private String paymentsConfirmedTopic = "kpay.payments.confirmed";
private String controlStatusTopic = "kpay.control.status";

private String bootstrapServers;

private KafkaTopicClient topicClient;

private long instanceId = System.currentTimeMillis();


private PaymentsInFlight paymentsInFlight;
private AccountProcessor paymentAccountProcessor;
private PaymentsConfirmed paymentsConfirmed;
Expand All @@ -69,17 +59,15 @@ public class KPayAllInOneImpl implements KPay {
private StartStopController controllerStartStop;
private ScheduledFuture scheduledPaymentFuture;
private PaymentRunnable paymentRunnable;
private String thisIpAddress;

public KPayAllInOneImpl(String bootstrapServers) {

this.bootstrapServers = bootstrapServers;
}

/**
* Fire up all of the stream processors
*/
public void start() throws UnknownHostException {
public void startProcessors() throws UnknownHostException {

Controllable pauseController = new PauseControllable();

Expand All @@ -98,25 +86,27 @@ public String getMetaStores() {
}

private void startController(Controllable pauseController) {
controllerStartStop = new StartStopController(controlStatusTopic, getControlProperties(bootstrapServers), pauseController);
controllerStartStop = new StartStopController(ControlProperties.controlStatusTopic, ControlProperties.get(bootstrapServers), pauseController);
controllerStartStop.start();
}

private int instrumentationPortOffset = 21000;
private void startInstrumentation() throws UnknownHostException {
instrumentationThroughput = new PaymentsThroughput(paymentsCompleteTopic, getPaymentsProperties(bootstrapServers, instrumentationPortOffset++));

private void startInstrumentation() {
instrumentationThroughput = new PaymentsThroughput(PaymentProperties.paymentsCompleteTopic, PaymentProperties.get(bootstrapServers, instrumentationPortOffset++));
instrumentationThroughput.start();
}

private int paymentsPortOffset = 20000;
private void startPaymentPipeline(Controllable pauseController) throws UnknownHostException {
paymentsInFlight = new PaymentsInFlight(paymentsIncomingTopic, paymentsInflightTopic, paymentsCompleteTopic, getPaymentsProperties(bootstrapServers, paymentsPortOffset++), pauseController);

private void startPaymentPipeline(Controllable pauseController) {
paymentsInFlight = new PaymentsInFlight(PaymentProperties.paymentsIncomingTopic, PaymentProperties.paymentsInflightTopic, PaymentProperties.paymentsCompleteTopic, PaymentProperties.get(bootstrapServers, paymentsPortOffset++), pauseController);
paymentsInFlight.start();

paymentAccountProcessor = new AccountProcessor(paymentsInflightTopic, paymentsCompleteTopic, getPaymentsProperties(bootstrapServers, paymentsPortOffset++));
paymentAccountProcessor = new AccountProcessor(PaymentProperties.paymentsInflightTopic, PaymentProperties.paymentsCompleteTopic, PaymentProperties.get(bootstrapServers, paymentsPortOffset++));
paymentAccountProcessor.start();

paymentsConfirmed = new PaymentsConfirmed(paymentsCompleteTopic, paymentsConfirmedTopic, getPaymentsProperties(bootstrapServers, paymentsPortOffset++));
paymentsConfirmed = new PaymentsConfirmed(PaymentProperties.paymentsCompleteTopic, PaymentProperties.paymentsConfirmedTopic, PaymentProperties.get(bootstrapServers, paymentsPortOffset++));
paymentsConfirmed.start();
}

Expand Down Expand Up @@ -166,7 +156,7 @@ public void run() {
try {
Payment payment = new Payment("pay-" + System.currentTimeMillis(), System.currentTimeMillis() + "", from[position % from.length], to[position % from.length], new BigDecimal(Math.round((Math.random() * 100.0)*100.0)/100.0), Payment.State.incoming, System.currentTimeMillis() );
log.info("Send:" + payment);
producer.send(buildRecord(paymentsIncomingTopic, System.currentTimeMillis(), payment.getId(), payment));
producer.send(buildRecord(PaymentProperties.paymentsIncomingTopic, System.currentTimeMillis(), payment.getId(), payment));
position++;
producer.flush();
} catch (Throwable t) {
Expand Down Expand Up @@ -238,34 +228,10 @@ public String showAccountDetails(String accountName) {
return null;
}

private Properties getPaymentsProperties(String broker, int portOffset) throws UnknownHostException {
Properties properties = getProperties(broker, Serdes.String().getClass().getName(), Payment.Serde.class.getName());
// payment processors start from 20000
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, getIpAddress() + ":" + portOffset);
System.out.println(" APP PORT:" + properties.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
return properties;
}

private Properties getControlProperties(String broker) {
return getProperties(broker, Serdes.String().getClass().getName(), Status.Serde.class.getName());
}
private Properties getProperties(String broker, String keySerdesClass, String valueSerdesClass) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "TEST-APP-ID-" + instanceId++);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdesClass);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdesClass);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
return props;
}

public void initializeEnvironment() {
getTopicClient().createTopic(paymentsInflightTopic, 5, (short)1);
getTopicClient().createTopic(paymentsIncomingTopic, 5, (short) 1);
getTopicClient().createTopic(paymentsCompleteTopic, 5, (short) 1);
getTopicClient().createTopic(paymentsConfirmedTopic, 5, (short) 1);
getTopicClient().createTopic(controlStatusTopic, 5, (short) 1);
PaymentProperties.initializeEnvironment(getTopicClient());
ControlProperties.initializeEnvironment(getTopicClient());
}

private KafkaTopicClient getTopicClient() {
Expand All @@ -292,15 +258,6 @@ private Properties properties() {
return producerConfig;
}

private String getIpAddress() throws UnknownHostException {
if (thisIpAddress == null) {
InetAddress thisIp = InetAddress.getLocalHost();
thisIpAddress = thisIp.getHostAddress().toString();
}
return thisIpAddress;
}


private interface PaymentRunnable extends Runnable {
void stop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
**/
package io.confluent.kpay;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.UnknownHostException;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KPayInstance {

Expand All @@ -37,7 +36,7 @@ public KPay getInstance() {


/**
* Note: dont care about double locking because it is always created on startup in the Servlet Lifecycle.start()
* Note: dont care about double locking because it is always created on startup in the Servlet Lifecycle.startProcessors()
*/
private static volatile KPayInstance singleton = null;

Expand All @@ -55,10 +54,10 @@ public static KPayInstance getInstance(Properties propertes) {
kPay.initializeEnvironment();

try {
kPay.start();
kPay.startProcessors();
} catch (UnknownHostException e) {
e.printStackTrace();
log.error("Cannot start due to network config", e);
log.error("Cannot startProcessors due to network config", e);
}

singleton = new KPayInstance(kPay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
Expand Down Expand Up @@ -62,8 +61,8 @@ public String info() {
* @return
*/
@POST
@Path("/payments/start")
@Operation(summary = "start processing some payments",
@Path("/payments/startProcessors")
@Operation(summary = "startProcessors processing some payments",
tags = {"payment generation"},
responses = {
@ApiResponse(content = @Content(schema = @Schema(implementation = String.class))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
**/
package io.confluent.kpay;

import java.io.File;
import java.util.Properties;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
Expand All @@ -26,10 +28,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.net.MalformedURLException;
import java.util.Properties;

/**
* Fire up Rest endpoint and Swagger UI
* Swagger will run on http://ipaddresss:port/swagger
Expand Down Expand Up @@ -142,7 +140,7 @@ public static void start() {
try {
server.start();
} catch (Exception ex) {
log.error("Failed to start", ex);
log.error("Failed to startProcessors", ex);
ex.printStackTrace();
System.exit(1);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.confluent.kpay.control;

import io.confluent.kpay.control.model.Status;
import io.confluent.kpay.util.KafkaTopicClient;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;

import static io.confluent.kpay.util.PropertiesGetter.getProperties;

public class ControlProperties {
final public static String controlStatusTopic = "kpay.control.status";
final static int partitionCount = 5;
final static short replicaCount = 1;


public static Properties get(String broker) {
return getProperties(broker, Serdes.String().getClass().getName(), Status.Serde.class.getName());
}

public static void initializeEnvironment(KafkaTopicClient topicClient) {
topicClient.createTopic(controlStatusTopic, partitionCount, replicaCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
public interface Controllable {

/**
* Whatever needs to stop and start will block on this call
* Whatever needs to stop and startProcessors will block on this call
* @return
*/
boolean pauseMaybe();

/**
* Event hooks to control start and stop
* Event hooks to control startProcessors and stop
*/
void startProcessing();
void pauseProcessing();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.confluent.kpay.payments;

import io.confluent.kpay.payments.model.Payment;
import io.confluent.kpay.util.KafkaTopicClient;
import io.confluent.kpay.util.PropertiesGetter;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

public class PaymentProperties {
final static int partitionCount = 5;
final static short replicaCount = 1;

final static public String paymentsIncomingTopic = "kpay.payments.incoming";
final static public String paymentsInflightTopic = "kpay.payments.inflight";
final static public String paymentsCompleteTopic = "kpay.payments.complete";
final static public String paymentsConfirmedTopic = "kpay.payments.confirmed";


public static Properties get(String broker, int portOffset) {
Properties properties = PropertiesGetter.getProperties(broker, Serdes.String().getClass().getName(), Payment.Serde.class.getName());
// payment processors startProcessors from 20000
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, PropertiesGetter.getIpAddress() + ":" + portOffset);
System.out.println(" APP PORT:" + properties.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
return properties;
}

public static void initializeEnvironment(KafkaTopicClient topicClient) {
topicClient.createTopic(PaymentProperties.paymentsInflightTopic, partitionCount, replicaCount);
topicClient.createTopic(PaymentProperties.paymentsIncomingTopic, partitionCount, replicaCount);
topicClient.createTopic(PaymentProperties.paymentsCompleteTopic, partitionCount, replicaCount);
topicClient.createTopic(PaymentProperties.paymentsConfirmedTopic, partitionCount, replicaCount);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

import io.confluent.kpay.util.GenericClassUtil;
import io.confluent.kpay.util.Pair;
import java.util.*;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class KTableResourceEndpoint<K,V> implements KTableResource<K, V> {

private static final Logger log = LoggerFactory.getLogger(KTableResourceEndpoint.class);
Expand Down Expand Up @@ -64,7 +63,7 @@ public void start(Properties streamsConfig) {

if (property == null) {
RuntimeException runtimeException = new RuntimeException("Not starting RestService for:" + streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG));
log.warn("Cannot start, missing config", runtimeException);
log.warn("Cannot startProcessors, missing config", runtimeException);
return;
}
microRestService = new MicroRestService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class MicroRestService {
/**
* Start an embedded Jetty Server on the given port
* @param hostAndPortString
* @throws Exception if jetty can't start
* @throws Exception if jetty can't startProcessors
*/
public void start(Object instance, final String hostAndPortString) {
log.info("Starting RestEndpoint on:" + hostAndPortString + " Instance:" + instance);
Expand Down Expand Up @@ -53,7 +53,7 @@ public void start(Object instance, final String hostAndPortString) {
} catch (final Exception exception) {
log.error("Unavailable: " + hostAndPortString + " Instance:" + instance);
exception.printStackTrace();
throw new RuntimeException("Failed to start Jetty", exception);
throw new RuntimeException("Failed to startProcessors Jetty", exception);
}
}

Expand Down
Loading

0 comments on commit 0b80e62

Please sign in to comment.