ReactiveX 之 Observable

阿里云产品限时红包,最高 ¥1888 元,立即领取

原文:http://reactivex.io/documentation/observable.html

对译文有建议,请发邮件或下方评论给我,万分感谢。

更新历史:1月13日

在RxJava中一个对象实现了Observer接口则被一个Observable类对象所订阅。订阅者则针对Obserable对象产生的值(包括项及项的序列)做响应。这种模式便利了异步操作,因为不用在等待Obserable产生对象时进行阻塞,它以订阅者的形式创建了一个哨兵,用于在Observable在将来的时间产生任何输出时提供合适的响应。

这篇文章解释什么是响应式模式以及什么是Observable与观察者(观察者怎样对Observable进行订阅)。

文档中的解释将采用”marble diagrams”的形式。下图说明”marble diagrams”怎样展示Observable以及Observable之间的转换。

Observables

背景

在许多软件编程任务中,你或多或少都希望自己编写的代码能够逐步的运行完成,因为你是一个一个的写下来的。但是在响应式编程范式中,许多代码并行的执行,其结果将在后来被观察者以任意的顺序捕获下来。在这种情况下,你不是调用一个方法,而是以Observable的形式为获取和转换数据定义一种机制,并将Observable订阅给订阅者。在预置的机制下当Observable产生的值到达时,观察者的哨兵捕获并对此进行响应。

这种解决方案的优点在于,当你有许多没有相互依赖的任务需要运行时,你可以在同一时刻启动它们,而不是需要在一个任务开始之前,等待其中的一个任务的结束。这样,你执行所有任务的所花费的时间只是其中耗时最长的任务的时间。

有许多种形式来描述这种异步编程和设计的模型。这篇文档将使用下面的形式:Subscriber(有时是Observer)订阅Observable类对象。也就是Subscriber对象订阅ObservableObservable产生值,并通过调用Subscriber的方法来发送通知给Subscriber

在其他文档或者上下文,有时我们也会将Subscriber称为watcherreactor。这个模型通常被被认为是响应模式

建立订阅者

这篇文档通常使用Groovy来做代码示例,实际上,你可以在任何基于JVM的语言上使用RxJava,如Clojure,Scala,JRuby或是Java本身。

与典型的响应式编程中无序的异步、并行不同,在传统的方法调用中,流程一般是这样的:

  1. 调用一个方法。
  2. 将方法的返回值保存在一个变量中。
  3. 使用这个变量以及它的新值做一些有用的事情。

或是用代码表示:

1
2
3
// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal

在异步模型中,流程则更像是这样:

  1. 定义一个方法来使用异步调用的返回值进行处理,这个方法是Subscriber的一部分。
  2. 用一个Observable类对象来定义异步调用本身。
  3. 通过订阅来将Subscriber关联Observable(这也同时初始化方法调用)。
  4. 继续你的业务逻辑;无论方法调用何时返回,Subscriber的方法开始对Observable所产生的项(返回值)进行操作。

用代码表示是这样:

1
2
3
4
5
6
7
8
// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the Subscriber is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business

onNext,onCompleted,以及onError

subscribe()方法可以接受1-3个方法,或者是一个Subscriber对象,或是任何实现了Observer接口(包含了这3个方法)的对象:

onNext:当Observable产生了一个值时,Observable将调用它的Subscriber上的这个方法。这个方法将Observable所产生的值作为它的参数。

onError:当Observable无法产生所预期的数据或是遇到了其他一些错误,Observable将调用它的Subscriber上的这个方法。这会使Observable停止,且不再掉调用onNextonCompletedonError将产生错误的误差指示作为它的参数。

onCompleted:在没有发生任何错误的情形下,Observable将在最后一次调用onNext之后调用其观察者的这个方法。

一个更加完整的subscribe()示例如下所示:

1
2
3
4
5
6
def myOnNext     = { item -> /* do something useful with item */ };
def myError = { throwable -> /* react sensibly to a failed call */ };
def myComplete = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business

Unsubscribing(退订)

在一些响应式扩展实现中,有专门的观察者接口,Subscriber来实现unsubscribe()方法。你可以调用这个方法来表明Subscriber不再对当前订阅的任何Observable感兴趣了。当不再有其他对其感兴趣的Observer,这些Observable可以选择停止产生新的值。

这个退订的结果会级联的影响到Observer所订阅的Observable上的操作链,这将导致操作链上的每个链接都停止产生项目。但这个并无法保证立即发生,当不再有Subscriber关注这些产生的值时,在短时间内,Observable可能仍然会产生新的值。

命名规范的注意事项

每一种特定语言的响应式扩展的实现都有自己的命名怪癖。这之间没有统一的命名标准,但在每个实现之间有许多的共性。

此外,在某些上下文中这些名称具有不同的含义,或者在一些特定的语言实现中显得尴尬。

举个例子:存在onEvent的命名模式(如:onNext, onCompleted, onError)。在许多上下文中,这些命名表明的是被注册的事件处理器上的方法。在ReactiveX中,这些命名则表示事件处理器本身。

“热”和”冷”Observable

Observable什么时候开始产生它的值序列呢?这个依赖于Observable。一个”热”的Observable将在它被新建之后便开始产生值,因此,任何订阅这种ObservableObserver将从这些序列的中间开始观察。另一方面,一个”冷”的Observable在开发产生值之前,会一直等待Observer订阅它。这样可以保证一个Observer看到从头开始的完整序列。

在响应式扩展的某些实现中,存在称为ConnectableObservable。这样的Observable直到它的connect方法被调用才开始产生值,不能是否有Observer已经订阅它。

组装Observable Operator

ObservableObserver仅仅是响应式扩展的开始。它是标准观察者模式的轻量扩展,比起一个简单的回调,它更适合与处理事件序列。

响应式扩展的真正力量在于,所有的Operator可以被变换,组合,操作,并和Observable所产生的值序列一起工作。

这些响应式扩展操作能让你通过利用所有回调的优点,以声明的方式将异步序列组织在一起。同时避免传统异步编程系统中的嵌套回调处理。

该文章将不同的操作及他们的示例用户组织在如下的页面中:

这些页面所包含的信息一些操作并不是响应式扩展的核心组成部分,但却是一些特定语言的实现或是可选模块。