当前位置:   article > 正文

Java WebSocket对接币安区块链K线行情API_bi安 websocket

bi安 websocket

今天我们来说说我在做Java后端对接币安区块链时,遇到的问题及解决方式。

既然要对接币安区块链K线接口,我们首先必须先了解这个行情api在哪里?

1. 行情K线接口

https://binance-docs.github.io/apidocs/futures/cn/#k-8

图片

注意:这里一定要是连续合约K线,而不是K线接口。这是在看官网K线socket接口对比得到的。

如果使用K线Api,数据是不正确的。

而且有一点我需要表述一下,不能说网络上其他博主的步骤不对,应该是版本升级的问题,其他博主的教程已经无法使用。

2. Java实现websockt客户端

Java实现websocket其实有很多种方式,比如:javax.websocket Java标准库、再如OkHttp、Apache HttpClient还有一个开源的库Java-WebSocket等等

Java-WebSocket在我碰到这个库之后,我基本都没用过其实的库,假如它适用我的业务,必须用它,因为它简单、高效。

来写代码了:

pom.xml先引入Java-WebSocket

  1. <dependency>
  2. <groupId>org.java-websocket</groupId>
  3. <artifactId>Java-WebSocket</artifactId>
  4. <version>1.5.4</version>
  5. </dependency>
然后我们写一个websocket的工具类。
  1. /**
  2. * 合约站行情推送处理器
  3. */
  4. @Slf4j
  5. public class WssMarketHandle implements Cloneable {
  6. private WebSocketClient webSocketClient;
  7. // 合约站行情请求以及订阅地址
  8. private String pushUrl = "";
  9. private WssMarketHandle() {
  10. }
  11. public WssMarketHandle(String pushUrl) {
  12. this.pushUrl = pushUrl;
  13. }
  14.     // ....
  15. }

由于K线数据时序要实时去接收的,所以我这里需要它连接之后,就让他去订阅接口,所以我的代码只写一个订阅的外部方式:

  1. public void sub(List<String> channels, SubscriptionListener<String> callback) throws URISyntaxException {
  2. doConnect(channels, callback);
  3. }

​​​​ List<String> channels 多币种订阅的模式参数

SubscriptionListener<String> callback 回调方法类

来看下doConnect方法,他是实现是主体

  1. private void doConnect(List<String> channels, SubscriptionListener<String> callback) throws URISyntaxException {
  2. webSocketClient = new WebSocketClient(new URI(pushUrl)) {
  3. @Override
  4. public void onOpen(ServerHandshake serverHandshake) {
  5. logger.debug("onOpen Success");
  6. doSub(channels);
  7. dealReconnect();
  8.             }
  9. @Override
  10. public void onMessage(String socketJson) {
  11. fixedThreadPool.execute(() -> {
  12. try {
  13. JSONObject entries = JSONUtil.parseObj(socketJson);
  14. String ping = entries.getStr("ping");
  15. String stream = entries.getStr("stream");
  16. if (StrUtil.isNotEmpty(ping)) {
  17. log.info("WssMarketHandle onMessage stream: " + socketJson);
  18. dealPing();
  19. } else if (StrUtil.isNotEmpty(stream)) {
  20. callback.onReceive(socketJson);
  21. } else {
  22. log.info("WssMarketHandle onMessage other: " + socketJson);
  23. }
  24. } catch (Throwable e) {
  25. logger.error("onMessage异常", e);
  26. }
  27. });
  28.             }
  29. @Override
  30. public void onMessage(ByteBuffer bytes) {
  31. fixedThreadPool.execute(() -> {
  32. try {
  33. byte[] temp = bytes.array();
  34. // gzip 解压
  35. byte[] decompress = GZipUtils.decompress(temp);
  36. String socketJson = new String(decompress, StandardCharsets.UTF_8);
  37. JSONObject JSONMessage = JSONUtil.parseObj(socketJson);
  38. String ch = JSONMessage.getStr("ch");
  39. String ping = JSONMessage.getStr("ping");
  40. if (StrUtil.isNotEmpty(ch)) {
  41. callback.onReceive(socketJson);
  42. }
  43. if (StrUtil.isNotEmpty(ping)) {
  44. dealPing();
  45. }
  46. } catch (Throwable e) {
  47. logger.error("onMessage异常", e);
  48. }
  49. });
  50.             }
  51. @Override
  52. public void onClose(int i, String s, boolean b) {
  53. logger.error("onClose i:{},s:{},b:{}", i, s, b);
  54.             }
  55. @Override
  56. public void onError(Exception e) {
  57. logger.error("onError:", e);
  58. }
  59.         };
  60.         webSocketClient.connect();
  61. }

