diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..4a29f0c --- /dev/null +++ b/pom.xml @@ -0,0 +1,12 @@ + + + 4.0.0 + + JrjPersonal + EasyRaft + 1.0-SNAPSHOT + + + \ No newline at end of file diff --git a/src/main/java/ClusterMessage.java b/src/main/java/ClusterMessage.java new file mode 100644 index 0000000..29d5974 --- /dev/null +++ b/src/main/java/ClusterMessage.java @@ -0,0 +1,5 @@ +/** + * Created by jrj on 17-10-30. + */ +public class ClusterMessage { +} diff --git a/src/main/java/SimpleChat.java b/src/main/java/SimpleChat.java new file mode 100644 index 0000000..b1fbb45 --- /dev/null +++ b/src/main/java/SimpleChat.java @@ -0,0 +1,58 @@ +import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.ReceiverAdapter; +import org.jgroups.View; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.concurrent.atomic.AtomicInteger; + +public class SimpleChat extends ReceiverAdapter { + JChannel channel; + String user_name=System.getProperty("user.name", "n/a"); + private AtomicInteger atomicInteger; + private final static int clusterSize = 3; + private void start() throws Exception { + channel=new JChannel(); + channel.setReceiver(this); + channel.connect("ChatCluster"); + eventLoop(); + channel.close(); + atomicInteger = new AtomicInteger(0); + } + + public static void main(String[] args) throws Exception { + new SimpleChat().start(); + } + + private void eventLoop() { + BufferedReader in=new BufferedReader(new InputStreamReader(System.in)); + while(true) { + try { + System.out.print("> "); System.out.flush(); + String line=in.readLine().toLowerCase(); + if(line.startsWith("quit") || line.startsWith("exit")) + break; + line="[" + user_name + "] " + line; + Message msg=new Message(null, null, line); + channel.send(msg); + } catch(Exception e) { + e.printStackTrace(); + } + } + } + + // s + @Override + public void viewAccepted(View new_view) { + if (new_view.size()>=(clusterSize/2+1)){ + + } + System.out.println("** view: " + new_view); + } + + @Override + public void receive(Message msg) { + System.out.println(msg.getSrc() + ": " + msg.getObject()); + } +} diff --git a/src/main/java/Utils/HashedWheelTimer.java b/src/main/java/Utils/HashedWheelTimer.java new file mode 100644 index 0000000..ad3a223 --- /dev/null +++ b/src/main/java/Utils/HashedWheelTimer.java @@ -0,0 +1,492 @@ +package Utils; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +/** + * Created by jrj on 17-10-30. + */ +public class HashedWheelTimer implements Timer { + public static final int WORKER_STATE_INIT = 0; + public static final int WORKER_STATE_STARTED = 1; + public static final int WORKER_STATE_SHUTDOWN = 2; + @SuppressWarnings({ "unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) + private volatile int workerState = WORKER_STATE_INIT; // 0 - init, 1 - started, 2 - shut down + //在netty中,这里用的是一个叫mpscLinkedQueue的无锁队列 + Queue cancelledTimeouts = new LinkedBlockingQueue(); + Queue timeouts = new LinkedBlockingQueue(); + private HashedWheelBucket[] wheel; + private final int mask; + long tickDuration,maxPendingTimeouts; + Thread workerThread; + private static final AtomicIntegerFieldUpdater WORKER_STATE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState"); + + public HashedWheelTimer(){ + this(1000,TimeUnit.MILLISECONDS,512,-1); + } + public HashedWheelTimer( + long tickDuration, TimeUnit unit, int ticksPerWheel,long maxPendingTimeouts) { + + if (unit == null) { + throw new NullPointerException("unit"); + } + if (tickDuration <= 0) { + throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); + } + if (ticksPerWheel <= 0) { + throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); + } + + // Normalize ticksPerWheel to power of two and initialize the wheel. + wheel = createWheel(ticksPerWheel); + //这个数组的大小一定是2^n的,由createWheel保证.所以减了一之后一定是0x1111111...这样的形式 + mask = wheel.length - 1; + + // Convert tickDuration to nanos. + this.tickDuration = unit.toNanos(tickDuration); + + //这个worker是HashedWheelTimer的一个内部类,而且是一个Runnable + workerThread = new Thread(new Worker()); + + this.maxPendingTimeouts = maxPendingTimeouts; + + } + + private static HashedWheelBucket[] createWheel(int ticksPerWheel) { + if (ticksPerWheel <= 0) { + throw new IllegalArgumentException( + "ticksPerWheel must be greater than 0: " + ticksPerWheel); + } + if (ticksPerWheel > 1073741824) { + throw new IllegalArgumentException( + "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); + } + //这里把ticksPerWheel转化为最接近的2^n + ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); + HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; + for (int i = 0; i < wheel.length; i ++){ + wheel[i] = new HashedWheelBucket(); + } + return wheel; + } + + private static int normalizeTicksPerWheel(int ticksPerWheel) { + int normalizedTicksPerWheel = 1; + while (normalizedTicksPerWheel < ticksPerWheel) { + normalizedTicksPerWheel <<= 1; + } + return normalizedTicksPerWheel; + } + + public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + if (task == null) { + throw new NullPointerException("task"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + + start(); + + // Add the timeout to the timeout queue which will be processed on the next tick. + // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. + long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; + HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); + timeouts.add(timeout); + return timeout; + } + + public void start() { + switch (WORKER_STATE_UPDATER.get(this)) { + case WORKER_STATE_INIT: + if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { + workerThread.start(); + } + break; + case WORKER_STATE_STARTED: + break; + case WORKER_STATE_SHUTDOWN: + throw new IllegalStateException("cannot be started once stopped"); + default: + throw new Error("Invalid WorkerState"); + } + + // Wait until the startTime is initialized by the worker. + while (startTime == 0) { + try { + startTimeInitialized.await(); + } catch (InterruptedException ignore) { + // Ignore - it will be ready very soon. + } + } + } + + public Set stop() { + return null; + } + + private static final class HashedWheelTimeout implements Timeout { + + private static final int ST_INIT = 0; + private static final int ST_CANCELLED = 1; + private static final int ST_EXPIRED = 2; + private static final AtomicIntegerFieldUpdater STATE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state"); + + private final HashedWheelTimer timer; + private final TimerTask task; + private final long deadline; + + @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) + private volatile int state = ST_INIT; + + // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the + // HashedWheelTimeout will be added to the correct HashedWheelBucket. + long remainingRounds; + + // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list. + // As only the workerThread will act on it there is no need for synchronization / volatile. + HashedWheelTimeout next; + HashedWheelTimeout prev; + + // The bucket to which the timeout was added + HashedWheelBucket bucket; + + HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) { + this.timer = timer; + this.task = task; + this.deadline = deadline; + } + + public Timer timer() { + return timer; + } + + public TimerTask task() { + return task; + } + + public boolean cancel() { + // only update the state it will be removed from HashedWheelBucket on next tick. + if (!compareAndSetState(ST_INIT, ST_CANCELLED)) { + return false; + } + timer.cancelledTimeouts.add(this); + return true; + } + + void remove() { + HashedWheelBucket bucket = this.bucket; + if (bucket != null) { + bucket.remove(this); + } + } + + public boolean compareAndSetState(int expected, int state) { + return STATE_UPDATER.compareAndSet(this, expected, state); + } + + public int state() { + return state; + } + + public boolean isCancelled() { + return state() == ST_CANCELLED; + } + + public boolean isExpired() { + return state() == ST_EXPIRED; + } + + public void expire() { + if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { + return; + } + + try { + task.run(this); + } catch (Throwable t) { + System.out.println("A error encountered during expire(){task.run()}"); + } + } + + @Override + public String toString() { + final long currentTime = System.nanoTime(); + long remaining = deadline - currentTime + timer.startTime; + + StringBuilder buf = new StringBuilder(192) + .append('(') + .append("deadline: "); + if (remaining > 0) { + buf.append(remaining) + .append(" ns later"); + } else if (remaining < 0) { + buf.append(-remaining) + .append(" ns ago"); + } else { + buf.append("now"); + } + + if (isCancelled()) { + buf.append(", cancelled"); + } + + return buf.append(", task: ") + .append(task()) + .append(')') + .toString(); + } + } + + private static final class HashedWheelBucket { + // Used for the linked-list datastructure + private HashedWheelTimeout head; + private HashedWheelTimeout tail; + + /** + * Add {@link HashedWheelTimeout} to this bucket. + */ + public void addTimeout(HashedWheelTimeout timeout) { + assert timeout.bucket == null; + timeout.bucket = this; + if (head == null) { + head = tail = timeout; + } else { + tail.next = timeout; + timeout.prev = tail; + tail = timeout; + } + } + + /** + * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}. + */ + public void expireTimeouts(long deadline) { + HashedWheelTimeout timeout = head; + + // process all timeouts + while (timeout != null) { + HashedWheelTimeout next = timeout.next; + if (timeout.remainingRounds <= 0) { + next = remove(timeout); + if (timeout.deadline <= deadline) { + timeout.expire(); + } else { + //这里因为remaingRounds小于0了,但是他的deadline却要大于现在的时间 + // The timeout was placed into a wrong slot. This should never happen. + throw new IllegalStateException(String.format( + "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); + } + } else if (timeout.isCancelled()) { + next = remove(timeout); + } else { + timeout.remainingRounds --; + } + timeout = next; + } + } + + public HashedWheelTimeout remove(HashedWheelTimeout timeout) { + HashedWheelTimeout next = timeout.next; + // remove timeout that was either processed or cancelled by updating the linked-list + if (timeout.prev != null) { + timeout.prev.next = next; + } + if (timeout.next != null) { + timeout.next.prev = timeout.prev; + } + + if (timeout == head) { + // if timeout is also the tail we need to adjust the entry too + if (timeout == tail) { + tail = null; + head = null; + } else { + head = next; + } + } else if (timeout == tail) { + // if the timeout is the tail modify the tail to be the prev node. + tail = timeout.prev; + } + // null out prev, next and bucket to allow for GC. + timeout.prev = null; + timeout.next = null; + timeout.bucket = null; + return next; + } + + /** + * Clear this bucket and return all not expired / cancelled {@link Timeout}s. + */ + public void clearTimeouts(Set set) { + for (;;) { + HashedWheelTimeout timeout = pollTimeout(); + if (timeout == null) { + return; + } + if (timeout.isExpired() || timeout.isCancelled()) { + continue; + } + set.add(timeout); + } + } + + private HashedWheelTimeout pollTimeout() { + HashedWheelTimeout head = this.head; + if (head == null) { + return null; + } + HashedWheelTimeout next = head.next; + if (next == null) { + tail = this.head = null; + } else { + this.head = next; + next.prev = null; + } + + // null out prev and next to allow for GC. + head.next = null; + head.prev = null; + head.bucket = null; + return head; + } + } + private long startTime; + CountDownLatch startTimeInitialized = new CountDownLatch(1); + private final class Worker implements Runnable { + private final Set unprocessedTimeouts = new HashSet(); + + private long tick; + //这个线程一开始不会run,直到有第一个任务进来,通过查看状态量才开始run + public void run() { + // Initialize the startTime. + startTime = System.nanoTime(); + if (startTime == 0) { + // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. + startTime = 1; + } + + // Notify the other threads waiting for the initialization at start(). + //这是一个countdownlatch + startTimeInitialized.countDown(); + + do { + //这里得到的deadline就是代表,timeout 0) { + int idx = (int) (tick & mask); + //有一个队列里面装得全部是timeout的task,这里就是把这些task全部给remove了 + processCancelledTasks(); + HashedWheelBucket bucket = + wheel[idx]; + transferTimeoutsToBuckets(); + bucket.expireTimeouts(deadline); + tick++; + } + } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); + + // Fill the unprocessedTimeouts so we can return them from stop() method. + for (HashedWheelBucket bucket: wheel) { + bucket.clearTimeouts(unprocessedTimeouts); + } + for (;;) { + HashedWheelTimeout timeout = timeouts.poll(); + if (timeout == null) { + break; + } + if (!timeout.isCancelled()) { + unprocessedTimeouts.add(timeout); + } + } + processCancelledTasks(); + } + + private void transferTimeoutsToBuckets() { + // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just + // adds new timeouts in a loop. + for (int i = 0; i < 100000; i++) { + HashedWheelTimeout timeout = timeouts.poll(); + if (timeout == null) { + // all processed + break; + } + if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { + // Was cancelled in the meantime. + continue; + } + + long calculated = timeout.deadline / tickDuration; + timeout.remainingRounds = (calculated - tick) / wheel.length; + //如果calculate比tick小代表,已经超时了,需要被马上执行 + final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. + int stopIndex = (int) (ticks & mask); + + HashedWheelBucket bucket = wheel[stopIndex]; + bucket.addTimeout(timeout); + } + } + + private void processCancelledTasks() { + for (;;) { + HashedWheelTimeout timeout = cancelledTimeouts.poll(); + if (timeout == null) { + // all processed + break; + } + try { + timeout.remove(); + } catch (Throwable t) { + System.out.println("An exception was thrown while process a cancellation task"); + } + } + } + + /** + * calculate goal nanoTime from startTime and current tick number, + * then wait until that goal has been reached. + * @return Long.MIN_VALUE if received a shutdown request, + * current time otherwise (with Long.MIN_VALUE changed by +1) + */ + private long waitForNextTick() { + long deadline = tickDuration * (tick + 1); + + for (;;) { + final long currentTime = System.nanoTime() - startTime; + //1ms = 10^6 ns 就是这么任性 + //这里+999999,其实是为了上取整 + //然后等到时间>=deadline了开始执行业务逻辑 + long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; + + if (sleepTimeMs <= 0) { + if (currentTime == Long.MIN_VALUE) { + return -Long.MAX_VALUE; + } else { + return currentTime; + } + } + + // Check if we run on windows, as if thats the case we will need + // to round the sleepTime as workaround for a bug that only affect + // the JVM if it runs on windows. + // + // See https://github.com/netty/netty/issues/356 + + try { + //惊了,就是要看到这一句.真的用的sleep啊 + Thread.sleep(sleepTimeMs); + } catch (InterruptedException ignored) { + if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { + return Long.MIN_VALUE; + } + } + } + } + + public Set unprocessedTimeouts() { + return Collections.unmodifiableSet(unprocessedTimeouts); + } + } +} diff --git a/src/main/java/Utils/TestTimeWheel.java b/src/main/java/Utils/TestTimeWheel.java new file mode 100644 index 0000000..9545390 --- /dev/null +++ b/src/main/java/Utils/TestTimeWheel.java @@ -0,0 +1,17 @@ +package Utils; + +import java.util.concurrent.TimeUnit; + +/** + * Created by jrj on 17-10-31. + */ +public class TestTimeWheel { + public static void main(String[] args){ + HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(); + hashedWheelTimer.newTimeout(new TimerTask() { + public void run(Timeout timeout) throws Exception { + System.out.println("fuck"); + } + },3000, TimeUnit.MILLISECONDS); + } +} diff --git a/src/main/java/Utils/Timeout.java b/src/main/java/Utils/Timeout.java new file mode 100644 index 0000000..860186d --- /dev/null +++ b/src/main/java/Utils/Timeout.java @@ -0,0 +1,37 @@ +package Utils; + +/** + * Created by jrj on 17-10-31. + */ +public interface Timeout { + /** + * Returns the {@link Timer} that created this handle. + */ + Timer timer(); + + /** + * Returns the {@link TimerTask} which is associated with this handle. + */ + TimerTask task(); + + /** + * Returns {@code true} if and only if the {@link TimerTask} associated + * with this handle has been expired. + */ + boolean isExpired(); + + /** + * Returns {@code true} if and only if the {@link TimerTask} associated + * with this handle has been cancelled. + */ + boolean isCancelled(); + + /** + * Attempts to cancel the {@link TimerTask} associated with this handle. + * If the task has been executed or cancelled already, it will return with + * no side effect. + * + * @return True if the cancellation completed successfully, otherwise false + */ + boolean cancel(); +} diff --git a/src/main/java/Utils/Timer.java b/src/main/java/Utils/Timer.java new file mode 100644 index 0000000..4c81f8b --- /dev/null +++ b/src/main/java/Utils/Timer.java @@ -0,0 +1,21 @@ +package Utils; + +import java.util.Set; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * Created by jrj on 17-10-31. + */ +public interface Timer { + Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); + + /** + * Releases all resources acquired by this {@link Timer} and cancels all + * tasks which were scheduled but not executed yet. + * + * @return the handles associated with the tasks which were canceled by + * this method + */ + Set stop(); +} diff --git a/src/main/java/Utils/TimerTask.java b/src/main/java/Utils/TimerTask.java new file mode 100644 index 0000000..45a6a56 --- /dev/null +++ b/src/main/java/Utils/TimerTask.java @@ -0,0 +1,8 @@ +package Utils; + +/** + * Created by jrj on 17-10-31. + */ +public interface TimerTask { + void run(Timeout timeout) throws Exception; +} diff --git a/src/main/java/kcp/KCP.java b/src/main/java/kcp/KCP.java new file mode 100644 index 0000000..856a9e3 --- /dev/null +++ b/src/main/java/kcp/KCP.java @@ -0,0 +1,983 @@ +//===================================================================== +// +// KCP - A Better ARQ Protocol Implementation +// skywind3000 (at) gmail.com, 2010-2011 +// +// Features: +// + Average RTT reduce 30% - 40% vs traditional ARQ like tcp. +// + Maximum RTT reduce three times vs tcp. +// + Lightweight, distributed as a single source file. +// +//===================================================================== +package kcp; + +import java.util.ArrayList; + +public abstract class KCP { + + //===================================================================== + // KCP BASIC + //===================================================================== + public final int IKCP_RTO_NDL = 30; // no delay min rto + public final int IKCP_RTO_MIN = 100; // normal min rto + public final int IKCP_RTO_DEF = 200; + public final int IKCP_RTO_MAX = 60000; + public final int IKCP_CMD_PUSH = 81; // cmd: push data + public final int IKCP_CMD_ACK = 82; // cmd: ack + public final int IKCP_CMD_WASK = 83; // cmd: window probe (ask) + public final int IKCP_CMD_WINS = 84; // cmd: window size (tell) + public final int IKCP_ASK_SEND = 1; // need to send IKCP_CMD_WASK + public final int IKCP_ASK_TELL = 2; // need to send IKCP_CMD_WINS + public final int IKCP_WND_SND = 32; + public final int IKCP_WND_RCV = 32; + public final int IKCP_MTU_DEF = 1400; + public final int IKCP_ACK_FAST = 3; + public final int IKCP_INTERVAL = 100; + public final int IKCP_OVERHEAD = 24; + public final int IKCP_DEADLINK = 10; + public final int IKCP_THRESH_INIT = 2; + public final int IKCP_THRESH_MIN = 2; + public final int IKCP_PROBE_INIT = 7000; // 7 secs to probe window size + public final int IKCP_PROBE_LIMIT = 120000; // up to 120 secs to probe window + + protected abstract void output(byte[] buffer, int size); // 需具体实现 + + // encode 8 bits unsigned int + public static void ikcp_encode8u(byte[] p, int offset, byte c) { + p[0 + offset] = c; + } + + // decode 8 bits unsigned int + public static byte ikcp_decode8u(byte[] p, int offset) { + return p[0 + offset]; + } + + /* encode 16 bits unsigned int (msb) */ + public static void ikcp_encode16u(byte[] p, int offset, int w) { + p[offset + 0] = (byte) (w >> 8); + p[offset + 1] = (byte) (w >> 0); + } + + /* decode 16 bits unsigned int (msb) */ + public static int ikcp_decode16u(byte[] p, int offset) { + int ret = (p[offset + 0] & 0xFF) << 8 + | (p[offset + 1] & 0xFF); + return ret; + } + + /* encode 32 bits unsigned int (msb) */ + public static void ikcp_encode32u(byte[] p, int offset, long l) { + p[offset + 0] = (byte) (l >> 24); + p[offset + 1] = (byte) (l >> 16); + p[offset + 2] = (byte) (l >> 8); + p[offset + 3] = (byte) (l >> 0); + } + + /* decode 32 bits unsigned int (msb) */ + public static long ikcp_decode32u(byte[] p, int offset) { + long ret = (p[offset + 0] & 0xFFL) << 24 + | (p[offset + 1] & 0xFFL) << 16 + | (p[offset + 2] & 0xFFL) << 8 + | p[offset + 3] & 0xFFL; + return ret; + } + + public static void slice(ArrayList list, int start, int stop) { + int size = list.size(); + for (int i = 0; i < size; ++i) { + if (i < stop - start) { + list.set(i, list.get(i + start)); + } else { + list.remove(stop - start); + } + } + } + + static long _imin_(long a, long b) { + return a <= b ? a : b; + } + + static long _imax_(long a, long b) { + return a >= b ? a : b; + } + + static long _ibound_(long lower, long middle, long upper) { + return _imin_(_imax_(lower, middle), upper); + } + + static int _itimediff(long later, long earlier) { + return ((int) (later - earlier)); + } + + private class Segment { + + protected long conv = 0; + protected long cmd = 0; + protected long frg = 0; + protected long wnd = 0; + protected long ts = 0; + protected long sn = 0; + protected long una = 0; + protected long resendts = 0; + protected long rto = 0; + protected long fastack = 0; + protected long xmit = 0; + protected byte[] data; + + protected Segment(int size) { + this.data = new byte[size]; + } + + //--------------------------------------------------------------------- + // ikcp_encode_seg + //--------------------------------------------------------------------- + // encode a segment into buffer + protected int encode(byte[] ptr, int offset) { + int offset_ = offset; + + ikcp_encode32u(ptr, offset, conv); + offset += 4; + ikcp_encode8u(ptr, offset, (byte) cmd); + offset += 1; + ikcp_encode8u(ptr, offset, (byte) frg); + offset += 1; + ikcp_encode16u(ptr, offset, (int) wnd); + offset += 2; + ikcp_encode32u(ptr, offset, ts); + offset += 4; + ikcp_encode32u(ptr, offset, sn); + offset += 4; + ikcp_encode32u(ptr, offset, una); + offset += 4; + ikcp_encode32u(ptr, offset, (long) data.length); + offset += 4; + + return offset - offset_; + } + } + + long conv = 0; + //long user = user; + long snd_una = 0; + long snd_nxt = 0; + long rcv_nxt = 0; + long ts_recent = 0; + long ts_lastack = 0; + long ts_probe = 0; + long probe_wait = 0; + long snd_wnd = IKCP_WND_SND; + long rcv_wnd = IKCP_WND_RCV; + long rmt_wnd = IKCP_WND_RCV; + long cwnd = 0; + long incr = 0; + long probe = 0; + long mtu = IKCP_MTU_DEF; + long mss = this.mtu - IKCP_OVERHEAD; + byte[] buffer = new byte[(int) (mtu + IKCP_OVERHEAD) * 3]; + ArrayList nrcv_buf = new ArrayList(128); + ArrayList nsnd_buf = new ArrayList(128); + ArrayList nrcv_que = new ArrayList(128); + ArrayList nsnd_que = new ArrayList(128); + long state = 0; + ArrayList acklist = new ArrayList(128); + //long ackblock = 0; + //long ackcount = 0; + long rx_srtt = 0; + long rx_rttval = 0; + long rx_rto = IKCP_RTO_DEF; + long rx_minrto = IKCP_RTO_MIN; + long current = 0; + long interval = IKCP_INTERVAL; + long ts_flush = IKCP_INTERVAL; + long nodelay = 0; + long updated = 0; + long logmask = 0; + long ssthresh = IKCP_THRESH_INIT; + long fastresend = 0; + long nocwnd = 0; + long xmit = 0; + long dead_link = IKCP_DEADLINK; + //long output = NULL; + //long writelog = NULL; + + public KCP(long conv_) { + conv = conv_; + } + + //--------------------------------------------------------------------- + // user/upper level recv: returns size, returns below zero for EAGAIN + //--------------------------------------------------------------------- + // 将接收队列中的数据传递给上层引用 + public int Recv(byte[] buffer) { + + if (0 == nrcv_que.size()) { + return -1; + } + + int peekSize = PeekSize(); + if (0 > peekSize) { + return -2; + } + + if (peekSize > buffer.length) { + return -3; + } + + boolean recover = false; + if (nrcv_que.size() >= rcv_wnd) { + recover = true; + } + + // merge fragment. + int count = 0; + int n = 0; + for (Segment seg : nrcv_que) { + System.arraycopy(seg.data, 0, buffer, n, seg.data.length); + n += seg.data.length; + count++; + if (0 == seg.frg) { + break; + } + } + + if (0 < count) { + slice(nrcv_que, count, nrcv_que.size()); + } + + // move available data from rcv_buf -> nrcv_que + count = 0; + for (Segment seg : nrcv_buf) { + if (seg.sn == rcv_nxt && nrcv_que.size() < rcv_wnd) { + nrcv_que.add(seg); + rcv_nxt++; + count++; + } else { + break; + } + } + + if (0 < count) { + slice(nrcv_buf, count, nrcv_buf.size()); + } + + // fast recover + if (nrcv_que.size() < rcv_wnd && recover) { + // ready to send back IKCP_CMD_WINS in ikcp_flush + // tell remote my window size + probe |= IKCP_ASK_TELL; + } + + return n; + } + + //--------------------------------------------------------------------- + // peek data size + //--------------------------------------------------------------------- + // check the size of next message in the recv queue + // 计算接收队列中有多少可用的数据 + public int PeekSize() { + if (0 == nrcv_que.size()) { + return -1; + } + + Segment seq = nrcv_que.get(0); + + if (0 == seq.frg) { + return seq.data.length; + } + + if (nrcv_que.size() < seq.frg + 1) { + return -1; + } + + int length = 0; + + for (Segment item : nrcv_que) { + length += item.data.length; + if (0 == item.frg) { + break; + } + } + + return length; + } + + //--------------------------------------------------------------------- + // user/upper level send, returns below zero for error + //--------------------------------------------------------------------- + // 上层要发送的数据丢给发送队列,发送队列会根据mtu大小分片 + public int Send(byte[] buffer) { + + if (0 == buffer.length) { + return -1; + } + + int count; + + // 根据mss大小分片 + if (buffer.length < mss) { + count = 1; + } else { + count = (int) (buffer.length + mss - 1) / (int) mss; + } + + if (255 < count) { + return -2; + } + + if (0 == count) { + count = 1; + } + + int offset = 0; + + // 分片后加入到发送队列 + int length = buffer.length; + for (int i = 0; i < count; i++) { + int size = (int) (length > mss ? mss : length); + Segment seg = new Segment(size); + System.arraycopy(buffer, offset, seg.data, 0, size); + offset += size; + seg.frg = count - i - 1; + nsnd_que.add(seg); + length -= size; + } + return 0; + } + + //--------------------------------------------------------------------- + // parse ack + //--------------------------------------------------------------------- + void update_ack(int rtt) { + if (0 == rx_srtt) { + rx_srtt = rtt; + rx_rttval = rtt / 2; + } else { + int delta = (int) (rtt - rx_srtt); + if (0 > delta) { + delta = -delta; + } + + rx_rttval = (3 * rx_rttval + delta) / 4; + rx_srtt = (7 * rx_srtt + rtt) / 8; + if (rx_srtt < 1) { + rx_srtt = 1; + } + } + + int rto = (int) (rx_srtt + _imax_(1, 4 * rx_rttval)); + rx_rto = _ibound_(rx_minrto, rto, IKCP_RTO_MAX); + } + + // 计算本地真实snd_una + void shrink_buf() { + if (nsnd_buf.size() > 0) { + snd_una = nsnd_buf.get(0).sn; + } else { + snd_una = snd_nxt; + } + } + + // 对端返回的ack, 确认发送成功时,对应包从发送缓存中移除 + void parse_ack(long sn) { + if (_itimediff(sn, snd_una) < 0 || _itimediff(sn, snd_nxt) >= 0) { + return; + } + + int index = 0; + for (Segment seg : nsnd_buf) { + if (_itimediff(sn, seg.sn) < 0) { + break; + } + + // 原版ikcp_parse_fastack&ikcp_parse_ack逻辑重复 + seg.fastack++; + + if (sn == seg.sn) { + nsnd_buf.remove(index); + break; + } + index++; + } + } + + // 通过对端传回的una将已经确认发送成功包从发送缓存中移除 + void parse_una(long una) { + int count = 0; + for (Segment seg : nsnd_buf) { + if (_itimediff(una, seg.sn) > 0) { + count++; + } else { + break; + } + } + + if (0 < count) { + slice(nsnd_buf, count, nsnd_buf.size()); + } + } + + //--------------------------------------------------------------------- + // ack append + //--------------------------------------------------------------------- + // 收数据包后需要给对端回ack,flush时发送出去 + void ack_push(long sn, long ts) { + // c原版实现中按*2扩大容量 + acklist.add(sn); + acklist.add(ts); + } + + //--------------------------------------------------------------------- + // parse data + //--------------------------------------------------------------------- + // 用户数据包解析 + void parse_data(Segment newseg) { + long sn = newseg.sn; + boolean repeat = false; + + if (_itimediff(sn, rcv_nxt + rcv_wnd) >= 0 || _itimediff(sn, rcv_nxt) < 0) { + return; + } + + int n = nrcv_buf.size() - 1; + int after_idx = -1; + + // 判断是否是重复包,并且计算插入位置 + for (int i = n; i >= 0; i--) { + Segment seg = nrcv_buf.get(i); + if (seg.sn == sn) { + repeat = true; + break; + } + + if (_itimediff(sn, seg.sn) > 0) { + after_idx = i; + break; + } + } + + // 如果不是重复包,则插入 + if (!repeat) { + if (after_idx == -1) { + nrcv_buf.add(0, newseg); + } else { + nrcv_buf.add(after_idx + 1, newseg); + } + } + + // move available data from nrcv_buf -> nrcv_que + // 将连续包加入到接收队列 + int count = 0; + for (Segment seg : nrcv_buf) { + if (seg.sn == rcv_nxt && nrcv_que.size() < rcv_wnd) { + nrcv_que.add(seg); + rcv_nxt++; + count++; + } else { + break; + } + } + + // 从接收缓存中移除 + if (0 < count) { + slice(nrcv_buf, count, nrcv_buf.size()); + } + } + + // when you received a low level packet (eg. UDP packet), call it + //--------------------------------------------------------------------- + // input data + //--------------------------------------------------------------------- + // 底层收包后调用,再由上层通过Recv获得处理后的数据 + public int Input(byte[] data) { + + long s_una = snd_una; + if (data.length < IKCP_OVERHEAD) { + return 0; + } + + int offset = 0; + + while (true) { + long ts, sn, length, una, conv_; + int wnd; + byte cmd, frg; + + if (data.length - offset < IKCP_OVERHEAD) { + break; + } + + conv_ = ikcp_decode32u(data, offset); + offset += 4; + if (conv != conv_) { + return -1; + } + + cmd = ikcp_decode8u(data, offset); + offset += 1; + frg = ikcp_decode8u(data, offset); + offset += 1; + wnd = ikcp_decode16u(data, offset); + offset += 2; + ts = ikcp_decode32u(data, offset); + offset += 4; + sn = ikcp_decode32u(data, offset); + offset += 4; + una = ikcp_decode32u(data, offset); + offset += 4; + length = ikcp_decode32u(data, offset); + offset += 4; + + if (data.length - offset < length) { + return -2; + } + + if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK && cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS) { + return -3; + } + + rmt_wnd = (long) wnd; + parse_una(una); + shrink_buf(); + + if (IKCP_CMD_ACK == cmd) { + if (_itimediff(current, ts) >= 0) { + update_ack(_itimediff(current, ts)); + } + parse_ack(sn); + shrink_buf(); + } else if (IKCP_CMD_PUSH == cmd) { + if (_itimediff(sn, rcv_nxt + rcv_wnd) < 0) { + ack_push(sn, ts); + if (_itimediff(sn, rcv_nxt) >= 0) { + Segment seg = new Segment((int) length); + seg.conv = conv_; + seg.cmd = cmd; + seg.frg = frg; + seg.wnd = wnd; + seg.ts = ts; + seg.sn = sn; + seg.una = una; + + if (length > 0) { + System.arraycopy(data, offset, seg.data, 0, (int) length); + } + + parse_data(seg); + } + } + } else if (IKCP_CMD_WASK == cmd) { + // ready to send back IKCP_CMD_WINS in Ikcp_flush + // tell remote my window size + probe |= IKCP_ASK_TELL; + } else if (IKCP_CMD_WINS == cmd) { + // do nothing + } else { + return -3; + } + + offset += (int) length; + } + + if (_itimediff(snd_una, s_una) > 0) { + if (cwnd < rmt_wnd) { + long mss_ = mss; + if (cwnd < ssthresh) { + cwnd++; + incr += mss_; + } else { + if (incr < mss_) { + incr = mss_; + } + incr += (mss_ * mss_) / incr + (mss_ / 16); + if ((cwnd + 1) * mss_ <= incr) { + cwnd++; + } + } + if (cwnd > rmt_wnd) { + cwnd = rmt_wnd; + incr = rmt_wnd * mss_; + } + } + } + + return 0; + } + + // 接收窗口可用大小 + int wnd_unused() { + if (nrcv_que.size() < rcv_wnd) { + return (int) (int) rcv_wnd - nrcv_que.size(); + } + return 0; + } + + //--------------------------------------------------------------------- + // ikcp_flush + //--------------------------------------------------------------------- + void flush() { + long current_ = current; + byte[] buffer_ = buffer; + int change = 0; + int lost = 0; + + // 'ikcp_update' haven't been called. + if (0 == updated) { + return; + } + + Segment seg = new Segment(0); + seg.conv = conv; + seg.cmd = IKCP_CMD_ACK; + seg.wnd = (long) wnd_unused(); + seg.una = rcv_nxt; + + // flush acknowledges + // 将acklist中的ack发送出去 + int count = acklist.size() / 2; + int offset = 0; + for (int i = 0; i < count; i++) { + if (offset + IKCP_OVERHEAD > mtu) { + output(buffer, offset); + offset = 0; + } + // ikcp_ack_get + seg.sn = acklist.get(i * 2 + 0); + seg.ts = acklist.get(i * 2 + 1); + offset += seg.encode(buffer, offset); + } + acklist.clear(); + + // probe window size (if remote window size equals zero) + // rmt_wnd=0时,判断是否需要请求对端接收窗口 + if (0 == rmt_wnd) { + if (0 == probe_wait) { + probe_wait = IKCP_PROBE_INIT; + ts_probe = current + probe_wait; + } else { + // 逐步扩大请求时间间隔 + if (_itimediff(current, ts_probe) >= 0) { + if (probe_wait < IKCP_PROBE_INIT) { + probe_wait = IKCP_PROBE_INIT; + } + probe_wait += probe_wait / 2; + if (probe_wait > IKCP_PROBE_LIMIT) { + probe_wait = IKCP_PROBE_LIMIT; + } + ts_probe = current + probe_wait; + probe |= IKCP_ASK_SEND; + } + } + } else { + ts_probe = 0; + probe_wait = 0; + } + + // flush window probing commands + // 请求对端接收窗口 + if ((probe & IKCP_ASK_SEND) != 0) { + seg.cmd = IKCP_CMD_WASK; + if (offset + IKCP_OVERHEAD > mtu) { + output(buffer, offset); + offset = 0; + } + offset += seg.encode(buffer, offset); + } + + // flush window probing commands(c#) + // 告诉对端自己的接收窗口 + if ((probe & IKCP_ASK_TELL) != 0) { + seg.cmd = IKCP_CMD_WINS; + if (offset + IKCP_OVERHEAD > mtu) { + output(buffer, offset); + offset = 0; + } + offset += seg.encode(buffer, offset); + } + + probe = 0; + + // calculate window size + long cwnd_ = _imin_(snd_wnd, rmt_wnd); + // 如果采用拥塞控制 + if (0 == nocwnd) { + cwnd_ = _imin_(cwnd, cwnd_); + } + + count = 0; + // move data from snd_queue to snd_buf + for (Segment nsnd_que1 : nsnd_que) { + if (_itimediff(snd_nxt, snd_una + cwnd_) >= 0) { + break; + } + Segment newseg = nsnd_que1; + newseg.conv = conv; + newseg.cmd = IKCP_CMD_PUSH; + newseg.wnd = seg.wnd; + newseg.ts = current_; + newseg.sn = snd_nxt; + newseg.una = rcv_nxt; + newseg.resendts = current_; + newseg.rto = rx_rto; + newseg.fastack = 0; + newseg.xmit = 0; + nsnd_buf.add(newseg); + snd_nxt++; + count++; + } + + if (0 < count) { + slice(nsnd_que, count, nsnd_que.size()); + } + + // calculate resent + long resent = (fastresend > 0) ? fastresend : 0xffffffff; + long rtomin = (nodelay == 0) ? (rx_rto >> 3) : 0; + + // flush data segments + for (Segment segment : nsnd_buf) { + boolean needsend = false; + if (0 == segment.xmit) { + // 第一次传输 + needsend = true; + segment.xmit++; + segment.rto = rx_rto; + segment.resendts = current_ + segment.rto + rtomin; + } else if (_itimediff(current_, segment.resendts) >= 0) { + // 丢包重传 + needsend = true; + segment.xmit++; + xmit++; + if (0 == nodelay) { + segment.rto += rx_rto; + } else { + segment.rto += rx_rto / 2; + } + segment.resendts = current_ + segment.rto; + lost = 1; + } else if (segment.fastack >= resent) { + // 快速重传 + needsend = true; + segment.xmit++; + segment.fastack = 0; + segment.resendts = current_ + segment.rto; + change++; + } + + if (needsend) { + segment.ts = current_; + segment.wnd = seg.wnd; + segment.una = rcv_nxt; + + int need = IKCP_OVERHEAD + segment.data.length; + if (offset + need >= mtu) { + output(buffer, offset); + offset = 0; + } + + offset += segment.encode(buffer, offset); + if (segment.data.length > 0) { + System.arraycopy(segment.data, 0, buffer, offset, segment.data.length); + offset += segment.data.length; + } + + if (segment.xmit >= dead_link) { + state = -1; // state = 0(c#) + } + } + } + + // flash remain segments + if (offset > 0) { + output(buffer, offset); + } + + // update ssthresh + // 拥塞避免 + if (change != 0) { + long inflight = snd_nxt - snd_una; + ssthresh = inflight / 2; + if (ssthresh < IKCP_THRESH_MIN) { + ssthresh = IKCP_THRESH_MIN; + } + cwnd = ssthresh + resent; + incr = cwnd * mss; + } + + if (lost != 0) { + ssthresh = cwnd / 2; + if (ssthresh < IKCP_THRESH_MIN) { + ssthresh = IKCP_THRESH_MIN; + } + cwnd = 1; + incr = mss; + } + + if (cwnd < 1) { + cwnd = 1; + incr = mss; + } + } + + //--------------------------------------------------------------------- + // update state (call it repeatedly, every 10ms-100ms), or you can ask + // ikcp_check when to call it again (without ikcp_input/_send calling). + // 'current' - current timestamp in millisec. + //--------------------------------------------------------------------- + public void Update(long current_) { + + current = current_; + + // 首次调用Update + if (0 == updated) { + updated = 1; + ts_flush = current; + } + + // 两次更新间隔 + int slap = _itimediff(current, ts_flush); + + // interval设置过大或者Update调用间隔太久 + if (slap >= 10000 || slap < -10000) { + ts_flush = current; + slap = 0; + } + + // flush同时设置下一次更新时间 + if (slap >= 0) { + ts_flush += interval; + if (_itimediff(current, ts_flush) >= 0) { + ts_flush = current + interval; + } + flush(); + } + } + + //--------------------------------------------------------------------- + // Determine when should you invoke ikcp_update: + // returns when you should invoke ikcp_update in millisec, if there + // is no ikcp_input/_send calling. you can call ikcp_update in that + // time, instead of call update repeatly. + // Important to reduce unnacessary ikcp_update invoking. use it to + // schedule ikcp_update (eg. implementing an epoll-like mechanism, + // or optimize ikcp_update when handling massive kcp connections) + //--------------------------------------------------------------------- + public long Check(long current_) { + + long ts_flush_ = ts_flush; + long tm_flush = 0x7fffffff; + long tm_packet = 0x7fffffff; + long minimal; + + if (0 == updated) { + return current_; + } + + if (_itimediff(current_, ts_flush_) >= 10000 || _itimediff(current_, ts_flush_) < -10000) { + ts_flush_ = current_; + } + + if (_itimediff(current_, ts_flush_) >= 0) { + return current_; + } + + tm_flush = _itimediff(ts_flush_, current_); + + for (Segment seg : nsnd_buf) { + int diff = _itimediff(seg.resendts, current_); + if (diff <= 0) { + return current_; + } + if (diff < tm_packet) { + tm_packet = diff; + } + } + + minimal = tm_packet < tm_flush ? tm_packet : tm_flush; + if (minimal >= interval) { + minimal = interval; + } + + return current_ + minimal; + } + + // change MTU size, default is 1400 + public int SetMtu(int mtu_) { + if (mtu_ < 50 || mtu_ < (int) IKCP_OVERHEAD) { + return -1; + } + + byte[] buffer_ = new byte[(mtu_ + IKCP_OVERHEAD) * 3]; + if (null == buffer_) { + return -2; + } + + mtu = (long) mtu_; + mss = mtu - IKCP_OVERHEAD; + buffer = buffer_; + return 0; + } + + public int Interval(int interval_) { + if (interval_ > 5000) { + interval_ = 5000; + } else if (interval_ < 10) { + interval_ = 10; + } + interval = (long) interval_; + return 0; + } + + // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1) + // nodelay: 0:disable(default), 1:enable + // interval: internal update timer interval in millisec, default is 100ms + // resend: 0:disable fast resend(default), 1:enable fast resend + // nc: 0:normal congestion control(default), 1:disable congestion control + public int NoDelay(int nodelay_, int interval_, int resend_, int nc_) { + + if (nodelay_ >= 0) { + nodelay = nodelay_; + if (nodelay_ != 0) { + rx_minrto = IKCP_RTO_NDL; + } else { + rx_minrto = IKCP_RTO_MIN; + } + } + + if (interval_ >= 0) { + if (interval_ > 5000) { + interval_ = 5000; + } else if (interval_ < 10) { + interval_ = 10; + } + interval = interval_; + } + + if (resend_ >= 0) { + fastresend = resend_; + } + + if (nc_ >= 0) { + nocwnd = nc_; + } + + return 0; + } + + // set maximum window size: sndwnd=32, rcvwnd=32 by default + public int WndSize(int sndwnd, int rcvwnd) { + if (sndwnd > 0) { + snd_wnd = (long) sndwnd; + } + + if (rcvwnd > 0) { + rcv_wnd = (long) rcvwnd; + } + return 0; + } + + // get how many packet is waiting to be sent + public int WaitSnd() { + return nsnd_buf.size() + nsnd_que.size(); + } +} \ No newline at end of file diff --git a/src/main/java/state/Candidate.java b/src/main/java/state/Candidate.java new file mode 100644 index 0000000..36685bc --- /dev/null +++ b/src/main/java/state/Candidate.java @@ -0,0 +1,23 @@ +package state; + +import org.jgroups.Message; +import org.jgroups.View; +import worker.MainWorker; + +/** + * Created by jrj on 17-10-30. + */ +public class Candidate extends State { + public Candidate(){ + curState = CANDIDATE; + } + + public void fireWhenViewAccepted(View new_view, MainWorker mainWorker) { + + } + + public void fireWhenMessageReceived(Message msg, MainWorker mainWorker) { + + } + +} diff --git a/src/main/java/state/Follower.java b/src/main/java/state/Follower.java new file mode 100644 index 0000000..0c342fa --- /dev/null +++ b/src/main/java/state/Follower.java @@ -0,0 +1,29 @@ +package state; + +import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.View; +import worker.MainWorker; + +/** + * Created by jrj on 17-10-30. + */ +public class Follower extends State { + public Follower(){ + curState = FOLLOWER; + Message message = new Message(null,jChannel.getAddress(),jChannel.getAddressAsString()+";Join in Group"); + try { + jChannel.send(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void fireWhenViewAccepted(View new_view, MainWorker mainWorker) { + + } + + public void fireWhenMessageReceived(Message msg, MainWorker mainWorker) { + + } +} diff --git a/src/main/java/state/Leader.java b/src/main/java/state/Leader.java new file mode 100644 index 0000000..6fb3686 --- /dev/null +++ b/src/main/java/state/Leader.java @@ -0,0 +1,28 @@ +package state; + +import org.jgroups.Message; +import org.jgroups.View; +import worker.MainWorker; + +import java.util.HashMap; +import java.util.WeakHashMap; + +/** + * Created by jrj on 17-10-30. + */ + +public class Leader extends State { + public Leader(){ + curState = LEADER; + WeakHashMap hashMap = new WeakHashMap(); + hashMap.clear(); + } + + public void fireWhenViewAccepted(View new_view, MainWorker mainWorker) { + + } + + public void fireWhenMessageReceived(Message msg, MainWorker mainWorker) { + + } +} diff --git a/src/main/java/state/State.java b/src/main/java/state/State.java new file mode 100644 index 0000000..f742c7e --- /dev/null +++ b/src/main/java/state/State.java @@ -0,0 +1,24 @@ +package state; + +import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.View; +import worker.MainWorker; +/** + * Created by jrj on 17-10-30. + */ +public abstract class State { + protected static JChannel jChannel; + protected static int FOLLOWER = 0; + protected static int CANDIDATE = 1; + protected static int LEADER = 2; + protected static long term = 0; + + protected int curState; + public static void setjChannel(JChannel jChannel){ + State.jChannel = jChannel; + } + + public abstract void fireWhenViewAccepted(View new_view,MainWorker mainWorker); + public abstract void fireWhenMessageReceived(Message msg,MainWorker mainWorker); +} diff --git a/src/main/java/worker/MainWorker.java b/src/main/java/worker/MainWorker.java new file mode 100644 index 0000000..72b6529 --- /dev/null +++ b/src/main/java/worker/MainWorker.java @@ -0,0 +1,39 @@ +package worker; + +import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.ReceiverAdapter; +import org.jgroups.View; +import state.Follower; +import state.State; + +/** + * Created by jrj on 17-10-30. + */ +public class MainWorker extends ReceiverAdapter { + private static final int clusterSize = 3; + private State current; + JChannel channel; + + public MainWorker() throws Exception{ + current = new Follower(); + channel = new JChannel(); + channel.setReceiver(this); + channel.connect("Cluster"); + State.setjChannel(channel); + } + + public void setState(State state){ + current = state; + } + + @Override + public void viewAccepted(View new_view) { + current.fireWhenViewAccepted(new_view,this); + } + + @Override + public void receive(Message msg) { + current.fireWhenMessageReceived(msg,this); + } +}