ReactiveX is a wonderful framework that allows to write event based code in a very elegant and readable way. Still, getting started in it can be challenging, and intimidating. In practice once you understand few key principles of ReactiveX, you can start writing reactive code easily.
ReactiveX是一个很棒的框架,它允许以非常优雅和可读的方式编写基于事件的代码。 但是,开始使用它可能会充满挑战并且令人生畏。 在实践中,一旦您了解了ReactiveX的几个关键原理,就可以轻松地开始编写React式代码。
The aim of this article is to explain these keys principles, and show how they apply though a simple example. By the way, the first half of this article is language agnostic. So feel free to read it even if you use another programming languages than Python.
本文的目的是解释这些关键原理,并通过一个简单的示例展示它们如何应用。 顺便说一句,本文的前半部分与语言无关。 因此,即使您使用的是Python以外的其他编程语言,也可以随时阅读。
Before reading on, be aware of one important thing: Reactive Programming is addictive ! Once you start thinking as a data flows instead of control flows, you trend to consider that it solves problems better than other programming approaches, and you use reactive programming more and more.
在继续阅读之前,请注意一件事:React式编程会让人上瘾! 一旦开始以数据流而不是控制流的方式思考,就会倾向于认为它比其他编程方法更好地解决了问题,并且越来越多地使用React式编程。
响应式编程和ReactiveX (Reactive Programming and ReactiveX)
So what is reactive programming ? It is a way to write event driven code. The name comes from the fact that a reactive code is composed of entities that react to events being emitted by sources. These entities apply transformations on these events, and return other events as a result. So these entities — named operators — can be chained together, to create computation graphs.
那么什么是React式编程? 这是一种编写事件驱动代码的方法。 该名称源于以下事实:React性代码由对源发出的事件做出React的实体组成。 这些实体对这些事件应用转换,并因此返回其他事件。 因此,可以将这些名为操作符的实体链接在一起,以创建计算图。
Reactive computation graphs are always directed. They flow in only one way. Some graphs are Directed Acyclic Graphs — DAG — like this one:
React性计算图始终是有向的。 它们仅以一种方式流动。 一些图是有向无环图 -DAG-像这样:
On this diagram the nodes represent computations, and the edge the link between computations.
在此图上,节点代表计算,边缘代表计算之间的链接。
Some graphs may also be Cycle Graphs like this one:
一些图也可能是像这样的循环图:
Cycle graphs are very common when writing a fully reactive application. Most of the time the major part of an application graph is acyclic, and a sub-part may have cycles.
编写完全React性的应用程序时,循环图非常常见。 大多数情况下,应用程序图的主要部分是非循环的,而子部分可能具有循环。
ReactiveX is the most popular implementation of Reactive Programming libraries. One reason is that it was one of the firsts reactive libraries. It was initially developed by Microsoft for the .net platform. Since 2012 the code is open source, and has been ported to more than 20 programming languages.
ReactiveX是Reactive编程库的最流行的实现。 一个原因是它是最早的React式库之一。 它最初是由Microsoft为.net平台开发的。 自2012年以来,该代码是开源的,并已移植到20多种编程语言中。
The python implementation of ReactiveX is RxPY. The library is available on pypi and can be installed with pip:
ReactiveX的python实现是RxPY 。 该库在pypi上可用,可以与pip一起安装:
pip3 install rx
可观察,可观察者,可操作者 (Observable, Observer, Operator)
The foundation of ReactiveX is based on only few key principles described in the Observable Contract. Once you understand these principles, you will clearly understand the behavior of any ReactiveX code.
ReactiveX的基础仅基于可观察合同中描述的几个关键原则。 一旦了解了这些原理,便会清楚地了解任何ReactiveX代码的行为。
The base entity in ReactiveX is the Observable. An Observable is an entity that is a source of items. Item is the ReactiveX term for an event. You can consider that an Observable is a stream of event.
ReactiveX中的基本实体是Observable 。 可观察对象是作为项目来源的实体。 项目是事件的ReactiveX术语。 您可以认为Observable是事件流。
The second entity is the Observer. An Observer is the entity that subscribes to Observers, so that it can process items as they are emitted. The subscription to an Observable is explicit. This means that an Observable does not emit items until an Observer subscribes to it. When an Observable is created, no data flow occurs. The data flow starts at subscription time, not at creation time.
第二个实体是观察者 。 观察者是订阅观察者的实体,因此它可以在项目发出时进行处理。 对Observable的订阅是显式的。 这意味着,直到观察者订阅了观察者之后,观察者才发出项目。 创建Observable时,不会发生数据流。 数据流在订阅时开始,而不是在创建时开始。
We can then combine an Observer and an Observable to create an Operator. An operator subscribes to a source Observable, applies some transformations to the incoming items, and emits new items on another Observable.
然后我们可以结合一个Observer和Observable来创建一个Operator 。 运算符订阅源Observable,对传入的项目进行一些转换,然后在另一个Observable上发出新项目。
This is all you need to understand how ReactiveX works ! We will go in more details in the following paragraphs, but it all ends up to understanding these four notions: Observable, Observer, Subscription, Operator.
这就是您了解ReactiveX如何工作的全部! 在下面的段落中,我们将做更多详细的介绍,但是所有这些都会最终理解这四个概念:可观察,观察,订阅,运算符。
大理石图 (Marble Diagrams)
A picture is worth a thousand words.
一张图片胜过千言万语。
This is the motto of marble diagrams: A by example way to represent the behavior of an operator. You will find such diagrams almost everywhere in documentations. Consider the map operator. This operator takes items from a source Observable, applies a transformation function, and returns a sink observable with the transformation function applied on source items. This is a very simple behavior, but rather verbose to explain in plain English. The marble diagram of the map operator is an easier way to explain this, and also an intuitive way to understand how ReactiveX works:
这是大理石图的座右铭:通过一种示例方式来表示操作员的行为。 您可以在文档的几乎所有地方找到这样的图。 考虑地图运算符。 此运算符从可观察的源中获取项目,应用转换函数,并通过对源项目应用转换函数返回可观察到的接收器。 这是一个非常简单的行为,但用普通英语解释时比较冗长。 地图运算符的大理石图是一种更简单的解释方式,也是理解ReactiveX原理的直观方式:
There are three parts in this diagram:
此图分为三个部分:
- The top arrow represents the source observable: When being subscribed, this source Observable emits the numbers 1 to 4. 上方的箭头表示可观察的源:被订阅时,该可观察的源发射数字1到4。
- The rectangle represents the computation done by the operator. In this example, one is subtracted to each received item. 矩形代表操作员完成的计算。 在这个例子中,每个接收到的项目减去一个。
- The bottom arrow represents the sink Observable. As a result of subtracting 1 to each item, it emits items 0 to 3. 底部箭头表示接收器“可观察”。 由于每个项目都减去1,因此发出项目0到3。
On marble diagrams, time increases from left to right. So the leftmost item is emitted before the rightmost. The end of the arrows can have different shapes, each indicating different ways that the Observable completes:
在大理石图上,时间从左到右增加。 因此,最左边的项目在最右边的项目之前发出。 箭头的末端可以具有不同的形状,每个形状表示Observable完成的方式不同:
A Line ending with an arrow means that the Observable will continue to emit items in the future. Circles on the line are time positions when items are emitted.
以箭头结尾的线表示Observable在将来会继续发射物品。 线上的圆圈是发出项目时的时间位置。
A line ending with a pipe — | — indicates that the Observable completes on success. No more items can be emitted after.
以管道结尾的线— | | —表示Observable成功完成。 之后将不再发射任何物品。
A line ending with a cross — X — indicates that the Observable completes on error. No more items can be emitted after.
以叉号X结束的线表示Observable发生错误。 之后将不再发射任何物品。
React图 (Reactivity Diagrams)
Reactivity diagrams are another kind of visualization. They are used to describe the behavior of an application or a component. They are similar to UML Activity Diagrams, but they describe a data flow instead of a control flow. Let’s consider a simple application that takes a source observable as input, decrease the value, and keep only even values. Here is the reactivity diagram of this application:
React图是另一种可视化形式。 它们用于描述应用程序或组件的行为。 它们类似于UML活动图 ,但是它们描述的是数据流而不是控制流。 让我们考虑一个简单的应用程序,该应用程序将可观察到的源作为输入,减少其值,并仅保留偶数个值。 这是此应用程序的React性图:
The black circle indicates a source Observable. The rounded rectangles are operators. Here we chain two operators: map and filter. The encircled black circle is a sink of the data flow.
黑色圆圈表示可观察到的源。 圆角矩形是运算符。 在这里,我们链接两个运算符: map和filter 。 黑色圆圈是数据流的汇聚点。
More complex graphs can be described in a similar way. Reactivity diagrams are a good way to work on architecture before coding. See here another simple example with a cycle graph:
可以类似的方式描述更复杂的图。 React性图是在编码之前在体系结构上工作的好方法。 参见另一个带有循环图的简单示例:
给我看一些代码! (Show Me some code !)
You are now ready to read and write ReactiveX code ! Let’s implement the code corresponding to the first reactivity diagram of the previous section. We need two imports to use the ReactiveX operators:
您现在可以读写ReactiveX代码了! 让我们实现与上一部分的第一个React性图相对应的代码。 我们需要两次导入才能使用ReactiveX运算符:
import rximport rx.operators as ops
The first import is to use utility functions and factory operators. Factory operators are operators that create Observable from an external source of data instead of an Observable. The second import is a shortcut for using all other operators.
首先导入的是使用实用程序功能和工厂操作员。 工厂操作员是从外部数据源而不是Observable创建Observable的操作员。 第二个导入是使用所有其他运算符的快捷方式。
The first step is to create a source Observable. We do not use real data here, but instead we create an Observable from a list. This is done with the from_iterable factory operator:
第一步是创建一个源Observable。 我们在这里不使用实际数据,而是从列表中创建一个Observable。 这是通过from_iterable工厂操作员完成的:
import rximport rx.operators as opssource = rx.from_iterable([1, 2, 3, 4])
Then we build the computation graph. This one is composed of two operators: map and filter.
然后我们建立计算图。 这是由两个运算符组成: map和filter 。
import rximport rx.operators as opssource = rx.from_iterable([1, 2, 3, 4])source.pipe( ops.map(lambda i: i - 1), ops.filter(lambda i: i % 2 == 0),)
The pipe operator allows to chain other operators. It is an easy and readable way to create graphs. The map and filter operators take functions as parameters. We use lambdas here for these simple computations.
管道运算符允许链接其他运算符。 这是创建图形的简单易懂的方法。 映射和过滤运算符将函数作为参数。 我们在这里使用lambda进行这些简单的计算。
You can execute this code already. However, remember that nothing will happen yet: The graph is created but nobody subscribed to it, so no data flows yet. Let’s do that to consume the source observable:
您已经可以执行此代码。 但是,请记住,什么都不会发生:创建了图,但是没有人订阅它,因此还没有数据流。 让我们这样做来消耗可观察的源:
import rximport rx.operators as opssource = rx.from_iterable([1, 2, 3, 4])source.pipe( ops.map(lambda i: i - 1), ops.filter(lambda i: i % 2 == 0),).subscribe( on_next=lambda i: print("on_next: {}".format(i)), on_completed=lambda: print("on_completed"), on_error=lambda e: print("on_error: {}".format(e)))
The subscribe method… subscribes to an observable. It takes three callbacks as arguments. These callbacks will be called at different times:
订阅方法…订阅一个可观察对象。 它需要三个回调作为参数。 这些回调将在不同时间调用:
on_next is called each time an item is received.
每次接收到项目时都会调用on_next 。
on_completed is called when the observable completes on success.
当可观察对象成功完成时,将调用on_completed 。
on_error is called when the Observable completes on error.
当Observable发生错误时,将调用on_error 。
Note that according to the Observable Contract, the on_next callback will never be called after the on_completed and the on_error callbacks.
请注意,根据可观察到合同中,on_next回调将永远是on_completed后并称为ON_ERROR回调。
There is a final step needed to clean up the resources on completion. The subscription creates some resources. These resources have to be cleaned up when they are not needed anymore. For this, the subscribe method returns a Disposable object. The dispose method of this Disposable object can be called to clean up these resources:
最后需要清理完成的资源。 订阅将创建一些资源。 这些资源在不再需要时必须清理。 为此,subscribe方法返回一个Disposable对象。 可以调用此Disposable对象的dispose方法来清理这些资源:
import rximport rx.operators as opssource = rx.from_iterable([1, 2, 3, 4])disposable = source.pipe( ops.map(lambda i: i - 1), ops.filter(lambda i: i % 2 == 0),).subscribe( on_next=lambda i: print("on_next: {}".format(i)), on_completed=lambda: print("on_completed"), on_error=lambda e: print("on_error: {}".format(e)),)disposable.dispose()print("Done!")
错误管理 (Error Management)
Ok, our code is fine, but what happens if the source observable contains strings instead of integers ? For sure the map operator will raise an exception because subtracting 1 to a string is not allowed in python. Ideally, this should be handled explicitly, so that we can deal with this error. The good news is that this is already the case.
好的,我们的代码很好,但是如果可观察的源包含字符串而不是整数会怎样? 可以肯定的是, 地图操作符会引发异常,因为python中不允许对字符串减去1。 理想情况下,应对此进行明确处理,以便我们可以处理此错误。 好消息是事实已经如此。
Replacing the source observable with this:
以此替换可观察的源:
source = rx.from iterable([1, ”foo” , 3, 4])
gives the following result:
给出以下结果:
$ python demo1_error.pyon_next: 0on_error: unsupported operand type(s) for -: ’str’ and ’int’Done!
The on_error callback has been called with the exception raised by the map operator as an argument. So what happened here ? What is the consequence on the filter operator that is executed after the map operator ?
调用了on_error回调,但地图操作员提出了异常作为参数。 那么这里发生了什么? 在map运算符之后执行的filter运算符的结果是什么?
The full explanation — sometimes called Railway Oriented Programming — is in this figure:
在此图中,完整的解释(有时称为“ 铁路定向编程”) :
One can see an operator as working on two data flows in parallel:
可以看到操作员正在并行处理两个数据流:
- The happy path processes all incoming items. 快乐路径处理所有传入项目。
- The error path processes the errors. 错误路径处理错误。
As a consequence, here is the behavior of our application: Each time an item is emitted, it goes through the happy path of the map operator (i.e. its value is decreased). If all goes well, the resulting item continues on the happy path of the filter operator. On success, the on_next callback is called.
因此,这就是我们应用程序的行为:每次发出一个项目时,它都会经过地图运算符的满意路径(即,其值减小)。 如果一切顺利,则结果项将继续在过滤器运算符的满意路径上。 成功后,将调用on_next回调。
In case of error in the map function, the map operator catches the exception, and emits it on the error path. The exception is forwarded to the error path of the filter operator. The filter operator just forwards it downstream, and the on_error callback is called.
如果map函数发生错误,则map运算符会捕获异常,并在错误路径上发出该异常。 异常被转发到过滤器运算符的错误路径。 过滤器运算符仅将其转发到下游,然后调用on_error回调。
So operators deal explicitly with errors. There are also some operators dedicated to error management, such as retrying subscriptions, or generating errors on timeout.
因此,运算符可以明确处理错误。 也有一些专门用于错误管理的运算符,例如重试订阅或在超时时生成错误。
The great thing with this structure is that in many cases, you have error management for free. For people using functional programming, this is an implementation of the Try monad.
这种结构的优点在于,在许多情况下,您可以免费进行错误管理。 对于使用函数式编程的人,这是Try monad的实现。
并发 (Concurrency)
The example that we implemented is blocking: All the computation is done in the call to the subscribe method. This is the default behavior of ReactiveX, but it is not always desirable. Sometimes there is a need for concurrency management, either IO or CPU. ReactiveX deals with concurrency via dedicated operators and schedulers.
我们实现的示例是阻塞的 :所有计算都在对subscription方法的调用中完成。 这是ReactiveX的默认行为,但并不总是令人满意的。 有时需要IO或CPU并发管理。 ReactiveX通过专用的运算符和调度程序处理并发。
Schedulers are objects that manage threads and event loops. RxPY implements schedulers to deal with CPU concurrency via threads and thread pools. It also provides schedulers for IO concurrency with AsyncIO, Twisted, GEvent and Eventlet.
调度程序是管理线程和事件循环的对象。 RxPY实现调度程序,以通过线程和线程池处理CPU并发。 它还通过AsyncIO,Twisted,GEvent和Eventlet提供IO并发的调度程序。
For more details and examples on concurrency, see the RxPY documentation.
有关并发的更多详细信息和示例,请参阅RxPY文档 。
结论 (Conclusion)
You now have all the keys to start using Reactive Programming. Understanding these concepts is all you need to understand existing code and write your own one. Start playing with the library, get familiar with the existing operators. The real hard part of ReactiveX is to know the existing operators to avoid re-writing them in you code !
现在,您已具有所有键来开始使用React式编程。 了解这些概念是了解现有代码并编写自己的代码所需要的。 开始使用库,熟悉现有的运算符。 ReactiveX真正困难的部分是了解现有的运算符,以避免在您的代码中重写它们!
The RxPY documentation contains examples on all key features, and — hopefully — detailed information on each operator.
RxPY文档包含有关所有关键功能的示例,并希望提供有关每个操作员的详细信息。
Once you are familiar with this, the other concepts like multicasting, hot/cold Observables, and higher order Observables will be easy to grasp.
一旦熟悉了这一点,其他概念(例如多播,热/冷可观察对象和高阶可观察对象)将很容易掌握。
Originally published at https://blog.oakbits.com on September 1, 2020.
最初于 2020年9月1日 发布在 https://blog.oakbits.com 上。
翻译自: https://medium.com/swlh/an-introduction-to-reactive-programming-in-python-a9985e4c43b9