package controllers import ( "rtzh_elec_temperature/datachannel" "rtzh_elec_temperature/logger" "rtzh_elec_temperature/tools" "encoding/json" "log" "net/http" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) var StageConn map[string]interface{} // websocket 升级并跨域 var ( upgrade = &websocket.Upgrader{ // 允许跨域 CheckOrigin: func(r *http.Request) bool { return true }, } ) // WebSocketBase TODO:服务基本函数 func WebSocketBase(c *gin.Context) { var ( err error conn *websocket.Conn ws *wsConn ) if conn, err = upgrade.Upgrade(c.Writer, c.Request, nil); err != nil { return } if ws, err = InitWebSocket(conn); err != nil { return } // 使得inChan和outChan耦合起来 for { var data []byte if data, err = ws.InChanRead(); err != nil { log.Println(err) goto ERR } logger.Logger.Debug("接收数据:" + string(data)) datachannel.DataQueue <- map[string]interface{}{"data": data, "conn": ws} } ERR: ws.CloseConn() } func ReceiveData() { for { msg, ok := <-datachannel.DataQueue if ok { data := msg["data"] conn := msg["conn"] if data != nil && conn != nil { conn2 := conn.(*wsConn) stage_id := conn2.conn.RemoteAddr().String() if StageConn == nil { StageConn = map[string]interface{}{stage_id: conn2} } else { StageConn[stage_id] = conn2 } //返回的数据 returndata := []byte{} //接收到数据 resdata := string(data.([]byte)) if resdata == "" { log.Println("接收数据为空") } else { // responseData := map[string]string{} //接收到的业务数据对应处理 // if strings.Index(resdata, "ONLINE") > -1 { // //场景上线加载 // stage_id := strings.Split(resdata, "ONLINE")[0] // if stage_id == "" { // log.Println("无效的场景上线数据") // } // if StageConn == nil { // StageConn = map[string]interface{}{stage_id: conn2} // } else { // StageConn[stage_id] = conn2 // } // } else if strings.Index(resdata, "HEARTCMD00000") > -1 { //场景心跳 // responseData = map[string]string{"MESSAGETYPE": "05", "MESSAGECONTENT": ""} // returndata, _ = json.Marshal(responseData) // } else if strings.Index(resdata, "CONTROLCMD") > -1 { // //控制命令。需要将该命令转发给设备 // responseData = map[string]string{"MESSAGETYPE": "04", "MESSAGECONTENT": ""} // returndata, _ = json.Marshal(responseData) // } else { // log.Println("无效的请求数据:", resdata) // } // if len(returndata) > 0 { // if err := conn2.OutChanWrite(returndata); err != nil { // log.Println(err) // delete(StageConn, conn2.conn.RemoteAddr().String()) // conn2.CloseConn() // } else { // if configure.GlobalConfig["loglevel"] == "5" { // log.Println("回复数据:" + string(returndata)) // } // } returndata, _ = json.Marshal("{\"code\":\"1\",\"msg\":\"Socket is connected!\"}") conn2.OutChanWrite(returndata) } } } else { break } } } //向连接通道发送消息数据 //按连接地址进行数据发送.未指定地址时向所有连接发送 func SendData() { for { //map[string]interface{} msg, ok := <-datachannel.SendDataQueue if ok { stageid := tools.IsEmpty(msg["ip"]) if stageid != "" { conn2 := StageConn[stageid] if conn2 == nil { continue } conn := conn2.(*wsConn) if conn.isClose { delete(StageConn, stageid) continue } data, _ := json.Marshal(msg) if len(data) > 0 { if err := conn.OutChanWrite(data); err != nil { log.Println(err) delete(StageConn, stageid) conn.CloseConn() } else { logger.Logger.Debug("回复数据:" + string(data)) } } } else { for ip, connObj := range StageConn { if connObj == nil { delete(StageConn, ip) continue } conn := connObj.(*wsConn) if conn.isClose { delete(StageConn, ip) continue } data, _ := json.Marshal(msg) if len(data) > 0 { if err := conn.OutChanWrite(data); err != nil { log.Println(err) delete(StageConn, stageid) conn.CloseConn() } else { logger.Logger.Debug("回复数据:" + string(data)) } } } } } } }