Skip to content

Commit

Permalink
event
Browse files Browse the repository at this point in the history
  • Loading branch information
tanghanzheng committed Aug 1, 2022
1 parent 400442c commit 8e3676b
Show file tree
Hide file tree
Showing 36 changed files with 260 additions and 278 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.github.linyuzai.event.core.bus;

import com.github.linyuzai.event.core.concept.EventConcept;
import com.github.linyuzai.event.core.exchange.EventExchange;

public class AbstractEventBus implements EventBus {

private EventConcept concept;

private EventExchange exchange;

public void publish(Object event) {
concept.template().publish(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.github.linyuzai.event.core.bus;

public interface EventBus {


}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.linyuzai.event.core.codec;

import java.lang.reflect.Type;
import com.github.linyuzai.event.core.context.EventContext;

/**
* 事件解码器
Expand All @@ -11,8 +11,7 @@ public interface EventDecoder {
* 解码
*
* @param event 事件
* @param type 解码类型
* @return 解码后的事件
*/
Object decode(Object event, Type type);
Object decode(Object event, EventContext context);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.github.linyuzai.event.core.codec;

import com.github.linyuzai.event.core.context.EventContext;

/**
* 事件编码器
*/
Expand All @@ -11,5 +13,5 @@ public interface EventEncoder {
* @param event 事件
* @return 编码后的事件
*/
Object encode(Object event);
Object encode(Object event, EventContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.linyuzai.event.core.context.EventContext;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -15,7 +16,7 @@
@Getter
@Setter
@AllArgsConstructor
public class JacksonEventDecoder extends AbstractEventDecoder {
public class JacksonEventDecoder implements EventDecoder {

private ObjectMapper objectMapper;

Expand All @@ -25,9 +26,10 @@ public JacksonEventDecoder() {

@SneakyThrows
@Override
public Object doDecode(Object event, Type type) {
public Object decode(Object event, EventContext context) {
if (event instanceof String) {
if (type == String.class) {
Type type = context.get(Type.class);
if (type == null || type == String.class) {
return event;
}
return objectMapper.readValue((String) event, new TypeReference<Object>() {
Expand All @@ -37,6 +39,6 @@ public Type getType() {
}
});
}
throw new EventDecodeException("String type required but " + event.getClass());
throw new IllegalArgumentException("String required but " + event.getClass());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.linyuzai.event.core.codec;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.linyuzai.event.core.context.EventContext;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -22,7 +23,7 @@ public JacksonEventEncoder() {

@SneakyThrows
@Override
public Object encode(Object event) {
public Object encode(Object event, EventContext context) {
if (event instanceof String) {
return event;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.github.linyuzai.event.core.codec;

import com.github.linyuzai.event.core.context.EventContext;
import lombok.SneakyThrows;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;

public class SerializationEventDecoder implements EventDecoder {
@SneakyThrows
@Override
public Object decode(Object event, EventContext context) {
if (event instanceof byte[]) {
try (ByteArrayInputStream is = new ByteArrayInputStream((byte[]) event);
ObjectInputStream ois = new ObjectInputStream(is)) {
return ois.readObject();
}
}
throw new IllegalArgumentException("Byte array required but " + event.getClass());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.github.linyuzai.event.core.codec;

import com.github.linyuzai.event.core.context.EventContext;
import lombok.SneakyThrows;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;

public class SerializationEventEncoder implements EventEncoder {
@SneakyThrows
@Override
public Object encode(Object event, EventContext context) {
try (ByteArrayOutputStream os = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(os)) {
oos.writeObject(event);
return os.toByteArray();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,9 @@ public interface EventConcept {
void destroy();

/**
* 用于订阅事件
* <p>
* 不会进行解码处理
*/
EventOperator event();

/**
* 用于订阅事件
*
* @param type 事件类型
*/
EventOperator event(Type type);

/**
* 用于发布事件
*
* @param event 事件
* 创建事件模版
*/
EventOperator event(Object event);
EventTemplate template();

/**
* 获得默认的事件交换机
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -80,18 +79,8 @@ public void destroy() {
}

@Override
public EventOperator event() {
return new EventOperatorImpl();
}

@Override
public EventOperator event(Type type) {
return new EventOperatorImpl(type);
}

@Override
public EventOperator event(Object event) {
return new EventOperatorImpl(event);
public EventTemplate template() {
return new EventTemplateImpl();
}

/**
Expand All @@ -114,17 +103,17 @@ protected void publishWithContext(Object event, EventContext context) {
/**
* 订阅事件
*
* @param type 事件类型
* @param context 事件上下文
* @param consumer 事件消费者
* @param context 事件上下文
*/
protected void subscribeWithContext(Type type, EventContext context) {
protected void subscribeWithContext(Consumer<Object> consumer, EventContext context) {
EventExchange exchange = applyExchange(context);
EventSubscriber subscriber = context.get(EventSubscriber.class);
Collection<EventEndpoint> endpoints = exchange.exchange(this);
for (EventEndpoint endpoint : endpoints) {
EventContext prepare = prepareContext(context, endpoint);
prepare.put(EventSubscriber.class, useSubscriber(endpoint, subscriber));
endpoint.subscribe(type, prepare);
endpoint.subscribe(consumer, prepare);
}
}

Expand Down Expand Up @@ -332,105 +321,74 @@ protected EventSubscriber useSubscriber(EventEndpoint endpoint, EventSubscriber
* 事件操作者的实现
*/
@NoArgsConstructor(access = AccessLevel.PROTECTED)
protected class EventOperatorImpl implements EventOperator {

/**
* 事件
*/
protected Object event;

/**
* 事件类型
*/
protected Type type;
protected class EventTemplateImpl implements EventTemplate {

/**
* 上下文缓存
*/
protected Map<Object, Object> context = new LinkedHashMap<>();

protected EventOperatorImpl(Type type) {
this.type = type;
}

protected EventOperatorImpl(Object event) {
this.event = event;
}
protected EventContext context = contextFactory.create();

@Override
public EventOperator exchange(EventExchange exchange) {
public EventTemplate exchange(EventExchange exchange) {
context.put(EventExchange.class, exchange);
return this;
}

@Override
public EventOperator encoder(EventEncoder encoder) {
public EventTemplate encoder(EventEncoder encoder) {
context.put(EventEncoder.class, encoder);
return this;
}

@Override
public EventOperator decoder(EventDecoder decoder) {
public EventTemplate decoder(EventDecoder decoder) {
context.put(EventDecoder.class, decoder);
return this;
}

@Override
public EventOperator error(Consumer<Throwable> errorHandler) {
return error((e, endpoint, context) -> errorHandler.accept(e));
public EventTemplate publisher(EventPublisher publisher) {
context.put(EventPublisher.class, publisher);
return this;
}

@Override
public EventOperator error(BiConsumer<Throwable, EventEndpoint> errorHandler) {
return error((e, endpoint, context) -> errorHandler.accept(e, endpoint));
public EventTemplate subscriber(EventSubscriber subscriber) {
context.put(EventSubscriber.class, subscriber);
return this;
}

@Override
public EventOperator error(EventErrorHandler errorHandler) {
context.put(EventErrorHandler.class, errorHandler);
return this;
public EventTemplate error(Consumer<Throwable> errorHandler) {
return error((e, endpoint, context) -> errorHandler.accept(e));
}

@Override
public EventOperator context(Object key, Object value) {
context.put(key, value);
return this;
public EventTemplate error(BiConsumer<Throwable, EventEndpoint> errorHandler) {
return error((e, endpoint, context) -> errorHandler.accept(e, endpoint));
}

@Override
public <K, V> EventOperator context(Map<K, V> context) {
this.context.putAll(context);
public EventTemplate error(EventErrorHandler errorHandler) {
context.put(EventErrorHandler.class, errorHandler);
return this;
}

@Override
public void publish() {
publish(null);
public EventTemplate context(Object key, Object value) {
context.put(key, value);
return this;
}

@Override
public void publish(EventPublisher publisher) {
EventContext context = buildContext();
context.put(EventPublisher.class, publisher);
public void publish(Object event) {
publishWithContext(event, context);
}

@SuppressWarnings("unchecked")
@Override
public void subscribe() {
subscribe(null);
}

@Override
public void subscribe(EventSubscriber subscriber) {
EventContext context = buildContext();
context.put(EventSubscriber.class, subscriber);
subscribeWithContext(type, context);
}

protected EventContext buildContext() {
EventContext context = contextFactory.create();
this.context.forEach(context::put);
return context;
public void subscribe(Consumer<?> consumer) {
subscribeWithContext((Consumer<Object>) consumer, context);
}
}
}
Loading

0 comments on commit 8e3676b

Please sign in to comment.