Rxjava采用观察者模式,主要用于处理异步操作和事件分发相关操作。是一款非常火热的开源库。下面的描述摘自github

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

RxJava的官网为:RxJava

使用示例

在介绍RxJava的源码之前,可以参考google的mvp+rxJava项目mvp+rxjava,简单看看rxjava的使用

示例1

TaskRemoteDataSource在项目中用于模拟访问远程数据源,因此在使用过程中,通过rxJava的delay方法,控制数据的延迟到达

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class TasksRemoteDataSource implements TasksDataSource {
private static TasksRemoteDataSource INSTANCE;
private static final int SERVICE_LATENCY_IN_MILLIS = 5000;
private final static Map<String, Task> TASKS_SERVICE_DATA;
static {
TASKS_SERVICE_DATA = new LinkedHashMap<>(2);
addTask("Build tower in Pisa", "Ground looks good, no foundation work required.");
addTask("Finish bridge in Tacoma", "Found awesome girders at half the cost!");
}
......
@Override
public Observable<List<Task>> getTasks() {
return Observable
.from(TASKS_SERVICE_DATA.values())
.delay(SERVICE_LATENCY_IN_MILLIS, TimeUnit.MILLISECONDS) //控制数据源延迟到达
.toList(); //将每一个task进行组装成List
}
@Override
public Observable<Task> getTask(@NonNull String taskId) {
final Task task = TASKS_SERVICE_DATA.get(taskId);
if(task != null) {
return Observable.just(task).delay(SERVICE_LATENCY_IN_MILLIS, TimeUnit.MILLISECONDS);
} else {
return Observable.empty();
}
}
......
}

示例2

TaskLocalDataSource在项目中模拟本地db中的数据源,该类中使用了sqlbrite库对SQLiteOpenHelper进行封装,从而更方便的使用rxjava

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class TasksLocalDataSource implements TasksDataSource {
@NonNull
private final BriteDatabase mDatabaseHelper;
@NonNull
private Func1<Cursor, Task> mTaskMapperFunction;
// Prevent direct instantiation.
private TasksLocalDataSource(@NonNull Context context,
@NonNull BaseSchedulerProvider schedulerProvider) {
checkNotNull(context, "context cannot be null");
checkNotNull(schedulerProvider, "scheduleProvider cannot be null");
TasksDbHelper dbHelper = new TasksDbHelper(context);
SqlBrite sqlBrite = SqlBrite.create();
mDatabaseHelper = sqlBrite.wrapDatabaseHelper(dbHelper, schedulerProvider.io());
mTaskMapperFunction = this::getTask;
}
@NonNull
private Task getTask(@NonNull Cursor c) {
String itemId = c.getString(c.getColumnIndexOrThrow(TaskEntry.COLUMN_NAME_ENTRY_ID));
String title = c.getString(c.getColumnIndexOrThrow(TaskEntry.COLUMN_NAME_TITLE));
String description =
c.getString(c.getColumnIndexOrThrow(TaskEntry.COLUMN_NAME_DESCRIPTION));
boolean completed =
c.getInt(c.getColumnIndexOrThrow(TaskEntry.COLUMN_NAME_COMPLETED)) == 1;
return new Task(title, description, itemId, completed);
}
@Override
public Observable<List<Task>> getTasks() {
String[] projection = {
TaskEntry.COLUMN_NAME_ENTRY_ID,
TaskEntry.COLUMN_NAME_TITLE,
TaskEntry.COLUMN_NAME_DESCRIPTION,
TaskEntry.COLUMN_NAME_COMPLETED
};
String sql = String.format("SELECT %s FROM %s", TextUtils.join(",", projection), TaskEntry.TABLE_NAME);
return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql)
.mapToList(mTaskMapperFunction);
}
@Override
public Observable<Task> getTask(@NonNull String taskId) {
String[] projection = {
TaskEntry.COLUMN_NAME_ENTRY_ID,
TaskEntry.COLUMN_NAME_TITLE,
TaskEntry.COLUMN_NAME_DESCRIPTION,
TaskEntry.COLUMN_NAME_COMPLETED
};
String sql = String.format("SELECT %s FROM %s WHERE %s LIKE ?",
TextUtils.join(",", projection), TaskEntry.TABLE_NAME, TaskEntry.COLUMN_NAME_ENTRY_ID);
return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql, taskId)
.mapToOneOrDefault(mTaskMapperFunction, null);
}
......
}

