一、引言
RxJava 到底是什么?RxJava 是 Rx 在Java上的实现,Rx(Reactive Extensions)是一个库,用来处理事件和异步任务,在很多语言上都有实现。简单来说,RxJava就是处理异步的一个库,最基本是基于观察者模式来实现的。通过Obserable和Observer的机制,实现所谓响应式的编程体验。 Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各 Android开发者的欢迎。
二、RxJava介绍
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
RxJava是一个实现反应性扩展框架的Java虚拟机:用于通过使用观察序列构成异步和基于事件的程序库。扩展了观察者模式,以支持数据/事件序列,并增加了操作符,他可以将将序列清晰的组合在一起的。这些序列组合可以是抽象出来的某些数据/事件,如低级别的线程,同步,线程安全和并发数据结构。
RxJava 有四个基本概念:Observable (可观察者,即被观察者)、Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
RxJava最核心的两个东西就是Observables(被观察者,也就是事件源)和Subscribers(观察者),由Observables发出一系列的事件,Subscribers进行订阅接收并进行处理,看起来就好像是设计模式中的观察者模式,但是跟观察者模式不同的地方就在于,如果没有观察者(即Subscribers),Observables是不会发出任何事件的。
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError():
- onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志
- onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出
- 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个
RxJava的优势:简洁,而且当业务越繁琐越复杂时这一点就越显出优势——它能够保持简洁;它提供的各种功能强悍的操作符真的很强大。
Github:RxJava
三、RxJava应用
3.1 引入依赖
Rxjava有两个分支并行维护(估计还会存在一段时间)
compile 'io.reactivex:rxjava:1.x.y' //1.3.8
及
compile 'io.reactivex.rxjava2:rxjava:x.y.z' ////2.2.1
或
implementation "io.reactivex.rxjava2:rxjava:2.x.y" //2.2.1
项目中用到:
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
3.2 创建Subscriber(Observer)
//创建观察者或者订阅者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//Disposable是1.x的Subscription改名的,因为Reactive-Streams规范用这个名称,为了避免重复
//这个回调方法是在2.0之后新添加的
//可以使用d.dispose()方法来取消订阅
}
@Override
public void onNext(String value) {
Log.e("onNext", value);
}
@Override
public void onError(Throwable e) {
Log.e("onError", e.getMessage());
}
@Override
public void onComplete() {
Log.e("onComplete", "complete");
}
};
3.3 创建Observable
//创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("Hello Amiga!");
}
});
3.4 subscribe (订阅)
tempObservable.subscribe(tempSubsciber);
subscribe()主要做了一下工作:
- 调用subscriber.onStart()
- 调用 Observable 中的 OnSubscribe.call(Subscriber) 。处理事件发送的逻辑,在 RxJava 中, Observable并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候
- 将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe()
//创建订阅者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
//这一步是必须,我们通常可以在这里做一些初始化操作,调用request()方法表示初始化工作已经完成
//调用request()方法,会立即触发onNext()方法
//在onComplete()方法完成,才会再执行request()后边的代码
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String value) {
Log.e("onNext", value);
}
@Override
public void onError(Throwable t) {
Log.e("onError", t.getMessage());
}
@Override
public void onComplete() {
//由于Reactive-Streams的兼容性,方法onCompleted被重命名为onComplete
Log.e("onComplete", "complete");
}
};
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
e.onNext("Hello,I am China!");
}
}, BackpressureStrategy.BUFFER)
.subscribe(subscriber);
在onSubscribe()回调中必须调用s.request()方法去请求资源,参数就是要请求的数量,一般如果不限制请求数量,可以写成Long.MAX_VALUE,之后会立即触发onNext()方法!所以在onSubscribe()/onStart()中做了一些初始化的工作,而这些工作是在request()后面时,会出现一些问题,在onNext()执行时,初始化工作的那部分代码还没有执行。为了避免这种情况,请确保调用request()时,已经把所有初始化工作做完了。
3.5 实例
目前最经典的就是RxJava 与 Retrofit 结合使用:
public interface PrintData {
@GET("request")
Observable<Translation> getCall();
// 注解里传入 网络请求 的部分URL地址
// Retrofit把网络请求的URL分成了两部分:一部分放在Retrofit对象里,另一部分放在网络请求接口里
// 如果接口里的url是一个完整的网址,那么放在Retrofit对象里的URL可以忽略
// 采用Observable<...>接口
// getCall()是接受网络请求数据的方法
}
//通过Retrofit发送网络请求
// 创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava,1.X为RxJavaCallAdapterFactory
.build();
// 创建 网络请求接口 的实例
PrintData request = retrofit.create(PrintData.class);
// 采用Observable<...>形式 对 网络请求 进行封装
Observable<Translation> observable = request.getCall();
// 通过线程切换发送网络请求
observable.subscribeOn(Schedulers.io()) // 切换到IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) // 切换回到主线程 处理请求结果
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation result) {
// e.接收服务器返回的数据
result.show() ;
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "请求失败");
}
@Override
public void onComplete() {
}
});
3.6 取消订阅
Observable.subscribe()的返回值是一个 Subscription 对象。Subscription 类只有两个方法,unsubscribe() 和 isUnsubscribed()。
四、RxJava 操作符
4.1 create操作符
用来创建一个Observable的:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//TODO
}
})
4.2 just操作符
输入什么,输出什么
Observable.just("Hello world").subscribe(s -> showLog(s));
4.3 fromArray操作符
将输入的数组都发射出来
Observable.fromArray("Hello", "World", "girl").subscribe(s -> showLog(s));
4.4 map操作符
将一个对象使用指定的方法转换为另一个对象再发射出去
observableSigle = Observable.create(new ObservableOnSubscribe<Student>() {
@Override
public void subscribe(ObservableEmitter<Student> e) throws Exception {
//学生
Student student = new Student("CCHIP", 100);
e.onNext(student);
e.onComplete();
}
}).map(new Function<Student, Teacher>() {//使用map操作符进行转换
@Override
public Teacher apply(Student student) throws Exception {
//将学生根据业务要求转换为老师并发射出去
String name = student.getName() + "的老师";
boolean isGood = student.getScore() >= 90;
Teacher teacher = new Teacher(name, isGood);
return teacher;
}
});
RxJava操作符功能非常强大,但大部份目前没有用过,所以就不作过多介绍。
五、总结
一开始我认为RxJava 写的代码理解起来很困难,可是它真的很牛逼,学习及使用之后,让我后受益匪浅。