Rxjava采用观察者模式,主要用于处理异步操作和事件分发相关操作。是一款非常火热的开源库。下面的描述摘自github
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
RxJava的官网为:RxJava
使用示例 在介绍RxJava的源码之前,可以参考google的mvp+rxJava项目mvp+rxjava ,简单看看rxjava的使用
示例1 TaskRemoteDataSource在项目中用于模拟访问远程数据源,因此在使用过程中,通过rxJava的delay方法,控制数据的延迟到达1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class TasksRemoteDataSource implements TasksDataSource {
private static TasksRemoteDataSource INSTANCE;
private static final int SERVICE_LATENCY_IN_MILLIS = 5000 ;
private final static Map<String, Task> TASKS_SERVICE_DATA;
static {
TASKS_SERVICE_DATA = new LinkedHashMap<>(2 );
addTask("Build tower in Pisa" , "Ground looks good, no foundation work required." );
addTask("Finish bridge in Tacoma" , "Found awesome girders at half the cost!" );
}
......
@Override
public Observable<List<Task>> getTasks() {
return Observable
.from(TASKS_SERVICE_DATA.values())
.delay(SERVICE_LATENCY_IN_MILLIS, TimeUnit.MILLISECONDS)
.toList();
}
@Override
public Observable<Task> getTask (@NonNull String taskId) {
final Task task = TASKS_SERVICE_DATA.get(taskId);
if (task != null ) {
return Observable.just(task).delay(SERVICE_LATENCY_IN_MILLIS, TimeUnit.MILLISECONDS);
} else {
return Observable.empty();
}
}
......
}
示例2 TaskLocalDataSource在项目中模拟本地db中的数据源,该类中使用了sqlbrite库对SQLiteOpenHelper进行封装,从而更方便的使用rxjava1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class TasksLocalDataSource implements TasksDataSource {
@NonNull
private final BriteDatabase mDatabaseHelper;
@NonNull
private Func1<Cursor, Task> mTaskMapperFunction;
private TasksLocalDataSource (@NonNull Context context,
@NonNull BaseSchedulerProvider schedulerProvider) {
checkNotNull(context, "context cannot be null" );
checkNotNull(schedulerProvider, "scheduleProvider cannot be null" );
TasksDbHelper dbHelper = new TasksDbHelper(context);
SqlBrite sqlBrite = SqlBrite.create();
mDatabaseHelper = sqlBrite.wrapDatabaseHelper(dbHelper, schedulerProvider.io());
mTaskMapperFunction = this ::getTask;
}
@NonNull
private Task getTask (@NonNull Cursor c) {
String itemId = c.getString(c.getColumnIndexOrThrow(TaskEntry.COLUMN_NAME_ENTRY_ID));
String title = c.getString(c.getColumnIndexOrThrow(TaskEntry.COLUMN_NAME_TITLE));
String description =
c.getString(c.getColumnIndexOrThrow(TaskEntry.COLUMN_NAME_DESCRIPTION));
boolean completed =
c.getInt(c.getColumnIndexOrThrow(TaskEntry.COLUMN_NAME_COMPLETED)) == 1 ;
return new Task(title, description, itemId, completed);
}
@Override
public Observable<List<Task>> getTasks() {
String[] projection = {
TaskEntry.COLUMN_NAME_ENTRY_ID,
TaskEntry.COLUMN_NAME_TITLE,
TaskEntry.COLUMN_NAME_DESCRIPTION,
TaskEntry.COLUMN_NAME_COMPLETED
};
String sql = String.format("SELECT %s FROM %s" , TextUtils.join("," , projection), TaskEntry.TABLE_NAME);
return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql)
.mapToList(mTaskMapperFunction);
}
@Override
public Observable<Task> getTask (@NonNull String taskId) {
String[] projection = {
TaskEntry.COLUMN_NAME_ENTRY_ID,
TaskEntry.COLUMN_NAME_TITLE,
TaskEntry.COLUMN_NAME_DESCRIPTION,
TaskEntry.COLUMN_NAME_COMPLETED
};
String sql = String.format("SELECT %s FROM %s WHERE %s LIKE ?" ,
TextUtils.join("," , projection), TaskEntry.TABLE_NAME, TaskEntry.COLUMN_NAME_ENTRY_ID);
return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql, taskId)
.mapToOneOrDefault(mTaskMapperFunction, null );
}
......
}
示例3 TasksRepository在提供本地内存数据源缓存的基础上,用于管理本地db数据源和远程数据源集合,提供透明而方便的接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
public class TasksRepository implements TasksDataSource {
@Nullable
private static TasksRepository INSTANCE = null ;
@NonNull
private final TasksDataSource mTasksRemoteDataSource;
@NonNull
private final TasksDataSource mTasksLocalDataSource;
@VisibleForTesting
@Nullable
Map<String, Task> mCachedTasks;
@VisibleForTesting
boolean mCacheIsDirty = false ;
private TasksRepository (@NonNull TasksDataSource tasksRemoteDataSource,
@NonNull TasksDataSource tasksLocalDataSource) {
mTasksRemoteDataSource = checkNotNull(tasksRemoteDataSource);
mTasksLocalDataSource = checkNotNull(tasksLocalDataSource);
}
@Override
public Observable<List<Task>> getTasks() {
if (mCachedTasks != null && !mCacheIsDirty) {
return Observable.from(mCachedTasks.values()).toList();
} else if (mCachedTasks == null ) {
mCachedTasks = new LinkedHashMap<>();
}
Observable<List<Task>> remoteTasks = getAndSaveRemoteTasks();
if (mCacheIsDirty) {
return remoteTasks;
} else {
Observable<List<Task>> localTasks = getAndCacheLocalTasks();
return Observable.concat(localTasks, remoteTasks)
.filter(tasks -> !tasks.isEmpty())
.first();
}
}
private Observable<List<Task>> getAndCacheLocalTasks() {
return mTasksLocalDataSource.getTasks()
.flatMap(new Func1<List<Task>, Observable<List<Task>>>() {
@Override
public Observable<List<Task>> call(List<Task> tasks) {
return Observable.from(tasks)
.doOnNext(task -> mCachedTasks.put(task.getId(), task))
.toList();
}
});
}
private Observable<List<Task>> getAndSaveRemoteTasks() {
return mTasksRemoteDataSource
.getTasks()
.flatMap(new Func1<List<Task>, Observable<List<Task>>>() {
@Override
public Observable<List<Task>> call(List<Task> tasks) {
return Observable.from(tasks).doOnNext(task -> {
mTasksLocalDataSource.saveTask(task);
mCachedTasks.put(task.getId(), task);
}).toList();
}
})
.doOnCompleted(() -> mCacheIsDirty = false );
}
@Override
public Observable<Task> getTask (@NonNull final String taskId) {
checkNotNull(taskId);
final Task cachedTask = getTaskWithId(taskId);
if (cachedTask != null ) {
return Observable.just(cachedTask);
}
if (mCachedTasks == null ) {
mCachedTasks = new LinkedHashMap<>();
}
Observable<Task> localTask = getTaskWithIdFromLocalRepository(taskId);
Observable<Task> remoteTask = mTasksRemoteDataSource
.getTask(taskId)
.doOnNext(task -> {
mTasksLocalDataSource.saveTask(task);
mCachedTasks.put(task.getId(), task);
});
return Observable.concat(localTask, remoteTask).first()
.map(task -> {
if (task == null ) {
throw new NoSuchElementException("No task found with taskId " + taskId);
}
return task;
});
}
@NonNull
Observable<Task> getTaskWithIdFromLocalRepository (@NonNull final String taskId) {
return mTasksLocalDataSource
.getTask(taskId)
.doOnNext(task -> mCachedTasks.put(taskId, task))
.first();
}
}
示例4 TasksPresenter通过使用TaskRepository来获取数据,从而更新界面1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public class TasksPresenter implements TasksContract .Presenter {
@NonNull
private final TasksRepository mTasksRepository;
@NonNull
private final TasksContract.View mTasksView;
@NonNull
private final BaseSchedulerProvider mSchedulerProvider;
@NonNull
private TasksFilterType mCurrentFiltering = TasksFilterType.ALL_TASKS;
private boolean mFirstLoad = true ;
@NonNull
private CompositeSubscription mSubscriptions;
public TasksPresenter (@NonNull TasksRepository tasksRepository,
@NonNull TasksContract.View tasksView,
@NonNull BaseSchedulerProvider schedulerProvider) {
mTasksRepository = checkNotNull(tasksRepository, "tasksRepository cannot be null" );
mTasksView = checkNotNull(tasksView, "tasksView cannot be null!" );
mSchedulerProvider = checkNotNull(schedulerProvider, "schedulerProvider cannot be null" );
mSubscriptions = new CompositeSubscription();
mTasksView.setPresenter(this );
}
@Override
public void subscribe () {
loadTasks(false );
}
@Override
public void unsubscribe () {
mSubscriptions.clear();
}
@Override
public void loadTasks (boolean forceUpdate) {
loadTasks(forceUpdate || mFirstLoad, true );
mFirstLoad = false ;
}
private void loadTasks (final boolean forceUpdate, final boolean showLoadingUI) {
if (showLoadingUI) {
mTasksView.setLoadingIndicator(true );
}
if (forceUpdate) {
mTasksRepository.refreshTasks();
}
EspressoIdlingResource.increment();
mSubscriptions.clear();
Subscription subscription = mTasksRepository
.getTasks()
.flatMap(new Func1<List<Task>, Observable<Task>>() {
@Override
public Observable<Task> call (List<Task> tasks) {
return Observable.from(tasks);
}
})
.filter(task -> {
switch (mCurrentFiltering) {
case ACTIVE_TASKS:
return task.isActive();
case COMPLETED_TASKS:
return task.isCompleted();
case ALL_TASKS:
default :
return true ;
}
})
.toList()
.subscribeOn(mSchedulerProvider.computation())
.observeOn(mSchedulerProvider.ui())
.doOnTerminate(() -> {
if (!EspressoIdlingResource.getIdlingResource().isIdleNow()) {
EspressoIdlingResource.decrement();
}
})
.subscribe(
this ::processTasks,
throwable -> mTasksView.showLoadingTasksError(),
() -> mTasksView.setLoadingIndicator(false ));
mSubscriptions.add(subscription);
}
private void processTasks (@NonNull List<Task> tasks) {
if (tasks.isEmpty()) {
processEmptyTasks();
} else {
mTasksView.showTasks(tasks);
showFilterLabel();
}
}
private void showFilterLabel () {
switch (mCurrentFiltering) {
case ACTIVE_TASKS:
mTasksView.showActiveFilterLabel();
break ;
case COMPLETED_TASKS:
mTasksView.showCompletedFilterLabel();
break ;
default :
mTasksView.showAllFilterLabel();
break ;
}
}
private void processEmptyTasks () {
switch (mCurrentFiltering) {
case ACTIVE_TASKS:
mTasksView.showNoActiveTasks();
break ;
case COMPLETED_TASKS:
mTasksView.showNoCompletedTasks();
break ;
default :
mTasksView.showNoTasks();
break ;
}
}
@Override
public void addNewTask () {
mTasksView.showAddTask();
}
......
}
示例5 StatisticsPresenter通过统计不同type的Task,通过Observable的zip方法将不同数据源的task进行组合,进行统一处理1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class StatisticsPresenter implements StatisticsContract .Presenter {
@NonNull
private final TasksRepository mTasksRepository;
@NonNull
private final StatisticsContract.View mStatisticsView;
@NonNull
private final BaseSchedulerProvider mSchedulerProvider;
@NonNull
private CompositeSubscription mSubscriptions;
public StatisticsPresenter (@NonNull TasksRepository tasksRepository,
@NonNull StatisticsContract.View statisticsView,
@NonNull BaseSchedulerProvider schedulerProvider) {
mTasksRepository = checkNotNull(tasksRepository, "tasksRepository cannot be null" );
mStatisticsView = checkNotNull(statisticsView, "statisticsView cannot be null!" );
mSchedulerProvider = checkNotNull(schedulerProvider, "schedulerProvider cannot be null" );
mSubscriptions = new CompositeSubscription();
mStatisticsView.setPresenter(this );
}
@Override
public void subscribe () {
loadStatistics();
}
@Override
public void unsubscribe () {
mSubscriptions.clear();
}
private void loadStatistics () {
mStatisticsView.setProgressIndicator(true );
EspressoIdlingResource.increment();
Observable<Task> tasks = mTasksRepository
.getTasks()
.flatMap(Observable::from);
Observable<Integer> completedTasks = tasks.filter(Task::isCompleted).count();
Observable<Integer> activeTasks = tasks.filter(Task::isActive).count();
Subscription subscription = Observable
.zip(completedTasks, activeTasks, (completed, active) -> Pair.create(active, completed))
.subscribeOn(mSchedulerProvider.computation())
.observeOn(mSchedulerProvider.ui())
.doOnTerminate(() -> {
if (!EspressoIdlingResource.getIdlingResource().isIdleNow()) {
EspressoIdlingResource.decrement();
}
})
.subscribe(
stats -> mStatisticsView.showStatistics(stats.first, stats.second),
throwable -> mStatisticsView.showLoadingStatisticsError(),
() -> mStatisticsView.setProgressIndicator(false ));
mSubscriptions.add(subscription);
}
}
示例6 如何配合retrofit进行使用,那么效果非常好,下面例子摘自https://juejin.im/post/59a83f146fb9a0248f4a9aa2
场景一: 单请求异步处理
由于在Android UI线程中不能做一些耗时操作,比如网络请求,大文件保存等,所以在开发中经常会碰到异步处理的情况,我们最典型的使用场景是RxJava+Retrofit处理网络请求1
2
3
4
5
MyService myService = retrofit.create(MyService.class);
myService.getSomething()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this ::updateUI, this ::showError);
场景二: 多异步请求连续调用
这种场景其实也很常见,我们做用户头像编辑的使用,一般就会有三个请求需要连续调用:
请求头像上传的地址 上传头像 更新用户信息 在平时的代码里,我们需要一步步callback嵌套下来,代码冗长太难看,而且不好维护,使用RxJava链式调用处理代码逻辑就会非常清晰1
2
3
4
5
6
7
MyService myService = retrofit.create(MyService.class);
myService.getUploadUrl()
.flatMap(this ::uploadImageTask)
.flatMap(this ::updateUserInfo)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this ::updateUI, this ::showError);
先获取请求,再上传头像,最后更新用户信息,后面的任务依赖上一步的处理结果,依次执行。
场景三: 多异步请求合并处理
有时候在项目中,我们会碰到组合多个请求的结果后,再更新UI的情况,比如我们项目中就有一个从多个请求地址获取通知数据,然后在APP上再按时间顺序组合后展示的需求,这时候我们就可以用RxJava的zip函数来处理了1
2
3
4
5
6
7
8
9
10
11
12
MyService myService = retrofit.create(MyService.class);
Observable o1 = myService.getNotification1();
Observable o2 = myService.getNotification2();
Observable.zip(o1,o2, this ::combiNotification)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this ::updateUI, this ::showError);
public List<Notification> combiNotification (List<Notification> n1, List<Notification> n2) {
}
zip函数会等待两个请求都完成后,调用我们的合并方法combiNotification,等合并处理后再回调subscribe中的方法。
场景四: 定时轮询
RxJava还特别适合对定时轮询任务的处理, 一种典型的例子就是APP提交了一个任务给后台异步处理,假设后台处理需要1-2分钟左右,我们需要定时到后台查询进度,并更新到UI上, 传统的做法是用Handler的postDelay方法,用RxJava实现的话就会非常简洁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Subscription subscription = Observable.interval(2 , TimeUnit.SECONDS)
.map(this ::getProgress)
.takeUntil(progress -> progress != 100 )
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted () {
}
@Override
public void onError (Throwable e) {
}
@Override
public void onNext (int progress) {
}
});
重要的类和字段功能 考虑到RxJava提供的接口和类非常多,本文主要是关注与几个特殊的点进行代码分析
重要接口 Observer<T> Observer定义了作为观察者,收到信息的接口,通过onSubscribe接口,可以获取数据源disposable, 通过onNext可以接收到订阅的消息。通过onComplete或者onError可以获取接收完成抑或者接收失败1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public interface Observer <T > {
void onSubscribe (@NonNull Disposable d) ;
void onNext (@NonNull T t) ;
void onError (@NonNull Throwable e) ;
void onComplete () ;
}
Function<T,R> Function接口则定一个简单apply接口1
2
3
4
public interface Function <T , R > {
@NonNull
R apply (@NonNull T t) throws Exception ;
}
Observable 使用RxJava机制时,一般做法是先创建一个可被观察的对象。而这个可被观察的对象,通常是通过Observable/Flowable的接口创建出来,在此挑出Observable出来进行简单的剖析
just/fromArray(T… items) just/fromArray接口分别用于生成上述的ObservableJust/ObservableFromArray。过程非常简单,不做多述。其中有意思的是,Observable很多地方都调用了RxJavaPlugin.onAssembly方法,不要被该方法迷惑,他是用来提供hook使用的,当没有配置RxJavaPlugins中的hook时,通常为直接返回,下面给出部分简要代码1
2
3
4
public static <T> Observable<T> just (T item) {
ObjectHelper.requireNonNull(item, "The item is null" );
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
1
2
3
4
5
6
7
8
9
10
public static <T> Observable<T> fromArray (T... items) {
ObjectHelper.requireNonNull(items, "items is null" );
if (items.length == 0 ) {
return empty();
} else
if (items.length == 1 ) {
return just(items[0 ]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
1
2
3
4
5
6
7
8
public static <T> Observable<T> onAssembly (@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null ) {
return apply(f, source);
}
return source;
}
subscribe subscribe方法用于obersable被oberser观察时进行调用,从源码上可以非常清楚的看到,最终会调用observable的subscribeActual方法上,这也正是ObservableJust/ObservableFromArray复写的方法1
2
3
4
5
6
7
8
9
10
11
public final void subscribe (Observer<? super T> observer) {
......
try {
observer = RxJavaPlugins.onSubscribe(this , observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer" );
subscribeActual(observer);
}
......
}
ObservableJust/ObservableFromArray 这两类都非常简单,都继承Observable,并复写subscribeActual接口,用于自定义执行数据的分发。区别在于前者只保留一个值,对Observer进行一次onNext,onComplete调用,而后者保存一个数组,用于将多个数据通过多次onNext进行分发,最后调用onComplete。下面仅给出ObservableJust的代码,而ObservableFromArray代码类似就不做更多说明1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final class ObservableJust <T > extends Observable <T > implements ScalarCallable <T > {
private final T value;
public ObservableJust (final T value) {
this .value = value;
}
@Override
protected void subscribeActual (Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call () {
return value;
}
}
ObservableMap 当Observable.map时创建并返回,map的作用是将可被观察的对象从一种类别转换到另一个类别。而转换的过程是通过生成一个新的observer插入到observer序列之前的方式实现的。我们具体看一下源码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final class ObservableMap <T , U > extends AbstractObservableWithUpstream <T , U > {
......
public ObservableMap (ObservableSource<T> source, Function<? super T, ? extends U> function) {
super (source);
this .function = function;
}
@Override
public void subscribeActual (Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver <T , U > extends BasicFuseableObserver <T , U > {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super (actual);
this .mapper = mapper;
}
@Override
public void onNext (T t) {
......
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value." );
} catch (Throwable ex) {
fail(ex);
return ;
}
actual.onNext(v);
}
......
}
}
从源码上可以看到,在ObservableMap中,将类型转换的function进行保存,并生成一个新的observer对象MapObserver,插入到目标observer序列之前,对类型进行function变换之后,在调用目标observer进行后续处理 这里存在一个非常巧妙的递归处理方式。ObservableMap的source字段保存是上一个Observable对象,在最后一个observable对象调用subscribe时,链表中的每一个observableMap,会触发source.subscribe操作(在subscribeActual中),依次到最上层的observable的subscribe操作。接着进行递归回来,依次从最上层的observer的onNext,一直到最下层的observer的onNext。因此此处的MapObserver相当于被插入到了observer处理链表中,用于类型转换
ObservableFlatMap 当Observable.flatMap调用时创建并返回。不同于ObservableMap,不是单纯的将一个数据进行类型转换,而是用于将一个被处理的类型转换成一批Observable类型。 从代码上来看,ObservableFlatMap在subscribe阶段返回一个MergeObserver类型对象,该对象既作为一个observer用于类型转换,同时也作为一个disposable,用于下发数据给下游的observer 下面给出部分核心代码,看看大致流程1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public final class ObservableFlatMap <T , U > extends AbstractObservableWithUpstream <T , U > {
@Override
public void subscribeActual (Observer<? super U> t) {
......
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
static final class MergeObserver <T , U > extends AtomicInteger implements Disposable , Observer <T > {
......
@Override
public void onSubscribe (Disposable s) {
if (DisposableHelper.validate(this .s, s)) {
this .s = s;
actual.onSubscribe(this );
}
}
......
@Override
public void onNext (T t) {
ObservableSource<? extends U> p;
......
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource" );
......
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this ) {
if (wip == maxConcurrency) {
sources.offer(p);
return ;
}
wip++;
}
}
subscribeInner(p);
}
void subscribeInner (ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
tryEmitScalar(((Callable<? extends U>)p));
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this ) {
p = sources.poll();
if (p == null ) {
wip--;
break ;
}
}
} else {
break ;
}
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this , uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break ;
}
}
}
....
ObservableLift 当Observable.lift时创建并返回,observableLift主要用于observer的转换,转换过程依赖接口ObservableOperator。代码很简单,如下所示:1
2
3
4
5
6
7
8
9
10
public interface ObservableOperator <Downstream , Upstream > {
@NonNull
Observer<? super Upstream> apply(@NonNull Observer<? super Downstream> observer) throws Exception;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final class ObservableLift <R , T > extends AbstractObservableWithUpstream <T , R > {
final ObservableOperator<? extends R, ? super T> operator;
public ObservableLift (ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) {
super (source);
this .operator = operator;
}
@Override
public void subscribeActual (Observer<? super R> s) {
Observer<? super T> observer;
observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer" );
......
source.subscribe(observer);
}
}
Observable.lift操作是一个非常有用的操作,可以通过lift,在observer处理链上,加上自定义的操作,例如切换线程等等重要操作,可以通过lift来实现
ObservableSubscribeOn 这个对象非常关键,在Observable.subScribeOn被调用的时候创建。用于线程调度控制。从源码上来看主要控制的subscribe过程中的线程调度,即控制的是调用subScribeOn位置以上的源头生成数据过程所在线程。从另一个角度来说最早调用subScribeOn的地方,才是生成数据最终被控制生效的地方。下面看一下源码1
2
3
4
5
6
7
8
9
10
public final class ObservableSubscribeOn <T > extends AbstractObservableWithUpstream <T , T > {
......
@Override
public void subscribeActual (final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
......
}
从源码中可以看到封装了一个SubscribeTask,该task是一个runnable,在run()过程中,只做subscribe处理,而这个runnable在哪个线程执行,受到初始化ObservableSubscribeOn时输入的scheduler控制。而对于SubscribeOnObserver对象的parent来说,起到一个承上启下的作用。最主要的作用是在dispose过程中,负责dispose上流源头,同时dispose scheduler返回的disposable1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
static final class SubscribeOnObserver <T > extends AtomicReference <Disposable > implements Observer <T >, Disposable {
private static final long serialVersionUID = 8094547886072529208L ;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this .actual = actual;
this .s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe (Disposable s) {
DisposableHelper.setOnce(this .s, s);
}
@Override
public void onNext (T t) {
actual.onNext(t);
}
@Override
public void onError (Throwable t) {
actual.onError(t);
}
@Override
public void onComplete () {
actual.onComplete();
}
@Override
public void dispose () {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this );
}
@Override
public boolean isDisposed () {
return DisposableHelper.isDisposed(get());
}
void setDisposable (Disposable d) {
DisposableHelper.setOnce(this , d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this .parent = parent;
}
@Override
public void run () {
source.subscribe(parent);
}
}
ObservableObserveOn 这个对象和ObservableSubscribeOn类似,同样是用来控制执行线程的,但是和ObservableSubscribeOn又存在极大的不同,主要的不同点在于ObservableSubscribeOn主要控制subscribe过程中,也就是源头数据生成操作所在的线程。而对于ObservableObserveOn主要用于控制数据生成后,被observer收到数据时所在的线程,下面我们来看一下具体的核心代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
public final class ObservableObserveOn <T > extends AbstractObservableWithUpstream <T , T > {
......
@Override
protected void subscribeActual (Observer<? super T> observer) {
......
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
......
}
static final class ObserveOnObserver <T > extends BasicIntQueueDisposable <T >
implements Observer <T >, Runnable {
......
@Override
public void onSubscribe (Disposable s) {
if (DisposableHelper.validate(this .s, s)) {
this .s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings ("unchecked" )
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true ;
actual.onSubscribe(this );
schedule();
return ;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this );
return ;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this );
}
}
@Override
public void onNext (T t) {
if (done) {
return ;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError (Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return ;
}
error = t;
done = true ;
schedule();
}
@Override
public void onComplete () {
if (done) {
return ;
}
done = true ;
schedule();
}
......
void schedule () {
if (getAndIncrement() == 0 ) {
worker.schedule(this );
}
}
@Override
public void run () {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal () {
int missed = 1 ;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return ;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return ;
}
boolean empty = v == null ;
if (checkTerminated(d, empty, a)) {
return ;
}
if (empty) {
break ;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0 ) {
break ;
}
}
}
void drainFused () {
int missed = 1 ;
for (;;) {
if (cancelled) {
return ;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null ) {
actual.onError(error);
worker.dispose();
return ;
}
actual.onNext(null );
if (d) {
ex = error;
if (ex != null ) {
actual.onError(ex);
} else {
actual.onComplete();
}
worker.dispose();
return ;
}
missed = addAndGet(-missed);
if (missed == 0 ) {
break ;
}
}
}