示例3

TasksRepository在提供本地内存数据源缓存的基础上,用于管理本地db数据源和远程数据源集合,提供透明而方便的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
public class TasksRepository implements TasksDataSource {
@Nullable
private static TasksRepository INSTANCE = null;
@NonNull
private final TasksDataSource mTasksRemoteDataSource;
@NonNull
private final TasksDataSource mTasksLocalDataSource;
/**
* This variable has package local visibility so it can be accessed from tests.
*/
@VisibleForTesting
@Nullable
Map<String, Task> mCachedTasks;
/**
* Marks the cache as invalid, to force an update the next time data is requested. This variable
* has package local visibility so it can be accessed from tests.
*/
@VisibleForTesting
boolean mCacheIsDirty = false;
// Prevent direct instantiation.
private TasksRepository(@NonNull TasksDataSource tasksRemoteDataSource,
@NonNull TasksDataSource tasksLocalDataSource) {
mTasksRemoteDataSource = checkNotNull(tasksRemoteDataSource);
mTasksLocalDataSource = checkNotNull(tasksLocalDataSource);
}
/**
* Gets tasks from cache, local data source (SQLite) or remote data source, whichever is
* available first.
*/
@Override
public Observable<List<Task>> getTasks() {
// Respond immediately with cache if available and not dirty
if (mCachedTasks != null && !mCacheIsDirty) {
return Observable.from(mCachedTasks.values()).toList();
} else if (mCachedTasks == null) {
mCachedTasks = new LinkedHashMap<>();
}
Observable<List<Task>> remoteTasks = getAndSaveRemoteTasks();
if (mCacheIsDirty) {
return remoteTasks;
} else {
// Query the local storage if available. If not, query the network.
Observable<List<Task>> localTasks = getAndCacheLocalTasks();
return Observable.concat(localTasks, remoteTasks)
.filter(tasks -> !tasks.isEmpty())
.first(); //这个地方非常精彩,通过contact的方法,提供本地db,和远程获取的List<Task>,如果本地db存在数据源,那么first将会返回由localTasks提供的数据源,否则使用远程提供的数据源
}
}
private Observable<List<Task>> getAndCacheLocalTasks() {
return mTasksLocalDataSource.getTasks()
.flatMap(new Func1<List<Task>, Observable<List<Task>>>() {
@Override
public Observable<List<Task>> call(List<Task> tasks) {
return Observable.from(tasks)
.doOnNext(task -> mCachedTasks.put(task.getId(), task))
.toList();
}
});//通过flatMap的方式,强行在流程中间插入保存内存cache的处理
}
private Observable<List<Task>> getAndSaveRemoteTasks() {
return mTasksRemoteDataSource
.getTasks()
.flatMap(new Func1<List<Task>, Observable<List<Task>>>() {
@Override
public Observable<List<Task>> call(List<Task> tasks) {
return Observable.from(tasks).doOnNext(task -> {
mTasksLocalDataSource.saveTask(task);
mCachedTasks.put(task.getId(), task);
}).toList();
}
})
.doOnCompleted(() -> mCacheIsDirty = false); //通过flatmap的方式,强行切换数据源的过程中插入中间流程,用于将数据源保存到本地db和内存之中
}
/**
* Gets tasks from local data source (sqlite) unless the table is new or empty. In that case it
* uses the network data source. This is done to simplify the sample.
*/
@Override
public Observable<Task> getTask(@NonNull final String taskId) {
checkNotNull(taskId);
final Task cachedTask = getTaskWithId(taskId);
// Respond immediately with cache if available
if (cachedTask != null) {
return Observable.just(cachedTask);
}
// Load from server/persisted if needed.
// Do in memory cache update to keep the app UI up to date
if (mCachedTasks == null) {
mCachedTasks = new LinkedHashMap<>();
}
// Is the task in the local data source? If not, query the network.
Observable<Task> localTask = getTaskWithIdFromLocalRepository(taskId);
Observable<Task> remoteTask = mTasksRemoteDataSource
.getTask(taskId)
.doOnNext(task -> {
mTasksLocalDataSource.saveTask(task);
mCachedTasks.put(task.getId(), task);
});
return Observable.concat(localTask, remoteTask).first()
.map(task -> {
if (task == null) {
throw new NoSuchElementException("No task found with taskId " + taskId);
}
return task;
});//此处通过map操作进行插入中间处理流程,插入对task的非空判断
}
@NonNull
Observable<Task> getTaskWithIdFromLocalRepository(@NonNull final String taskId) {
return mTasksLocalDataSource
.getTask(taskId)
.doOnNext(task -> mCachedTasks.put(taskId, task))
.first();
}
}

