Skip to content

Commit

Permalink
added test and enum findby id
Browse files Browse the repository at this point in the history
  • Loading branch information
robertroeser committed May 20, 2016
1 parent 3bf3126 commit 206d2a2
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 6 deletions.
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Wed Dec 02 15:47:21 PST 2015
#Thu May 19 16:56:49 PDT 2016
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-all.zip
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ repositories {
maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' }
}

sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8

dependencies {
compile project(':hystrix-core')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@ public Observable<Payload> get() {

public static EventStreamEnum findByTypeId(int typeId) {
return Arrays
.asList(EventStreamEnum.findByTypeId(typeId))
.asList(EventStreamEnum.values())
.stream()
.filter(t -> t.typeId == typeId)
.findAny()
.orElseThrow(() -> new IllegalStateException("no type id found for id => " + typeId));
}

public int getTypeId() {
return typeId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ public Publisher<Payload> handleSubscription(Payload payload) {
logger.error(t.getMessage(), t);
return Observable.error(t);
}
});
})
.onBackpressureDrop();

return RxReactiveStreams.toPublisher(defer);
return RxReactiveStreams
.toPublisher(defer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public ByteBuffer getMetadata() {

subject.onNext(p);
})
);
)
.retry()
.subscribe();
}

public boolean filter(T t) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.netflix.hystrix.contrib.reactivesocket;


public class EventStreamEnumTest {
public void test() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package com.netflix.hystrix.contrib.reactivesocket;


import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import io.reactivesocket.Frame;
import io.reactivesocket.Payload;
import org.agrona.BitUtil;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import rx.schedulers.Schedulers;

import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class EventStreamRequestHandlerTest {
@Test
public void testEventStream() throws Exception {
Payload payload = new Payload() {
@Override
public ByteBuffer getData() {
return ByteBuffer
.allocate(BitUtil.SIZE_OF_INT)
.putInt(EventStreamEnum.METRICS_STREAM.getTypeId());
}

@Override
public ByteBuffer getMetadata() {
return Frame.NULL_BYTEBUFFER;
}
};

Schedulers
.io()
.createWorker()
.schedulePeriodically(() -> {
TestCommand testCommand = new TestCommand();
testCommand.execute();
}, 0, 1, TimeUnit.MILLISECONDS);

CountDownLatch latch = new CountDownLatch(1);
CountDownLatch latch1 = new CountDownLatch(5);
CountDownLatch latch2 = new CountDownLatch(15);

AtomicReference<Subscription> subscriptionAtomicReference = new AtomicReference<>();

EventStreamRequestHandler handler = new EventStreamRequestHandler();
Publisher<Payload> payloadPublisher = handler.handleSubscription(payload);

payloadPublisher
.subscribe(new Subscriber<Payload>() {
@Override
public void onSubscribe(Subscription s) {
subscriptionAtomicReference.set(s);
latch.countDown();
}

@Override
public void onNext(Payload payload) {
ByteBuffer data = payload.getData();
String s = new String(data.array());

System.out.println(s);

latch1.countDown();
latch2.countDown();
}

@Override
public void onError(Throwable t) {

}

@Override
public void onComplete() {

}
});

latch.await();

Subscription subscription = subscriptionAtomicReference.get();
subscription.request(5);

latch1.await();

long count = latch2.getCount();
Assert.assertTrue(count < 15);

subscription.request(100);

latch2.await();

}

class TestCommand extends HystrixCommand<Boolean> {
protected TestCommand() {
super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest"));
}

@Override
protected Boolean run() throws Exception {
return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.netflix.hystrix.contrib.reactivesocket.metrics;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;

public class HystrixCollasperMetricsStreamTest {

@Test
public void test() throws Exception {
CountDownLatch latch = new CountDownLatch(21);
HystrixCommandMetricsStream
.getInstance()
.get()
.subscribe(payload -> {
ByteBuffer data = payload.getData();
String s = new String(data.array());

System.out.println(s);
latch.countDown();
});


for (int i = 0; i < 20; i++) {
TestCommand test = new TestCommand();

test.execute();
latch.countDown();
}

latch.await();
}

class TestCommand extends HystrixCommand<Boolean> {
protected TestCommand() {
super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest"));
}

@Override
protected Boolean run() throws Exception {
return true;
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.netflix.hystrix.contrib.reactivesocket.metrics;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;

/**
* Created by rroeser on 5/19/16.
*/
public class HystrixCommandMetricsStreamTest {
@Test
public void test() throws Exception {
CountDownLatch latch = new CountDownLatch(23);
HystrixCommandMetricsStream
.getInstance()
.get()
.subscribe(payload -> {
ByteBuffer data = payload.getData();
String s = new String(data.array());

System.out.println(s);
latch.countDown();
});

for (int i = 0; i < 20; i++) {
TestCommand test = new TestCommand(latch);

test.execute();
}

latch.await();
}

class TestCommand extends HystrixCommand<Boolean> {
CountDownLatch latch;
protected TestCommand(CountDownLatch latch) {
super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest"));
this.latch = latch;
}

@Override
protected Boolean run() throws Exception {
latch.countDown();
return true;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.netflix.hystrix.contrib.reactivesocket.sample;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCommandMetricsStream;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;

/**
* Created by rroeser on 5/19/16.
*/
public class HystrixConfigStreamTest {
@Test
public void test() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
HystrixCommandMetricsStream
.getInstance()
.get()
.subscribe(payload -> {
ByteBuffer data = payload.getData();
String s = new String(data.array());

System.out.println(s);
latch.countDown();
});


for (int i = 0; i < 20; i++) {
TestCommand test = new TestCommand();

test.execute();
}

latch.await();
}

class TestCommand extends HystrixCommand<Boolean> {
protected TestCommand() {
super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest"));
}

@Override
protected Boolean run() throws Exception {
System.out.println("IM A HYSTRIX COMMAND!!!!!");
return true;
}
}
}

0 comments on commit 206d2a2

Please sign in to comment.