赞
踩
反应式编程将函数式范式与复杂的大规模编程能力结合在一起。这些能力允许在应用程序架构中使用类似函数式的语义。ReactiveX是反应式世界中最强大的项目之一,为语言实现者提供了一套通用的规范。本文是对RxJava的实践探索,RxJava是ReactiveX的Java实现。
开始使用RxJava
为了测试RxJava,我们将编写一个命令行应用程序,监视CoinCap开发的公共事件流。该事件流提供了一个WebSocket API,类似于一个广泛的加密货币交易所上每个交易的JSON格式事件的消防栓。我们将首先简单地获取这些事件并将它们打印到控制台。然后,我们将添加一些更复杂的处理来展示RxJava的能力。清单1让我们从Maven快速入门原型开始,它为我们的演示应用程序提供了脚手架。
清单1:Maven快速入门
mvn archetype:generate -DgroupId=com.infoworld -DartifactId=rxjava -DarchetypeArtifactId=maven-archetype-quickstar
现在我们在目录中存储了一个简单的项目脚手架。我们可以修改它来包含我们需要的依赖项。我们还设置了程序的Java版本,如清单2所示。
清单2:修改pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.infoworld</groupId> <artifactId>rxjava</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>rxjava</name> <url>http://maven.apache.org</url> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>16</source> <target>16</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.2.21</version> </dependency> <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>4.9.1</version> </dependency> <!-- JSON library for parsing GitHub API response --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.9</version> </dependency> </dependencies> </project>
为了确认一切正常工作,请输入以下命令:. 这个命令应该会输出经典的“Hello World”。现在,我们将添加从WebSocket端点获取事件并在控制台中显示它们的基本功能的代码。可以在清单3中看到这段代码。
清单3:添加一个特性
package com.infoworld; import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; import io.reactivex.schedulers.Schedulers; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.WebSocket; import okhttp3.WebSocketListener; import okio.ByteString; public class App { public static void main(String[] args) { String websocketUrl = "wss://ws.coincap.io/trades/binance"; OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder().url(websocketUrl).build(); Observable<String> observable = Observable.create(emitter -> { WebSocket webSocket = client.newWebSocket(request, new WebSocketListener() { @Override public void onOpen(WebSocket webSocket, okhttp3.Response response) { // WebSocket connection is open } @Override public void onMessage(WebSocket webSocket, String text) { emitter.onNext(text); // Emit received message } @Override public void onMessage(WebSocket webSocket, ByteString bytes) { // Handle binary message if needed } @Override public void onClosing(WebSocket webSocket, int code, String reason) { webSocket.close(code, reason); } @Override public void onClosed(WebSocket webSocket, int code, String reason) { emitter.onComplete(); // WebSocket connection is closed } @Override public void onFailure(WebSocket webSocket, Throwable t, okhttp3.Response response) { emitter.onError(t); // WebSocket connection failure } }); // Dispose WebSocket connection when the observer is disposed emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket"))); }); observable .subscribeOn(Schedulers.io()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { // No-op } @Override public void onNext(String event) { // Process each event here System.out.println(event); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("Completed"); } }); // Wait indefinitely (or use another mechanism to keep the program running) try { Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } }
如果运行这个程序,将会逐行输出JSON事件,每行一个事件。要停止程序,可以按下Ctrl/Command-c。
建模事件流
清单3向我们展示了一些RxJava的基本原理。我们使用OkHttpClient获取到与binance推送端点(wss://ws.coincap.io/trades/binance)的连接,这使得消费WebSocket API变得简单。
当我们打开了连接,我们就创建了一个新的Observable。Observable是发出事件的基本类型,它是一个可以被观察(或监听)的对象。换句话说,Observable是某种类型的事件源,它可以模拟许多不同的事件源。在这种情况下,我们使用Observable.create方法创建了一个新的事件源,它接受一个带有单个参数的函数,我们将其命名为emitter。Observable对象具有我们需要的所有回调方法,以便生成我们的事件流。从某种意义上说,我们希望将WebSocket流包装在一个自定义的RxJava事件源中。为此,我们从WebSocketListener中选择我们想要的回调方法,特别是onMessage的版本,并调用我们想要的方法,即emitter.onNext(text)。(还有一些用于生命周期事件的回调方法,如onClosed和onError。)
这样我们就得到了一个Observable,可以传递给需要的任何人,以便让他们了解正在发生的事情。这是一种标准化、可移植的建模事件流的方式。此外,它非常灵活,具有各种功能转换。以下是如何关闭emitter:
emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket")));
});.
通过这种方式关闭emitter可以确保在emitter完成时关闭WebSocket连接。
观察事件
为了观察从Observable中发出的事件,我们使用Observable对象的subscribe()方法。我们首先调用subscribeOn(Schedulers.io())方法,告诉RxJava在后台线程中运行。这是一种(非常)简单的方式来实现多线程并发。RxJava还使用了一个线程池。处理事件的主要工作是通过将Observer对象传递给subscribe()方法来完成的。Observer是与Observable相对应的另一面:它是任何想要观察事件的基本类型。在这种情况下,我们在调用中创建了一个新的匿名Observer(使用泛型进行参数化)。将事件写入控制台的实际工作发生在Observer的onNext(String)方法中。
操作事件流
现在让我们对事件流执行一些操作。首先,我们将使用GSON将字符串转换为JSON对象。然后,我们将使用filter()方法过滤出仅在Solana区块链上的交易。为了实现这一点,我们可以使用Observable的map()和filter()方法。使用map()方法,我们可以逐个事件地将字符串转换为JSON对象。然后,我们在filter()方法中使用JSON对象,只保留货币为“Solana”的事件(在CoinCap规范中,使用的加密货币在“base”字段中)。可以在清单4中看到这段新代码。
清单4:使用map()和filter()
import com.google.gson.Gson; import com.google.gson.JsonObject; //… The rest is the same observable .subscribeOn(Schedulers.io()) .map(event -> { Gson gson = new Gson(); JsonObject jsonObject = gson.fromJson(event, JsonObject.class); return jsonObject; }) .filter(jsonObject -> { String base = jsonObject.get("base").getAsString(); return base.equals("solana"); }) .subscribe( jsonObject -> System.out.println(jsonObject), Throwable::printStackTrace, () -> System.out.println("Completed") );
这段代码中的map()和filter()方法非常容易理解。map()方法将我们的事件流转换为另一个事件流。filter()方法在事件到达时进行筛选,只保留base字段等于“solana”的事件。 清单4还展示了方法的另一种重载形式。这个重载版本接受三个参数:Observable对象、onNext函数和onError函数。它的工作方式相同。还有一个单参数版本,只接受onNext函数作为参数。另外,需要注意的是,map()和filter()是我们熟悉的函数式操作,我们在Java流和其他语言(如JavaScript)中都很喜欢使用它们。但现在,我们可以将它们应用于各种事件源。实际上,我们可以将这些操作应用于任何可以使用Observable和Observer处理的内容。
结论
RxJava中的响应式编程为我们提供了强大的功能和灵活的语法。它可以在各种情况下使用。正如我们所见,它在处理像CoinCap API这样的流数据源时非常方便。将事件流作为对象传递的能力是现代软件中的重要抽象。所以我觉得每个开发人员都应该了解它。
作者:Matthew Tyson
更多技术干货请关注公众号“云原生数据库”
squids.cn,目前可体验全网zui低价RDS,免费的迁移工具DBMotion、SQL开发工具等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。