Skip to content

Commit

Permalink
🚧 process actor module
Browse files Browse the repository at this point in the history
  • Loading branch information
sanshengshui committed Aug 17, 2021
1 parent 5f201b3 commit 4c99f31
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package iot.technology.actor;

/**
* @author mushuwei
*/
public abstract class AbstractActor implements Actor {

protected ActorCtx ctx;

@Override
public void init() throws ActorException {
this.ctx = ctx;
}

public ActorRef getActorRef() {
return ctx;
}
}
2 changes: 2 additions & 0 deletions IOT-Guide-Actor/src/main/java/iot/technology/actor/Actor.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package iot.technology.actor;

import iot.technology.actor.message.ActorMsg;

/**
* @author mushuwei
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package iot.technology.actor;

import iot.technology.actor.message.ActorMsg;

import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package iot.technology.actor;

import iot.technology.actor.message.ActorMsg;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* @author mushuwei
*/
@Slf4j
@Data
public final class ActorMailbox implements ActorCtx {

public static final boolean FREE = false;
public static final boolean BUSY = true;

public static final boolean NOT_READY = false;
public static final boolean READY = true;

private final ConcurrentLinkedQueue<ActorMsg> highPriorityMsgs = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<ActorMsg> normalPriorityMsgs = new ConcurrentLinkedQueue<>();
private final AtomicBoolean busy = new AtomicBoolean(FREE);
private final AtomicBoolean ready = new AtomicBoolean(NOT_READY);
private final AtomicBoolean destroyInProgress = new AtomicBoolean();

private void enqueue(ActorMsg msg, boolean highPriority) {
if (!destroyInProgress.get()) {
if (highPriority) {

}
}
}

@Override
public ActorId getSelf() {
return null;
}

@Override
public ActorRef getParentRef() {
return null;
}

@Override
public void tell(ActorId target, ActorMsg msg) {

}

@Override
public void stop(ActorId target) {

}

@Override
public ActorRef getOrCreateChildActor(ActorId actorId, Supplier<String> dispatcher, Supplier<ActorCreator> creator) {
return null;
}

@Override
public void broadcastToChildren(ActorMsg msg) {

}

@Override
public void broadcastToChildren(ActorMsg msg, Predicate<ActorId> childFilter) {

}

@Override
public List<ActorId> filterChildren(Predicate<ActorId> childFilter) {
return null;
}

@Override
public ActorId getActorId() {
return null;
}

@Override
public void tell(ActorMsg actorMsg) {

}

@Override
public void tellWithHighPriority(ActorMsg actorMsg) {

}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package iot.technology.actor;

import iot.technology.actor.message.ActorMsg;

/**
* @author mushuwei
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,42 @@
package iot.technology.actor;

import iot.technology.actor.message.ActorMsg;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Predicate;

/**
* @author mushuwei
*/
public interface ActorSystem {

ScheduledExecutorService getScheduler();

void createDispatcher(String dispatcherId, ExecutorService executor);

void destroyDispatcher(String dispatcherId);

ActorRef getActor(ActorId actorId);

ActorRef createRootActor(String dispatcherId, ActorCreator creator);

ActorRef createChildActor(String dispatcherId, ActorCreator creator, ActorId actorId);

void tell(ActorId target, ActorMsg actorMsg);

void tellWithHighPriority(ActorId target, ActorMsg actorMsg);

void stop(ActorRef actorRef);

void stop(ActorId actorId);

void stop();

void broadcastToChildren(ActorId parent, ActorMsg msg);

void broadcastToChildren(ActorId parent, Predicate<ActorId> childFilter, ActorMsg msg);

List<ActorId> filterChildren(ActorId parent, Predicate<ActorId> childFilter);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package iot.technology.actor;

import lombok.Data;

/**
* @author mushuwei
*/
@Data
public class ActorSystemSettings {

private final int actorThroughput;
private final int schedulerPoolSize;
private final int maxActorInitAttempts;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package iot.technology.actor.message;

/**
* @author mushuwei
*/
public interface ActorMsg {

MsgType getMsgType();

/**
* Executed when the target actor is stopped or destroyed.
* For Example, rule node failed to initialize or removed from rule chain
* Implementation should cleanup the resources.
*
* @param reason
*/
default void onActorStopped(ActorStopReason reason) {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package iot.technology.actor.message;

/**
* @author mushuwei
*/
public enum ActorStopReason {

INIT_FAILED, STOPPED

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package iot.technology.actor.message;

/**
* @author mushuwei
*/
public enum MsgType {

}

0 comments on commit 4c99f31

Please sign in to comment.