当前位置:   article > 正文

沪深websocket level2/level1行情推送接入示例

沪深websocket level2/level1行情推送接入示例

行情接入包

golang packge:

  1. package hangqing
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/flate"
  6. "encoding/json"
  7. "github.com/gorilla/websocket"
  8. "io/ioutil"
  9. "log"
  10. "net/http"
  11. "net/url"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. type ServerAddrRsp struct {
  17. Code string `json:"code"`
  18. Server string `json:"server"`
  19. }
  20. type Hq struct {
  21. token string //jvQuant Token
  22. server string //websocket服务器地址
  23. conn *websocket.Conn //websocket连接
  24. cmdChan chan string
  25. exitChan chan int
  26. lv1Deal func(string string) //level1行情处理方法
  27. lv2Deal func(string string) //level2行情处理方法
  28. wg *sync.WaitGroup
  29. }
  30. //实例初始化
  31. func (hq *Hq) Construct(token, serAddr string, lv1Handle, lv2Handle func(string string)) {
  32. hq.token = token
  33. if serAddr == "" {
  34. hq.server = hq.initServer()
  35. }
  36. hq.lv1Deal = lv1Handle
  37. hq.lv2Deal = lv2Handle
  38. hq.conn = hq.connect()
  39. hq.wg = &sync.WaitGroup{}
  40. hq.cmdChan = make(chan string, 128)
  41. hq.exitChan = make(chan int)
  42. //接收协程
  43. hq.wg.Add(2)
  44. go func() {
  45. hq.receive()
  46. hq.wg.Done()
  47. }()
  48. //发送协程
  49. go func() {
  50. hq.cmd()
  51. hq.wg.Done()
  52. }()
  53. }
  54. //获取行情服务器地址
  55. func (hq *Hq) initServer() (server string) {
  56. params := url.Values{
  57. "market": []string{"ab"},
  58. "type": []string{"websocket"},
  59. "token": []string{hq.token},
  60. }
  61. req := "http://jvquant.com/query/server?" + params.Encode()
  62. rb, err := HttpOnce(req, nil, nil, 3000)
  63. if err != nil {
  64. log.Fatalln("获取行情服务器地址失败:", req, err)
  65. }
  66. rspMap := ServerAddrRsp{}
  67. err = json.Unmarshal(rb, &rspMap)
  68. if err != nil {
  69. log.Fatalln("解析行情服务器地址失败:", string(rb), err)
  70. }
  71. server = rspMap.Server
  72. if rspMap.Code != "0" || server == "" {
  73. log.Fatalln("解析行情服务器地址失败:", string(rb))
  74. }
  75. log.Println("获取行情服务器地址成功:", server)
  76. return
  77. }
  78. //连接行情服务器
  79. func (hq Hq) connect() (conn *websocket.Conn) {
  80. wsUrl := hq.server + "?token=" + hq.token
  81. conn, _, err := websocket.DefaultDialer.Dial(wsUrl, nil)
  82. if err != nil {
  83. log.Fatalln("行情服务器连接错误:", err)
  84. }
  85. return
  86. }
  87. //增加level1行情订阅
  88. func (hq Hq) AddLv1(codeArr []string) {
  89. cmd := "add="
  90. cmdArr := []string{}
  91. for _, code := range codeArr {
  92. cmdArr = append(cmdArr, "lv1_"+code)
  93. }
  94. cmd = cmd + strings.Join(cmdArr, ",")
  95. hq.SendRawCmd(cmd)
  96. }
  97. //增加level2行情订阅
  98. func (hq Hq) AddLv2(codeArr []string) {
  99. cmd := "add="
  100. cmdArr := []string{}
  101. for _, code := range codeArr {
  102. cmdArr = append(cmdArr, "lv2_"+code)
  103. }
  104. cmd = cmd + strings.Join(cmdArr, ",")
  105. hq.SendRawCmd(cmd)
  106. }
  107. //指令入队列
  108. func (hq Hq) SendRawCmd(cmd string) {
  109. hq.cmdChan <- cmd
  110. }
  111. //关闭行情连接
  112. func (hq Hq) Close() {
  113. close(hq.cmdChan)
  114. hq.exitChan <- 1
  115. hq.conn.Close()
  116. }
  117. //线程阻塞等待
  118. func (hq Hq) Wait() {
  119. hq.wg.Wait()
  120. }
  121. //websocket指令发送
  122. func (hq Hq) cmd() {
  123. for cmd := range hq.cmdChan {
  124. log.Println("发送指令:" + cmd)
  125. err := hq.conn.WriteMessage(websocket.TextMessage, []byte(cmd))
  126. if err != nil {
  127. log.Println("指令发送错误:", err)
  128. }
  129. }
  130. }
  131. //websocket行情接收处理
  132. func (hq Hq) receive() {
  133. for {
  134. select {
  135. case <-hq.exitChan:
  136. log.Print("接收协程退出")
  137. return
  138. default:
  139. //阻塞接收
  140. messageType, rb, err := hq.conn.ReadMessage()
  141. if err != nil {
  142. log.Print("接收错误:", err)
  143. }
  144. //文本消息
  145. if messageType == websocket.TextMessage {
  146. log.Println("Text响应:", string(rb))
  147. }
  148. //二进制消息
  149. if messageType == websocket.BinaryMessage {
  150. unZipByte := DeCompress(rb)
  151. text := string(unZipByte)
  152. ex1 := strings.Split(text, "\n")
  153. for _, ex1r := range ex1 {
  154. ex2 := strings.Split(ex1r, "=")
  155. if len(ex2) == 2 {
  156. code := ex2[0]
  157. hqs := ex2[1]
  158. if strings.HasPrefix(code, "lv1_") {
  159. hq.lv1Deal(hqs)
  160. }
  161. if strings.HasPrefix(code, "lv2_") {
  162. hq.lv2Deal(hqs)
  163. }
  164. }
  165. }
  166. }
  167. }
  168. }
  169. }
  170. //二进制数据解压方法
  171. func DeCompress(b []byte) []byte {
  172. var buffer bytes.Buffer
  173. buffer.Write([]byte(b))
  174. reader := flate.NewReader(&buffer)
  175. var result bytes.Buffer
  176. result.ReadFrom(reader)
  177. reader.Close()
  178. return result.Bytes()
  179. }
  180. //http请求封装
  181. func HttpOnce(Url string, headers, postData map[string]string, msTimeOut int) (r []byte, err error) {
  182. client := &http.Client{
  183. Timeout: time.Duration(time.Duration(msTimeOut) * time.Millisecond),
  184. }
  185. method := http.MethodGet
  186. r = []byte{}
  187. err = nil
  188. if len(headers) == 0 {
  189. headers = map[string]string{}
  190. }
  191. if len(postData) != 0 {
  192. method = http.MethodPost
  193. headers["Content-Type"] = "application/x-www-form-urlencoded"
  194. }
  195. postParam := url.Values{}
  196. for k, v := range postData {
  197. postParam.Set(k, v)
  198. }
  199. postParamBuff := bytes.NewBufferString(postParam.Encode())
  200. req, err := http.NewRequest(method, Url, postParamBuff)
  201. if err != nil {
  202. return r, err
  203. }
  204. for k, v := range headers {
  205. req.Header.Add(k, v)
  206. }
  207. resp, er := client.Do(req)
  208. if er != nil {
  209. err = er
  210. return
  211. }
  212. defer resp.Body.Close()
  213. if err != nil {
  214. return r, err
  215. }
  216. br := bufio.NewReader(resp.Body)
  217. r, err = ioutil.ReadAll(br)
  218. return r, err
  219. }

参考地址:https://github.com/jvQuant/OpenAPIDemo

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/521122
推荐阅读
相关标签
  

闽ICP备14008679号