rxjava和rxAndroid

简介

rxjava是Java一个rx最近的多线程技术。。。

他是一个扩展观察者模式实现的一个多线程技术

观察者模式:

观察者模式就是相当于警察(观察者)去观察小偷(被观察者),当小偷去偷东西(事件)的时候,警察能立刻知道,并做出相应的反应。不过在编程中是告诉被观察者在执行事件的过程中去提醒观察者。。现实中小偷不会偷东西时候提醒观察者

rxJava

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

基本创建:

Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() {            
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("hello");
                e.onComplete();
                e.onNext("hello2");

            }
        });

        Observer<String> oser = new Observer<String>() {            
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.w("","onSubscribe");
            }            @Override
            public void onNext(@NonNull String s) {
                Log.w("","onNext = "+s);
            }            @Override
            public void onError(@NonNull Throwable e) {
                Log.w("","onError" + e);
            }            @Override
            public void onComplete() {
                Log.w("","onComplete");
            }
        };

        Log.w("","subscribe");
        oble.subscribe(oser);

subscribe
onSubscribe
onNext = hello
onComplete

其中oble是一个被观察者,oser是一个观察者,被观察者可以调用onNext向观察者发送内容,此时观察者就能通过重写的onNext获取到数据,执行相应的操作

另外观察者还有oncomplete和onerror,如果执行了onComplete方法,那么就会断开联系,所以hello2没有显示出来,如果发生了错误会调用了onerror也会立马断开联系。

另一些

简写被观察者

上面的例子是create一个最基本的被观察者,当如果被观察者只有一个动作的时候就不需要那么复杂的操作,可以用一个just

Observable<String> observable = Observable.just("hello");

这样就是只执行一个onNext('hello');


简写观察者

当然对于观察者也是一样,如果不用考虑oncomplete和onerror也可以简写,创建一个consumer对象,重写accept方法就行,然后通过被观察者.subscribe(观察者)来建立联系

Observable<String> observable = Observable.just("hello");
        Consumer<String> consumer = new Consumer<String>() {
           @Override
           public void accept(String s) throws Exception {
               System.out.println(s);
           }
        };
        observable.subscribe(consumer);


创建完成或者错误的另一些方法

可以创建一个action对象来处理oncomplete的事件,用一个consumer来处理onnext和onerror事件,最后重载subscribe的一些方法达到建立关系的目的

Observable<String> observable = Observable.just("hello");
    Action onCompleteAction = new Action() {
        @Override
        public void run() throws Exception {
            Log.i("kaelpu", "complete");
        }
    };
    Consumer<String> onNextConsumer = new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.i("kaelpu", s);
        }
    };
    Consumer<Throwable> onErrorConsumer = new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.i("kaelpu", "error");
        }
    };
    observable.subscribe(onNextConsumer, onErrorConsumer, onCompleteAction);

}
public final Disposable subscribe() {} 
public final Disposable subscribe(Consumer<? super T> onNext) {} 
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {} 
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {} 
public final void subscribe(Observer<? super T> observer) {}

上面是subscribe一些重载方法

线程调度

rxJava最大的好处就是能够在多线程的情况下去实现,主要能应用在Android更新UI上。。

在建立关系subscribe的时候会有一些方法

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d("kaelpu", "Observable thread is : " + Thread.currentThread().getName());
            Log.d("kaelpu", "emitter 1");
            emitter.onNext(1);
        }
    });

    Consumer<Integer> consumer = new Consumer<Integer>() {        @Override
        public void accept(Integer integer) throws Exception {
            Log.d("kaelpu", "Observer thread is :" + Thread.currentThread().getName());
            Log.d("kaelpu", "onNext: " + integer);
        }
    };

    observable.subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(consumer);
}

最后那个建立关系的意思是让被监听者在Schedulers.newThread()这个新线程上,然后让观察者在AndroidSchedulers.mainThread()这个主线程上,就实现了主线程更新UI的操作,主要就是subscribeOn是让被观察者运行的线程,observeOn是观察者运行的线程

操作符的使用和Android的一些扩展可以看原文

作者:蒲文辉
链接:https://www.jianshu.com/p/7eb5ccf5ab1e
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。


xwm

还是一个菜鸟


Comments are closed.