Skip to content

Commit

Permalink
🍱 deal with actor module
Browse files Browse the repository at this point in the history
  • Loading branch information
sanshengshui committed Sep 11, 2021
1 parent 581b440 commit 3d5cf49
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 5 deletions.
5 changes: 5 additions & 0 deletions IOT-Guide-Actor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package iot.technology.actor;

import iot.technology.actor.exception.ActorException;
import lombok.Getter;

/**
* @author mushuwei
*/
public abstract class AbstractActor implements Actor {

@Getter
protected ActorCtx ctx;

@Override
public void init() throws ActorException {
public void init(ActorCtx ctx) throws ActorException {
this.ctx = ctx;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package iot.technology.actor;

import iot.technology.actor.exception.ActorException;
import iot.technology.actor.message.ActorMsg;

/**
Expand All @@ -11,7 +12,7 @@ public interface Actor {

ActorRef getActorRef();

default void init() throws ActorException {
default void init(ActorCtx ctx) throws ActorException {
}

default void destroy() throws ActorException {
Expand Down
120 changes: 120 additions & 0 deletions IOT-Guide-Actor/src/main/java/iot/technology/actor/ActorMailbox.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package iot.technology.actor;

import iot.technology.actor.message.ActorMsg;
import iot.technology.actor.message.ActorStopReason;
import iot.technology.actor.message.MsgType;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand All @@ -23,17 +26,134 @@ public final class ActorMailbox implements ActorCtx {
public static final boolean NOT_READY = false;
public static final boolean READY = true;

private final ActorSystem system;
private final ActorSystemSettings settings;
private final ActorId selfId;
private final Actor actor;
private final Dispatcher dispatcher;
private final ConcurrentLinkedQueue<ActorMsg> highPriorityMsgs = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<ActorMsg> normalPriorityMsgs = new ConcurrentLinkedQueue<>();

private final AtomicBoolean busy = new AtomicBoolean(FREE);
private final AtomicBoolean ready = new AtomicBoolean(NOT_READY);
private final AtomicBoolean destroyInProgress = new AtomicBoolean();
private volatile ActorStopReason stopReason;

public void initActor() {
dispatcher.getExecutor().execute(() -> tryInit(1));
}

private void tryInit(int attempt) {
try {
log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt);
if (!destroyInProgress.get()) {
actor.init(this);
if (!destroyInProgress.get()) {
ready.set(READY);
tryProcessQueue(false);
}
}
} catch (Throwable t) {
log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t);
int attemptIdx = attempt + 1;
InitFailureStrategy strategy = actor.onInitFailure(attempt, t);
if (strategy.isStop() ||
(settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) {
log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t);
stopReason = ActorStopReason.INIT_FAILED;
destroy();
} else if (strategy.getRetryDelay() > 0) {
log.info("[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms", selfId, attempt,
strategy.getRetryDelay());
log.debug("[{}] Error", selfId, t);
system.getScheduler().schedule(() -> dispatcher.getExecutor().execute(() -> tryInit(attemptIdx)), strategy.getRetryDelay(),
TimeUnit.MICROSECONDS);
} else {
log.info("[{}] Failed to init actor, attempt {}, going to retry immediately", selfId, attempt);
log.debug("[{}] Error", selfId, t);
dispatcher.getExecutor().execute(() -> tryInit(attemptIdx));
}
}
}

public void destroy() {
if (stopReason == null) {
stopReason = ActorStopReason.STOPPED;
}
destroyInProgress.set(true);
dispatcher.getExecutor().execute(() -> {
try {
ready.set(NOT_READY);
actor.destroy();
highPriorityMsgs.forEach(msg -> msg.onActorStopped(stopReason));
normalPriorityMsgs.forEach(msg -> msg.onActorStopped(stopReason));
} catch (Throwable t) {
log.warn("[{}] Failed to destroy actor: {}", selfId, t);
}
});
}

private void enqueue(ActorMsg msg, boolean highPriority) {
if (!destroyInProgress.get()) {
if (highPriority) {
highPriorityMsgs.add(msg);
} else {
normalPriorityMsgs.add(msg);
}
tryProcessQueue(true);
} else {
if (highPriority && msg.getMsgType().equals(MsgType.UPDATED_MSG)) {
synchronized (this) {
if (stopReason == ActorStopReason.INIT_FAILED) {
destroyInProgress.set(false);
stopReason = null;
initActor();
} else {
msg.onActorStopped(stopReason);
}
}
} else {
msg.onActorStopped(stopReason);
}
}
}

private void tryProcessQueue(boolean newMsg) {
if (ready.get() == READY) {
if (newMsg || !highPriorityMsgs.isEmpty() || !normalPriorityMsgs.isEmpty()) {
if (busy.compareAndSet(FREE, BUSY)) {
dispatcher.getExecutor().execute(this::processMailbox);
} else {
log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg);
}
} else {
log.trace("[{}] MessageBox is empty, new msg: {}", selfId, newMsg);
}
} else {
log.trace("[{}] MessageBox is not ready, new msg: {}", selfId, newMsg);
}
}

private void processMailbox() {
boolean noMoreElements = false;
for (int i = 0; i < settings.getActorThroughput(); i++) {
ActorMsg msg = highPriorityMsgs.poll();
if (msg == null) {
msg = normalPriorityMsgs.poll();
}
if (msg != null) {
log.debug("[{}] Going to process message: {}", selfId, msg);
actor.process(msg);
} else {
noMoreElements = true;
break;
}
}
if (noMoreElements) {
busy.set(FREE);
dispatcher.getExecutor().execute(() -> tryProcessQueue(false));
} else {
dispatcher.getExecutor().execute(this::processMailbox);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package iot.technology.actor;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Copy of Executors.DefaultThreadFactory but with ability to set name of the pool
*
* @author mushuwei
*/
public class ActorThreadFactory implements ThreadFactory {
public static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private static final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

public static ActorThreadFactory forName(String name) {
return new ActorThreadFactory(name);
}

public ActorThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package iot.technology.actor;

import iot.technology.actor.message.ActorMsg;
import lombok.Getter;

import java.util.List;
import java.util.concurrent.*;
import java.util.function.Predicate;

/**
* @author mushuwei
*/
public class DefaultActorSystem implements ActorSystem {

private final ConcurrentMap<String, Dispatcher> dispatchers = new ConcurrentHashMap<>();

@Getter
private final ActorSystemSettings settings;
@Getter
private final ScheduledExecutorService scheduler;

public DefaultActorSystem(ActorSystemSettings settings) {
this.settings = settings;
this.scheduler =
Executors.newScheduledThreadPool(settings.getSchedulerPoolSize(), ActorThreadFactory.forName("actor-system-scheduler"));
}


@Override
public void createDispatcher(String dispatcherId, ExecutorService executor) {

}

@Override
public void destroyDispatcher(String dispatcherId) {

}

@Override
public ActorRef getActor(ActorId actorId) {
return null;
}

@Override
public ActorRef createRootActor(String dispatcherId, ActorCreator creator) {
return null;
}

@Override
public ActorRef createChildActor(String dispatcherId, ActorCreator creator, ActorId actorId) {
return null;
}

@Override
public void tell(ActorId target, ActorMsg actorMsg) {

}

@Override
public void tellWithHighPriority(ActorId target, ActorMsg actorMsg) {

}

@Override
public void stop(ActorRef actorRef) {

}

@Override
public void stop(ActorId actorId) {

}

@Override
public void stop() {

}

@Override
public void broadcastToChildren(ActorId parent, ActorMsg msg) {

}

@Override
public void broadcastToChildren(ActorId parent, Predicate<ActorId> childFilter, ActorMsg msg) {

}

@Override
public List<ActorId> filterChildren(ActorId parent, Predicate<ActorId> childFilter) {
return null;
}
}
15 changes: 15 additions & 0 deletions IOT-Guide-Actor/src/main/java/iot/technology/actor/Dispatcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package iot.technology.actor;

import lombok.Data;

import java.util.concurrent.ExecutorService;

/**
* @author mushuwei
*/
@Data
public class Dispatcher {

private final String dispatcherId;
private final ExecutorService executor;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package iot.technology.actor;
package iot.technology.actor.exception;

/**
* @author mushuwei
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
* @author mushuwei
*/
public enum MsgType {

/**
* Special message to indicate update request
*/
UPDATED_MSG,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package iot.technology.actor;

import lombok.extern.slf4j.Slf4j;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

/**
* @author mushuwei
*/
@Slf4j
@RunWith(MockitoJUnitRunner.class)
public class ActorSystemTest {

private volatile ActorSystem actorSystem;
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
<lombok.version>1.16.18</lombok.version>
<slf4j.version>1.7.7</slf4j.version>
<logback.version>1.2.3</logback.version>
<junit.version>4.13.1</junit.version>
<junit.version>4.12</junit.version>
<mockito.version>3.3.3</mockito.version>
<gson.version>2.6.2</gson.version>
<guava.version>30.1-jre</guava.version>
Expand Down

0 comments on commit 3d5cf49

Please sign in to comment.