示例4

TasksPresenter通过使用TaskRepository来获取数据,从而更新界面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public class TasksPresenter implements TasksContract.Presenter {
@NonNull
private final TasksRepository mTasksRepository;
@NonNull
private final TasksContract.View mTasksView;
@NonNull
private final BaseSchedulerProvider mSchedulerProvider;
@NonNull
private TasksFilterType mCurrentFiltering = TasksFilterType.ALL_TASKS;
private boolean mFirstLoad = true;
@NonNull
private CompositeSubscription mSubscriptions;
public TasksPresenter(@NonNull TasksRepository tasksRepository,
@NonNull TasksContract.View tasksView,
@NonNull BaseSchedulerProvider schedulerProvider) {
mTasksRepository = checkNotNull(tasksRepository, "tasksRepository cannot be null");
mTasksView = checkNotNull(tasksView, "tasksView cannot be null!");
mSchedulerProvider = checkNotNull(schedulerProvider, "schedulerProvider cannot be null");
mSubscriptions = new CompositeSubscription();
mTasksView.setPresenter(this);
}
@Override
public void subscribe() {
loadTasks(false);
}
@Override
public void unsubscribe() {
mSubscriptions.clear();
}
@Override
public void loadTasks(boolean forceUpdate) {
// Simplification for sample: a network reload will be forced on first load.
loadTasks(forceUpdate || mFirstLoad, true);
mFirstLoad = false;
}
/**
* @param forceUpdate Pass in true to refresh the data in the {@link TasksDataSource}
* @param showLoadingUI Pass in true to display a loading icon in the UI
*/
private void loadTasks(final boolean forceUpdate, final boolean showLoadingUI) {
if (showLoadingUI) {
mTasksView.setLoadingIndicator(true);
}
if (forceUpdate) {
mTasksRepository.refreshTasks();
}
// The network request might be handled in a different thread so make sure Espresso knows
// that the app is busy until the response is handled.
EspressoIdlingResource.increment(); // App is busy until further notice
mSubscriptions.clear();
Subscription subscription = mTasksRepository
.getTasks()
.flatMap(new Func1<List<Task>, Observable<Task>>() {
@Override
public Observable<Task> call(List<Task> tasks) {
return Observable.from(tasks);
}
})
.filter(task -> {
switch (mCurrentFiltering) {
case ACTIVE_TASKS:
return task.isActive();
case COMPLETED_TASKS:
return task.isCompleted();
case ALL_TASKS:
default:
return true;
}
})
.toList()
.subscribeOn(mSchedulerProvider.computation())
.observeOn(mSchedulerProvider.ui())
.doOnTerminate(() -> {
if (!EspressoIdlingResource.getIdlingResource().isIdleNow()) {
EspressoIdlingResource.decrement(); // Set app as idle.
}
})
.subscribe(
// onNext
this::processTasks,
// onError
throwable -> mTasksView.showLoadingTasksError(),
// onCompleted
() -> mTasksView.setLoadingIndicator(false));
mSubscriptions.add(subscription);
}
private void processTasks(@NonNull List<Task> tasks) {
if (tasks.isEmpty()) {
// Show a message indicating there are no tasks for that filter type.
processEmptyTasks();
} else {
// Show the list of tasks
mTasksView.showTasks(tasks);
// Set the filter label's text.
showFilterLabel();
}
}
private void showFilterLabel() {
switch (mCurrentFiltering) {
case ACTIVE_TASKS:
mTasksView.showActiveFilterLabel();
break;
case COMPLETED_TASKS:
mTasksView.showCompletedFilterLabel();
break;
default:
mTasksView.showAllFilterLabel();
break;
}
}
private void processEmptyTasks() {
switch (mCurrentFiltering) {
case ACTIVE_TASKS:
mTasksView.showNoActiveTasks();
break;
case COMPLETED_TASKS:
mTasksView.showNoCompletedTasks();
break;
default:
mTasksView.showNoTasks();
break;
}
}
@Override
public void addNewTask() {
mTasksView.showAddTask();
}
......
}

示例5

