Skip to content

Commit

Permalink
New - new demo 'reactive-demo'
Browse files Browse the repository at this point in the history
  • Loading branch information
vinny-constantine authored Oct 18, 2023
1 parent 1ec6b49 commit 23fa4ac
Show file tree
Hide file tree
Showing 26 changed files with 1,535 additions and 0 deletions.
34 changes: 34 additions & 0 deletions reactive/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
plugins {
id 'java'
// id 'org.springframework.boot' version '2.7.14'
id 'io.spring.dependency-management' version '1.0.15.RELEASE'
}

group = 'com.dover'
version = '0.0.1-SNAPSHOT'

java {
sourceCompatibility = 'VERSION_1_8'
}

apply plugin: 'java'
apply plugin: 'io.spring.dependency-management'

repositories {
mavenCentral()
maven { url 'https://repo.spring.io/libs-snapshot' }
}

dependencyManagement { imports { mavenBom "io.projectreactor:reactor-bom:Bismuth-RELEASE" } }

dependencies {

compile 'io.projectreactor:reactor-core'
compile 'junit:junit:4.13.2'

}


//tasks.named('test') {
// useJUnitPlatform()
//}
1 change: 1 addition & 0 deletions reactive/settings.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
rootProject.name = 'reactive-demo'
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.dover.reactivedemo.publisher;

import reactor.core.publisher.Flux;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* @author dover
* @since 2023/8/22
*/
public class FluxDemo {


public static void main(String[] args) {
Flux<Long> fibonacciGenerator = buildFibonacciGenerator();
fibonacciGenerator.subscribe(System.out::println);
}


public static Flux<Long> buildFibonacciGenerator() {
return Flux.create(e -> {
long current = 1, prev = 0;
AtomicBoolean stop = new AtomicBoolean(false);
e.onDispose(() -> {
stop.set(true);
System.out.println("******* Stop Received ****** ");
});
while (current > 0) {
e.next(current);
System.out.println("generated " + current);
current = current + prev;
prev = current - prev;
}
e.complete();
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package com.dover.reactivedemo.flowcontrol;

import org.junit.Test;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertTrue;

public class BackpressureDemo {

@Test
public void testBackPressure() throws Exception {
// 背压,当生产者发布的事件超出订阅者所需时,有多种溢出策略可选择
// BUFFER:将未交付的事件缓存,当订阅者再次请求时再交付,这是默认策略
// IGNORE:忽略背压,持续向订阅者交付事件
// DROP:丢弃未交付的事件
// LATEST:新的事件会缓存在旧事件前面,订阅者总是先消费最新的事件
// ERROR:直接抛出异常
Flux<Integer> numberGenerator = Flux.create(x -> {
System.out.println("Requested Events :" + x.requestedFromDownstream());
int number = 1;
// 订阅者仅需要一个事件,但 publisher 发布100个事件
while (number < 100) {
x.next(number);
number++;
}
System.out.println("=======complete");
x.complete();
});
CountDownLatch latch = new CountDownLatch(1);
numberGenerator.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 仅需要一个事件
request(1);
}

@Override
protected void hookOnNext(Integer value) {
System.out.println(value);
}

@Override
protected void hookOnError(Throwable throwable) {
throwable.printStackTrace();
System.out.println("=============countDown");
latch.countDown();
}

@Override
protected void hookOnComplete() {
// 发布者发布的事件超出订阅者所需,多余事件被Reactor框架缓存在队列中,因此 complete 事件被阻塞,无法消费到
System.out.println("=============countDown");
latch.countDown();
}
});
// 因为始终消费不到“ERROR事件”和“COMPLETE事件”,因此无法 countDown,此处测试失败
assertTrue(latch.await(1L, TimeUnit.SECONDS));
}

@Test
public void testErrorBackPressure() throws Exception {
// 使用Error策略
Flux<Integer> numberGenerator = Flux.create(x -> {
System.out.println("Requested Events :" + x.requestedFromDownstream());
int number = 1;
while (number < 100) {
x.next(number);
number++;
}
x.complete();
}, FluxSink.OverflowStrategy.ERROR);
// 由于生成者发布的事件数量超出订阅者所需,在 ERROR 策略下,程序直接抛出异常
numberGenerator.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
});
}