其他的方法,包含取消订阅、断开重连以及Ping处理:

  1. public void close() {
  2. webSocketClient.connect();
  3. }
  4. /**
  5. * 开始订阅
  6. * @param channels
  7. */
  8. private void doSub(List<String> channels) {
  9. try {
  10. if (IterUtil.isNotEmpty(channels)) {
  11. channels.stream().forEach(e -> {
  12. webSocketClient.send(e);
  13. });
  14. }
  15. } catch (Exception ex) {
  16. }
  17. }
  18. /**
  19. * 取消订阅
  20. * @param subStr
  21. */
  22. public void unSub(String subStr) {
  23. try {
  24. webSocketClient.send(subStr);
  25. } catch (Exception ex) {
  26. }
  27. }
  28. /**
  29. * Ping处理
  30. */
  31. private void dealPing() {
  32. try {
  33. JSONObject jsonMessage = new JSONObject();
  34. jsonMessage.put("pong", pong.incrementAndGet());
  35. logger.debug("发送pong:{}", jsonMessage.toString());
  36. webSocketClient.send(jsonMessage.toString());
  37. } catch (Throwable t) {
  38. logger.error("dealPing出现了异常");
  39. }
  40. }
  41. /**
  42. * 重连
  43. */
  44. private void dealReconnect() {
  45. try {
  46. scheduledExecutorService.scheduleAtFixedRate(() -> {
  47. try {
  48. if ((webSocketClient.isClosed() && !webSocketClient.isClosing())) {
  49. logger.error("isClosed:{},isClosing:{},准备重连", webSocketClient.isClosed(), webSocketClient.isClosing());
  50. Boolean reconnectResult = webSocketClient.reconnectBlocking();
  51. logger.error("重连的结果为:{}", reconnectResult);
  52. if (!reconnectResult) {
  53. webSocketClient.closeBlocking();
  54. logger.error("closeBlocking");
  55. }
  56. }
  57. } catch (Throwable e) {
  58. logger.error("dealReconnect异常", e);
  59. }
  60. }, 60, 10, TimeUnit.SECONDS);
  61. } catch (Exception e) {
  62. logger.error("dealReconnect scheduledExecutorService异常", e);
  63. }
  64. }

​​​​​​​ 至此整个开发过程已然明了。

调用websockt接口实现,写个main方法测试下。

  1. private static final String BIAN_WS_URL = "wss://fstream.binance.com/stream";
  2. public static void main(String[] args) throws Exception {
  3. WssMarketHandle marketHandle = new WssMarketHandle(BIAN_WS_URL);
  4. List<String> channels = new ArrayList<>();
  5. JSONObject object = new JSONObject();
  6. object.set("method", "SUBSCRIBE");
  7. object.set("id", RandomUtil.randomInt(99999));
  8. JSONArray params = new JSONArray();
  9. String pair = ExchangeEnum.BTC.getExchangeName().toLowerCase() + "usdt";
  10. String perpetual = "perpetual";
  11. String urlParam = StrUtil.format("{}_{}@continuousKline_1m", pair, perpetual);
  12. params.add(urlParam);
  13. object.set("params", params);
  14. channels.add(object.toString());
  15. marketHandle.sub(channels, log::info);
  16. }

​​​​​​​历史K线我找了好久,都没有适用的,所以使用的是接口方式,因为它本来就是一次性获取的,所以不需要socket长连接获取也是可行的。接口如下:

https://fapi.binance.com/fapi/v1/continuousKlines

有需要帮助的小伙伴,可以关注,联系我,谢谢!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/646350
推荐阅读
相关标签
  

闽ICP备14008679号