StatisticsPresenter通过统计不同type的Task,通过Observable的zip方法将不同数据源的task进行组合,进行统一处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class StatisticsPresenter implements StatisticsContract.Presenter {
@NonNull
private final TasksRepository mTasksRepository;
@NonNull
private final StatisticsContract.View mStatisticsView;
@NonNull
private final BaseSchedulerProvider mSchedulerProvider;
@NonNull
private CompositeSubscription mSubscriptions;
public StatisticsPresenter(@NonNull TasksRepository tasksRepository,
@NonNull StatisticsContract.View statisticsView,
@NonNull BaseSchedulerProvider schedulerProvider) {
mTasksRepository = checkNotNull(tasksRepository, "tasksRepository cannot be null");
mStatisticsView = checkNotNull(statisticsView, "statisticsView cannot be null!");
mSchedulerProvider = checkNotNull(schedulerProvider, "schedulerProvider cannot be null");
mSubscriptions = new CompositeSubscription();
mStatisticsView.setPresenter(this);
}
@Override
public void subscribe() {
loadStatistics();
}
@Override
public void unsubscribe() {
mSubscriptions.clear();
}
private void loadStatistics() {
mStatisticsView.setProgressIndicator(true);
// The network request might be handled in a different thread so make sure Espresso knows
// that the app is busy until the response is handled.
EspressoIdlingResource.increment(); // App is busy until further notice
Observable<Task> tasks = mTasksRepository
.getTasks()
.flatMap(Observable::from);
Observable<Integer> completedTasks = tasks.filter(Task::isCompleted).count();
Observable<Integer> activeTasks = tasks.filter(Task::isActive).count();
Subscription subscription = Observable
.zip(completedTasks, activeTasks, (completed, active) -> Pair.create(active, completed))
.subscribeOn(mSchedulerProvider.computation())
.observeOn(mSchedulerProvider.ui())
.doOnTerminate(() -> {
if (!EspressoIdlingResource.getIdlingResource().isIdleNow()) {
EspressoIdlingResource.decrement(); // Set app as idle.
}
})
.subscribe(
// onNext
stats -> mStatisticsView.showStatistics(stats.first, stats.second),
// onError
throwable -> mStatisticsView.showLoadingStatisticsError(),
// onCompleted
() -> mStatisticsView.setProgressIndicator(false));
mSubscriptions.add(subscription);
} //这里的zip很精彩,将completeTask和activityTask的count信息进行统一组合,当获取二者数据之后,在同一进行数据下放
}

示例6

如何配合retrofit进行使用,那么效果非常好,下面例子摘自https://juejin.im/post/59a83f146fb9a0248f4a9aa2

场景一: 单请求异步处理

由于在Android UI线程中不能做一些耗时操作,比如网络请求,大文件保存等,所以在开发中经常会碰到异步处理的情况,我们最典型的使用场景是RxJava+Retrofit处理网络请求

1
2
3
4
5
MyService myService = retrofit.create(MyService.class);
myService.getSomething()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::updateUI, this::showError);

场景二: 多异步请求连续调用

这种场景其实也很常见,我们做用户头像编辑的使用,一般就会有三个请求需要连续调用:

请求头像上传的地址
上传头像
更新用户信息
在平时的代码里,我们需要一步步callback嵌套下来,代码冗长太难看,而且不好维护,使用RxJava链式调用处理代码逻辑就会非常清晰

1
2
3
4
5
6
7
MyService myService = retrofit.create(MyService.class);
myService.getUploadUrl()
.flatMap(this::uploadImageTask)
.flatMap(this::updateUserInfo)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::updateUI, this::showError);

先获取请求,再上传头像,最后更新用户信息,后面的任务依赖上一步的处理结果,依次执行。

场景三: 多异步请求合并处理

有时候在项目中,我们会碰到组合多个请求的结果后,再更新UI的情况,比如我们项目中就有一个从多个请求地址获取通知数据,然后在APP上再按时间顺序组合后展示的需求,这时候我们就可以用RxJava的zip函数来处理了

1
2
3
4
5
6
7
8
9
10
11
12
MyService myService = retrofit.create(MyService.class);
Observable o1 = myService.getNotification1();
Observable o2 = myService.getNotification2();
Observable.zip(o1,o2, this::combiNotification)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::updateUI, this::showError);
public List<Notification> combiNotification(List<Notification> n1, List<Notification> n2){
//TODO 合并通知列表
}