@Test
public void testBackPressureOps() throws Exception {
Flux<Integer> numberGenerator = Flux.create(x -> {
System.out.println("Requested Events :" + x.requestedFromDownstream());
int number = 1;
while (number < 100) {
x.next(number);
number++;
}
x.complete();
});
CountDownLatch latch = new CountDownLatch(1);
// 订阅前变更背压策略为 DROP,超出订阅者所需的事件均被丢弃
numberGenerator.onBackpressureDrop(x -> System.out.println("Dropped :" + x))
// .onBackpressureLatest()
// .onBackpressureError()
// .onBackpressureBuffer(100)
// .onBackpressureBuffer(100, BufferOverflowStrategy.DROP_LATEST) // 当缓冲区满了之后,丢弃最新的事件
// .onBackpressureBuffer(100, BufferOverflowStrategy.DROP_OLDEST) // 当缓冲区满了之后,丢弃最旧的事件
// .onBackpressureBuffer(100, BufferOverflowStrategy.ERROR) // 当缓冲区满了之后,抛出异常
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
});
assertTrue(latch.await(1L, TimeUnit.SECONDS));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.dover.reactivedemo.flowcontrol;

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

import java.time.Duration;

public class BufferDemo {

@Test
public void testBufferWithInfiniteSize() {
Flux<Long> fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> {
if (state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator.take(100).buffer(10).subscribe(x -> System.out.println(x));
}

@Test
public void testBufferSizes() {
Flux<Long> fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> {
if (state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
// maxSize:每个缓冲区的最大容量,skipSize:构建新的缓冲区前必须跳过的事件数量
// 当 maxSize > skipSize 时,缓冲区可能会重叠,意味着其中的事件可能会跨缓冲区重复
// 当 maxSize < skipSize 时,缓冲区则不会相交, 意味着会丢失事件
// 当 maxSize = skipSize 时,就相当于 buffer()
fibonacciGenerator.take(100).buffer(6, 7).subscribe(x -> System.out.println(x));
}

@Test
public void testBufferTimePeriod() {
Flux<Long> fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> {
if (state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator.buffer(Duration.ofNanos(10)).subscribe(x -> System.out.println(x));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.dover.reactivedemo.flowcontrol;

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

public class GroupByDemo {

@Test
public void testGrouping() {
Flux<Long> fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> {
if (state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
// 取斐波那契数列前20,按可被2、3、5、7整除来分组,并打印结果
fibonacciGenerator.take(20).groupBy(i -> {
List<Integer> divisors = Arrays.asList(2, 3, 5, 7);
Optional<Integer> divisor = divisors.stream().filter(d -> i % d == 0).findFirst();
return divisor.map(x -> "可被[" + x +"]整除").orElse("其他");
}).concatMap(x -> {
System.out.println("\n" + x.key());
return x;
}).subscribe(x -> System.out.print(" " + x));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.dover.reactivedemo.flowcontrol;

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

public class WindowDemo {

@Test
public void testWindowsFixedSize() {
// 与缓冲区类似,能够将`Flux<T>`生成的事件分割,但聚合的结果为Processor,每个分割得到的Processor都能重新发布订阅事件
Flux<Long> fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> {
if (state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
// 将 Flux<T> 流每10个事件为一组分割为window,每个window都是一个 UnicastProcessor,所以需要用 concatMap 或 flatMap 将其组合起来,再订阅消费
fibonacciGenerator.window(10).concatMap(x -> x).subscribe(x -> System.out.print(x + " "));
}


@Test
public void testWindowsPredicate() {
Flux<Long> fibonacciGenerator = Flux.generate(() -> Tuples.<Long, Long>of(0L, 1L), (state, sink) -> {
if (state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator.windowWhile(x -> x < 500).concatMap(x -> x).subscribe(x -> System.out.println(x));
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.dover.reactivedemo.operator;

import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

/**
* @author dover
* @since 2023/8/18
*/
public class FactorialDemo {


Flux<Double> generateFactorial(long number) {
Flux<Double> factorialStream = Flux.generate(() -> Tuples.of(0L, 1.0d), (state, sink) -> {
Long factNumber = state.getT1();
Double factValue = state.getT2();
if (factNumber <= number) sink.next(factValue);
else sink.complete();
return Tuples.of(factNumber + 1, (factNumber + 1) * factValue);
});
return factorialStream;
}
}
Loading

0 comments on commit 23fa4ac

Please sign in to comment.