赞
踩
1、概述
Http 异步客户端
设置:并行度=2,capacity=2,HttpMaxConn=2,client 为静态
输入:同时发起4条查询
输出:间隔10秒,同时返回4条数据
JDBC 线程池+链接池
设置:并行度=2,capacity=2,HttpMaxConn=2,client 为静态
输入:同时发起4条查询
输出:间隔10秒,先返回两条数据,间隔10秒,再返回两条数据
2、代码示例
package com.xu.flink.datastream.day11;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.util.concurrent.Executors;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.util.EntityUtils;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.concurrent.TimeUnit;
public class _06_HttpAsyncQueryGaoDe {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
AsyncDataStream.unorderedWait(lines, new AsyncHttpQueryFunction(), 20, TimeUnit.SECONDS, 2).print();
env.execute();
}
}
/**
* Http 异步客户端
* 设置:并行度=2,capacity=2,HttpMaxConn=2,client 为静态
* 输入:同时发起4条查询
* 输出:间隔10秒,同时返回4条数据
* <p>
* JDBC 线程池+链接池
* 设置:并行度=2,capacity=2,HttpMaxConn=2,client 为静态
* 输入:同时发起4条查询
* 输出:间隔10秒,先返回两条数据,间隔10秒,再返回两条数据
*/
class AsyncHttpQueryFunction extends RichAsyncFunction<String, _06_OrderBean> {
private static final String key = "***";
private static CloseableHttpAsyncClient httpClient;
@Override
public void open(Configuration parameters) throws Exception {
//创建异步 HttpClient 连接池,并初始化异步的 HttpClient
RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(3000)
.setConnectTimeout(3000)
.build();
httpClient = HttpAsyncClients.custom()
.setMaxConnTotal(2)
.setDefaultRequestConfig(requestConfig)
.build();
System.out.println("open 方法被调用");
System.out.println("httpClient=>" + httpClient.hashCode());
httpClient.start();
}
@Override
public void asyncInvoke(String line, ResultFuture<_06_OrderBean> resultFuture) throws Exception {
try {
// 1、解析JSON,获取经纬度信息
_06_OrderBean orderBean = JSON.parseObject(line, _06_OrderBean.class);
double longitude = orderBean.getLongitude();
double latitude = orderBean.getLatitude();
// 创建 httpGet 请求
HttpGet httpGet = new HttpGet("https://restapi.amap.com/v3/geocode/regeo?&location=" + longitude + "," + latitude + "&key=" + key);
// 2、通过 httpClient 提交异步请求,获取 future 对象
// callback 是回调函数(也可通过回调函数拿结果)
// 注意:此处使用 task 线程,如果此处是异步提交,则不会阻塞;如果此处是同步提交,则会阻塞;
Future<HttpResponse> future = httpClient.execute(httpGet, null);
// 3、从成功的 Future 中取数据,返回 orderBean
// 使用 Executors.directExecutor() 获取返回结果
// 注意:此处为异步获取返回结果,会使用单独的线程池,即使用 get() 方法,也不会阻塞 task 线程
CompletableFuture.supplyAsync(new Supplier<_06_OrderBean>() {
@Override
public _06_OrderBean get() {
try {
HttpResponse response = future.get();
String province = null;
String city = null;
String district = null;
if (response.getStatusLine().getStatusCode() == 200) {
//拿出响应的实例对象
HttpEntity entity = response.getEntity();
JSONObject jsonObject = JSON.parseObject(EntityUtils.toString(entity));
JSONObject regeocodeObject = jsonObject.getJSONObject("regeocode");
if (regeocodeObject != null && !regeocodeObject.isEmpty()) {
JSONObject addObject = regeocodeObject.getJSONObject("addressComponent");
district = addObject.getString("district");
city = addObject.getString("city");
province = addObject.getString("province");
}
}
orderBean.setProvince(province);
orderBean.setCity(city);
orderBean.setDistrict(district);
return orderBean;
} catch (Exception e) {
return null;
}
}
}, Executors.directExecutor())
.thenAccept(new Consumer<_06_OrderBean>() {
@Override
public void accept(_06_OrderBean resultOrderBean) {
resultFuture.complete(Collections.singleton(resultOrderBean));
}
});
} catch (Exception e) {
resultFuture.complete(Collections.singleton(null));
}
}
@Override
public void timeout(String input, ResultFuture<_06_OrderBean> resultFuture) throws Exception {
resultFuture.completeExceptionally(new RuntimeException(input + "=查询超时"));
}
@Override
public void close() throws Exception {
httpClient.close();
}
}
-----------------------------------------------------------------
@Data
@NoArgsConstructor
@AllArgsConstructor
public class _06_OrderBean {
private String oid;
private String uid;
private double money;
private double longitude;
private double latitude;
private String province;
private String city;
private String district;
@Override
public String toString() {
return "OrderBean{" +
"oid='" + oid + '\'' +
", uid='" + uid + '\'' +
", money=" + money +
", longitude=" + longitude +
", latitude=" + latitude +
", province='" + province + '\'' +
", city='" + city + '\'' +
", district='" + district + '\'' +
'}';
}
}
3、测试用例
{"oid":"o001","uid":"u001","money":99.99,"longitude":115.690417, "latitude":36.239344}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。