package service import ( "encoding/base64" "encoding/json" "errors" "fmt" "reflect" "rtzh_elec_temperature/conf" "rtzh_elec_temperature/global" "rtzh_elec_temperature/logger" "rtzh_elec_temperature/models/bo" "rtzh_elec_temperature/mqtt" "rtzh_elec_temperature/rtelec_app_public_lib/utils" "rtzh_elec_temperature/tools" "time" "github.com/astaxie/beego/orm" "git.rtzhtech.cn/iss/public-lib/logic" "github.com/spf13/cast" ) //命令执行服务 type CommandService struct { BaseService //当前操作的设备ID DeviceId int //命令信息模型对象 CommandData CommandParameter } type CommandMgr struct { DeviceId int AttrName string State int ModelId int Mpname string Actid int64 } /** 2022-11-1修改http访问海康硬件的接口为modbus模式 */ type CommandParameter struct { CommandMgr DeviceId int Condition map[string]interface{} MessageId string CommandType int ValueType string } //执行次数、总次数、间隔周期 var ExecuteIndex = 0 var ExecuteTimer = 0 var interval, _ = time.ParseDuration("0s") //下发控制指令 func (t *CommandService) PushCommand(cmdParameter CommandMgr) (string, error) { if RtelecManageApp().AppToken == "" { return "", errors.New("发送指令时Token不允许为空!") } //获取设备信息 //-----2022-12-08-------- devinfo := new(DeviceService) devinfo.DeviceId = cmdParameter.DeviceId devlst, err2 := devinfo.GetCommandRestart() if err2 != nil { return "", err2 } if len(devlst) == 0 { return "", errors.New("无效的设备ID" + tools.IsEmpty(cmdParameter.DeviceId)) } devicename := tools.IsEmpty(devlst[0]["devicename"]) //----------------------------- sendCount, _ := bo.GetSysParamValue("sendCount", "3") ExecuteTimer = cast.ToInt(sendCount) sendInterval, _ := bo.GetSysParamValue("sendInterval", "3") interval, _ = time.ParseDuration(sendInterval + "s") var ExecuteState = false var message = "远端设备无响应!" resultvalue := "" timeout, _ := bo.GetSysParamValue("timeout", "5") for ExecuteIndex = 0; ExecuteIndex < ExecuteTimer; ExecuteIndex++ { var messageId = tools.GetUid() msgObj := new(utils.MsgStateManage) msgObj.SetMessageStateValue(messageId, utils.MsgState{Success: false, State: false, Message: ""}) logger.Logger.Debug(fmt.Sprintf("====发送id为%s指令:%+v", messageId, cmdParameter)) t.Command(cmdParameter, messageId, 1) Result := msgObj.HanderMesage(messageId, cast.ToInt(timeout)) //2022-11-04修改:把延迟秒数变为一个变量,从配置文件或者数据库中读取 logger.Logger.Debug(fmt.Sprintf("====接收ID为%s结果:%+v", messageId, Result)) if Result.Success { //控制命令下发成功 //获取控制点关联的采集点状态值。如果未关联,则获取控制点本身的状态值(需要控制点的寄存器地址支持读) ExecuteState, resultvalue = t.CheckState(cmdParameter) if ExecuteState { //状态值获取成功,并且与下发值完全匹配时,进入控制点联动控制处理 //注:控制点可能没有配置联动策略,此时无需任何处理 t.Handle_Link_Event(cmdParameter) //此处是否应该记录一次操作成功的操作日志???? new(LogService).SaveLog(fmt.Sprintf("控制成功。设备名称:%s 操作:%s 操作时间:%s", devicename, "控制", tools.NowTime())) return resultvalue, nil } else { continue } } else { //如果执行命令失败直接返回结果。[!!该逻辑已废弃!!] //如果执行命令失败,尝试重新执行 logger.Logger.Debug(fmt.Sprintf("执行命令%s[%v]失败,结果为:%v,尝试重新执行...", messageId, cmdParameter, Result)) new(LogService).SaveLog(fmt.Sprintf("设备%s执行命令%s[%v]失败,结果为:%v,操作时间:%s", devicename, messageId, cmdParameter, Result, tools.NowTime())) //return errors.New(Result.Message) ExecuteState = false message = Result.Message continue } } //此处是否应该记录一次操作失败的操作日志???? if !ExecuteState { return "", errors.New(message) } else { return resultvalue, nil } } //处理联动事件 func (field *CommandService) Handle_Link_Event(parameter CommandMgr) { var Field = new(HistoryService) Field.Model.Deviceid = int32(parameter.DeviceId) //接收数据后设置设备为在线状态 new(DeviceService).SetDeviceState(parameter.DeviceId, 2) Field.Model.Date = tools.NowTime() var AttrInfo = make(map[string]interface{}) AttrInfo[parameter.AttrName] = float64(parameter.State) Field.AttrInfo = AttrInfo go new(LinkEventService).HandleLinkEvent(Field) } //判断状态 func (field *CommandService) CheckState(cmdParameter CommandMgr) (bool, string) { appId := cast.ToInt(RtelecManageApp().RegAppID) time.Sleep(interval) var messageId = tools.GetUid() var sendStatus = cmdParameter.State //保存下发的命令值 msgObj := new(utils.MsgStateManage) msgObj.SetMessageStateValue(messageId, utils.MsgState{Success: false, State: false, Message: ""}) //2022-11-4判断状态通过发送的dev_cpinfo的attrname去获取对应采集点的属性attrname //--start-- type ControlMpname struct { Mpid int64 Mpname string Successval float64 } var mpobj ControlMpname l := logic.NewDeviceLogic() var sqlCommandText = "select ifnull(mp.attrname,'') mpname,ifnull(a.mpid,0) mpid,ifnull(t1.successval,t1.value) successval from dev_cpinfo a inner join dev_devinfo b on a.deviceid=b.deviceid left join dev_mpinfo mp on a.mpid=mp.mpid inner join dev_cpaction t1 on a.cpid=t1.cpid where 1=1" sqlCommandText += fmt.Sprintf(" and a.attrname='%s'", cmdParameter.AttrName) sqlCommandText += fmt.Sprintf(" and a.deviceid=%d", cmdParameter.DeviceId) if cmdParameter.Actid > 0 { sqlCommandText += fmt.Sprintf(" and t1.actid=%d", cmdParameter.Actid) } sqlCommandText += fmt.Sprintf(" and b.appid=%d limit 1", appId) logger.Logger.Debug("=====CheckState sqlCommandText:" + sqlCommandText) err := l.SvcCtx.AlarmStrategy.Base.Raw(sqlCommandText, &mpobj) logger.Logger.Debug(fmt.Sprintf("=====CheckState mpobj:%+v", mpobj)) if err == nil && mpobj.Mpname != "" { // if mpobj.Mpid != 0 { // //获取优化测点的信息 // var tmp = ControlMpname{} // sqlCommandText = fmt.Sprintf("SELECT d.refid mpid,m.attrname mpname from dev_data_optimize d,dev_mpinfo m where d.refid = m.mpid and d.mpid=%d", mpobj.Mpid) // err = l.SvcCtx.AlarmStrategy.Base.Raw(sqlCommandText, &tmp) // if err == nil { // if tmp.Mpname != "" { // cmdParameter.Mpname = tmp.Mpname //这因为command函数里面还要用这个去找采集点的attrname // } // } // } cmdParameter.Mpname = mpobj.Mpname //这因为command函数里面还要用这个去找采集点的attrname } //如果单独设置了关联测点的控制成功值:successval,采用该值与采集结果进行一致性比对 sendStatus = cast.ToInt(mpobj.Successval) //--end-- logger.Logger.Debug(fmt.Sprintf("===CheckState===发送id为%s采集命令:%+v", messageId, cmdParameter)) gatherInter, _ := bo.GetSysParamValue("gatherInterval", "2") gatherInterval, _ := time.ParseDuration(cast.ToString(gatherInter + "s")) //计数读取15秒 gatherCount, _ := bo.GetSysParamValue("gatherCount", "15") var countDown = cast.ToInt(gatherCount) //从数据库中读取配置周期 field.Command(cmdParameter, messageId, 0) timeout, _ := bo.GetSysParamValue("timeout", "3") Result := msgObj.HanderMesage(messageId, cast.ToInt(timeout)) //先读1次 //控制结果状态。只有硬件进行了动作并且动作结果与预期相同时才算控制成功 controlSuccess := false for { logger.Logger.Debug(fmt.Sprintf("第%d次===CheckState==接收ID为%s采集结果:%+v 控制成功值:%d", countDown, messageId, Result, sendStatus)) if Result.Success && Result.State && sendStatus == cast.ToInt(Result.Value) { controlSuccess = true break } time.Sleep(gatherInterval) //每次读取间隔时间 messageId = tools.GetUid() field.Command(cmdParameter, messageId, 0) Result = msgObj.HanderMesage(messageId, cast.ToInt(timeout)) countDown -= 1 if countDown < 1 { break } } var state = false if controlSuccess { //如果状态读取成功则写入历史表 //--start-- type fields struct { Appid int Deviceid int Devicename string Mpid string Mpname string Attrname string Val string Date int64 } var field = fields{} var sql = "" if tools.IsEmpty(conf.GlobalConfig["DoorWay"]) == "http" { sql = fmt.Sprintf("SELECT a.appid,a.deviceid,a.devicename,m.mpid,m.cpname mpname,m.attrname FROM dev_devinfo a INNER JOIN dev_cpinfo m ON a.deviceid = m.deviceid AND m.cpname='%s' WHERE a.appid =%d AND a.deviceid =%d LIMIT 1;", cmdParameter.Mpname, appId, cmdParameter.DeviceId) } else { sql = fmt.Sprintf("select a.appid,a.deviceid,a.devicename,m.mpid,m.mpname,m.attrname from dev_devinfo a left join dev_mpinfo m on a.deviceid=m.deviceid and m.mpname='%s' where a.appid=%d and a.deviceid=%d limit 1;", cmdParameter.Mpname, appId, cmdParameter.DeviceId) } err = l.SvcCtx.AlarmStrategy.Base.Raw(sql, &field) if err == nil { field.Val = cast.ToString(Result.Value) field.Date = time.Now().Unix() } else { logger.Logger.Debug(err) return false, "" } o := orm.NewOrm() var sqlParamber []interface{} var sqlCommandText = "insert into dev_history(appid,deviceid,devicename,mpid,mpname,attrname,val,date)values(?,?,?,?,?,?,?,from_unixtime(?));" //因为采集硬件方式的不同,采集点和控制点是同一个寄存器的时候,历史数据写入控制点的attrname,如果是分开的则写入采集点的attrname if field.Mpid == "" { field.Mpid = "0" } if tools.IsEmpty(conf.GlobalConfig["DoorWay"]) == "http" { sqlParamber = []interface{}{appId, field.Deviceid, field.Devicename, field.Mpid, field.Mpname, cmdParameter.AttrName, field.Val, field.Date} } else { sqlParamber = []interface{}{appId, field.Deviceid, field.Devicename, field.Mpid, field.Mpname, cmdParameter.Mpname, field.Val, field.Date} } _, err := o.Raw(sqlCommandText, sqlParamber).Exec() if err != nil { logger.Logger.Debug(err) return false, "" } //--end-- new(LogService).SaveLog(fmt.Sprintf("采集成功。设备名称:%s 操作:%s 操作时间:%s", field.Devicename, "采集", tools.NowTime())) if Result.Success && sendStatus == cast.ToInt(Result.Value) { state = true } logger.Logger.Debug(fmt.Sprintf("==CheckState==本次%s控制结果:%v", messageId, state)) return state, tools.IsEmpty(Result.Value) } return state, Result.Message } //获取状态信息 func (field *CommandService) GetCommandState(cmdParameter CommandMgr) (interface{}, string) { AppId := cast.ToInt(RtelecManageApp().RegAppID) var messageId = tools.GetUid() msgObj := new(utils.MsgStateManage) msgObj.SetMessageStateValue(messageId, utils.MsgState{Success: false, State: false, Message: ""}) //2022-11-4判断状态通过发送的dev_cpinfo的attrname去获取对应mpname type ControlMpname struct { Mpid int Mpname string } var mpobj ControlMpname l := logic.NewDeviceLogic() var sqlCommandText = "select ifnull(mp.attrname,'') mpname,ifnull(a.mpid,0) mpid from dev_cpinfo a inner join dev_devinfo b on a.deviceid=b.deviceid left join dev_mpinfo mp on a.mpid=mp.mpid where 1=1" sqlCommandText += fmt.Sprintf(" and a.attrname='%s'", cmdParameter.AttrName) sqlCommandText += fmt.Sprintf(" and a.deviceid=%d", cmdParameter.DeviceId) sqlCommandText += fmt.Sprintf(" and b.appid=%d limit 1", AppId) err := l.SvcCtx.AlarmStrategy.Base.Raw(sqlCommandText, &mpobj) if err == nil { if mpobj.Mpid != 0 { //获取优化测点的信息 var tmp = ControlMpname{} sqlCommandText = fmt.Sprintf("SELECT d.refid mpid,m.attrname mpname from dev_data_optimize d,dev_mpinfo m where d.refid = m.mpid and d.mpid=%d", mpobj.Mpid) err = l.SvcCtx.AlarmStrategy.Base.Raw(sqlCommandText, &tmp) if err == nil { if tmp.Mpname != "" { cmdParameter.Mpname = tmp.Mpname } } } } field.Command(cmdParameter, messageId, 0) timeout, _ := bo.GetSysParamValue("timeout", "3") Result := msgObj.HanderMesage(messageId, cast.ToInt(timeout)) if Result.Success { return Result.Value, "" } else { return -1, Result.Message } } //命令管理 func (field *CommandService) Command(Field CommandMgr, messageId string, commandType int) { var deviceObject = new(DeviceService) deviceObject.DeviceId = Field.DeviceId deviceObject.Model.AttrName = Field.AttrName list, err := deviceObject.GetCommandParameter() if err == nil && len(list) > 0 { apptoken := RtelecManageApp().AppToken for _, record := range list { var parameter = make(map[string]interface{}) parameter["mid"] = messageId parameter["token"] = apptoken utils.MessageIds.Set(messageId, messageId) //mqtt消息中的message数据处理 var para = make(map[string]interface{}) para["timestamp"] = time.Now().Unix() para["deviceName"] = record.Devicename para["modelId"] = record.Modelid para["attrName"] = record.AttrName //这里的mpname其实是采集点的attrname,在CheckState函数中的sql查询的时候,查询的字段是attrname别名为mpname if commandType == 0 && Field.Mpname != "" { para["attrName"] = Field.Mpname } para["commandType"] = commandType para["valueType"] = "number" para["value"] = Field.State Message, _ := json.Marshal(para) msg, _ := new(utils.ToolsLogic).EncryptMsg(Message, []byte(conf.GlobalConfig["rtelec_manage_password"]), true) parameter["message"] = msg value, _ := json.Marshal(parameter) mqtt.PublishMessage(global.Rtelec_Topics["command"], string(value)) } } } //设备重启 func (c *CommandService) DeviceRestart() error { var deviceObject = new(DeviceService) deviceObject.DeviceId = c.DeviceId var modelId float64 modellist := new(ModelService).GetModelListObj() if len(modellist) > 0 { for _, record := range modellist { row := record.(map[string]interface{}) if row["model_name"].(string) == "lightcontorller" { modelId = cast.ToFloat64(row["id"]) } } } if modelId == 0 { return errors.New("模型id不允许为空!") } list, err := deviceObject.GetCommandRestart() if err == nil && len(list) > 0 { for _, record := range list { logger.Logger.Debug(record) var parameter = make(map[string]interface{}) var mid = tools.GetUid() parameter["mid"] = mid parameter["token"] = RtelecManageApp().AppToken utils.MessageIds.Set(mid, mid) //mqtt消息中的message数据处理 var para = make(map[string]interface{}) para["timestamp"] = time.Now().Unix() para["deviceName"] = tools.IsEmpty(record["devicename"]) para["modelId"] = modelId para["attrName"] = "isrest" para["commandType"] = 1 para["valueType"] = "number" para["value"] = 1 Message, _ := json.Marshal(para) msg, _ := new(utils.ToolsLogic).EncryptMsg(Message, []byte(conf.GlobalConfig["rtelec_manage_password"]), true) parameter["message"] = msg value, _ := json.Marshal(parameter) mqtt.PublishMessage(global.Rtelec_Topics["command"], string(value)) new(LogService).SaveLog(fmt.Sprintf("命令发送成功。设备名称:%+v 操作:%s 操作时间:%s", para["deviceName"], "重启", tools.NowTime())) } } if len(list) == 0 { return errors.New("未查询到需要发送指令对应的设备!") } return nil } //处理命令执行后的返回结果 func (field *CommandService) HandleCommandResult(data map[string]interface{}, MessageId string) { logger.Logger.Debug("接收控制结果数据:") logger.Logger.Debug(data) if message, ok := data["message"]; ok && tools.IsEmpty(message) != "" { var tempData map[string]interface{} if reflect.TypeOf(message).Kind() == reflect.String { bytes := []byte(message.(string)) err := json.Unmarshal(bytes, &tempData) if err == nil { if cast.ToInt(tempData["code"]) == 200 { new(utils.MsgStateManage).SetMessageStateValue(MessageId, utils.MsgState{Success: true, State: true, Message: "操作成功", Value: tempData["value"]}) } else { logger.Logger.Debug(fmt.Sprintf("指令%s执行返回失败:%+v", MessageId, data)) new(utils.MsgStateManage).SetMessageStateObj(MessageId, utils.MsgState{Success: true, State: false, Message: tools.IsEmpty(tempData["msg"]), Value: -1}) } } else { logger.Logger.Debug("接收到数据不是有效的JSON格式:") logger.Logger.Debug(data) } } } if MessageId != "" { //对比使用后删除map中的key utils.MessageIds.Remove(MessageId) new(utils.MsgStateManage).RemoveMessageStateObj(MessageId) } } //调用命令公共接口 //attrname:向指定属性(测点)发送指令,未指定时向所有属性发送 func (field *CommandService) CommonInterface(attrname ...string) error { staticAttrName := "" if attrname != nil && len(attrname) > 0 { staticAttrName = attrname[0] } var deviceObject = new(DeviceService) deviceObject.DeviceId = field.DeviceId list, err := deviceObject.GetCmdParameter() if err == nil { if len(list) == 0 { return errors.New("指定的设备不正确") } var messageId = field.CommandData.MessageId for _, record := range list { if staticAttrName == "" || staticAttrName == record.AttrName { var parameter = make(map[string]interface{}) parameter["mid"] = messageId parameter["token"] = RtelecManageApp().AppToken utils.MessageIds.Set(messageId, messageId) var para = make(map[string]interface{}) para["timestamp"] = time.Now().Unix() para["deviceName"] = record.Devicename para["modelId"] = record.Modelid para["attrName"] = record.AttrName para["commandType"] = field.CommandData.CommandType if field.CommandData.ValueType == "" { para["valueType"] = "string" } else { para["valueType"] = field.CommandData.ValueType } field.CommandData.Condition["mid"] = messageId msgContent, _ := json.Marshal(field.CommandData.Condition) para["value"] = base64.StdEncoding.EncodeToString(msgContent) Message, _ := json.Marshal(para) msg, _ := new(utils.ToolsLogic).EncryptMsg(Message, []byte(conf.GlobalConfig["rtelec_manage_password"]), true) parameter["message"] = msg command_text, _ := json.Marshal(parameter) logger.Logger.Debug(fmt.Sprintf("控制操作,操作参数:%+v", para)) mqtt.PublishMessage(global.Rtelec_Topics["command"], string(command_text)) if staticAttrName != "" { //如果指定了定向属性时,发送后立即退出 break } } } } return nil } //发送事件获取指令 //需要订阅dataPush接口,解析查询返回结果数据 <---错误的订阅主题,应该订阅/iss_v1.0.0/isecurity/command/response 2022-11-24 //消息value数据格式: //{ // "code": "getEvent", // "pageNo": 1, // "pageSize": 20, // "startTime": "2022-09-07T00:00:00+08:00", // "endTime": "2022-09-07T23:59:59+08:00", // "mid": "1a41fd60-f393-4ff8-b1dd-8b9b8dc077e3" //} func (field *CommandService) SendGetEvent(comm_code string, attrname ...string) error { staticAttrName := "" if attrname != nil && len(attrname) > 0 { staticAttrName = attrname[0] } if comm_code == "" { comm_code = "getEvent" } if _, has := field.CommandData.Condition["pageNo"]; !has { field.CommandData.Condition["pageNo"] = 1 } if _, has := field.CommandData.Condition["pageSize"]; !has { field.CommandData.Condition["pageSize"] = 20 } field.CommandData.Condition["code"] = comm_code //field.Condition["starttime"] = starttime //field.Condition["endTime"] = tools.NowTime() if _, has := field.CommandData.Condition["startTime"]; !has { field.CommandData.Condition["startTime"] = time.Now().Add(-12 * time.Hour).Format("2006-01-02T15:04:05+08:00") } if _, has := field.CommandData.Condition["endTime"]; !has { field.CommandData.Condition["endTime"] = time.Now().Format("2006-01-02T15:04:05+08:00") } var deviceObject = new(DeviceService) deviceObject.DeviceId = field.DeviceId deviceInfo, _ := deviceObject.GetCommandRestart() if deviceInfo == nil || len(deviceInfo) == 0 { return errors.New(fmt.Sprintf("未查询到设备[ID:%d]对应的设备信息", field.DeviceId)) } deviceSign := deviceInfo[0] var parameter = make(map[string]interface{}) // parameter["code"] = comm_code //附加参数,用于解析结果时标识该数据为事件查询数据 parameter["mid"] = field.CommandData.MessageId parameter["token"] = RtelecManageApp().AppToken utils.MessageIds.Set(field.CommandData.MessageId, field.CommandData.MessageId) var para = make(map[string]interface{}) para["timestamp"] = time.Now().Unix() para["deviceName"] = tools.IsEmpty(deviceSign["devicename"]) para["modelId"] = tools.IsEmpty(deviceSign["modelid"]) para["attrName"] = "Contrl" para["valueType"] = "string" para["commandType"] = 0 if staticAttrName != "" { para["attrName"] = staticAttrName } field.CommandData.Condition["mid"] = field.CommandData.MessageId msgContent, _ := json.Marshal(field.CommandData.Condition) para["value"] = base64.StdEncoding.EncodeToString(msgContent) Message, _ := json.Marshal(para) msg, _ := new(utils.ToolsLogic).EncryptMsg(Message, []byte(conf.GlobalConfig["rtelec_manage_password"]), true) parameter["message"] = msg command_text, _ := json.Marshal(parameter) logger.Logger.Debug(fmt.Sprintf("SendGetEvent操作,操作参数:%+v", para)) mqtt.PublishMessage(global.Rtelec_Topics["command"], string(command_text)) return nil }