上一节中我们说到了 Hot Observable 的一个实现 ---- ConnectableObservable。这一节中我们说一说 Hot Observable 的另一种实现 ---- Subject
Subject
按照惯例,先来一段能跑的代码
// 5.1.kt
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val observable = Observable.interval(10, TimeUnit.MILLISECONDS)
val subject = PublishSubject.create<Long>() // 注释1
observable.subscribe(subject) // 描点1 Subject 充当 Observer 角色
subject.subscribe({ println("Received $it") }) // 描点2 Subject 充当 Observable 角色
Thread.sleep(60)
}
输出
Received 0
Received 1
Received 2
Received 3
Received 4
Received 5
注释1
我们可以用 PublishSubject.create() 来创建 PublishSubject (PublishSubject 下方介绍)
-
PublishSubject是Subject的一种 -
Subject是Hot Observable的一种
(这里有空我补一个关系图)
Subject 是 Observable 与 Observer 的组合体
-
Observable的所有操作符它都有(操作符会在之后的章节介绍) - 它也可以像
Observer一样接收值 - 左耳朵进右耳朵出(对,我妈经常这么说我)。如果你向它的
Observer接口传入值(描点1), 它会把这些值选择性地从Observable接口处弹出去(描点2)
(PublishSubject会把所有从Observer接口传入的值按照时间顺序全部传出去)
这么做有什么用处,既然我们可以直接从 源Observable 订阅,为什么要在中间加一层 PublishSubject? 来看下一个例子
PublishSubject 的作用
// 5.2.kt
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
observable.subscribe({ println("Subscription 1 Received $it") })
Thread.sleep(200)
observable.subscribe({ println("Subscription 2 Received $it") })
Thread.sleep(300)
}
输出
Subscription 1 Received 0
Subscription 1 Received 1
Subscription 1 Received 2
Subscription 2 Received 0 // 注释1
Subscription 1 Received 3
Subscription 2 Received 1
Subscription 1 Received 4
Subscription 2 Received 2
注释1
订阅2 从 0 开始接收消息(因为它订阅的 observable 是一个 Cold Observable,所以会从头发送)
这里的输出结果和下面对比一下
// 5.3.kt
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val subject = PublishSubject.create<Long>()
observable.subscribe(subject)
subject.subscribe({ println("Subscription 1 Received $it") })
Thread.sleep(300)
subject.subscribe({ println("Subscription 2 Received $it") })
Thread.sleep(200)
}
输出
Subscription 1 Received 0
Subscription 1 Received 1
Subscription 1 Received 2
Subscription 1 Received 3
Subscription 2 Received 3 // 注释1
Subscription 1 Received 4
Subscription 2 Received 4
注释1
订阅2 从 3 开始接收消息(它错过了 0 1 2, 我们说过 Subject 是 Hot Observable 的一种)
在这里,我们通过 PublishSubject 把原来的 Cold 变成了 Hot(上一节的 publish 也能实现此功能,只不过得到的是 ConnectableObservable)
Subject 的各种实现
AsyncSubject
下面这张图是为了阐述 ReactiveX 原理常用的 Marble Diagram ,我会在明天专门去说 Marble Diagram 如何看(之前我也是各种看不懂,捂脸)

(图片来自 ReactiveX documentation)
AsyncSubject 会从 源Observable(Subject 的 Observer 接口传入值来自 源Observable) 接收所有值,并把最后一个值从 Observable 接口处弹出去,看一个例子
// 5.4.kt
import io.reactivex.Observable
import io.reactivex.subjects.AsyncSubject
fun main(args: Array<String>) {
val observable = Observable.just(1, 2, 3, 4)
val subject = AsyncSubject.create<Int>()
observable.subscribe(subject)
subject.subscribe(observer)
}
输出
New Subscription
Next 4
All Completed
我们可以不订阅任何的 Observable 而直接调用 Subject 的 onNext 方法(Observer 接口)传入值(其实上面 Subject 订阅 Observable 的时候,Subject 会在内部对每一个从 Observable 得到的值调用 onNext 方法)。就像这个例子
// 5.5.kt
import io.reactivex.subjects.AsyncSubject
fun main(args: Array<String>) {
val subject = AsyncSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe(observer) // 订阅1
subject.onNext(3)
subject.subscribe(observer) // 订阅2
subject.onNext(4)
subject.onComplete()
}
输出
New Subscription
New Subscription
Next 4 // 订阅1(我知道你要问为什么不输出 2 而是 4,下面有解释)
All Completed
Next 4 // 订阅2
All Completed
AsyncSubject 当且仅当调用 onComplete 方法时才会弹出值(和( ConnectableObservable 与 connect 方法的关系)差不多)
所以 订阅1 并没有输出 Next 2 而是输出 Next 4。
PublishSubject
PublishSubject 会把所有从 Observer 接口传入的值按照时间顺序全部传出

(图片来自 ReactiveX documentation)
BehaviorSubject
把 PublishSubject 与 AsyncSubject 组合在一起差不多就是 BehaviorSubject。
BehaviorSubject 会弹出订阅 BehaviorSubject 之前的最后一个值(AsyncSubject 的特性)和订阅 BehaviorSubject 之后的所有值(PublishSubject 的特性)
// 5.6.kt
import io.reactivex.subjects.BehaviorSubject
fun main(args: Array<String>) {
val subject = BehaviorSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe(observer)
subject.onNext(3)
subject.subscribe(observer)
subject.onNext(4)
subject.onComplete()
}
输出
/*
New Subscription
Next 2 // 订阅1 获取到了 `2` 而跳过了 `1`
Next 3 // 订阅1 获取到了订阅之后的值
New Subscription
Next 3 // 订阅2
Next 4 // 订阅1
Next 4 // 订阅2
All Completed
All Completed
*/
ReplaySubject
它和 Cold Observable 的性质差不多(我还不知道它有什么用,麻烦哪位同学告诉我,我加在这里,先谢过了)

(图片来自 ReactiveX documentation)
// 5.7.kt
import io.reactivex.subjects.ReplaySubject
fun main(args: Array<String>) {
val subject = ReplaySubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe(observer)
subject.onNext(3)
subject.subscribe(observer)
subject.onComplete()
}
输出
/*
New Subscription
Next 1
Next 2
Next 3
New Subscription
Next 1
Next 2
Next 3
All Completed
All Completed
*/
这节 OK 了,明天我们来一起学习一下看图(Marble Diagram)识字....
RxKotlin 例子不超过15行教程 1----环境配置与初体验
RxKotlin 例子不超过15行教程 2----Observable Observer 与 Subscribe 简介
RxKotlin 例子不超过15行教程 3----Observable 的创建
RxKotlin 例子不超过15行教程 4----Observer Subscribe 与 Hot/Cold Observable
RxKotlin 例子不超过15行教程 5----Subject
RxKotlin 例子不超过15行教程 6----Operator 与 Marble Diagram
RxKotlin 例子不超过15行教程 7----Backpressure Flowable 与 Subscriber 简介