zip函数会等待两个请求都完成后,调用我们的合并方法combiNotification,等合并处理后再回调subscribe中的方法。

场景四: 定时轮询

RxJava还特别适合对定时轮询任务的处理, 一种典型的例子就是APP提交了一个任务给后台异步处理,假设后台处理需要1-2分钟左右,我们需要定时到后台查询进度,并更新到UI上, 传统的做法是用Handler的postDelay方法,用RxJava实现的话就会非常简洁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Subscription subscription = Observable.interval(2, TimeUnit.SECONDS)
.map(this::getProgress)
.takeUntil(progress -> progress != 100)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
//TODO finished
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(int progress) {
//TODO update progress
}
});

重要的类和字段功能

考虑到RxJava提供的接口和类非常多,本文主要是关注与几个特殊的点进行代码分析

重要接口

Observer<T>

Observer定义了作为观察者,收到信息的接口,通过onSubscribe接口,可以获取数据源disposable, 通过onNext可以接收到订阅的消息。通过onComplete或者onError可以获取接收完成抑或者接收失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public interface Observer<T> {
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}

Function<T,R>

Function接口则定一个简单apply接口

1
2
3
4
public interface Function<T, R> {
@NonNull
R apply(@NonNull T t) throws Exception;
}

Observable

使用RxJava机制时,一般做法是先创建一个可被观察的对象。而这个可被观察的对象,通常是通过Observable/Flowable的接口创建出来,在此挑出Observable出来进行简单的剖析

just/fromArray(T… items)

just/fromArray接口分别用于生成上述的ObservableJust/ObservableFromArray。过程非常简单,不做多述。其中有意思的是,Observable很多地方都调用了RxJavaPlugin.onAssembly方法,不要被该方法迷惑,他是用来提供hook使用的,当没有配置RxJavaPlugins中的hook时,通常为直接返回,下面给出部分简要代码

1
2
3
4
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

1
2
3
4
5
6
7
8
9
10
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
1
2
3
4
5
6
7
8
//摘自RxJavaPlugins
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

subscribe

subscribe方法用于obersable被oberser观察时进行调用,从源码上可以非常清楚的看到,最终会调用observable的subscribeActual方法上,这也正是ObservableJust/ObservableFromArray复写的方法

1
2
3
4
5
6
7
8
9
10
11
public final void subscribe(Observer<? super T> observer) {
......
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
}
......
}

ObservableJust/ObservableFromArray

这两类都非常简单,都继承Observable,并复写subscribeActual接口,用于自定义执行数据的分发。区别在于前者只保留一个值,对Observer进行一次onNext,onComplete调用,而后者保存一个数组,用于将多个数据通过多次onNext进行分发,最后调用onComplete。下面仅给出ObservableJust的代码,而ObservableFromArray代码类似就不做更多说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();//scalardisposable会在run的过程中,调用observer的onNext,onComplete,因此非常简单
}
@Override
public T call() {
return value;
}
}

ObservableMap

当Observable.map时创建并返回,map的作用是将可被观察的对象从一种类别转换到另一个类别。而转换的过程是通过生成一个新的observer插入到observer序列之前的方式实现的。我们具体看一下源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
......
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
......
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
......
}
}

从源码上可以看到,在ObservableMap中,将类型转换的function进行保存,并生成一个新的observer对象MapObserver,插入到目标observer序列之前,对类型进行function变换之后,在调用目标observer进行后续处理
这里存在一个非常巧妙的递归处理方式。ObservableMap的source字段保存是上一个Observable对象,在最后一个observable对象调用subscribe时,链表中的每一个observableMap,会触发source.subscribe操作(在subscribeActual中),依次到最上层的observable的subscribe操作。接着进行递归回来,依次从最上层的observer的onNext,一直到最下层的observer的onNext。因此此处的MapObserver相当于被插入到了observer处理链表中,用于类型转换

ObservableFlatMap

