Skip to content

Commit

Permalink
event
Browse files Browse the repository at this point in the history
  • Loading branch information
tanghanzheng committed Aug 12, 2022
1 parent 9af2d22 commit 84f8c31
Show file tree
Hide file tree
Showing 32 changed files with 262 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public KafkaEventEndpointFactory kafkaEventEndpointFactory() {
}

/**
* 创建事件引擎和事件端点
* 创建 Kafka 事件引擎和 Kafka 事件端点
*/
@Bean
public KafkaEventEngine kafkaEventEngine(ConfigurableBeanFactory beanFactory,
Expand All @@ -136,7 +136,7 @@ public KafkaEventEngine kafkaEventEngine(ConfigurableBeanFactory beanFactory,
}

/**
* 注册事件端点
* 注册 Kafka 事件端点
*/
private void registerEndpoint(String name, KafkaEventEndpoint endpoint, ConfigurableBeanFactory beanFactory) {
register(name + "KafkaProducerFactory", endpoint.getProducerFactory(), beanFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.springframework.kafka.transaction.KafkaTransactionManager;

/**
* 基于 Kafka 的事件端点
* Kafka 事件端点
*/
@Getter
@Setter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* <p>
* 只复制了属性配置
* <p>
* 其他的 Bean 配置可以用 {@link KafkaEventEndpointConfigurer}
* 其他的 Bean 配置可以用 {@link KafkaEventEndpointConfigurer} 扩展
* <p>
* Jaas 应该是全部配置,所以可以直接使用 spring.kafka 进行配置
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class KafkaEventProperties extends AbstractPropertiesConfig implements EngineConfig {

/**
* 是否启用
* 是否启用引擎
*/
private boolean enabled;

Expand All @@ -42,7 +42,7 @@ public class KafkaEventProperties extends AbstractPropertiesConfig implements En
public static class ExtendedKafkaProperties extends KafkaProperties implements EndpointConfig {

/**
* 是否启用
* 是否启用端点
*/
private boolean enabled = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ public abstract class AbstractKafkaEventSubscriber extends KafkaEventSubscriber
@Override
public Subscription doSubscribe(EventListener listener, KafkaEventEndpoint endpoint, EventContext context) {
MessageListenerContainer container = createMessageListenerContainer(endpoint, context);
container.getContainerProperties().setMessageListener(createMessageListener(listener, endpoint, context));
Object messageListener = container.getContainerProperties().getMessageListener();
//如果没有设置监听器则生成一个监听器并设置
if (messageListener == null) {
container.getContainerProperties()
.setMessageListener(createMessageListener(listener, endpoint, context));
}
container.start();
return new KafkaSubscription(container);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@

import java.util.List;

/**
* 需要集成 amqp 模块并启用 @EnableEventConcept
* <p>
* 替换 {@link RabbitAutoConfiguration} 中配置的 Bean
* <p>
* 这些替换的 Bean 无法使用仅仅用于不让默认的配置生效
*/
@Configuration
@ConditionalOnProperty(name = "concept.event.rabbitmq.enabled", havingValue = "true")
@ConditionalOnBean(name = "com.github.linyuzai.event.autoconfigure.EventEnabled")
Expand All @@ -43,43 +50,67 @@
public class RabbitEventAutoConfiguration extends EngineEndpointConfiguration<RabbitEventProperties,
RabbitEventProperties.ExtendedRabbitProperties, RabbitEventEngine, RabbitEventEndpoint> {

@Bean
public SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer() {
return new SimpleRabbitListenerContainerFactoryConfigurer(new RabbitProperties());
}

/**
* 覆盖默认配置
*/
@Bean(name = "rabbitListenerContainerFactory")
public RabbitListenerContainerFactory<? extends MessageListenerContainer> rabbitListenerContainerFactory() {
return new SimpleRabbitListenerContainerFactory();
}

/**
* 覆盖默认配置
*/
@Bean
public SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer() {
return new SimpleRabbitListenerContainerFactoryConfigurer(new RabbitProperties());
}

/**
* 覆盖默认配置
*/
@Bean
public DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurer() {
return new DirectRabbitListenerContainerFactoryConfigurer(new RabbitProperties());
}

/**
* 覆盖默认配置
*/
@Bean
public RabbitConnectionFactoryBeanConfigurer rabbitConnectionFactoryBeanConfigurer() {
return new RabbitConnectionFactoryBeanConfigurer(null, null);
}

/**
* 覆盖默认配置
*/
@Bean
public CachingConnectionFactoryConfigurer rabbitConnectionFactoryConfigurer() {
return new CachingConnectionFactoryConfigurer(new RabbitProperties());
}

/**
* 覆盖默认配置
*/
@Bean
public ConnectionFactory rabbitConnectionFactory() {
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setAutomaticRecoveryEnabled(false);
return new CachingConnectionFactory(factory);
}

/**
* 覆盖默认配置
*/
@Bean
public RabbitTemplateConfigurer rabbitTemplateConfigurer() {
return new RabbitTemplateConfigurer(new RabbitProperties());
}

/**
* 覆盖默认配置
*/
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(rabbitConnectionFactory()) {
Expand All @@ -93,29 +124,44 @@ public <T> T execute(@NonNull ChannelCallback<T> action) {
};
}

/**
* 覆盖默认配置
*/
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(rabbitConnectionFactory());
}

/**
* RabbitMQ 配置继承处理器
*/
@Bean
@ConditionalOnMissingBean
public RabbitInheritHandler rabbitInheritHandler(Environment environment) {
return new ReflectionRabbitInheritHandler(environment);
}

/**
* RabbitMQ 事件引擎工厂
*/
@Bean
@ConditionalOnMissingBean
public RabbitEventEngineFactory rabbitEventEngineFactory() {
return new RabbitEventEngineFactoryImpl();
}

/**
* RabbitMQ 事件端点工厂
*/
@Bean
@ConditionalOnMissingBean
public RabbitEventEndpointFactory rabbitEventEndpointFactory(ResourceLoader resourceLoader) {
return new RabbitEventEndpointFactoryImpl(resourceLoader);
}

/**
* 创建 RabbitMQ 事件引擎和 RabbitMQ 事件端点
*/
@Bean
public RabbitEventEngine rabbitEventEngine(ConfigurableBeanFactory beanFactory,
RabbitEventProperties properties,
Expand All @@ -129,6 +175,9 @@ public RabbitEventEngine rabbitEventEngine(ConfigurableBeanFactory beanFactory,
registerEndpoint(name, endpoint, beanFactory));
}

/**
* 注册 RabbitMQ 事件端点
*/
private void registerEndpoint(String name, RabbitEventEndpoint endpoint, ConfigurableBeanFactory beanFactory) {
register(name + "RabbitConnectionFactory", endpoint.getConnectionFactory(), beanFactory);
register(name + "RabbitListenerContainerFactory", endpoint.getListenerContainerFactory(), beanFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
import java.util.HashMap;
import java.util.Map;

/**
* 用于创建 交换机/队列/绑定关系
* <p>
* 复制 {@link BindingBuilder} 的代码
* <p>
* 用法一样,直接创建
*/
@Getter
@AllArgsConstructor
public class RabbitBinding {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;

/**
* RabbitMQ 事件端点
*/
@Getter
@Setter
public class RabbitEventEndpoint extends AbstractEventEndpoint {

/**
* RabbitMQ 扩展配置
*/
private RabbitEventProperties.ExtendedRabbitProperties properties;

private ConnectionFactory connectionFactory;
Expand All @@ -35,11 +41,17 @@ public RabbitEventEndpoint(@NonNull String name, @NonNull EventEngine engine) {
super(name, engine);
}

/**
* 默认发布
*/
@Override
public void defaultPublish(Object event, EventContext context) {
new DefaultRabbitEventPublisher().publish(event, this, context);
}

/**
* 默认订阅
*/
@Override
public Subscription defaultSubscribe(EventListener listener, EventContext context) {
throw new RabbitEventException("EventSubscriber is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@

import com.github.linyuzai.event.core.endpoint.EventEndpointConfigurer;

/**
* RabbitMQ 事件端点扩展配置
*/
public interface RabbitEventEndpointConfigurer extends EventEndpointConfigurer<RabbitEventEndpoint> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import com.github.linyuzai.event.rabbitmq.engine.RabbitEventEngine;
import com.github.linyuzai.event.rabbitmq.properties.RabbitEventProperties;

/**
* RabbitMQ 事件端点工厂
*/
public interface RabbitEventEndpointFactory extends
EventEndpointFactory<RabbitEventProperties.ExtendedRabbitProperties, RabbitEventEngine, RabbitEventEndpoint> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@
import java.util.List;
import java.util.Objects;

/**
* RabbitMQ 事件端点工厂实现
* <p>
* 都是复制过来的配置
* <p>
* 只复制了属性配置
* <p>
* 其他的 Bean 配置可以用 {@link RabbitEventEndpointConfigurer} 扩展
*/
@Getter
@Setter
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import com.github.linyuzai.event.core.concept.EventConcept;
import com.github.linyuzai.event.core.engine.AbstractEventEngine;

/**
* RabbitMQ 事件引擎
*/
public class RabbitEventEngine extends AbstractEventEngine {

public static final String NAME = "rabbitmq";
Expand All @@ -11,6 +14,9 @@ public RabbitEventEngine() {
super(NAME);
}

/**
* 获得 RabbitMQ 事件引擎
*/
public static RabbitEventEngine get(EventConcept concept) {
return (RabbitEventEngine) concept.getEngine(NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@

import com.github.linyuzai.event.core.engine.EventEngineConfigurer;

/**
* RabbitMQ 事件引擎扩展配置
*/
public interface RabbitEventEngineConfigurer extends EventEngineConfigurer<RabbitEventEngine> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@
import com.github.linyuzai.event.core.engine.EventEngineFactory;
import com.github.linyuzai.event.rabbitmq.properties.RabbitEventProperties;

/**
* RabbitMQ 事件引擎工厂
*/
public interface RabbitEventEngineFactory extends EventEngineFactory<RabbitEventProperties, RabbitEventEngine> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.github.linyuzai.event.rabbitmq.properties.RabbitEventProperties;

/**
* RabbitMQ 事件引擎工厂实现
*/
public class RabbitEventEngineFactoryImpl implements RabbitEventEngineFactory {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.github.linyuzai.event.core.exception.EventException;

/**
* RabbitMQ 事件异常
*/
public class RabbitEventException extends EventException {

public RabbitEventException(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

import java.util.Collection;

/**
* RabbitMQ 端点交换机
* <p>
* 定位 RabbitMQ 引擎下的一个或多个端点
*/
public class RabbitEndpointExchange extends EndpointExchange {

public RabbitEndpointExchange(String... endpoints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
import com.github.linyuzai.event.core.exchange.EngineExchange;
import com.github.linyuzai.event.rabbitmq.engine.RabbitEventEngine;

/**
* RabbitMQ 引擎交换机
* <p>
* 定位 RabbitMQ 引擎下的所有端点
*/
public class RabbitEngineExchange extends EngineExchange {

public RabbitEngineExchange() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import com.github.linyuzai.event.core.config.InheritHandler;
import com.github.linyuzai.event.rabbitmq.properties.RabbitEventProperties;

/**
* RabbitMQ 配置继承处理器
*/
public interface RabbitInheritHandler extends InheritHandler<RabbitEventProperties> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@

import java.util.Map;

/**
* 手动罗列所有的配置项
* <p>
* 不同版本会存在配置不存在的异常问题
*/
@Deprecated
@Getter
@AllArgsConstructor
Expand Down
Loading

0 comments on commit 84f8c31

Please sign in to comment.