From 9c5da5801a174b1f112153257752f2423dd9d6bc Mon Sep 17 00:00:00 2001 From: tanghanzheng Date: Thu, 21 Jul 2022 11:54:00 +0800 Subject: [PATCH] event --- .../core/concept/DefaultEventConcept.java | 38 ++++++++++++++++++- .../event/core/concept/EventConcept.java | 31 +++++++++++++-- .../EventConceptLifecycleListener.java | 10 +++++ .../kafka/AbstractKafkaEventSubscriber.java | 23 +++++++++++ .../kafka/KafkaEventAutoConfiguration.java | 7 ---- .../autoconfigure/EventConfiguration.java | 9 +++-- 6 files changed, 104 insertions(+), 14 deletions(-) create mode 100644 concept-event/concept-event-core/src/main/java/com/github/linyuzai/event/core/lifecycle/EventConceptLifecycleListener.java diff --git a/concept-event/concept-event-core/src/main/java/com/github/linyuzai/event/core/concept/DefaultEventConcept.java b/concept-event/concept-event-core/src/main/java/com/github/linyuzai/event/core/concept/DefaultEventConcept.java index 45731a879..8493127c3 100644 --- a/concept-event/concept-event-core/src/main/java/com/github/linyuzai/event/core/concept/DefaultEventConcept.java +++ b/concept-event/concept-event-core/src/main/java/com/github/linyuzai/event/core/concept/DefaultEventConcept.java @@ -6,6 +6,7 @@ import com.github.linyuzai.event.core.context.EventContextFactory; import com.github.linyuzai.event.core.endpoint.EventEndpoint; import com.github.linyuzai.event.core.error.EventErrorHandler; +import com.github.linyuzai.event.core.lifecycle.EventConceptLifecycleListener; import com.github.linyuzai.event.core.publisher.EventPublisher; import com.github.linyuzai.event.core.engine.EventEngine; import com.github.linyuzai.event.core.exchange.EventExchange; @@ -18,8 +19,10 @@ import java.lang.reflect.Type; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; @Getter @Setter @@ -27,6 +30,8 @@ public class DefaultEventConcept implements EventConcept { protected final Map engineMap = new ConcurrentHashMap<>(); + protected final List lifecycleListeners = new CopyOnWriteArrayList<>(); + private EventContextFactory contextFactory; private EventExchange exchange; @@ -37,6 +42,20 @@ public class DefaultEventConcept implements EventConcept { private EventErrorHandler errorHandler; + @Override + public void initialize() { + for (EventConceptLifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onInitialize(this); + } + } + + @Override + public void destroy() { + for (EventConceptLifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onDestroy(this); + } + } + @Override public EventOperator event() { return new EventOperatorImpl(); @@ -99,12 +118,29 @@ public Collection getEngines() { } @Override - public void add(Collection engines) { + public void addEngines(Collection engines) { for (EventEngine engine : engines) { this.engineMap.put(engine.getName(), engine); } } + @Override + public void removeEngines(Collection engines) { + for (String engine : engines) { + this.engineMap.remove(engine); + } + } + + @Override + public void addLifecycleListeners(Collection lifecycleListeners) { + this.lifecycleListeners.addAll(lifecycleListeners); + } + + @Override + public void removeLifecycleListeners(Collection lifecycleListeners) { + this.lifecycleListeners.removeAll(lifecycleListeners); + } + protected EventExchange useExchange(EventExchange exchange) { if (exchange != null) { return exchange; diff --git a/concept-event/concept-event-core/src/main/java/com/github/linyuzai/event/core/concept/EventConcept.java b/concept-event/concept-event-core/src/main/java/com/github/linyuzai/event/core/concept/EventConcept.java index b7c42ca55..b1ad18a53 100644 --- a/concept-event/concept-event-core/src/main/java/com/github/linyuzai/event/core/concept/EventConcept.java +++ b/concept-event/concept-event-core/src/main/java/com/github/linyuzai/event/core/concept/EventConcept.java @@ -5,6 +5,7 @@ import com.github.linyuzai.event.core.error.EventErrorHandler; import com.github.linyuzai.event.core.engine.EventEngine; import com.github.linyuzai.event.core.exchange.EventExchange; +import com.github.linyuzai.event.core.lifecycle.EventConceptLifecycleListener; import java.lang.reflect.Type; import java.util.Arrays; @@ -12,6 +13,10 @@ public interface EventConcept { + void initialize(); + + void destroy(); + EventOperator event(); EventOperator event(Type type); @@ -36,9 +41,29 @@ public interface EventConcept { Collection getEngines(); - default void add(EventEngine... engines) { - add(Arrays.asList(engines)); + default void addEngines(EventEngine... engines) { + addEngines(Arrays.asList(engines)); + } + + void addEngines(Collection engines); + + default void removeEngines(String... engines) { + removeEngines(Arrays.asList(engines)); + } + + void removeEngines(Collection engines); + + Collection getLifecycleListeners(); + + default void addLifecycleListeners(EventConceptLifecycleListener... lifecycleListeners) { + addLifecycleListeners(Arrays.asList(lifecycleListeners)); + } + + void addLifecycleListeners(Collection lifecycleListeners); + + default void removeLifecycleListeners(EventConceptLifecycleListener... lifecycleListeners) { + removeLifecycleListeners(Arrays.asList(lifecycleListeners)); } - void add(Collection engines); + void removeLifecycleListeners(Collection lifecycleListeners); } diff --git a/concept-event/concept-event-core/src/main/java/com/github/linyuzai/event/core/lifecycle/EventConceptLifecycleListener.java b/concept-event/concept-event-core/src/main/java/com/github/linyuzai/event/core/lifecycle/EventConceptLifecycleListener.java new file mode 100644 index 000000000..1e1e51007 --- /dev/null +++ b/concept-event/concept-event-core/src/main/java/com/github/linyuzai/event/core/lifecycle/EventConceptLifecycleListener.java @@ -0,0 +1,10 @@ +package com.github.linyuzai.event.core.lifecycle; + +import com.github.linyuzai.event.core.concept.EventConcept; + +public interface EventConceptLifecycleListener { + + void onInitialize(EventConcept concept); + + void onDestroy(EventConcept concept); +} diff --git a/concept-event/concept-event-kafka/src/main/java/com/github/linyuzai/event/kafka/AbstractKafkaEventSubscriber.java b/concept-event/concept-event-kafka/src/main/java/com/github/linyuzai/event/kafka/AbstractKafkaEventSubscriber.java index e8f2a0681..546ff0578 100644 --- a/concept-event/concept-event-kafka/src/main/java/com/github/linyuzai/event/kafka/AbstractKafkaEventSubscriber.java +++ b/concept-event/concept-event-kafka/src/main/java/com/github/linyuzai/event/kafka/AbstractKafkaEventSubscriber.java @@ -1,6 +1,9 @@ package com.github.linyuzai.event.kafka; +import com.github.linyuzai.event.core.concept.EventConcept; import com.github.linyuzai.event.core.context.EventContext; +import com.github.linyuzai.event.core.lifecycle.EventConceptLifecycleListener; +import lombok.AllArgsConstructor; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListenerContainer; @@ -13,9 +16,29 @@ public void subscribe(Type type, KafkaEventEndpoint endpoint, EventContext conte MessageListenerContainer container = createContainer(type, endpoint, context); container.getContainerProperties().setMessageListener(createMessageListener(type, endpoint, context)); container.start(); + EventConcept concept = context.get(EventConcept.class); + concept.addLifecycleListeners(new MessageListenerContainerStopper(container)); } public abstract MessageListenerContainer createContainer(Type type, KafkaEventEndpoint endpoint, EventContext context); public abstract MessageListener createMessageListener(Type type, KafkaEventEndpoint endpoint, EventContext context); + + @AllArgsConstructor + public static class MessageListenerContainerStopper implements EventConceptLifecycleListener { + + private MessageListenerContainer container; + + @Override + public void onInitialize(EventConcept concept) { + + } + + @Override + public void onDestroy(EventConcept concept) { + if (container.isRunning()) { + container.stop(); + } + } + } } diff --git a/concept-event/concept-event-kafka/src/main/java/com/github/linyuzai/event/kafka/KafkaEventAutoConfiguration.java b/concept-event/concept-event-kafka/src/main/java/com/github/linyuzai/event/kafka/KafkaEventAutoConfiguration.java index 02dfc64e5..2dd19ed47 100644 --- a/concept-event/concept-event-kafka/src/main/java/com/github/linyuzai/event/kafka/KafkaEventAutoConfiguration.java +++ b/concept-event/concept-event-kafka/src/main/java/com/github/linyuzai/event/kafka/KafkaEventAutoConfiguration.java @@ -69,7 +69,6 @@ public ProducerListener kafkaProducerListener() { return new DefaultKafkaConsumerFactory<>(Collections.emptyMap()); } - @Bean public KafkaAdmin kafkaAdmin() { return new KafkaAdmin(Collections.emptyMap()); @@ -145,12 +144,6 @@ public KafkaEventEngine kafkaEventEngine(ConfigurableBeanFactory beanFactory, beanFactory.registerSingleton(key + "KafkaAdmin", endpoint.getAdmin()); } - //registry.registerBeanDefinition(); - - //ApplicationContext context; - - //context.getBean() - engine.add(endpoint); beanFactory.registerSingleton(key + "KafkaEventEndpoint", endpoint); diff --git a/concept-event/concept-event-spring-boot-starter/src/main/java/com/github/linyuzai/event/autoconfigure/EventConfiguration.java b/concept-event/concept-event-spring-boot-starter/src/main/java/com/github/linyuzai/event/autoconfigure/EventConfiguration.java index e68d3967e..64967955a 100644 --- a/concept-event/concept-event-spring-boot-starter/src/main/java/com/github/linyuzai/event/autoconfigure/EventConfiguration.java +++ b/concept-event/concept-event-spring-boot-starter/src/main/java/com/github/linyuzai/event/autoconfigure/EventConfiguration.java @@ -10,6 +10,7 @@ import com.github.linyuzai.event.core.error.EventErrorHandler; import com.github.linyuzai.event.core.error.LoggerEventErrorHandler; import com.github.linyuzai.event.core.exchange.EventExchange; +import com.github.linyuzai.event.core.lifecycle.EventConceptLifecycleListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.ObjectProvider; @@ -35,21 +36,23 @@ public EventErrorHandler eventErrorHandler() { return new LoggerEventErrorHandler(log::error); } - @Bean + @Bean(initMethod = "initialize", destroyMethod = "destroy") @ConditionalOnMissingBean public EventConcept eventConcept(EventContextFactory contextFactory, ObjectProvider exchangeProvider, ObjectProvider encoderProvider, ObjectProvider decoderProvider, EventErrorHandler errorHandler, - List engines) { + List engines, + List lifecycleListeners) { DefaultEventConcept concept = new DefaultEventConcept(); concept.setContextFactory(contextFactory); concept.setExchange(exchangeProvider.getIfUnique()); concept.setEncoder(encoderProvider.getIfUnique()); concept.setDecoder(decoderProvider.getIfUnique()); concept.setErrorHandler(errorHandler); - concept.add(engines); + concept.addEngines(engines); + concept.addLifecycleListeners(lifecycleListeners); return concept; } }