赞
踩
- package hangqing
-
- import (
- "bufio"
- "bytes"
- "compress/flate"
- "encoding/json"
- "github.com/gorilla/websocket"
- "io/ioutil"
- "log"
- "net/http"
- "net/url"
- "strings"
- "sync"
- "time"
- )
-
- type ServerAddrRsp struct {
- Code string `json:"code"`
- Server string `json:"server"`
- }
-
- type Hq struct {
- token string //jvQuant Token
- server string //websocket服务器地址
- conn *websocket.Conn //websocket连接
- cmdChan chan string
- exitChan chan int
- lv1Deal func(string string) //level1行情处理方法
- lv2Deal func(string string) //level2行情处理方法
- wg *sync.WaitGroup
- }
-
- //实例初始化
- func (hq *Hq) Construct(token, serAddr string, lv1Handle, lv2Handle func(string string)) {
- hq.token = token
- if serAddr == "" {
- hq.server = hq.initServer()
- }
- hq.lv1Deal = lv1Handle
- hq.lv2Deal = lv2Handle
- hq.conn = hq.connect()
- hq.wg = &sync.WaitGroup{}
- hq.cmdChan = make(chan string, 128)
- hq.exitChan = make(chan int)
-
- //接收协程
- hq.wg.Add(2)
- go func() {
- hq.receive()
- hq.wg.Done()
- }()
- //发送协程
- go func() {
- hq.cmd()
- hq.wg.Done()
- }()
- }
-
- //获取行情服务器地址
- func (hq *Hq) initServer() (server string) {
- params := url.Values{
- "market": []string{"ab"},
- "type": []string{"websocket"},
- "token": []string{hq.token},
- }
- req := "http://jvquant.com/query/server?" + params.Encode()
- rb, err := HttpOnce(req, nil, nil, 3000)
- if err != nil {
- log.Fatalln("获取行情服务器地址失败:", req, err)
- }
- rspMap := ServerAddrRsp{}
- err = json.Unmarshal(rb, &rspMap)
- if err != nil {
- log.Fatalln("解析行情服务器地址失败:", string(rb), err)
- }
- server = rspMap.Server
- if rspMap.Code != "0" || server == "" {
- log.Fatalln("解析行情服务器地址失败:", string(rb))
- }
- log.Println("获取行情服务器地址成功:", server)
- return
- }
-
- //连接行情服务器
- func (hq Hq) connect() (conn *websocket.Conn) {
- wsUrl := hq.server + "?token=" + hq.token
- conn, _, err := websocket.DefaultDialer.Dial(wsUrl, nil)
- if err != nil {
- log.Fatalln("行情服务器连接错误:", err)
- }
- return
- }
-
- //增加level1行情订阅
- func (hq Hq) AddLv1(codeArr []string) {
- cmd := "add="
- cmdArr := []string{}
- for _, code := range codeArr {
- cmdArr = append(cmdArr, "lv1_"+code)
- }
- cmd = cmd + strings.Join(cmdArr, ",")
- hq.SendRawCmd(cmd)
- }
-
- //增加level2行情订阅
- func (hq Hq) AddLv2(codeArr []string) {
- cmd := "add="
- cmdArr := []string{}
- for _, code := range codeArr {
- cmdArr = append(cmdArr, "lv2_"+code)
- }
- cmd = cmd + strings.Join(cmdArr, ",")
- hq.SendRawCmd(cmd)
- }
-
- //指令入队列
- func (hq Hq) SendRawCmd(cmd string) {
- hq.cmdChan <- cmd
- }
-
- //关闭行情连接
- func (hq Hq) Close() {
- close(hq.cmdChan)
- hq.exitChan <- 1
- hq.conn.Close()
- }
-
- //线程阻塞等待
- func (hq Hq) Wait() {
- hq.wg.Wait()
- }
-
- //websocket指令发送
- func (hq Hq) cmd() {
- for cmd := range hq.cmdChan {
- log.Println("发送指令:" + cmd)
- err := hq.conn.WriteMessage(websocket.TextMessage, []byte(cmd))
- if err != nil {
- log.Println("指令发送错误:", err)
- }
- }
- }
-
- //websocket行情接收处理
- func (hq Hq) receive() {
- for {
- select {
- case <-hq.exitChan:
- log.Print("接收协程退出")
- return
- default:
- //阻塞接收
- messageType, rb, err := hq.conn.ReadMessage()
- if err != nil {
- log.Print("接收错误:", err)
- }
- //文本消息
- if messageType == websocket.TextMessage {
- log.Println("Text响应:", string(rb))
- }
- //二进制消息
- if messageType == websocket.BinaryMessage {
- unZipByte := DeCompress(rb)
- text := string(unZipByte)
- ex1 := strings.Split(text, "\n")
- for _, ex1r := range ex1 {
- ex2 := strings.Split(ex1r, "=")
- if len(ex2) == 2 {
- code := ex2[0]
- hqs := ex2[1]
- if strings.HasPrefix(code, "lv1_") {
- hq.lv1Deal(hqs)
- }
- if strings.HasPrefix(code, "lv2_") {
- hq.lv2Deal(hqs)
- }
- }
- }
- }
- }
- }
- }
-
- //二进制数据解压方法
- func DeCompress(b []byte) []byte {
- var buffer bytes.Buffer
- buffer.Write([]byte(b))
- reader := flate.NewReader(&buffer)
- var result bytes.Buffer
- result.ReadFrom(reader)
- reader.Close()
- return result.Bytes()
- }
- //http请求封装
- func HttpOnce(Url string, headers, postData map[string]string, msTimeOut int) (r []byte, err error) {
- client := &http.Client{
- Timeout: time.Duration(time.Duration(msTimeOut) * time.Millisecond),
- }
- method := http.MethodGet
- r = []byte{}
- err = nil
- if len(headers) == 0 {
- headers = map[string]string{}
- }
- if len(postData) != 0 {
- method = http.MethodPost
- headers["Content-Type"] = "application/x-www-form-urlencoded"
- }
-
- postParam := url.Values{}
- for k, v := range postData {
- postParam.Set(k, v)
- }
- postParamBuff := bytes.NewBufferString(postParam.Encode())
- req, err := http.NewRequest(method, Url, postParamBuff)
-
- if err != nil {
- return r, err
- }
- for k, v := range headers {
- req.Header.Add(k, v)
- }
- resp, er := client.Do(req)
- if er != nil {
- err = er
- return
- }
- defer resp.Body.Close()
- if err != nil {
- return r, err
- }
- br := bufio.NewReader(resp.Body)
- r, err = ioutil.ReadAll(br)
- return r, err
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。