Skip to content

Commit

Permalink
event
Browse files Browse the repository at this point in the history
  • Loading branch information
tanghanzheng committed Jul 21, 2022
1 parent 6b3118f commit 9c5da58
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.github.linyuzai.event.core.context.EventContextFactory;
import com.github.linyuzai.event.core.endpoint.EventEndpoint;
import com.github.linyuzai.event.core.error.EventErrorHandler;
import com.github.linyuzai.event.core.lifecycle.EventConceptLifecycleListener;
import com.github.linyuzai.event.core.publisher.EventPublisher;
import com.github.linyuzai.event.core.engine.EventEngine;
import com.github.linyuzai.event.core.exchange.EventExchange;
Expand All @@ -18,15 +19,19 @@
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

@Getter
@Setter
public class DefaultEventConcept implements EventConcept {

protected final Map<String, EventEngine> engineMap = new ConcurrentHashMap<>();

protected final List<EventConceptLifecycleListener> lifecycleListeners = new CopyOnWriteArrayList<>();

private EventContextFactory contextFactory;

private EventExchange exchange;
Expand All @@ -37,6 +42,20 @@ public class DefaultEventConcept implements EventConcept {

private EventErrorHandler errorHandler;

@Override
public void initialize() {
for (EventConceptLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onInitialize(this);
}
}

@Override
public void destroy() {
for (EventConceptLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onDestroy(this);
}
}

@Override
public EventOperator event() {
return new EventOperatorImpl();
Expand Down Expand Up @@ -99,12 +118,29 @@ public Collection<EventEngine> getEngines() {
}

@Override
public void add(Collection<? extends EventEngine> engines) {
public void addEngines(Collection<? extends EventEngine> engines) {
for (EventEngine engine : engines) {
this.engineMap.put(engine.getName(), engine);
}
}

@Override
public void removeEngines(Collection<String> engines) {
for (String engine : engines) {
this.engineMap.remove(engine);
}
}

@Override
public void addLifecycleListeners(Collection<? extends EventConceptLifecycleListener> lifecycleListeners) {
this.lifecycleListeners.addAll(lifecycleListeners);
}

@Override
public void removeLifecycleListeners(Collection<? extends EventConceptLifecycleListener> lifecycleListeners) {
this.lifecycleListeners.removeAll(lifecycleListeners);
}

protected EventExchange useExchange(EventExchange exchange) {
if (exchange != null) {
return exchange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@
import com.github.linyuzai.event.core.error.EventErrorHandler;
import com.github.linyuzai.event.core.engine.EventEngine;
import com.github.linyuzai.event.core.exchange.EventExchange;
import com.github.linyuzai.event.core.lifecycle.EventConceptLifecycleListener;

import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;

public interface EventConcept {

void initialize();

void destroy();

EventOperator event();

EventOperator event(Type type);
Expand All @@ -36,9 +41,29 @@ public interface EventConcept {

Collection<EventEngine> getEngines();

default void add(EventEngine... engines) {
add(Arrays.asList(engines));
default void addEngines(EventEngine... engines) {
addEngines(Arrays.asList(engines));
}

void addEngines(Collection<? extends EventEngine> engines);

default void removeEngines(String... engines) {
removeEngines(Arrays.asList(engines));
}

void removeEngines(Collection<String> engines);

Collection<EventConceptLifecycleListener> getLifecycleListeners();

default void addLifecycleListeners(EventConceptLifecycleListener... lifecycleListeners) {
addLifecycleListeners(Arrays.asList(lifecycleListeners));
}

void addLifecycleListeners(Collection<? extends EventConceptLifecycleListener> lifecycleListeners);

default void removeLifecycleListeners(EventConceptLifecycleListener... lifecycleListeners) {
removeLifecycleListeners(Arrays.asList(lifecycleListeners));
}

void add(Collection<? extends EventEngine> engines);
void removeLifecycleListeners(Collection<? extends EventConceptLifecycleListener> lifecycleListeners);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.github.linyuzai.event.core.lifecycle;

import com.github.linyuzai.event.core.concept.EventConcept;

public interface EventConceptLifecycleListener {

void onInitialize(EventConcept concept);

void onDestroy(EventConcept concept);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.github.linyuzai.event.kafka;

import com.github.linyuzai.event.core.concept.EventConcept;
import com.github.linyuzai.event.core.context.EventContext;
import com.github.linyuzai.event.core.lifecycle.EventConceptLifecycleListener;
import lombok.AllArgsConstructor;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;

Expand All @@ -13,9 +16,29 @@ public void subscribe(Type type, KafkaEventEndpoint endpoint, EventContext conte
MessageListenerContainer container = createContainer(type, endpoint, context);
container.getContainerProperties().setMessageListener(createMessageListener(type, endpoint, context));
container.start();
EventConcept concept = context.get(EventConcept.class);
concept.addLifecycleListeners(new MessageListenerContainerStopper(container));
}

public abstract MessageListenerContainer createContainer(Type type, KafkaEventEndpoint endpoint, EventContext context);

public abstract MessageListener<?, ?> createMessageListener(Type type, KafkaEventEndpoint endpoint, EventContext context);

@AllArgsConstructor
public static class MessageListenerContainerStopper implements EventConceptLifecycleListener {

private MessageListenerContainer container;

@Override
public void onInitialize(EventConcept concept) {

}

@Override
public void onDestroy(EventConcept concept) {
if (container.isRunning()) {
container.stop();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public ProducerListener<Object, Object> kafkaProducerListener() {
return new DefaultKafkaConsumerFactory<>(Collections.emptyMap());
}


@Bean
public KafkaAdmin kafkaAdmin() {
return new KafkaAdmin(Collections.emptyMap());
Expand Down Expand Up @@ -145,12 +144,6 @@ public KafkaEventEngine kafkaEventEngine(ConfigurableBeanFactory beanFactory,
beanFactory.registerSingleton(key + "KafkaAdmin", endpoint.getAdmin());
}

//registry.registerBeanDefinition();

//ApplicationContext context;

//context.getBean()

engine.add(endpoint);
beanFactory.registerSingleton(key + "KafkaEventEndpoint", endpoint);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.github.linyuzai.event.core.error.EventErrorHandler;
import com.github.linyuzai.event.core.error.LoggerEventErrorHandler;
import com.github.linyuzai.event.core.exchange.EventExchange;
import com.github.linyuzai.event.core.lifecycle.EventConceptLifecycleListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.ObjectProvider;
Expand All @@ -35,21 +36,23 @@ public EventErrorHandler eventErrorHandler() {
return new LoggerEventErrorHandler(log::error);
}

@Bean
@Bean(initMethod = "initialize", destroyMethod = "destroy")
@ConditionalOnMissingBean
public EventConcept eventConcept(EventContextFactory contextFactory,
ObjectProvider<EventExchange> exchangeProvider,
ObjectProvider<EventEncoder> encoderProvider,
ObjectProvider<EventDecoder> decoderProvider,
EventErrorHandler errorHandler,
List<EventEngine> engines) {
List<EventEngine> engines,
List<EventConceptLifecycleListener> lifecycleListeners) {
DefaultEventConcept concept = new DefaultEventConcept();
concept.setContextFactory(contextFactory);
concept.setExchange(exchangeProvider.getIfUnique());
concept.setEncoder(encoderProvider.getIfUnique());
concept.setDecoder(decoderProvider.getIfUnique());
concept.setErrorHandler(errorHandler);
concept.add(engines);
concept.addEngines(engines);
concept.addLifecycleListeners(lifecycleListeners);
return concept;
}
}

0 comments on commit 9c5da58

Please sign in to comment.