RxJava——线程控制

前言

对于一般的需求场景,需要在子线程中实现耗时的操作;然后回到主线程实现 UI操作应用到 RxJava模型中,可理解为:

被观察者 (Observable)子线程 中生产事件(如实现耗时操作等等)
观察者(Observer)主线程 接收 & 响应事件(即实现UI操作


实现方式

采用 RxJava内置的线程调度器( Scheduler ),即通过 功能性操作符subscribeOn() & observeOn()实现


subscribeOn

使用该方法可以指定被观察者执行方法位于的线程。

注意:该方法调用只能生效一次,即第一次调用后,再调用subscribeOn无法改变其执行线程的位置。

observeOn

使用该方法指定观察者事件响应位于的线程。

注意:该方法可调用多次,每一次调用observeOn,后续操作线程就会切换一次,这里的后续操作指的是调用observeOn后,在下一个observeOn前指定的事件监听操作


代码实现

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        Log.v("rxJava","Observable thread :" + Thread.currentThread().getName());
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
}).subscribeOn(Schedulers.io()) // 指定被观察者执行线程
        .observeOn(Schedulers.newThread()) // 切换到新线程
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.v("rxJava","do on next thread :" + Thread.currentThread().getName());
            }
        })
        .observeOn(Schedulers.newThread()) // 切换到另一个新线程
        .filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                Log.v("rxJava","filter thread:"+ Thread.currentThread().getName());
                return true;
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("rxJava","Observer onSubscribe thread :" + Thread.currentThread().getName());
            }

            @Override
            public void onNext(Integer value) {
                Log.v("rxJava","Observer onNext thread :" + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                Log.v("rxJava","Observer onError thread :" + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                Log.v("rxJava","Observer onComplete thread :" + Thread.currentThread().getName());
            }
        });

运行结果

V/rxJava: Observer onSubscribe thread :main
V/rxJava: Observable thread :RxCachedThreadScheduler-1
V/rxJava: do on next thread :RxNewThreadScheduler-1
V/rxJava: do on next thread :RxNewThreadScheduler-1
V/rxJava: do on next thread :RxNewThreadScheduler-1
V/rxJava: filter thread:RxNewThreadScheduler-2
V/rxJava: Observer onNext thread :RxNewThreadScheduler-2
V/rxJava: filter thread:RxNewThreadScheduler-2
V/rxJava: Observer onNext thread :RxNewThreadScheduler-2
V/rxJava: filter thread:RxNewThreadScheduler-2
V/rxJava: Observer onNext thread :RxNewThreadScheduler-2
V/rxJava: Observer onComplete thread :RxNewThreadScheduler-2

注意:onSubscribe方法总是执行在调用subscribe方法的线程


线程可选参数

schedulers.io()

这个调度器时用于I/O操作。它基于根据需要,增长或缩减来自适应的线程池。我们将使用它来修复StrictMode检测到的违规做法。由于它专用于I/O操作,所以并不是RxJava的默认方法;正确的使用它是由开发者决定的。 重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存。一如既往的是,我们需要在性能和简捷两者之间找到一个有效的平衡点。

Schedulers.computation()

这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer(),debounce(),delay(),interval(),sample(),skip()。

Schedulers.immediate()

这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器。

Schedulers.newThread()

指定一个新线程来执行任务,如果有多个步骤且每个步骤都使用这个方法调度,则每个步骤都是在一个新的线程中,而不是同一个线程。

Schedulers.trampoline()

当我们想在当前线程执行一个任务时,并不是立即,我们可以用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。它是repeat()和retry()方法默认的调度器。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 本篇代码见:RxJava_Demo_Translater 这里开始学习RxJava线程控制(切换/调度)。一、Rx...
    Jotyy阅读 2,555评论 0 1
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 10,915评论 7 62
  • 最近项目里面有用到Rxjava框架,感觉很强大的巨作,所以在网上搜了很多相关文章,发现一片文章很不错,今天把这篇文...
    Scus阅读 11,784评论 2 50
  • 人生不存在走弯路,一切都是最好的安排! 二十几岁时没加过班,没考过证。上班下班过着正常的生活,虽然抱怨过工资低位,...
    小八杂谈阅读 1,352评论 1 1
  • 10月15日晚上,以“怦然新动”为主题的南岳学院2017级迎新晚会在西校区排演厅举行。学院党总支书记匡云山、院长王...
    衡阳师范学院阅读 1,059评论 0 0