赞
踩
今天我们来说说我在做Java后端对接币安区块链时,遇到的问题及解决方式。
既然要对接币安区块链K线接口,我们首先必须先了解这个行情api在哪里?
1. 行情K线接口
https://binance-docs.github.io/apidocs/futures/cn/#k-8
注意:这里一定要是连续合约K线,而不是K线接口。这是在看官网K线socket接口对比得到的。
如果使用K线Api,数据是不正确的。
而且有一点我需要表述一下,不能说网络上其他博主的步骤不对,应该是版本升级的问题,其他博主的教程已经无法使用。
2. Java实现websockt客户端
Java实现websocket其实有很多种方式,比如:javax.websocket Java标准库、再如OkHttp、Apache HttpClient还有一个开源的库Java-WebSocket等等
Java-WebSocket在我碰到这个库之后,我基本都没用过其实的库,假如它适用我的业务,必须用它,因为它简单、高效。
来写代码了:
pom.xml先引入Java-WebSocket
- <dependency>
- <groupId>org.java-websocket</groupId>
- <artifactId>Java-WebSocket</artifactId>
- <version>1.5.4</version>
- </dependency>
然后我们写一个websocket的工具类。
- /**
- * 合约站行情推送处理器
- */
- @Slf4j
- public class WssMarketHandle implements Cloneable {
- private WebSocketClient webSocketClient;
- // 合约站行情请求以及订阅地址
- private String pushUrl = "";
-
- private WssMarketHandle() {
- }
-
- public WssMarketHandle(String pushUrl) {
- this.pushUrl = pushUrl;
- }
- // ....
- }
由于K线数据时序要实时去接收的,所以我这里需要它连接之后,就让他去订阅接口,所以我的代码只写一个订阅的外部方式:
- public void sub(List<String> channels, SubscriptionListener<String> callback) throws URISyntaxException {
- doConnect(channels, callback);
- }
List<String> channels 多币种订阅的模式参数
SubscriptionListener<String> callback 回调方法类
来看下doConnect方法,他是实现是主体
- private void doConnect(List<String> channels, SubscriptionListener<String> callback) throws URISyntaxException {
- webSocketClient = new WebSocketClient(new URI(pushUrl)) {
- @Override
- public void onOpen(ServerHandshake serverHandshake) {
- logger.debug("onOpen Success");
- doSub(channels);
- dealReconnect();
- }
- @Override
- public void onMessage(String socketJson) {
- fixedThreadPool.execute(() -> {
- try {
- JSONObject entries = JSONUtil.parseObj(socketJson);
- String ping = entries.getStr("ping");
- String stream = entries.getStr("stream");
- if (StrUtil.isNotEmpty(ping)) {
- log.info("WssMarketHandle onMessage stream: " + socketJson);
- dealPing();
- } else if (StrUtil.isNotEmpty(stream)) {
- callback.onReceive(socketJson);
- } else {
- log.info("WssMarketHandle onMessage other: " + socketJson);
- }
- } catch (Throwable e) {
- logger.error("onMessage异常", e);
- }
- });
- }
- @Override
- public void onMessage(ByteBuffer bytes) {
- fixedThreadPool.execute(() -> {
- try {
- byte[] temp = bytes.array();
- // gzip 解压
- byte[] decompress = GZipUtils.decompress(temp);
- String socketJson = new String(decompress, StandardCharsets.UTF_8);
- JSONObject JSONMessage = JSONUtil.parseObj(socketJson);
- String ch = JSONMessage.getStr("ch");
- String ping = JSONMessage.getStr("ping");
- if (StrUtil.isNotEmpty(ch)) {
- callback.onReceive(socketJson);
- }
- if (StrUtil.isNotEmpty(ping)) {
- dealPing();
- }
- } catch (Throwable e) {
- logger.error("onMessage异常", e);
- }
- });
- }
- @Override
- public void onClose(int i, String s, boolean b) {
- logger.error("onClose i:{},s:{},b:{}", i, s, b);
- }
- @Override
- public void onError(Exception e) {
- logger.error("onError:", e);
- }
- };
- webSocketClient.connect();
- }
其他的方法,包含取消订阅、断开重连以及Ping处理:
- public void close() {
- webSocketClient.connect();
- }
-
- /**
- * 开始订阅
- * @param channels
- */
- private void doSub(List<String> channels) {
- try {
- if (IterUtil.isNotEmpty(channels)) {
- channels.stream().forEach(e -> {
- webSocketClient.send(e);
- });
- }
- } catch (Exception ex) {
- }
- }
-
- /**
- * 取消订阅
- * @param subStr
- */
- public void unSub(String subStr) {
- try {
- webSocketClient.send(subStr);
- } catch (Exception ex) {
- }
- }
-
- /**
- * Ping处理
- */
- private void dealPing() {
- try {
- JSONObject jsonMessage = new JSONObject();
- jsonMessage.put("pong", pong.incrementAndGet());
- logger.debug("发送pong:{}", jsonMessage.toString());
- webSocketClient.send(jsonMessage.toString());
- } catch (Throwable t) {
- logger.error("dealPing出现了异常");
- }
- }
-
- /**
- * 重连
- */
- private void dealReconnect() {
- try {
- scheduledExecutorService.scheduleAtFixedRate(() -> {
- try {
- if ((webSocketClient.isClosed() && !webSocketClient.isClosing())) {
- logger.error("isClosed:{},isClosing:{},准备重连", webSocketClient.isClosed(), webSocketClient.isClosing());
- Boolean reconnectResult = webSocketClient.reconnectBlocking();
- logger.error("重连的结果为:{}", reconnectResult);
- if (!reconnectResult) {
- webSocketClient.closeBlocking();
- logger.error("closeBlocking");
- }
-
- }
- } catch (Throwable e) {
- logger.error("dealReconnect异常", e);
- }
-
- }, 60, 10, TimeUnit.SECONDS);
- } catch (Exception e) {
- logger.error("dealReconnect scheduledExecutorService异常", e);
- }
- }
至此整个开发过程已然明了。
调用websockt接口实现,写个main方法测试下。
- private static final String BIAN_WS_URL = "wss://fstream.binance.com/stream";
-
- public static void main(String[] args) throws Exception {
- WssMarketHandle marketHandle = new WssMarketHandle(BIAN_WS_URL);
- List<String> channels = new ArrayList<>();
- JSONObject object = new JSONObject();
- object.set("method", "SUBSCRIBE");
- object.set("id", RandomUtil.randomInt(99999));
-
- JSONArray params = new JSONArray();
- String pair = ExchangeEnum.BTC.getExchangeName().toLowerCase() + "usdt";
- String perpetual = "perpetual";
- String urlParam = StrUtil.format("{}_{}@continuousKline_1m", pair, perpetual);
- params.add(urlParam);
- object.set("params", params);
-
- channels.add(object.toString());
- marketHandle.sub(channels, log::info);
- }
历史K线我找了好久,都没有适用的,所以使用的是接口方式,因为它本来就是一次性获取的,所以不需要socket长连接获取也是可行的。接口如下:
https://fapi.binance.com/fapi/v1/continuousKlines
有需要帮助的小伙伴,可以关注,联系我,谢谢!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。