Skip to content

Commit

Permalink
event
Browse files Browse the repository at this point in the history
  • Loading branch information
tanghanzheng committed Jul 22, 2022
1 parent 09ad21b commit 8d7e36c
Showing 1 changed file with 179 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,47 +73,174 @@ public static class ExtendedKafkaProperties extends KafkaProperties implements E
private Class<? extends EventSubscriber> subscriber;

public void inherit(String name, ExtendedKafkaProperties parent, Environment environment) {
String prefix = "concept.event.kafka.endpoints." + name;
inheritProperties(getProperties(), parent.getProperties());
inheritConsumer(getConsumer(), parent.getConsumer());
inheritProducer(getProducer(), parent.getProducer());
inheritListener(getListener(), parent.getListener());
inheritAdmin(getAdmin(), parent.getAdmin());
inheritStreams(getStreams(), parent.getStreams());
inheritConsumer(getConsumer(), parent.getConsumer(), environment, prefix + ".consumer");
inheritProducer(getProducer(), parent.getProducer(), environment, prefix + ".producer");
inheritListener(getListener(), parent.getListener(), environment, prefix + ".listener");
inheritAdmin(getAdmin(), parent.getAdmin(), environment, prefix + ".admin");
inheritStreams(getStreams(), parent.getStreams(), environment, prefix + ".streams");
inheritSsl(getSsl(), parent.getSsl());
//jaas好像是全局的,可以配置在spring.kafka中
inheritTemplate(getTemplate(), parent.getTemplate());
inheritSecurity(getSecurity(), parent.getSecurity());
inheritExtended(this, parent);
}