当Observable.flatMap调用时创建并返回。不同于ObservableMap,不是单纯的将一个数据进行类型转换,而是用于将一个被处理的类型转换成一批Observable类型。
从代码上来看,ObservableFlatMap在subscribe阶段返回一个MergeObserver类型对象,该对象既作为一个observer用于类型转换,同时也作为一个disposable,用于下发数据给下游的observer
下面给出部分核心代码,看看大致流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
@Override
public void subscribeActual(Observer<? super U> t) {
......
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
......
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
}
}
......
@Override
public void onNext(T t) {
ObservableSource<? extends U> p;
......
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
......
//当达到最大同时处理数目时,将p添加到sources队列中等待处理,否则通过subscribeInner进行处理
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
subscribeInner(p);
}
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
//如果p属于callable可以直接进行数据下放
if (p instanceof Callable) {
tryEmitScalar(((Callable<? extends U>)p));
//从缓存队列中取出ObservableSource进行数据下放
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
break;
}
}
} else {
break;
}
} else {
//构建一个InnerObserver对象,并用生成的p,进行subscribe操作
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
....

ObservableLift

当Observable.lift时创建并返回,observableLift主要用于observer的转换,转换过程依赖接口ObservableOperator。代码很简单,如下所示:

1
2
3
4
5
6
7
8
9
10
public interface ObservableOperator<Downstream, Upstream> {
/**
* Applies a function to the child Observer and returns a new parent Observer.
* @param observer the child Observer instance
* @return the parent Observer instance
* @throws Exception on failure
*/
@NonNull
Observer<? super Upstream> apply(@NonNull Observer<? super Downstream> observer) throws Exception;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final class ObservableLift<R, T> extends AbstractObservableWithUpstream<T, R> {
/** The actual operator. */
final ObservableOperator<? extends R, ? super T> operator;
public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) {
super(source);
this.operator = operator;
}
@Override
public void subscribeActual(Observer<? super R> s) {
Observer<? super T> observer;
observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer");
......
source.subscribe(observer);
}
}

Observable.lift操作是一个非常有用的操作,可以通过lift,在observer处理链上,加上自定义的操作,例如切换线程等等重要操作,可以通过lift来实现

ObservableSubscribeOn

这个对象非常关键,在Observable.subScribeOn被调用的时候创建。用于线程调度控制。从源码上来看主要控制的subscribe过程中的线程调度,即控制的是调用subScribeOn位置以上的源头生成数据过程所在线程。从另一个角度来说最早调用subScribeOn的地方,才是生成数据最终被控制生效的地方。下面看一下源码

1
2
3
4
5
6
7
8
9
10
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
......
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
......
}

从源码中可以看到封装了一个SubscribeTask,该task是一个runnable,在run()过程中,只做subscribe处理,而这个runnable在哪个线程执行,受到初始化ObservableSubscribeOn时输入的scheduler控制。而对于SubscribeOnObserver对象的parent来说,起到一个承上启下的作用。最主要的作用是在dispose过程中,负责dispose上流源头,同时dispose scheduler返回的disposable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s); //dispose上流源头
DisposableHelper.dispose(this); //dispose scheduler返回的disposable
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}

ObservableObserveOn

这个对象和ObservableSubscribeOn类似,同样是用来控制执行线程的,但是和ObservableSubscribeOn又存在极大的不同,主要的不同点在于ObservableSubscribeOn主要控制subscribe过程中,也就是源头数据生成操作所在的线程。而对于ObservableObserveOn主要用于控制数据生成后,被observer收到数据时所在的线程,下面我们来看一下具体的核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
......
@Override
protected void subscribeActual(Observer<? super T> observer) {
......
//通过指定的scheduler创建对应的worker,并传入新生成的ObserveOnObserver中
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
......
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
......
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//数据上流支持同步模式,则表示可以可以直接通过上流queue不停的poll获取数据
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
//这一步很关键,线程切换主要在这一步进行
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
//数据上流支持异步模式,数据准备好时,push到queue中,并通过onNext进行通知
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
//当数据上流不是QueueDisposable时,本地建立一个queue用户将受到数据push其中
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
......
//schedule方法很关键,线程切换就是在这个地方发生的
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
//worker.schedule之后,最终run接口会在指定的线程中被调用
@Override
public void run() {
//outputFused为true的情况是下流请求异步时执行
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
//正常情况下在worker的线程中,会执行这个方法进行数据分发
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
//循环不断的从queue中获取,调用下游observer的onNext方法进行数据下流
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//数据下流
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
//当下流请求异步模式时,worker线程中最终会调用该接口
void drainFused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null) {
actual.onError(error);
worker.dispose();
return;
}
//由于是async模式,因此只需要将数据放到queue中,通过onNext通知下流observer从queue中取就好,因此传递的是null
actual.onNext(null);
if (d) {
ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}