From 23fa4ac94311eec56e7f22a9dfd5aa51a03bfac6 Mon Sep 17 00:00:00 2001 From: vinny-constantine Date: Wed, 18 Oct 2023 15:02:56 +0800 Subject: [PATCH] New - new demo 'reactive-demo' --- reactive/build.gradle | 34 +++ reactive/settings.gradle | 1 + .../reactive/errorhandle/ErrorHandleDemo.java | 37 +++ .../flowcontrol/BackpressureDemo.java | 117 ++++++++++ .../reactive/flowcontrol/BufferDemo.java | 44 ++++ .../reactive/flowcontrol/GroupByDemo.java | 31 +++ .../reactive/flowcontrol/WindowDemo.java | 33 +++ .../reactive/operator/FactorialDemo.java | 23 ++ .../reactive/operator/FibonacciDemo.java | 167 ++++++++++++++ .../processor/DirectProcessorDemo.java | 44 ++++ .../processor/EmitterProcessorDemo.java | 42 ++++ .../processor/ReplayProcessorDemo.java | 30 +++ .../processor/TopicProcessorDemo.java | 55 +++++ .../processor/UnicastProcessorDemo.java | 25 +++ .../processor/WorkQueueProcessorDemo.java | 34 +++ .../publisher/ColdAndHotPublishDemo.java | 58 +++++ .../dover/reactive/publisher/FluxDemo.java | 37 +++ .../dover/reactive/publisher/MonoDemo.java | 33 +++ .../reactive/subscriber/SubscriberDemo.java | 70 ++++++ .../dover/reactive/util/Factorization.java | 25 +++ .../com/dover/reactive/util/RomanNumber.java | 25 +++ text-handle-demo/utils/AesUtils.java | 210 ++++++++++++++++++ text-handle-demo/utils/IdUtils.java | 141 ++++++++++++ text-handle-demo/utils/JwtUtil.java | 88 ++++++++ text-handle-demo/utils/RsaUtil.java | 107 +++++++++ text-handle-demo/utils/TxUtil.java | 24 ++ 26 files changed, 1535 insertions(+) create mode 100644 reactive/build.gradle create mode 100644 reactive/settings.gradle create mode 100644 reactive/src/main/java/com/dover/reactive/errorhandle/ErrorHandleDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/flowcontrol/BackpressureDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/flowcontrol/BufferDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/flowcontrol/GroupByDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/flowcontrol/WindowDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/operator/FactorialDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/operator/FibonacciDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/processor/DirectProcessorDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/processor/EmitterProcessorDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/processor/ReplayProcessorDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/processor/TopicProcessorDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/processor/UnicastProcessorDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/processor/WorkQueueProcessorDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/publisher/ColdAndHotPublishDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/publisher/FluxDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/publisher/MonoDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/subscriber/SubscriberDemo.java create mode 100644 reactive/src/main/java/com/dover/reactive/util/Factorization.java create mode 100644 reactive/src/main/java/com/dover/reactive/util/RomanNumber.java create mode 100644 text-handle-demo/utils/AesUtils.java create mode 100644 text-handle-demo/utils/IdUtils.java create mode 100644 text-handle-demo/utils/JwtUtil.java create mode 100644 text-handle-demo/utils/RsaUtil.java create mode 100644 text-handle-demo/utils/TxUtil.java diff --git a/reactive/build.gradle b/reactive/build.gradle new file mode 100644 index 0000000..9b8f58a --- /dev/null +++ b/reactive/build.gradle @@ -0,0 +1,34 @@ +plugins { + id 'java' +// id 'org.springframework.boot' version '2.7.14' + id 'io.spring.dependency-management' version '1.0.15.RELEASE' +} + +group = 'com.dover' +version = '0.0.1-SNAPSHOT' + +java { + sourceCompatibility = 'VERSION_1_8' +} + +apply plugin: 'java' +apply plugin: 'io.spring.dependency-management' + +repositories { + mavenCentral() + maven { url 'https://repo.spring.io/libs-snapshot' } +} + +dependencyManagement { imports { mavenBom "io.projectreactor:reactor-bom:Bismuth-RELEASE" } } + +dependencies { + + compile 'io.projectreactor:reactor-core' + compile 'junit:junit:4.13.2' + +} + + +//tasks.named('test') { +// useJUnitPlatform() +//} diff --git a/reactive/settings.gradle b/reactive/settings.gradle new file mode 100644 index 0000000..1648fb2 --- /dev/null +++ b/reactive/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'reactive-demo' diff --git a/reactive/src/main/java/com/dover/reactive/errorhandle/ErrorHandleDemo.java b/reactive/src/main/java/com/dover/reactive/errorhandle/ErrorHandleDemo.java new file mode 100644 index 0000000..b042153 --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/errorhandle/ErrorHandleDemo.java @@ -0,0 +1,37 @@ +package com.dover.reactivedemo.publisher; + +import reactor.core.publisher.Flux; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author dover + * @since 2023/8/22 + */ +public class FluxDemo { + + + public static void main(String[] args) { + Flux fibonacciGenerator = buildFibonacciGenerator(); + fibonacciGenerator.subscribe(System.out::println); + } + + + public static Flux buildFibonacciGenerator() { + return Flux.create(e -> { + long current = 1, prev = 0; + AtomicBoolean stop = new AtomicBoolean(false); + e.onDispose(() -> { + stop.set(true); + System.out.println("******* Stop Received ****** "); + }); + while (current > 0) { + e.next(current); + System.out.println("generated " + current); + current = current + prev; + prev = current - prev; + } + e.complete(); + }); + } +} diff --git a/reactive/src/main/java/com/dover/reactive/flowcontrol/BackpressureDemo.java b/reactive/src/main/java/com/dover/reactive/flowcontrol/BackpressureDemo.java new file mode 100644 index 0000000..9215cd8 --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/flowcontrol/BackpressureDemo.java @@ -0,0 +1,117 @@ +package com.dover.reactivedemo.flowcontrol; + +import org.junit.Test; +import org.reactivestreams.Subscription; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +public class BackpressureDemo { + + @Test + public void testBackPressure() throws Exception { + // 背压,当生产者发布的事件超出订阅者所需时,有多种溢出策略可选择 + // BUFFER:将未交付的事件缓存,当订阅者再次请求时再交付,这是默认策略 + // IGNORE:忽略背压,持续向订阅者交付事件 + // DROP:丢弃未交付的事件 + // LATEST:新的事件会缓存在旧事件前面,订阅者总是先消费最新的事件 + // ERROR:直接抛出异常 + Flux numberGenerator = Flux.create(x -> { + System.out.println("Requested Events :" + x.requestedFromDownstream()); + int number = 1; + // 订阅者仅需要一个事件,但 publisher 发布100个事件 + while (number < 100) { + x.next(number); + number++; + } + System.out.println("=======complete"); + x.complete(); + }); + CountDownLatch latch = new CountDownLatch(1); + numberGenerator.subscribe(new BaseSubscriber() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + // 仅需要一个事件 + request(1); + } + + @Override + protected void hookOnNext(Integer value) { + System.out.println(value); + } + + @Override + protected void hookOnError(Throwable throwable) { + throwable.printStackTrace(); + System.out.println("=============countDown"); + latch.countDown(); + } + + @Override + protected void hookOnComplete() { + // 发布者发布的事件超出订阅者所需,多余事件被Reactor框架缓存在队列中,因此 complete 事件被阻塞,无法消费到 + System.out.println("=============countDown"); + latch.countDown(); + } + }); + // 因为始终消费不到“ERROR事件”和“COMPLETE事件”,因此无法 countDown,此处测试失败 + assertTrue(latch.await(1L, TimeUnit.SECONDS)); + } + + @Test + public void testErrorBackPressure() throws Exception { + // 使用Error策略 + Flux numberGenerator = Flux.create(x -> { + System.out.println("Requested Events :" + x.requestedFromDownstream()); + int number = 1; + while (number < 100) { + x.next(number); + number++; + } + x.complete(); + }, FluxSink.OverflowStrategy.ERROR); + // 由于生成者发布的事件数量超出订阅者所需,在 ERROR 策略下,程序直接抛出异常 + numberGenerator.subscribe(new BaseSubscriber() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + request(1); + } + }); + } + + + @Test + public void testBackPressureOps() throws Exception { + Flux numberGenerator = Flux.create(x -> { + System.out.println("Requested Events :" + x.requestedFromDownstream()); + int number = 1; + while (number < 100) { + x.next(number); + number++; + } + x.complete(); + }); + CountDownLatch latch = new CountDownLatch(1); + // 订阅前变更背压策略为 DROP,超出订阅者所需的事件均被丢弃 + numberGenerator.onBackpressureDrop(x -> System.out.println("Dropped :" + x)) +// .onBackpressureLatest() +// .onBackpressureError() +// .onBackpressureBuffer(100) +// .onBackpressureBuffer(100, BufferOverflowStrategy.DROP_LATEST) // 当缓冲区满了之后,丢弃最新的事件 +// .onBackpressureBuffer(100, BufferOverflowStrategy.DROP_OLDEST) // 当缓冲区满了之后,丢弃最旧的事件 +// .onBackpressureBuffer(100, BufferOverflowStrategy.ERROR) // 当缓冲区满了之后,抛出异常 + .subscribe(new BaseSubscriber() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + request(1); + } + }); + assertTrue(latch.await(1L, TimeUnit.SECONDS)); + } + +} diff --git a/reactive/src/main/java/com/dover/reactive/flowcontrol/BufferDemo.java b/reactive/src/main/java/com/dover/reactive/flowcontrol/BufferDemo.java new file mode 100644 index 0000000..dadb4dc --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/flowcontrol/BufferDemo.java @@ -0,0 +1,44 @@ +package com.dover.reactivedemo.flowcontrol; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.util.function.Tuples; + +import java.time.Duration; + +public class BufferDemo { + + @Test + public void testBufferWithInfiniteSize() { + Flux fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> { + if (state.getT1() < 0) sink.complete(); + else sink.next(state.getT1()); + return Tuples.of(state.getT2(), state.getT1() + state.getT2()); + }); + fibonacciGenerator.take(100).buffer(10).subscribe(x -> System.out.println(x)); + } + + @Test + public void testBufferSizes() { + Flux fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> { + if (state.getT1() < 0) sink.complete(); + else sink.next(state.getT1()); + return Tuples.of(state.getT2(), state.getT1() + state.getT2()); + }); + // maxSize:每个缓冲区的最大容量,skipSize:构建新的缓冲区前必须跳过的事件数量 + // 当 maxSize > skipSize 时,缓冲区可能会重叠,意味着其中的事件可能会跨缓冲区重复 + // 当 maxSize < skipSize 时,缓冲区则不会相交, 意味着会丢失事件 + // 当 maxSize = skipSize 时,就相当于 buffer() + fibonacciGenerator.take(100).buffer(6, 7).subscribe(x -> System.out.println(x)); + } + + @Test + public void testBufferTimePeriod() { + Flux fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> { + if (state.getT1() < 0) sink.complete(); + else sink.next(state.getT1()); + return Tuples.of(state.getT2(), state.getT1() + state.getT2()); + }); + fibonacciGenerator.buffer(Duration.ofNanos(10)).subscribe(x -> System.out.println(x)); + } +} diff --git a/reactive/src/main/java/com/dover/reactive/flowcontrol/GroupByDemo.java b/reactive/src/main/java/com/dover/reactive/flowcontrol/GroupByDemo.java new file mode 100644 index 0000000..368f035 --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/flowcontrol/GroupByDemo.java @@ -0,0 +1,31 @@ +package com.dover.reactivedemo.flowcontrol; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.util.function.Tuples; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +public class GroupByDemo { + + @Test + public void testGrouping() { + Flux fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> { + if (state.getT1() < 0) sink.complete(); + else sink.next(state.getT1()); + return Tuples.of(state.getT2(), state.getT1() + state.getT2()); + }); + // 取斐波那契数列前20,按可被2、3、5、7整除来分组,并打印结果 + fibonacciGenerator.take(20).groupBy(i -> { + List divisors = Arrays.asList(2, 3, 5, 7); + Optional divisor = divisors.stream().filter(d -> i % d == 0).findFirst(); + return divisor.map(x -> "可被[" + x +"]整除").orElse("其他"); + }).concatMap(x -> { + System.out.println("\n" + x.key()); + return x; + }).subscribe(x -> System.out.print(" " + x)); + } + +} diff --git a/reactive/src/main/java/com/dover/reactive/flowcontrol/WindowDemo.java b/reactive/src/main/java/com/dover/reactive/flowcontrol/WindowDemo.java new file mode 100644 index 0000000..ef10ed2 --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/flowcontrol/WindowDemo.java @@ -0,0 +1,33 @@ +package com.dover.reactivedemo.flowcontrol; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.util.function.Tuples; + +public class WindowDemo { + + @Test + public void testWindowsFixedSize() { + // 与缓冲区类似,能够将`Flux`生成的事件分割,但聚合的结果为Processor,每个分割得到的Processor都能重新发布订阅事件 + Flux fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> { + if (state.getT1() < 0) sink.complete(); + else sink.next(state.getT1()); + return Tuples.of(state.getT2(), state.getT1() + state.getT2()); + }); + // 将 Flux 流每10个事件为一组分割为window,每个window都是一个 UnicastProcessor,所以需要用 concatMap 或 flatMap 将其组合起来,再订阅消费 + fibonacciGenerator.window(10).concatMap(x -> x).subscribe(x -> System.out.print(x + " ")); + } + + + @Test + public void testWindowsPredicate() { + Flux fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> { + if (state.getT1() < 0) sink.complete(); + else sink.next(state.getT1()); + return Tuples.of(state.getT2(), state.getT1() + state.getT2()); + }); + fibonacciGenerator.windowWhile(x -> x < 500).concatMap(x -> x).subscribe(x -> System.out.println(x)); + } + + +} diff --git a/reactive/src/main/java/com/dover/reactive/operator/FactorialDemo.java b/reactive/src/main/java/com/dover/reactive/operator/FactorialDemo.java new file mode 100644 index 0000000..3394d86 --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/operator/FactorialDemo.java @@ -0,0 +1,23 @@ +package com.dover.reactivedemo.operator; + +import reactor.core.publisher.Flux; +import reactor.util.function.Tuples; + +/** + * @author dover + * @since 2023/8/18 + */ +public class FactorialDemo { + + + Flux generateFactorial(long number) { + Flux factorialStream = Flux.generate(() -> Tuples.of(0L, 1.0d), (state, sink) -> { + Long factNumber = state.getT1(); + Double factValue = state.getT2(); + if (factNumber <= number) sink.next(factValue); + else sink.complete(); + return Tuples.of(factNumber + 1, (factNumber + 1) * factValue); + }); + return factorialStream; + } +} diff --git a/reactive/src/main/java/com/dover/reactive/operator/FibonacciDemo.java b/reactive/src/main/java/com/dover/reactive/operator/FibonacciDemo.java new file mode 100644 index 0000000..cfc18ff --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/operator/FibonacciDemo.java @@ -0,0 +1,167 @@ +package com.dover.reactivedemo.operator; + +import com.dover.reactivedemo.publisher.FluxDemo; +import com.dover.reactivedemo.util.Factorization; +import com.dover.reactivedemo.util.RomanNumber; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author dover + * @since 2023/8/17 + */ +public class FibonacciDemo { + + + @Test + public void testConcatWith() { + // concatWith:将多个流合并为一个流 + Flux fibonacciGenerator = FluxDemo.buildFibonacciGenerator(); + fibonacciGenerator.take(10).concatWith(Flux.just(new Long[]{-1L, -2L, -3L, -4L})).subscribe(t -> { + System.out.println(t); + }); + } + + + @Test + public void testReduce() { + // reduce:将流中所有值聚合为一个单值 + Flux fibonacciGenerator = FluxDemo.buildFibonacciGenerator(); + fibonacciGenerator.take(10).reduce((x, y) -> x + y).subscribe(t -> { + System.out.println(t); + }); + } + + + @Test + public void testCollect() { + // 收集斐波那契数列的前十个值 + System.out.println("====================收集斐波那契数列的前十个值======================"); + Flux fibonacciGenerator = FluxDemo.buildFibonacciGenerator(); + fibonacciGenerator.take(10).collectList().subscribe(t -> { + System.out.println(t); + }); + // 将斐波那契数列的前十个值收集为有序的list集合 + System.out.println("====================将斐波那契数列的前十个值收集为倒序的list集合======================"); + fibonacciGenerator.take(10).collectSortedList((x, y) -> -1 * Long.compare(x, y)).subscribe(t -> { + System.out.println(t); + }); + // 将斐波那契数列的前十个值收集为map集合 + System.out.println("====================将斐波那契数列的前十个值收集为奇偶map集合======================"); + fibonacciGenerator.take(10).collectMap(t -> t % 2 == 0 ? "even" : "odd").subscribe(t -> { + System.out.println(t); + }); + // 将斐波那契数列的前十个值收集为map集合 + System.out.println("====================将斐波那契数列的前十个值收集为奇偶map集合======================"); + fibonacciGenerator.take(10).collectMultimap(t -> t % 2 == 0 ? "even" : "odd").subscribe(t -> { + System.out.println(t); + }); + } + + @Test + public void testRepeat() { + // 将斐波那契数列流前十个值,重复一次 + Flux fibonacciGenerator = FluxDemo.buildFibonacciGenerator(); + fibonacciGenerator.take(10).repeat(2).subscribe(t -> { + System.out.println(t); + }); + } + + + @Test + public void testFlatMap() { + // 展开因子集合,将所有生成的斐波那契数列的每一项的因子集合均合并展开 + Flux fibonacciGenerator = FluxDemo.buildFibonacciGenerator(); + fibonacciGenerator.skip(1) + .take(10) + .flatMap(t -> Flux.fromIterable(Factorization.findfactor(t.intValue()))) + .subscribe(t -> { + System.out.println(t); + }); + } + + @Test + public void testMap() { + Flux fibonacciGenerator = FluxDemo.buildFibonacciGenerator(); + // 转换为罗马数字 + System.out.println("====================转换为罗马数字======================"); + fibonacciGenerator.skip(1).take(10).map(t -> RomanNumber.toRomanNumeral(t.intValue())).subscribe(t -> { + System.out.println(t); + }); + System.out.println("====================查找因子======================"); + // 查找因子 + fibonacciGenerator.skip(1).take(10).map(t -> Factorization.findfactor(t.intValue())).subscribe(t -> { + System.out.println(t); + }); + } + + + @Test + public void testSkip() { + Flux fibonacciGenerator = FluxDemo.buildFibonacciGenerator(); + System.out.println("====================跳过前10======================"); + // 跳过前10 + fibonacciGenerator.skip(10).subscribe(t -> { + System.out.println(t); + }); + System.out.println("====================跳过前10======================"); + // 跳过10毫秒 + System.out.println("====================跳过10毫秒======================"); + fibonacciGenerator.skip(Duration.ofMillis(10)).subscribe(t -> { + System.out.println(t); + }); + System.out.println("====================跳过10毫秒======================"); + // 跳过直至值大于100 + System.out.println("====================跳过直至值大于100======================"); + fibonacciGenerator.skipUntil(t -> t > 100).subscribe(t -> { + System.out.println(t); + }); + System.out.println("====================跳过直至值大于100======================"); + } + + + @Test + public void testFilter() { + Flux fibonacciGenerator = FluxDemo.buildFibonacciGenerator(); + // 同步方式,得到偶数 + fibonacciGenerator.filter(x -> (x & 1) == 0).subscribe(System.out::println); + System.out.println("==============================================="); + // 异步方式,判断mono是否为true + fibonacciGenerator.filterWhen(x -> Mono.just(x > 100)).subscribe(System.out::println); + } + + + @Test + public void testFibonacciFluxSink() { + // fluxSink 可异步生成任意数量的事件,不关注 backpressure ,也不关注订阅关系,即使订阅关系废弃,也能继续生产事件 + // fluxSink 的实现必须监听取消事件,以及显式初始化 stream 闭包 + Flux fibonacciGenerator = Flux.create(e -> { + long current = 1, prev = 0; + AtomicBoolean stop = new AtomicBoolean(false); + e.onDispose(() -> { + stop.set(true); + System.out.println("******* Stop Received ****** "); + }); + while (current > 0) { + e.next(current); + System.out.println("generated " + current); + current = current + prev; + prev = current - prev; + } + e.complete(); + }); + List fibonacciSeries = new LinkedList<>(); + fibonacciGenerator.take(50).subscribe(t -> { + System.out.println("consuming " + t); + fibonacciSeries.add(t); + }); + System.out.println(fibonacciSeries); + } + +} diff --git a/reactive/src/main/java/com/dover/reactive/processor/DirectProcessorDemo.java b/reactive/src/main/java/com/dover/reactive/processor/DirectProcessorDemo.java new file mode 100644 index 0000000..f75f76e --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/processor/DirectProcessorDemo.java @@ -0,0 +1,44 @@ +package com.dover.reactivedemo.processor; + +import org.junit.Test; +import reactor.core.publisher.DirectProcessor; + +/** + * @author dover + * @since 2023/8/22 + */ +public class DirectProcessorDemo { + + + @Test + public void testBackpressure() { + // DirectProcessor 无法处理背压,一旦超出订阅者所需事件数,则抛出异常 + DirectProcessor data = DirectProcessor.create(); + data.subscribe(t -> System.out.println(t), e -> e.printStackTrace(), () -> System.out.println("Finished"), + s -> s.request(1)); + data.onNext(10L); + data.onNext(11L); + data.onNext(12L); + } + + + @Test + public void testDirectProcessor() { + // 一旦DirectProcessor收到完成事件,则不再接收任何data事件 + DirectProcessor data = DirectProcessor.create(); + data.subscribe(t -> System.out.println(t), e -> e.printStackTrace(), () -> System.out.println("Finished 1")); + data.onNext(10L); + data.onComplete(); + data.subscribe(t -> System.out.println(t), e -> e.printStackTrace(), () -> System.out.println("Finished 2")); + data.onNext(12L); + } + + @Test + public void testProcessor() { + DirectProcessor data = DirectProcessor.create(); + data.take(2).subscribe(t -> System.out.println(t)); + data.onNext(10L); + data.onNext(11L); + data.onNext(12L); + } +} diff --git a/reactive/src/main/java/com/dover/reactive/processor/EmitterProcessorDemo.java b/reactive/src/main/java/com/dover/reactive/processor/EmitterProcessorDemo.java new file mode 100644 index 0000000..6822ec6 --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/processor/EmitterProcessorDemo.java @@ -0,0 +1,42 @@ +package com.dover.reactivedemo.processor; + +import org.junit.Test; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.UnicastProcessor; + +/** + * @author dover + * @since 2023/8/22 + */ +public class EmitterProcessorDemo { + + + @Test + public void testBackpressure() { + // EmitterProcessor 支持背压,支持多个订阅者,每个订阅者依据自己的消费速度来获取Value事件 + UnicastProcessor data = UnicastProcessor.create(); + data.subscribe(t -> { + System.out.println(t); + }); + // sink 中的方法是线程安全的 + data.sink().next(10L); + } + + @Test + public void testConsume() { + EmitterProcessor data = EmitterProcessor.create(1); + data.subscribe(t -> System.out.println(t)); + // 通过 sink 来发布事件 + FluxSink sink = data.sink(); + sink.next(10L); + sink.next(11L); + sink.next(12L); + data.subscribe(t -> System.out.println("==2:" + t)); + sink.next(13L); + sink.next(14L); + sink.next(15L); + } + + +} diff --git a/reactive/src/main/java/com/dover/reactive/processor/ReplayProcessorDemo.java b/reactive/src/main/java/com/dover/reactive/processor/ReplayProcessorDemo.java new file mode 100644 index 0000000..44e7391 --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/processor/ReplayProcessorDemo.java @@ -0,0 +1,30 @@ +package com.dover.reactivedemo.processor; + +import org.junit.Test; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.ReplayProcessor; + +/** + * @author dover + * @since 2023/8/22 + */ +public class ReplayProcessorDemo { + + + @Test + public void testReplay() { + // ReplayProcessor 具备缓存和重放事件的能力,当一个新的订阅者订阅后能重放指定数量的事件给该订阅者 + ReplayProcessor data = ReplayProcessor.create(4); + data.subscribe(t -> System.out.println(t)); + FluxSink sink = data.sink(); + sink.next(10L); + sink.next(11L); + sink.next(12L); + sink.next(13L); + sink.next(14L); + data.subscribe(t -> System.out.println("==2:" + t)); + } + + +} diff --git a/reactive/src/main/java/com/dover/reactive/processor/TopicProcessorDemo.java b/reactive/src/main/java/com/dover/reactive/processor/TopicProcessorDemo.java new file mode 100644 index 0000000..277858d --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/processor/TopicProcessorDemo.java @@ -0,0 +1,55 @@ +package com.dover.reactivedemo.processor; + +import org.junit.Test; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.TopicProcessor; +import reactor.util.concurrent.WaitStrategy; + +import java.util.concurrent.Executors; + +/** + * @author dover + * @since 2023/8/22 + */ +public class TopicProcessorDemo { + + + @Test + public void testTopicProcessor() { + // TopicProcessor 支持多个订阅者,会交付所有事件给每一个订阅者(类似于广播模式),通过事件循环机制实现,异步并发方式交付事件,RingBuffer 结构来支持背压 + // RingBuffer 的等待策略分为:blocking(阻塞,适用于对吞吐量和低延迟要求不高,不如CPU资源重要的场景)、busySpin(自旋,消耗CPU资源来避免系统调用,非常适合线程可绑定CPU核心的场景)、 + TopicProcessor data = TopicProcessor.builder() + .executor(Executors.newFixedThreadPool(2)) + .bufferSize(8) + .waitStrategy(WaitStrategy.busySpin()) + .build(); + data.subscribe(t -> { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("订阅者1:========" + t); + }); + data.subscribe(t -> { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("订阅者2:========" + t); + }); + FluxSink sink = data.sink(); + for (int i = 0; i < 20; i++) { + sink.next((long) i); + } + // 主线程等待 + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + +} diff --git a/reactive/src/main/java/com/dover/reactive/processor/UnicastProcessorDemo.java b/reactive/src/main/java/com/dover/reactive/processor/UnicastProcessorDemo.java new file mode 100644 index 0000000..dde2aad --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/processor/UnicastProcessorDemo.java @@ -0,0 +1,25 @@ +package com.dover.reactivedemo.processor; + +import org.junit.Test; +import reactor.core.publisher.UnicastProcessor; + +/** + * @author dover + * @since 2023/8/22 + */ +public class UnicastProcessorDemo { + + + @Test + public void testUnicastProcessor() { + // UnicastProcessor 支持背压,内部会通过一个队列来缓存未被消费的事件,但仅支持一个订阅者 + UnicastProcessor data = UnicastProcessor.create(); + data.subscribe(t -> { + System.out.println(t); + }); + // sink 中的方法是线程安全的 + data.sink().next(10L); + } + + +} diff --git a/reactive/src/main/java/com/dover/reactive/processor/WorkQueueProcessorDemo.java b/reactive/src/main/java/com/dover/reactive/processor/WorkQueueProcessorDemo.java new file mode 100644 index 0000000..7fa5d5c --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/processor/WorkQueueProcessorDemo.java @@ -0,0 +1,34 @@ +package com.dover.reactivedemo.processor; + +import org.junit.Test; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.WorkQueueProcessor; +import reactor.util.concurrent.WaitStrategy; + +import java.util.concurrent.Executors; + +/** + * @author dover + * @since 2023/8/22 + */ +public class WorkQueueProcessorDemo { + + + @Test + public void testWorkQueueProcessor() { + // WorkQueueProcessor 支持多个订阅者,但事件会被所有订阅者瓜分消费(类似mq集群模式),通过轮询来交付事件给对应订阅者, + WorkQueueProcessor data = WorkQueueProcessor.builder() + .executor(Executors.newFixedThreadPool(2)) + .waitStrategy(WaitStrategy.blocking()) + .bufferSize(100) + .build(); + data.subscribe(t -> System.out.println("1. " + t)); + data.subscribe(t -> System.out.println("2. " + t)); + FluxSink sink = data.sink(); + sink.next(10L); + sink.next(11L); + sink.next(12L); + } + + +} diff --git a/reactive/src/main/java/com/dover/reactive/publisher/ColdAndHotPublishDemo.java b/reactive/src/main/java/com/dover/reactive/publisher/ColdAndHotPublishDemo.java new file mode 100644 index 0000000..abfe508 --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/publisher/ColdAndHotPublishDemo.java @@ -0,0 +1,58 @@ +package com.dover.reactivedemo.publisher; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.UnicastProcessor; +import reactor.util.function.Tuples; + +import java.util.concurrent.CountDownLatch; + +/** + * @author dover + * @since 2023/8/23 + */ +public class ColdAndHotPublishDemo { + + + @Test + public void testCodePub() { + // 冷发布,只有存在订阅关系,生产者才会生产数据 + Flux fibonacciGenerator = Flux.generate(() -> Tuples.of(0L, 1L), (state, sink) -> { + sink.next(state.getT1()); + return Tuples.of(state.getT2(), state.getT1() + state.getT2()); + }); + fibonacciGenerator.take(5).subscribe(t -> System.out.println("1. " + t)); + fibonacciGenerator.take(5).subscribe(t -> System.out.println("2. " + t)); + } + + + @Test + public void testHotPub() { + // 热发布,不论是否存在订阅关系,生产者都会产生数据,比如 processor 作为 publisher,新的订阅者也能收到已经发布过的数据 + UnicastProcessor hotSource = UnicastProcessor.create(); + Flux hotFlux = hotSource.publish().autoConnect(); + hotFlux.take(5).subscribe(t -> System.out.println("1. " + t)); + CountDownLatch latch = new CountDownLatch(2); + new Thread(() -> { + int c1 = 0, c2 = 1; + while (c1 < 1000) { + hotSource.onNext(Long.valueOf(c1)); + int sum = c1 + c2; + c1 = c2; + c2 = sum; + if (c1 == 144) { + hotFlux.subscribe(t -> System.out.println("2. " + t)); + } + } + hotSource.onComplete(); + latch.countDown(); + }).start(); + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + +} diff --git a/reactive/src/main/java/com/dover/reactive/publisher/FluxDemo.java b/reactive/src/main/java/com/dover/reactive/publisher/FluxDemo.java new file mode 100644 index 0000000..b042153 --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/publisher/FluxDemo.java @@ -0,0 +1,37 @@ +package com.dover.reactivedemo.publisher; + +import reactor.core.publisher.Flux; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author dover + * @since 2023/8/22 + */ +public class FluxDemo { + + + public static void main(String[] args) { + Flux fibonacciGenerator = buildFibonacciGenerator(); + fibonacciGenerator.subscribe(System.out::println); + } + + + public static Flux buildFibonacciGenerator() { + return Flux.create(e -> { + long current = 1, prev = 0; + AtomicBoolean stop = new AtomicBoolean(false); + e.onDispose(() -> { + stop.set(true); + System.out.println("******* Stop Received ****** "); + }); + while (current > 0) { + e.next(current); + System.out.println("generated " + current); + current = current + prev; + prev = current - prev; + } + e.complete(); + }); + } +} diff --git a/reactive/src/main/java/com/dover/reactive/publisher/MonoDemo.java b/reactive/src/main/java/com/dover/reactive/publisher/MonoDemo.java new file mode 100644 index 0000000..37ca8ae --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/publisher/MonoDemo.java @@ -0,0 +1,33 @@ +package com.dover.reactivedemo.publisher; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Arrays; + +/** + * @author dover + * @since 2023/8/17 + */ +public class MonoDemo { + + + @Test + public void monoFromXxx() { + // 生成一次值事件以及完成事件,即时值为多值,也会包装在一个对象中 + Mono.fromCallable(() -> new String[]{"color"}) + .subscribe(t -> System.out.println("received " + Arrays.toString(t))); + // 生成一次单值事件及完成事件 + Mono.fromSupplier(() -> 1); + // 生成一次无值事件及完成事件 + Mono.fromRunnable(() -> System.out.println(" ")) + .subscribe(t -> System.out.println("received " + t), null, () -> System.out.println("Finished")); + } + + @Test + public void monoFrom() { + // 将Flux 流转为Mono 流,仅会从中获取第一个值 + Mono.from(Flux.just("Red", "Blue", "Yellow", "Black")).subscribe(t -> System.out.println("received " + t)); + } +} diff --git a/reactive/src/main/java/com/dover/reactive/subscriber/SubscriberDemo.java b/reactive/src/main/java/com/dover/reactive/subscriber/SubscriberDemo.java new file mode 100644 index 0000000..fdb2500 --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/subscriber/SubscriberDemo.java @@ -0,0 +1,70 @@ +package com.dover.reactivedemo.subscriber; + +import com.dover.reactivedemo.publisher.FluxDemo; +import org.junit.Test; +import org.reactivestreams.Subscription; +import reactor.core.Disposable; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; + +/** + * @author dover + * @since 2023/8/18 + */ +public class SubscriberDemo { + + + @Test + public void testSubscribe() { + Flux fibonacciGenerator = FluxDemo.buildFibonacciGenerator(); + // 不消费 + fibonacciGenerator.take(10).subscribe(); + // 仅消费值事件 + fibonacciGenerator.take(10).subscribe(t -> System.out.println("consuming " + t)); + // 消费值事件,并打印异常事件 + fibonacciGenerator.take(10).subscribe(t -> System.out.println("consuming " + t), e -> e.printStackTrace()); + // 消费值事件,异常事件,完成事件 + fibonacciGenerator.take(10).subscribe(t -> System.out.println("consuming " + t) // 值事件 + , e -> e.printStackTrace() // 异常事件 + , () -> System.out.println("Finished") // 完成事件 + ); + // 消费值事件,异常事件,完成事件,订阅事件,返回的 disposable 可用于取消订阅 + Disposable disposable = fibonacciGenerator.take(10).subscribe(t -> System.out.println("consuming " + t) // Value事件 + , e -> e.printStackTrace() // 异常事件 + , () -> System.out.println("Finished") // 完成事件 + , s -> System.out.println("Subscribed :" + s) // 订阅事件 + ); + // 触发取消事件 + disposable.dispose(); + + } + + @Test + public void testBaseSubscribe() { + Flux fibonacciGenerator = FluxDemo.buildFibonacciGenerator(); + // 五种钩子函数 + BaseSubscriber fibonacciSubscriber = new BaseSubscriber() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + } + @Override + protected void hookOnNext(Long value) { + } + @Override + protected void hookOnComplete() { + } + @Override + protected void hookOnError(Throwable throwable) { + } + @Override + protected void hookOnCancel() { + } + }; + // 结交订阅关系 + fibonacciGenerator.subscribe(fibonacciSubscriber); + // 订阅者主动拉取10个Value事件,触发 Request 事件 + fibonacciSubscriber.request(10); + // 可忽略backpressure + fibonacciSubscriber.requestUnbounded(); + } +} diff --git a/reactive/src/main/java/com/dover/reactive/util/Factorization.java b/reactive/src/main/java/com/dover/reactive/util/Factorization.java new file mode 100644 index 0000000..49686b7 --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/util/Factorization.java @@ -0,0 +1,25 @@ +package com.dover.reactivedemo.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * @author dover + * @since 2023/8/21 + */ +public class Factorization { + + /** + * 查找因子 + */ + public static Collection findfactor(int number) { + List factors = new ArrayList<>(); + for (int i = 1; i <= number; i++) { + if (number % i == 0) { + factors.add(i); + } + } + return factors; + } +} diff --git a/reactive/src/main/java/com/dover/reactive/util/RomanNumber.java b/reactive/src/main/java/com/dover/reactive/util/RomanNumber.java new file mode 100644 index 0000000..49686b7 --- /dev/null +++ b/reactive/src/main/java/com/dover/reactive/util/RomanNumber.java @@ -0,0 +1,25 @@ +package com.dover.reactivedemo.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * @author dover + * @since 2023/8/21 + */ +public class Factorization { + + /** + * 查找因子 + */ + public static Collection findfactor(int number) { + List factors = new ArrayList<>(); + for (int i = 1; i <= number; i++) { + if (number % i == 0) { + factors.add(i); + } + } + return factors; + } +} diff --git a/text-handle-demo/utils/AesUtils.java b/text-handle-demo/utils/AesUtils.java new file mode 100644 index 0000000..980ea9b --- /dev/null +++ b/text-handle-demo/utils/AesUtils.java @@ -0,0 +1,210 @@ +package com.dover.util; + +import lombok.SneakyThrows; + +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import javax.crypto.SecretKeyFactory; +import javax.crypto.spec.IvParameterSpec; +import javax.crypto.spec.PBEKeySpec; +import javax.crypto.spec.SecretKeySpec; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import java.security.spec.KeySpec; +import java.util.UUID; +import java.util.function.Function; + +/** + * AES对称加密,对明文进行加密、解密处理,推荐使用AES_CBC_256, 数据库使用的是 AES_ECB_128 + * + * @author liudezhi + */ +public class AesUtils { + private static final int IV_SIZE_16 = 16; + private static final int AES_KEY_LENGTH_128 = 128; + private static final int AES_KEY_LENGTH_256 = 256; + private static final String AES_ALGORITHM = "AES"; + private static final String AES_ECB_PKCS5PADDING_ALGORITHM = "AES/ECB/PKCS5Padding"; + private static final String SECURE_RANDOM_ALGORITHM = "PBKDF2WithHmacSHA256"; + private static final String AES_CBC_PKCS5PADDING_ALGORITHM = "AES/CBC/PKCS5Padding"; + private static final String AES_KEY = "AES.key"; + private static final String DEFAULT_KEY = "RU1jNUdQU0RVTnVqekhOU3pWY3ZTTFpYZHViWjNrSEs="; + + + @SneakyThrows + public static void main(String[] args) { +// System.out.println(createBase64EncodedAES256Key()); + System.out.println(AesUtils.encrypt("15804926464")); + System.out.println(AesUtils.encrypt("15642266788")); +// System.out.println(AesUtils.decrypt("ho0fNpD2s5VKuZUPzn2gSw==")); + } + + /** + * 使用Aes128 加密 PaddingMode = PKCS5, CipherMode = ECB + * + * @param text 明文 + * @return Base64编码后的加密密文 + */ + public static String encrypt(String text) { + try { + String key = DoverProperty.get(AES_KEY, DEFAULT_KEY); + Cipher cipher = Cipher.getInstance(AES_ECB_PKCS5PADDING_ALGORITHM); + cipher.init(Cipher.ENCRYPT_MODE, generateMySQLAESKey(key, Base64::decodeBase64)); + byte[] encryptData = cipher.doFinal(text.getBytes(StandardCharsets.UTF_8)); + return Base64.encodeBase64String(encryptData); + } catch (Exception e) { + throw new DoverServiceException(ReserveResultCode.ENCRYPTED_ERROR); + } + } + + /** + * 使用Aes128 解密 PaddingMode = PKCS5, CipherMode = ECB + * + * @param encryptedText Base64编码后的加密密文 + * @return 明文 + */ + public static String decrypt(String encryptedText) { + try { + String key = DoverProperty.get(AES_KEY, DEFAULT_KEY); + Cipher cipher = Cipher.getInstance(AES_ECB_PKCS5PADDING_ALGORITHM); + cipher.init(Cipher.DECRYPT_MODE, generateMySQLAESKey(key, Base64::decodeBase64)); + byte[] encryptData = cipher.doFinal(Base64.decodeBase64(encryptedText)); + return new String(encryptData, StandardCharsets.UTF_8); + } catch (Exception e) { + throw new DoverServiceException(ReserveResultCode.DECRYPTED_ERROR); + } + } + + /** + * mySQL特殊的Key生成方式,必须是128位即16个字节 + * 参考 https://dev.mysql.com/doc/refman/8.0/en/encryption-functions.html#function_aes-decrypt, + * https://stackoverflow.com/questions/19518447/aes-encryption-method-equivalent-to-mysql-aes-encrypt-function + * + * @param key + * @param encoding + * @return + */ + private static SecretKeySpec generateMySQLAESKey(final String key, Function encoding) { + final byte[] finalKey = new byte[32]; + int i = 0; + for (byte b : encoding.apply(key)) + finalKey[i++ % 32] ^= b; + return new SecretKeySpec(finalKey, AES_ALGORITHM); + } + + /** + * 使用Aes256 加密 PaddingMode = PKCS5, CipherMode = CBC + * + * @param text 明文 + * @param key 256位 byte 使用Base64编码后的字符串 + * @param iv Base64编码后的偏移量 + * @return Base64编码后的加密密文 + */ + public static String AesCbc256Encrypt(String text, String key, String iv) throws Exception { + Cipher cipher = Cipher.getInstance(AES_CBC_PKCS5PADDING_ALGORITHM); + byte[] keyBytes = Base64.decodeBase64(key); + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(keyBytes, AES_ALGORITHM), + new IvParameterSpec(Base64.decodeBase64(iv))); + byte[] encryptData = cipher.doFinal(text.getBytes(StandardCharsets.UTF_8)); + return Base64.encodeBase64String(encryptData); + } + + /** + * 使用Aes256 解密 PaddingMode = PKCS5, CipherMode = CBC + * + * @param encryptedText Base64编码后的加密密文 + * @param key 256位 byte 使用Base64编码后的字符串 + * @param iv Base64编码后的偏移量 + * @return 明文 + */ + public static String AesCbc256Decrypt(String encryptedText, String key, String iv) throws Exception { + Cipher cipher = Cipher.getInstance(AES_CBC_PKCS5PADDING_ALGORITHM); + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(Base64.decodeBase64(key), AES_ALGORITHM), + new IvParameterSpec(Base64.decodeBase64(iv))); + byte[] encryptData = cipher.doFinal(Base64.decodeBase64(encryptedText)); + return new String(encryptData, StandardCharsets.UTF_8); + } + + /** + * 生成IV Byte[16] + * + * @return + */ + public static IvParameterSpec createIV() throws Exception { + final byte[] iv = new byte[IV_SIZE_16]; + SecureRandom random = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM); + random.setSeed(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)); + random.nextBytes(iv); + return new IvParameterSpec(iv); + } + + /** + * 生成IV Byte[16] + * + * @return Base64编码的字符串 + */ + public static String createBase64EncodedIV() throws Exception { + final byte[] iv = new byte[IV_SIZE_16]; + SecureRandom random = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM); + random.setSeed(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)); + random.nextBytes(iv); + return Base64.encodeBase64String(iv); + } + + /** + * 生成Aes 128 KEY + * + * @return Base64编码的字符串 + * @throws Exception + */ + public static String createBase64EncodedAES128Key() throws Exception { + byte[] keys = createAESKey(AES_KEY_LENGTH_128).getEncoded(); + return Base64.encodeBase64String(keys); + } + + /** + * 生成Aes 256 KEY + * + * @return Base64编码的字符串 + * @throws Exception + */ + public static String createBase64EncodedAES256Key() throws Exception { + byte[] keys = createAESKey(AES_KEY_LENGTH_256).getEncoded(); + return Base64.encodeBase64String(keys); + } + + /** + * 生成Aes 128 KEY + * + * @return + * @throws Exception + */ + public static SecretKey createAES128Key() throws Exception { + return createAESKey(AES_KEY_LENGTH_128); + } + + /** + * 生成Aes 256 KEY + * + * @return + * @throws Exception + */ + public static SecretKey createAES256Key() throws Exception { + return createAESKey(AES_KEY_LENGTH_256); + } + + /** + * 生成Aes KEY + * + * @param keySize KEY的大小: 128 or 256 + * @return + * @throws Exception + */ + private static SecretKey createAESKey(int keySize) throws Exception { + String uuid = UUID.randomUUID().toString(); + SecretKeyFactory factory = SecretKeyFactory.getInstance(SECURE_RANDOM_ALGORITHM); + KeySpec spec = new PBEKeySpec(uuid.toCharArray(), uuid.getBytes(StandardCharsets.UTF_8), 65536, keySize); + SecretKey tmp = factory.generateSecret(spec); + return new SecretKeySpec(tmp.getEncoded(), AES_ALGORITHM); + } +} diff --git a/text-handle-demo/utils/IdUtils.java b/text-handle-demo/utils/IdUtils.java new file mode 100644 index 0000000..28106fe --- /dev/null +++ b/text-handle-demo/utils/IdUtils.java @@ -0,0 +1,141 @@ +package com.dover.util; + +import cn.hutool.core.util.IdUtil; +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.ZoneId; +import java.util.Random; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * 53 bits unique id: + *

+ * |--------|--------|--------|--------|--------|--------|--------|--------| + * |00000000|00011111|11111111|11111111|11111111|11111111|11111111|11111111| + * |--------|---xxxxx|xxxxxxxx|xxxxxxxx|xxxxxxxx|xxx-----|--------|--------| + * |--------|--------|--------|--------|--------|---xxxxx|xxxxxxxx|xxx-----| + * |--------|--------|--------|--------|--------|--------|--------|---xxxxx| + *

+ * Maximum ID = 11111_11111111_11111111_11111111_11111111_11111111_11111111 + *

+ * Maximum TS = 11111_11111111_11111111_11111111_111 + *

+ * Maximum NT = ----- -------- -------- -------- ---11111_11111111_111 = 65535 + *

+ * Maximum SH = ----- -------- -------- -------- -------- -------- ---11111 = 31 + *

+ * It can generate 64k unique id per IP and up to 2106-02-07T06:28:15Z. + */ +public class IdUtils { + + private static final Logger logger = LoggerFactory.getLogger(IdUtil.class); + + private static final Pattern PATTERN_LONG_ID = Pattern.compile("^([0-9]{15})([0-9a-f]{32})([0-9a-f]{3})$"); + +// private static final Pattern PATTERN_HOSTNAME = Pattern.compile("^.*\\D+([0-9]+)$"); + + private static final long OFFSET = LocalDate.of(2000, 1, 1).atStartOfDay(ZoneId.of("Z")).toEpochSecond(); + + private static final long MAX_NEXT = 0b11111_11111111_111L; + /** + * 服务启动向redis上报自身serverId,若冲突,则自增,直至不冲突 + */ + private static final String SERVER_ID = "reserve-union:server-id:"; + // 改为懒初始化 + private static long SHARD_ID = 32; + + private static long offset = 0; + + private static long lastEpoch = 0; + + public static String nextIdStr() { + return String.valueOf(nextId(System.currentTimeMillis() / 1000)); + } + + public static Long nextId() { + return nextId(System.currentTimeMillis() / 1000); + } + + public static String uuid() { + return UUID.randomUUID().toString().replace("-", ""); + } + + private static synchronized long nextId(long epochSecond) { + if (epochSecond < lastEpoch) { + // warning: clock was turn back: + logger.warn("clock has been back: " + epochSecond + " from previous:" + lastEpoch); + epochSecond = lastEpoch; + } + if (lastEpoch != epochSecond) { + lastEpoch = epochSecond; + reset(); + } + offset++; + long next = offset & MAX_NEXT; + if (next == 0) { + logger.warn("maximum id reached in 1 second in epoch: " + epochSecond); + return nextId(epochSecond + 1); + } + return generateId(epochSecond, next, getServerIdAsLong()); + } + + private static void reset() { + offset = 0; + } + + private static long generateId(long epochSecond, long next, long shardId) { + return ((epochSecond - OFFSET) << 21) | (next << 5) | shardId; + } + + private static long getServerIdAsLong() { + // shardId 只可能是 [0 , 31] 之间的值 + if (SHARD_ID != 32) return SHARD_ID; + try { + // 生成 serverId 后,尝试上报 serverId 至 redis,若上报成功,则使用该 serverId,若失败,则自增重新上报 + String hostname = InetAddress.getLocalHost().getHostName(); + int serverId = Math.abs(hostname.hashCode()) % 31; + for (int i = 0; i < 32; i++) { + boolean res = DoverRedisCommon.setNx(SERVER_ID + serverId, 1); + if (res) { + DoverLog.info("hostname={},serverId={},上报成功", hostname, serverId); + // 上报成功后,将serverId设置为一天失效(一般union+union2c两个项目会一起发布,共4台主机,一天之内全部重新部署8次也不会冲突) + DoverRedisCommon.expire(SERVER_ID + serverId, 60 * 60 * 24); + SHARD_ID = serverId; + return SHARD_ID; + } + DoverLog.info("hostname={},serverId={},上报失败", hostname, serverId); + serverId = serverId == 31 ? 0 : serverId + 1; + } + } catch (UnknownHostException e) { + logger.warn("无法获取主机名,将 serverId 设置为随机值 random(0, 31)"); + } + Random random = new Random(); + int serverId = random.nextInt(32); + DoverLog.info("根据hostname生成serverId上报已达上限,随机生成serverId={}", serverId); + SHARD_ID = serverId; + return serverId; + } + + public static long stringIdToLongId(String stringId) { + // a stringId id is composed as timestamp (15) + uuid (32) + serverId (000~fff). + Matcher matcher = PATTERN_LONG_ID.matcher(stringId); + if (matcher.matches()) { + long epoch = Long.parseLong(matcher.group(1)) / 1000; + String uuid = matcher.group(2); + byte[] sha1 = DigestUtils.sha1(uuid.getBytes(StandardCharsets.UTF_8)); + long next = ((sha1[0] << 24) | (sha1[1] << 16) | (sha1[2] << 8) | sha1[3]) & MAX_NEXT; + long serverId = Long.parseLong(matcher.group(3), 16); + return generateId(epoch, next, serverId); + } + throw new DoverServiceException("invalid id"); + } +} diff --git a/text-handle-demo/utils/JwtUtil.java b/text-handle-demo/utils/JwtUtil.java new file mode 100644 index 0000000..445f127 --- /dev/null +++ b/text-handle-demo/utils/JwtUtil.java @@ -0,0 +1,88 @@ +package com.dover.util; + +import io.jsonwebtoken.Claims; +import io.jsonwebtoken.Jws; +import io.jsonwebtoken.JwtException; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import io.jsonwebtoken.io.Decoders; +import io.jsonwebtoken.security.Keys; +import org.apache.commons.lang3.StringUtils; + +import javax.crypto.SecretKey; +import java.time.LocalDate; +import java.time.ZoneId; +import java.util.Date; + +/** + * @author dover + * @since 2022/5/24 + */ +public final class JwtUtil { + /** + * jwt 签名秘钥 + */ + private static final String SECRET_NAME = "constant.jwt.secret"; + /** + * token有效时间(秒) + */ + private static final String EXPIRE = "constant.jwt.expire-day"; + /** + * 默认secret + */ + public static final String DEFAULT_SECRET = "111111"; + + public static void main(String[] args) { +// SecretKey key = Keys.secretKeyFor(SignatureAlgorithm.HS256); +// String secretString = Encoders.BASE64.encode(key.getEncoded()); +// System.out.println(secretString); + String subject = "13851874332"; + DoverProperty.getPropertyMap().put(EXPIRE, 30); + System.out.println(generate(subject)); +// Claims claims = verify( +// "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0IiwiaWF0IjoxNjUzNDYzNjk0LCJleHAiOjE2NTM1NTAwOTR9.wQhGAljKbu8miv2D4oMIp7ivAnLsofccO6Ve97T7ipM"); +// System.out.println(claims.getSubject().equals(subject)); + } + + /** + * 生成jwt + * + * @param salerPhoneNo 导购的手机号 + * @return jwt + */ + public static String generate(String salerPhoneNo) { + Integer validDays = DoverProperty.getInteger(EXPIRE, ReserveConst.ONE); + // 过期日期(次日凌晨过期) + Date expiredDate = new Date(LocalDate.now() + .plusDays(validDays) + .atStartOfDay() + .atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli()); + SecretKey secretKey = Keys.hmacShaKeyFor(Decoders.BASE64.decode(DoverProperty.get(SECRET_NAME, DEFAULT_SECRET))); + return Jwts.builder() + .setSubject(salerPhoneNo) + .setIssuedAt(new Date()) + .setExpiration(expiredDate) + .signWith(secretKey, SignatureAlgorithm.HS256) + .compact(); + } + + /** + * 验证jwt有效性 + */ + public static Claims verify(String jwt) { + if (StringUtils.isBlank(jwt)) { + throw new AuthException(SalerConst.INVALID_SALER_TOKEN); + } + try { + String secret = DoverProperty.get(SECRET_NAME, DEFAULT_SECRET); + SecretKey secretKey = Keys.hmacShaKeyFor(Decoders.BASE64.decode(secret)); + Jws jws = Jwts.parserBuilder().setSigningKey(secretKey).build().parseClaimsJws(jwt); + return jws.getBody(); + } catch (JwtException ex) { + throw new AuthException(SalerConst.INVALID_SALER_TOKEN, ex); + } + } + +} diff --git a/text-handle-demo/utils/RsaUtil.java b/text-handle-demo/utils/RsaUtil.java new file mode 100644 index 0000000..35043c8 --- /dev/null +++ b/text-handle-demo/utils/RsaUtil.java @@ -0,0 +1,107 @@ +package com.dover.util; + + +import org.springframework.stereotype.Component; + +import javax.crypto.Cipher; +import java.nio.charset.StandardCharsets; +import java.security.*; +import java.security.spec.KeySpec; +import java.security.spec.PKCS8EncodedKeySpec; +import java.security.spec.X509EncodedKeySpec; + +/** + * @author dover + * @since 2022/6/28 + **/ +@Component +public class RsaUtil { + private static final int CRYPTO_BITS = 2048; + private static final String CRYPTO_METHOD = "RSA"; + private static final String CYPHER = "RSA/ECB/PKCS1Padding"; + private static final String RSA_PUBLIC_KEY = "RSA.public-key"; + private static final String RSA_PRIVATE_KEY = "RSA.private-key"; + + private static final String RSA_PC_PRIVATE_KEY = "rsa.privateKey"; + private static String DEFAULT_PUBLIC_KEY = "1"; + private static String DEFAULT_PRIVATE_KEY = "2"; + //private static String DEFAULT_PRIVATE_KEY = "3"; + + + public static void main(String[] args) throws Exception { +// getKeyPair(); +// System.out.println("pub_key: " + DEFAULT_PUBLIC_KEY); +// System.out.println("=============================================================================="); +// System.out.println("private_key: " + DEFAULT_PRIVATE_KEY); +// System.out.println("=============================================================================="); +// System.out.printf("%s=%s%n", "17626042021", RsaUtil.encrypt("17626042021")); +// System.out.printf("%s=%s%n", "13851874332", RsaUtil.encrypt("13851874332")); +// System.out.println("=============================================================================="); + + + String s = RsaUtil.decryptPC("aHj9Kc82V0ebhUCFnUuUeO1SSUAF3DUqGPih/1rZ2jD5jfnFwvOmKkaArkKVgVcxXCs/La4wDLoy4yyETL4EwLLghkry6A5LeyH+TtgZSfzTMEPtquOGI6jiEa8WK9LnvSxRkCxNqj8WJUKEv6vl5gMRf4GJpGlZ04V+CvlR6EnhMn7k2ohx96re4brLziv2OAnT+BP8BiZYpeg6E9OBbwn3rJ3aF9NbBgnoNnxvCfbjLIGZLch3BWO31jNBETD4POobNZoKDTaCzPX/tSN6rYxXd5gXnfAQwKD7sD5O1L+MdgA1XJnmJSG3bxPbhGUHk3Etf2mLcC+efLYbtgy6gg=="); + System.out.println(s); +// System.out.println(RsaUtil.decrypt(encryptedMessage)); +// System.out.println(RsaUtil.decrypt( +// "tmZJDzGqgjR6sIJgVT0RMfLu1BkoQP0YiwO4aPlyP96LlSyxyqUCk36bFmtIRJtgeeHXxG0X1pWRQsL/mzkQtJLj+inRhs5lSKhYtUHnd8alkMTbS3VoAIEUYcL9InbpcDgRMw7PuEIpFltBmGqQJs4XyEy2/TIbU6/Pib/C8UeI6HJL6qDtRfhsfADJcuXyr3lcZ3h5B9R//TeteNgwHm1dpPCXUsiBK4txX3G7ARDoKdj2j7MN0flLXX7yV+pe6EA3g8QGdiKDVNc4ksMb1/sIpQ84MOpxRi75eQZ7e1gvGZlWyJrZAXYxMUKcZQmw0Ad0XV28zBJzepb/JIyddQ==")); + } + + public static void getKeyPair() throws Exception { + KeyPairGenerator kpg = KeyPairGenerator.getInstance(CRYPTO_METHOD); + kpg.initialize(CRYPTO_BITS); + KeyPair kp = kpg.generateKeyPair(); + PublicKey publicKey = kp.getPublic(); + byte[] publicKeyBytes = publicKey.getEncoded(); + DEFAULT_PUBLIC_KEY = Base64.encode(publicKeyBytes); + PrivateKey privateKey = kp.getPrivate(); + byte[] privateKeyBytes = privateKey.getEncoded(); + DEFAULT_PRIVATE_KEY = Base64.encode(privateKeyBytes); + } + + public static String encrypt(String clearText) { + try { + KeyFactory keyFac = KeyFactory.getInstance(CRYPTO_METHOD); + KeySpec keySpec = new X509EncodedKeySpec( + Base64.decode(DoverProperty.get(RSA_PUBLIC_KEY, DEFAULT_PUBLIC_KEY).trim())); + Key key = keyFac.generatePublic(keySpec); + final Cipher cipher = Cipher.getInstance(CYPHER); + cipher.init(Cipher.ENCRYPT_MODE, key); + byte[] encryptedBytes = cipher.doFinal(clearText.getBytes(StandardCharsets.UTF_8)); + return Base64.encode(encryptedBytes).replaceAll("([\\r\\n])", ""); + } catch (Exception e) { + throw new DoverServiceException(ReserveResultCode.ENCRYPTED_ERROR); + } + } + + public static String decrypt(String encryptedBase64) { + try { + KeyFactory keyFac = KeyFactory.getInstance(CRYPTO_METHOD); + KeySpec keySpec = new PKCS8EncodedKeySpec( + Base64.decode(DoverProperty.get(RSA_PRIVATE_KEY, DEFAULT_PRIVATE_KEY).trim())); + Key key = keyFac.generatePrivate(keySpec); + final Cipher cipher = Cipher.getInstance(CYPHER); + cipher.init(Cipher.DECRYPT_MODE, key); + byte[] encryptedBytes = Base64.decode(encryptedBase64); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + return new String(decryptedBytes, StandardCharsets.UTF_8); + } catch (Exception e) { + throw new DoverServiceException(ReserveResultCode.DECRYPTED_ERROR, e); + } + } + + public static String decryptPC(String encryptedBase64) { + try { + KeyFactory keyFac = KeyFactory.getInstance(CRYPTO_METHOD); + KeySpec keySpec = new PKCS8EncodedKeySpec( + Base64.decode(DoverProperty.get(RSA_PC_PRIVATE_KEY, DEFAULT_PRIVATE_KEY).trim())); + Key key = keyFac.generatePrivate(keySpec); + final Cipher cipher = Cipher.getInstance(CYPHER); + cipher.init(Cipher.DECRYPT_MODE, key); + byte[] encryptedBytes = Base64.decode(encryptedBase64); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + return new String(decryptedBytes, StandardCharsets.UTF_8); + } catch (Exception e) { + throw new DoverServiceException(ReserveResultCode.DECRYPTED_ERROR, e); + } + } +} diff --git a/text-handle-demo/utils/TxUtil.java b/text-handle-demo/utils/TxUtil.java new file mode 100644 index 0000000..ebd1199 --- /dev/null +++ b/text-handle-demo/utils/TxUtil.java @@ -0,0 +1,24 @@ +package com.dover.util; + +import org.springframework.transaction.support.TransactionSynchronizationAdapter; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +/** + * @author dover + * @since 2022/11/14 + */ +public final class TxUtil { + + + /** + * 注册spring事务提交后逻辑 + */ + public static void afterCommit(Runnable runnable) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { + @Override + public void afterCommit() { + runnable.run(); + } + }); + } +}