Skip to content

Commit

Permalink
feat:装换操作符
Browse files Browse the repository at this point in the history
  • Loading branch information
daleige committed Oct 23, 2021
1 parent f074cea commit 17163be
Showing 1 changed file with 33 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,41 +68,42 @@ class TransfActivity : AppCompatActivity() {
}

private fun flatMapOption() {
Observable.just("a-", "b-", "c-", "d-")
.flatMap(object : Function<String, ObservableSource<String>> {
override fun apply(value: String): ObservableSource<String> {
return Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(e: ObservableEmitter<String>?) {
Log.d(TAG, "flatMap 操作符 Int 转 ObservableOnSubscribe")
//flatMap操作符不是顺序执行的
for (i in 1..10) {
e?.onNext(value + (100 * i))
}
Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(e: ObservableEmitter<String>?) {
e?.onNext("A ")
e?.onNext("B ")
e?.onNext("C ")
}
}).flatMap(object : Function<String, ObservableSource<String>> {
override fun apply(value: String): ObservableSource<String> {
return Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(e: ObservableEmitter<String>?) {
Log.d(TAG, "flatMap 操作符 Int 转 ObservableOnSubscribe")
//flatMap操作符不是顺序执行的
for (i in 1..10) {
e?.onNext(value + (100 * i))
}
}).delay(1, TimeUnit.SECONDS)
}
})
.subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable?) {
Log.d(TAG, "flatMap 操作符 onSubscribe()")
}

override fun onNext(t: String?) {
Log.d(TAG, "flatMap 操作符 onNext() =$t")
}

override fun onError(e: Throwable?) {
Log.d(TAG, "flatMap 操作符 onError()")
}

override fun onComplete() {
Log.d(TAG, "flatMap 操作符 onComplete()")
}
}).delay(1, TimeUnit.SECONDS)
}
})
.subscribe(object : Consumer<String> {
override fun accept(t: String) {
Log.d(TAG, "flatMap 操作符 accept() = $t")
}
})
}

private fun concatMapOption() {
Observable.just("a-", "b-", "c-", "d-")
Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(e: ObservableEmitter<String>?) {
//只会接收第一个onNext ,后面的会被丢弃
e?.onNext("A ")
e?.onNext("B ")
e?.onNext("C ")
e?.onComplete()
}
})
.concatMap(object : Function<String, ObservableSource<String>> {
override fun apply(value: String): ObservableSource<String> {
return Observable.create(object : ObservableOnSubscribe<String> {
Expand All @@ -116,21 +117,9 @@ class TransfActivity : AppCompatActivity() {
}).delay(1, TimeUnit.SECONDS)
}
})
.subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable?) {
Log.d(TAG, "concatMap 操作符 onSubscribe()")
}

override fun onNext(t: String?) {
Log.d(TAG, "concatMap 操作符 onNext() =$t")
}

override fun onError(e: Throwable?) {
Log.d(TAG, "concatMap 操作符 onError()")
}

override fun onComplete() {
Log.d(TAG, "concatMap 操作符 onComplete()")
.subscribe(object : Consumer<String> {
override fun accept(t: String) {
Log.d(TAG, "concatMap 操作符 accept() = $t")
}
})
}
Expand Down

0 comments on commit 17163be

Please sign in to comment.