webSocketHandler.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package controllers
  2. import (
  3. "rtzh_elec_temperature/datachannel"
  4. "rtzh_elec_temperature/logger"
  5. "rtzh_elec_temperature/tools"
  6. "encoding/json"
  7. "log"
  8. "net/http"
  9. "github.com/gin-gonic/gin"
  10. "github.com/gorilla/websocket"
  11. )
  12. var StageConn map[string]interface{}
  13. // websocket 升级并跨域
  14. var (
  15. upgrade = &websocket.Upgrader{
  16. // 允许跨域
  17. CheckOrigin: func(r *http.Request) bool {
  18. return true
  19. },
  20. }
  21. )
  22. // WebSocketBase TODO:服务基本函数
  23. func WebSocketBase(c *gin.Context) {
  24. var (
  25. err error
  26. conn *websocket.Conn
  27. ws *wsConn
  28. )
  29. if conn, err = upgrade.Upgrade(c.Writer, c.Request, nil); err != nil {
  30. return
  31. }
  32. if ws, err = InitWebSocket(conn); err != nil {
  33. return
  34. }
  35. // 使得inChan和outChan耦合起来
  36. for {
  37. var data []byte
  38. if data, err = ws.InChanRead(); err != nil {
  39. log.Println(err)
  40. goto ERR
  41. }
  42. logger.Logger.Debug("接收数据:" + string(data))
  43. datachannel.DataQueue <- map[string]interface{}{"data": data, "conn": ws}
  44. }
  45. ERR:
  46. ws.CloseConn()
  47. }
  48. func ReceiveData() {
  49. for {
  50. msg, ok := <-datachannel.DataQueue
  51. if ok {
  52. data := msg["data"]
  53. conn := msg["conn"]
  54. if data != nil && conn != nil {
  55. conn2 := conn.(*wsConn)
  56. stage_id := conn2.conn.RemoteAddr().String()
  57. if StageConn == nil {
  58. StageConn = map[string]interface{}{stage_id: conn2}
  59. } else {
  60. StageConn[stage_id] = conn2
  61. }
  62. //返回的数据
  63. returndata := []byte{}
  64. //接收到数据
  65. resdata := string(data.([]byte))
  66. if resdata == "" {
  67. log.Println("接收数据为空")
  68. } else {
  69. // responseData := map[string]string{}
  70. //接收到的业务数据对应处理
  71. // if strings.Index(resdata, "ONLINE") > -1 {
  72. // //场景上线加载
  73. // stage_id := strings.Split(resdata, "ONLINE")[0]
  74. // if stage_id == "" {
  75. // log.Println("无效的场景上线数据")
  76. // }
  77. // if StageConn == nil {
  78. // StageConn = map[string]interface{}{stage_id: conn2}
  79. // } else {
  80. // StageConn[stage_id] = conn2
  81. // }
  82. // } else if strings.Index(resdata, "HEARTCMD00000") > -1 {
  83. //场景心跳
  84. // responseData = map[string]string{"MESSAGETYPE": "05", "MESSAGECONTENT": ""}
  85. // returndata, _ = json.Marshal(responseData)
  86. // } else if strings.Index(resdata, "CONTROLCMD") > -1 {
  87. // //控制命令。需要将该命令转发给设备
  88. // responseData = map[string]string{"MESSAGETYPE": "04", "MESSAGECONTENT": ""}
  89. // returndata, _ = json.Marshal(responseData)
  90. // } else {
  91. // log.Println("无效的请求数据:", resdata)
  92. // }
  93. // if len(returndata) > 0 {
  94. // if err := conn2.OutChanWrite(returndata); err != nil {
  95. // log.Println(err)
  96. // delete(StageConn, conn2.conn.RemoteAddr().String())
  97. // conn2.CloseConn()
  98. // } else {
  99. // if configure.GlobalConfig["loglevel"] == "5" {
  100. // log.Println("回复数据:" + string(returndata))
  101. // }
  102. // }
  103. returndata, _ = json.Marshal("{\"code\":\"1\",\"msg\":\"Socket is connected!\"}")
  104. conn2.OutChanWrite(returndata)
  105. }
  106. }
  107. } else {
  108. break
  109. }
  110. }
  111. }
  112. //向连接通道发送消息数据
  113. //按连接地址进行数据发送.未指定地址时向所有连接发送
  114. func SendData() {
  115. for {
  116. //map[string]interface{}
  117. msg, ok := <-datachannel.SendDataQueue
  118. if ok {
  119. stageid := tools.IsEmpty(msg["ip"])
  120. if stageid != "" {
  121. conn2 := StageConn[stageid]
  122. if conn2 == nil {
  123. continue
  124. }
  125. conn := conn2.(*wsConn)
  126. if conn.isClose {
  127. delete(StageConn, stageid)
  128. continue
  129. }
  130. data, _ := json.Marshal(msg)
  131. if len(data) > 0 {
  132. if err := conn.OutChanWrite(data); err != nil {
  133. log.Println(err)
  134. delete(StageConn, stageid)
  135. conn.CloseConn()
  136. } else {
  137. logger.Logger.Debug("回复数据:" + string(data))
  138. }
  139. }
  140. } else {
  141. for ip, connObj := range StageConn {
  142. if connObj == nil {
  143. delete(StageConn, ip)
  144. continue
  145. }
  146. conn := connObj.(*wsConn)
  147. if conn.isClose {
  148. delete(StageConn, ip)
  149. continue
  150. }
  151. data, _ := json.Marshal(msg)
  152. if len(data) > 0 {
  153. if err := conn.OutChanWrite(data); err != nil {
  154. log.Println(err)
  155. delete(StageConn, stageid)
  156. conn.CloseConn()
  157. } else {
  158. logger.Logger.Debug("回复数据:" + string(data))
  159. }
  160. }
  161. }
  162. }
  163. }
  164. }
  165. }