本节会以各种各样的 Operator 为例来看奇奇怪怪的 Marble Diagram
- 本教程中所有
Operator均依照ReactiveX规范, 首字母大写 - 本节代码中的 observer 就是第二节中的
- 本节中的
Map是一种函数, 不是数据结构 - 本节中所有
Marble Diagram上面是源 Observable下面是生成的Observable - 这一节重点是
Marble Diagram, 所以Operator不会选择很生僻的
本节术语的中文翻译
Operator 运算符
Marble Diagram 弹珠图
Operator 是什么
请允许我用前面举了很多次的例子做开头,不过这一次我会给它配图
// 6.1.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
val observable = Observable.just(1, 2, 3)
observable.map { x -> 10 * x }.subscribe(observer) // 这里完全可以用 10*it ,为了和下面的图片一致我没有这么做
}
输出
New Subscription
Next 10
Next 20
Next 30
All Completed
Marble Diagram

Map 函数会把当前 Observable 中的每个值用传入的函数做一下变换再放入新的数据流中
- 重点在
源 Observable并不会被修改 -
新的 这个词不恰当,因为这个计算有可能是
惰性的, 多次应用Map方法会在一次变换中完成,并不会创建多个中间 Observable
上面例子中的 Map 就是一个 Operator (一会我们会更全面的理解它)。
链式 Operator (在 Kotlin 中的实现是 链式方法)
大多数 Operator 的作用对象可以是 Observable, 返回值可以是 Observable (还能作用到其他类型上,返回其他类型的值。之后会见到 Flowable Single 等类型)。所以我们可以一个接一个的使用 Operator ,每个 Operator 从上一个 Observable 中获取值经过变换传给下一个 Operator。
Operator 的范畴(截止 2018.2.26)
目前 ReactiveX 规范共定义有 454 个 Operator (Observable Operator 列表)
这里先概述一下,之后分别介绍。按照 ReactiveX 规范, Operator 有如下几类。
| 名称 | 职能 | 代表 |
|---|---|---|
| Creating | 创建新的 Observable
|
Just |
| Transforming | 把 源 Observable 中的值进行变换后弹出 |
Map |
| Filtering | 把 源 Observable 中的值选择性地弹出 |
Filter |
| Combining | 把多个 源 Observable 处理为一个 Observable
|
Zip |
| Error Handling | 失败恢复 | Retry |
| Observable Utility | 工具集 | Subscribe |
| Conditional and Boolean | 条件相关 | DefaultIfEmpty |
| Mathematical and Aggregate | 数学,聚合相关 | Max |
| Backpressure | 用于处理 Observable 弹射值的速度远大于 Observer 接收的速度的情况(下一节介绍) |
略 |
| Connectable Observable | 把 源 Observable 转换成一个 特化 的 Observable, 使其满足特定要求(如 精确订阅) |
Publish |
| Convert | 把 Observable 变成另一种对象或数据结构 |
To |
ReactiveX在很多平台上都有相应实现(如 RxJava RxJS Rx.NET 等等)
这些实现是有一些差异的 (如Rx.NET的SelectMany Operator对应ReactiveX的FlatMap Operator, 而这个Operator在RxKotlin中叫flatMap)
这套教程的最后会有一个 决策树 帮助你选择合适的 Operator 解决遇到的问题。
那么下面来看各种 Operator 的 Marble Diagram
Marble Diagram
接下来我们用各大范畴的 Operator 看 Marble Diagram
Filter(Filtering)
// 6.2.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
val observable = Observable.just(2, 30, 22, 5, 60, 1)
observable.filter { x -> x > 10 }.subscribe(observer) // it > 10
}
输出
New Subscription
Next 30
Next 22
Next 60
All Completed
Marble Diagram

FlatMap(Transforming)
和 Kotlin List 的 flatMap 相似
多说一句,
Map是Functor的方法,FlatMap是Monad的方法。
这篇教程不是专门讲函数式的, 此处不展开。有兴趣的话可以看 第二节 的参考链接
// 6.3.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
val observable = Observable.just(1, 5, 9) // 数字没有特殊含义
observable
.flatMap { x -> Observable.just(x + 1, x + 2) } // 这个例子非常牵强
.subscribe(observer)
}
输出
New Subscription
Next 2
Next 3
Next 6
Next 7
Next 10
Next 11
All Completed
Marble Diagram(和之前的有些不太一样, 这个是 形状风格 的, 之前是 数字风格 的)

DefaultIfEmpty(Conditional and Boolean)
当 Observable 中没有值的时候,我们订阅什么也得不到。比如下面的例子
// 6.4.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.range(0, 10)
.filter { it > 15 }
.subscribe(observer)
}
输出
New Subscription
All Completed
看, 什么也没有
那如果我们想在 Observable 没有值的时候给出一个默认值呢, 见下例
// 6.5.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.range(0, 10)
.filter { it > 15 }
.defaultIfEmpty(15)
.subscribe(observer)
}
输出
New Subscription
Next 15
All Completed
Marble Diagram

注意看图, 因为之前没有值,所以
源 Observable 的终点变为一个值, 又生成了一个新的终点。
StartWith(Combining)
// 6.6.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.just(2, 3)
.startWith(1)
.subscribe(observer)
}
输出
New Subscription
Next 1
Next 2
Next 3
All Completed
Marble Diagram

Count(Mathematical and Aggregate)
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.rxkotlin.subscribeBy
fun main(args: Array<String>) {
// Single 会在之后介绍
val count:Single<Long> = Observable.just(2, 30, 22, 5, 60, 1).count()
// subscribeBy 会在之后介绍
count.subscribeBy { println(it) }
}
输出
6
Marble Diagram(这个图和上面的代码不一致, 因为 RxKotlin 貌似没有办法在 count 后面加参数)

Scan(和 Map 一样, 是 Transforming)
// 6.8.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.just(1, 2, 3, 4, 5)
.scan { x, y -> x + y }
.subscribe(observer)
}
输出
New Subscription
Next 1
Next 3
Next 6
Next 10
Next 15
All Completed
Marble Diagram

Scan 接受一个有两个参数的函数为参数, 函数的第一个参数为累计值, 第二个参数为当前值, 这个函数返回一个值, 返回值被放在新的 Observable 中(同前,这里的 新的 也不准确)
再来看一个
Scan 的例子 (这个例子是为了更进一步了解 Scan, 不是为了演示 Marble Diagram)
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.just("1", "2", "3", "4", "5")
.scan { x, y -> x + " " + y }
.subscribe(observer)
}
输出
New Subscription
Next 1
Next 1 2
Next 1 2 3
Next 1 2 3 4
Next 1 2 3 4 5
All Completed
这一节 OK 了,明天我们说 Backpressure
RxKotlin 例子不超过15行教程 1----环境配置与初体验
RxKotlin 例子不超过15行教程 2----Observable Observer 与 Subscribe 简介
RxKotlin 例子不超过15行教程 3----Observable 的创建
RxKotlin 例子不超过15行教程 4----Observer Subscribe 与 Hot/Cold Observable
RxKotlin 例子不超过15行教程 5----Subject
RxKotlin 例子不超过15行教程 6----Operator 与 Marble Diagram
RxKotlin 例子不超过15行教程 7----Backpressure Flowable 与 Subscriber 简介