public void inheritProperties(Map<String, String> child, Map<String, String> parent) {
for (Map.Entry<String, String> entry : parent.entrySet()) {
String key = entry.getKey();
public <K, V> void inheritProperties(Map<K, V> child, Map<K, V> parent) {
for (Map.Entry<K, V> entry : parent.entrySet()) {
K key = entry.getKey();
if (child.containsKey(key)) {
continue;
}
child.put(key, entry.getValue());
}
}

public void inheritConsumer(Consumer child, Consumer parent) {
public void inheritConsumer(Consumer child, Consumer parent, Environment environment, String prefix) {
inheritSsl(child.getSsl(), parent.getSsl());
inheritSecurity(child.getSecurity(), parent.getSecurity());
if (child.getAutoCommitInterval() == null) {
child.setAutoCommitInterval(parent.getAutoCommitInterval());
}
if (child.getAutoOffsetReset() == null) {
child.setAutoOffsetReset(parent.getAutoOffsetReset());
}
if (child.getEnableAutoCommit() == null) {
child.setEnableAutoCommit(parent.getEnableAutoCommit());
}
if (child.getFetchMaxWait() == null) {
child.setFetchMaxWait(parent.getFetchMaxWait());
}
if (child.getFetchMinSize() == null) {
child.setFetchMinSize(parent.getFetchMinSize());
}
if (child.getGroupId() == null) {
child.setGroupId(parent.getGroupId());
}
if (child.getHeartbeatInterval() == null) {
child.setHeartbeatInterval(parent.getHeartbeatInterval());
}
String isolationLevel = environment.getProperty(prefix + ".isolation-level");
if (isolationLevel == null) {
child.setIsolationLevel(parent.getIsolationLevel());
}
String keyDeserializer = environment.getProperty(prefix + ".key-deserializer");
if (keyDeserializer == null) {
child.setKeyDeserializer(parent.getKeyDeserializer());
}
String valueDeserializer = environment.getProperty(prefix + ".value-deserializer");
if (valueDeserializer == null) {
child.setValueDeserializer(parent.getValueDeserializer());
}
if (child.getMaxPollRecords() == null) {
child.setMaxPollRecords(parent.getMaxPollRecords());
}
inheritProperties(child.getProperties(), parent.getProperties());
}

public void inheritProducer(Producer child, Producer parent) {

public void inheritProducer(Producer child, Producer parent, Environment environment, String prefix) {
inheritSsl(child.getSsl(), parent.getSsl());
inheritSecurity(child.getSecurity(), parent.getSecurity());
if (child.getAcks() == null) {
child.setAcks(parent.getAcks());
}
if (child.getBatchSize() == null) {
child.setBatchSize(parent.getBatchSize());
}
if (child.getBufferMemory() == null) {
child.setBufferMemory(parent.getBufferMemory());
}
if (child.getCompressionType() == null) {
child.setCompressionType(parent.getCompressionType());
}
String keySerializer = environment.getProperty(prefix + ".key-serializer");
if (keySerializer == null) {
child.setKeySerializer(parent.getKeySerializer());
}
String valueSerializer = environment.getProperty(prefix + ".value-serializer");
if (valueSerializer == null) {
child.setValueSerializer(parent.getValueSerializer());
}
if (child.getRetries() == null) {
child.setRetries(parent.getRetries());
}
inheritProperties(child.getProperties(), parent.getProperties());
}

public void inheritListener(Listener child, Listener parent) {

public void inheritListener(Listener child, Listener parent, Environment environment, String prefix) {
String type = environment.getProperty(prefix + ".type");
if (type == null) {
child.setType(parent.getType());
}
if (child.getAckMode() == null) {
child.setAckMode(parent.getAckMode());
}
if (child.getConcurrency() == null) {
child.setConcurrency(parent.getConcurrency());
}
if (child.getPollTimeout() == null) {
child.setPollTimeout(parent.getPollTimeout());
}
if (child.getNoPollThreshold() == null) {
child.setNoPollThreshold(parent.getNoPollThreshold());
}
if (child.getAckCount() == null) {
child.setAckCount(parent.getAckCount());
}
if (child.getAckTime() == null) {
child.setAckTime(parent.getAckTime());
}
String idleBetweenPolls = environment.getProperty(prefix + ".idle-between-polls");
if (idleBetweenPolls == null) {
child.setIdleBetweenPolls(parent.getIdleBetweenPolls());
}
if (child.getIdleEventInterval() == null) {
child.setIdleEventInterval(parent.getIdleEventInterval());
}
if (child.getMonitorInterval() == null) {
child.setMonitorInterval(parent.getMonitorInterval());
}
if (child.getLogContainerConfig() == null) {
child.setLogContainerConfig(parent.getLogContainerConfig());
}
String onlyLogRecordMetadata = environment.getProperty(prefix + ".only-log-record-metadata");
if (onlyLogRecordMetadata == null) {
child.setOnlyLogRecordMetadata(parent.isOnlyLogRecordMetadata());
}
}

public void inheritAdmin(Admin child, Admin parent) {

public void inheritAdmin(Admin child, Admin parent, Environment environment, String prefix) {
inheritSsl(child.getSsl(), parent.getSsl());
inheritSecurity(child.getSecurity(), parent.getSecurity());
inheritProperties(child.getProperties(), parent.getProperties());
String failFast = environment.getProperty(prefix + ".fail-fast");
if (failFast == null) {
child.setFailFast(parent.isFailFast());
}
}

public void inheritStreams(Streams child, Streams parent) {

public void inheritStreams(Streams child, Streams parent, Environment environment, String prefix) {
inheritSsl(child.getSsl(), parent.getSsl());
inheritSecurity(child.getSecurity(), parent.getSecurity());
inheritCleanup(child.getCleanup(), parent.getCleanup(), environment, prefix + ".cleanup");
if (child.getApplicationId() == null) {
child.setApplicationId(parent.getApplicationId());
}
String autoStartup = environment.getProperty(prefix + ".auto-startup");
if (autoStartup == null) {
child.setAutoStartup(parent.isAutoStartup());
}
if (child.getCacheMaxSizeBuffering() == null) {
child.setCacheMaxSizeBuffering(parent.getCacheMaxSizeBuffering());
}
if (child.getReplicationFactor() == null) {
child.setReplicationFactor(parent.getReplicationFactor());
}
if (child.getStateDir() == null) {
child.setStateDir(parent.getStateDir());
}
inheritProperties(child.getProperties(), parent.getProperties());
}

public void inheritSsl(Ssl child, Ssl parent) {
Expand Down Expand Up @@ -153,11 +280,45 @@ public void inheritSsl(Ssl child, Ssl parent) {
}

public void inheritTemplate(Template child, Template parent) {

if (child.getDefaultTopic() == null) {
child.setDefaultTopic(parent.getDefaultTopic());
}
}

public void inheritSecurity(Security child, Security parent) {
if (child.getProtocol() == null) {
child.setProtocol(parent.getProtocol());
}
}

public void inheritCleanup(Cleanup child, Cleanup parent, Environment environment, String prefix) {
String onStartup = environment.getProperty(prefix + ".on-startup");
if (onStartup == null) {
child.setOnStartup(parent.isOnStartup());
}
String onShutdown = environment.getProperty(prefix + ".on-shutdown");
if (onShutdown == null) {
child.setOnShutdown(parent.isOnShutdown());
}
}

public void inheritExtended(EventOperator.PropertyConfig child, EventOperator.PropertyConfig parent) {
inheritProperties(child.getMetadata(), parent.getMetadata());
if (child.getEncoder() == null) {
child.setEncoder(parent.getEncoder());
}
if (child.getDecoder() == null) {
child.setDecoder(parent.getDecoder());
}
if (child.getErrorHandler() == null) {
child.setErrorHandler(parent.getErrorHandler());
}
if (child.getPublisher() == null) {
child.setPublisher(parent.getPublisher());
}
if (child.getSubscriber() == null) {
child.setSubscriber(parent.getSubscriber());
}
}
}
}

0 comments on commit 8d7e36c

Please sign in to comment.