Skip to content

Commit

Permalink
New - new module 'Flow control'
Browse files Browse the repository at this point in the history
  • Loading branch information
vinny-constantine authored Sep 3, 2023
1 parent 8a98ef4 commit 61a66bf
Showing 1 changed file with 157 additions and 84 deletions.
241 changes: 157 additions & 84 deletions reactive.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,90 +62,6 @@ System.out.println(sum); // 25
```
### 柔韧性(resilient)
### 伸缩性(scalable)
### 背压(backpressure):
用于解决发布者与订阅者之间处理效率不一致而导致的事件堆积,内存溢出等问题
- pull模式:即订阅方实现 backpressure
```java
@Test
public void testPullBackpressure(){
Flux.just(1, 2, 3, 4)
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(2);
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
```
- push模式:即发布方实现 backpressure
```java
// 通过delayElements
@Test
public void testPushBackpressure() throws InterruptedException {
Flux.range(1, 1000)
.delayElements(Duration.ofMillis(200))
.subscribe(e -> {
LOGGER.info("subscribe:{}",e);
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
});
Thread.sleep(100*1000);
}
// 通过buffer方法
@Test
public void testBufferBackpressure() throws InterruptedException {
Flux.range(1, 1000)
.buffer(Duration.ofMillis(800))
.subscribe(e -> {
LOGGER.info("subscribe:{}",e);
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
});
Thread.sleep(100*1000);
}
// 通过take方法
@Test
public void testTakeBackpressure() throws InterruptedException {
Flux.range(1, 1000)
.take(Duration.ofMillis(4000))
.subscribe(e -> {
LOGGER.info("subscribe:{}",e);
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
});
Thread.sleep(100*1000);
}
```



## Reactor
Expand Down Expand Up @@ -272,6 +188,163 @@ fibonacciGenerator.take(10).reduce((x, y) -> x + y).subscribe(t -> {
- concatWith:将其他流拼接在当前流之后合并
- startWith:将其他流拼接再当前流之前合并

### 控制流(Flow control)
控制事件生成以及消费的速率,解决发布者与订阅者之间处理效率不一致而导致的事件堆积,内存溢出等问题

#### 分组(groupBy):

按数据流中数据的某一属性将 `Flux<T>` 分组,但无法再保证数据流的顺序性
```java
@Test
public void testGrouping() {
Flux<Long> fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> {
if (state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
// 取斐波那契数列前20,按可被2、3、5、7整除来分组,并打印结果
fibonacciGenerator.take(20).groupBy(i -> {
List<Integer> divisors = Arrays.asList(2, 3, 5, 7);
Optional<Integer> divisor = divisors.stream().filter(d -> i % d == 0).findFirst();
return divisor.map(x -> "可被[" + x +"]整除").orElse("其他");
}).concatMap(x -> {
System.out.println("\n" + x.key());
return x;
}).subscribe(x -> System.out.print(" " + x));
}
//可被[2]整除
//0 2 8 34 144 610 2584

//其他
//1 1 13 89 233 377 1597 4181

//可被[3]整除
//3 21 987

//可被[5]整除
//5 55
```

#### 缓冲区(buffer):
`Flux<T>`生成的事件聚合为`List<T>`,一并发布,不会改变顺序性
```java
@Test
public void testBufferWithDefinateSize() {
Flux<Long> fibonacciGenerator = Flux.generate(() -> Tuples.<Long, Long>of(0L, 1L), (state, sink) -> {
if (state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
// 将斐波那契数列前100,每10个放入一个缓冲区
fibonacciGenerator.take(100).buffer(10).subscribe(x -> System.out.println(x));
}
//[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

//[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181]

//[6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229]

//[832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352, 24157817, 39088169, 63245986]

//[102334155, 165580141, 267914296, 433494437, 701408733, 1134903170, 1836311903, 2971215073, 4807526976, 7778742049]

//[12586269025, 20365011074, 32951280099, 53316291173, 86267571272, 139583862445, 225851433717, 365435296162, 591286729879, 956722026041]

//[1548008755920, 2504730781961, 4052739537881, 6557470319842, 10610209857723, 17167680177565, 27777890035288, 44945570212853, 72723460248141, 117669030460994]

//[190392490709135, 308061521170129, 498454011879264, 806515533049393, 1304969544928657, 2111485077978050, 3416454622906707, 5527939700884757, 8944394323791464, 14472334024676221]

//[23416728348467685, 37889062373143906, 61305790721611591, 99194853094755497, 160500643816367088, 259695496911122585, 420196140727489673, 679891637638612258, 1100087778366101931, 1779979416004714189]

//[2880067194370816120, 4660046610375530309, 7540113804746346429]
```


#### 背压(backpressure):

- pull模式:即订阅方实现 backpressure
```java
@Test
public void testPullBackpressure(){
Flux.just(1, 2, 3, 4)
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(2);
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
```
- push模式:即发布方实现 backpressure
```java
// 通过delayElements
@Test
public void testPushBackpressure() throws InterruptedException {
Flux.range(1, 1000)
.delayElements(Duration.ofMillis(200))
.subscribe(e -> {
LOGGER.info("subscribe:{}",e);
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
});
Thread.sleep(100*1000);
}
// 通过buffer方法
@Test
public void testBufferBackpressure() throws InterruptedException {
Flux.range(1, 1000)
.buffer(Duration.ofMillis(800))
.subscribe(e -> {
LOGGER.info("subscribe:{}",e);
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
});
Thread.sleep(100*1000);
}
// 通过take方法
@Test
public void testTakeBackpressure() throws InterruptedException {
Flux.range(1, 1000)
.take(Duration.ofMillis(4000))
.subscribe(e -> {
LOGGER.info("subscribe:{}",e);
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
});
Thread.sleep(100*1000);
}
```


## SpringWebFlux
SpringWebFlux支持两种构建 reactive 应用的模式,一是注解形式,二是函数式配置

Expand Down

0 comments on commit 61a66bf

Please sign in to comment.