package mqtt import ( "encoding/json" "log" "scd_check_tools/conf" "scd_check_tools/global" "scd_check_tools/sms" "scd_check_tools/tools" "strings" "github.com/astaxie/beego/orm" ) //mqtt消息处理控制器 type ReceiveDataMessage struct { } func (c *ReceiveDataMessage) Receive(topic string, msg string) error { if conf.GlobalConfig["loglevel"] == "5" { log.Println("ReceiveDataMessage:" + msg) } obj := map[string]interface{}{} err := json.Unmarshal([]byte(msg), &obj) if err != nil { log.Println(err) return err } //imsi黑名单查询 if topic == "/jujutong/police/blacklist/imsi/query" { c.MqttQueryList("imsi") return nil } appid := tools.IsEmpty(obj["appid"]) //人脸特征值检测 if topic == "/jujutong/police/face/extractfeatureresult" || topic == "/jujutong/police/face/extractfeatureerror" { if appid == "full_search_face" { imgid := tools.IsEmpty(obj["imgid"]) if imgid != "" { global.ImgFeartyueCheckListLock.Lock() global.ImgFeartyueCheckList[imgid] = obj global.ImgFeartyueCheckListLock.Unlock() } return nil } return nil } if topic == "/jujutong/police/face/comperror" { //log.Println(msg) return nil } client_id := tools.IsEmpty(obj["client_id"]) //布控分析结果 if client_id == "mqtt_client_control_publish" && topic == "/jujutong/police/face/compresult" { count := tools.IsEmpty(obj["count"]) if count == "" || count == "0" { return nil } result := obj["result"].([]interface{}) row1 := result[0].(map[string]interface{}) control_id := tools.IsEmpty(row1["control_id"]) control_sub_id := tools.IsEmpty(row1["control_sub_id"]) if control_id == "" || control_sub_id == "" { return nil } go c.SendControlTaskSms(control_id, control_sub_id) return nil } //碰撞分析结果 if client_id == "mqtt_client_collision_publish" && topic == "/jujutong/police/face/compresult" { return nil } //伴随分析结果 if appid == "accompany_task_process" && strings.HasPrefix(topic, "/jujutong/police/accompany/") { state := tools.IsEmpty(obj["state"]) if state == "2" { accompanyid := tools.IsEmpty(obj["accompanyid"]) go c.SendAccompanyTaskSms(accompanyid) } return nil } if appid == "ftp_face_comp" && topic == "/jujutong/police/face/compresult" { faceid := tools.IsEmpty(obj["faceid"]) if faceid == "" { return nil } camera_sn := tools.IsEmpty(obj["camera_sn"]) floor := tools.IsEmpty(obj["floor"]) go func() { c.saveSameFace(faceid, floor, camera_sn, obj["result"]) }() } return nil } //布控任务短信通知 func (c *ReceiveDataMessage) SendControlTaskSms(controlid, controlsubid string) { if controlid == "" || controlsubid == "" { return } db := orm.NewOrm() sql := "select s1.name, s.control_name,(select mobilephone from t_data_user where s1.cr=id) phone from t_data_control_sub s,t_data_control s1 where s.controlid=s1.id and s.id=? and s1.id=?" var tableData []orm.Params _, err := db.Raw(sql, controlsubid, controlid).Values(&tableData) if err != nil { log.Println(err) return } if len(tableData) > 0 { taskname := tools.IsEmpty(tableData[0]["name"]) + "-" + tools.IsEmpty(tableData[0]["control_name"]) phone := tools.IsEmpty(tableData[0]["phone"]) if phone != "" { smsplatform := conf.GlobalConfig["sms.platform"] if smsplatform == "" { log.Println("未配置短信消息发送平台") } else { smsSend := sms.GetSmsInstance(smsplatform) datatype := sms.TemplateCode_SetControl_Task params := []string{"布控分析", ":" + taskname + " 新"} phone := strings.Split(phone, ",") smsSend.Send(datatype, params, phone) } } } } //伴随任务完成短信通知 func (c *ReceiveDataMessage) SendAccompanyTaskSms(accompanyid string) { if accompanyid == "" { return } o := orm.NewOrm() sqlCommandText := "select a.name,(select mobilephone from t_data_user where a.cr=id) phone,(select count(DISTINCT face_id) from t_data_accompany_result where accompany_id=a.id) cnt from t_data_accompany a where a.id=?;" var tableData []orm.Params _, err := o.Raw(sqlCommandText, accompanyid).Values(&tableData) if err != nil { log.Println(err) return } if len(tableData) > 0 { taskname := tools.IsEmpty(tableData[0]["name"]) phone := tools.IsEmpty(tableData[0]["phone"]) cnt := tools.IsEmpty(tableData[0]["cnt"]) if phone != "" { smsplatform := conf.GlobalConfig["sms.platform"] if smsplatform == "" { log.Println("未配置短信消息发送平台") } else { smsSend := sms.GetSmsInstance(smsplatform) datatype := sms.TemplateCode_Follow_Collision_task params := []string{"伴随分析", ":" + taskname + " ", cnt} phone := strings.Split(phone, ",") smsSend.Send(datatype, params, phone) } } } } func (c *ReceiveDataMessage) MqttQueryList(typecode string) error { o := orm.NewOrm() var sqlCommandText string var sqlCondition []string var sqlParameter []interface{} sqlCommandText = "select * from t_data_blacklist a " if typecode != "" { sqlCondition = append(sqlCondition, "a.typecode=?") sqlParameter = append(sqlParameter, typecode) } if len(sqlCondition) > 0 { sqlCommandText += " where " + strings.Join(sqlCondition, " and ") } sqlCommandText += " order by a.id desc " var tableData []orm.Params _, err := o.Raw(sqlCommandText, sqlParameter).Values(&tableData) if err != nil { log.Println(err) return err } msg := map[string]interface{}{} msg["opt"] = "queryresult" list := []string{} for _, row := range tableData { list = append(list, tools.IsEmpty(row["v"])) } msg["list"] = list msgStr, _ := json.Marshal(msg) PublishMessage("/jujutong/police/blacklist/imsi", string(msgStr)) return nil } //获取到人像匹配结果 //加入到有特征人像分组表及子表中 //floor与camera_sn都有值时表示进行了娄层比对;仅指定了camera_sn时表示楼栋比对 func (c *ReceiveDataMessage) saveSameFace(faceid, floor, camera_sn string, result interface{}) { db := orm.NewOrm() var err error if result != nil { resultLst := result.([]interface{}) if len(resultLst) > 0 { faces1 := []string{} for _, r := range resultLst { r1 := r.(map[string]interface{}) faces1 = append(faces1, tools.IsEmpty(r1["fid"])) } sql := "select id,faceid from t_face_hasfeature_group where faceid in ('" + strings.Join(faces1, "','") + "')" gary := []orm.Params{} db.Raw(sql, faceid).Values(&gary) faceInGroup := map[string]string{} for _, t := range gary { faceInGroup[t["faceid"].(string)] = t["id"].(string) } sql = "insert into t_relation_face_same(groupid, faceid,camearsn,samefaceid,samevalue)values(?,?,?,?,?)" if len(faceInGroup) == 0 { sqlresult, _ := db.Raw("insert into t_face_hasfeature_group(camearsn,faceid)values(?,?)", camera_sn, faceid).Exec() newgid, _ := sqlresult.LastInsertId() if newgid > 0 { for _, r := range resultLst { r1 := r.(map[string]interface{}) sameFaceID := tools.IsEmpty(r1["fid"]) cv := tools.IsEmpty(r1["cv"]) if cv == "" { cv = "0" } _, err = db.Raw(sql, newgid, faceid, camera_sn, sameFaceID, cv).Exec() if err != nil { log.Println(err) } } } return } for _, r := range resultLst { r1 := r.(map[string]interface{}) groupFaceID := tools.IsEmpty(r1["fid"]) if gid, has := faceInGroup[groupFaceID]; has { cv := tools.IsEmpty(r1["cv"]) if cv == "" { cv = "0" } _, err = db.Raw(sql, gid, groupFaceID, camera_sn, faceid, cv).Exec() if err != nil { log.Println(err) } } } return } } if floor != "" { //楼层比对没有结果,进行楼栋主人像比对 dst := []orm.Params{} _, err = db.Raw("select faceid from t_face_hasfeature_group where camearsn=?", camera_sn).Values(&dst) if err != nil { log.Println(err) } else if len(dst) > 0 { faces := []string{} for _, r := range dst { faces = append(faces, r["faceid"].(string)) } //发送mqtt消息到人像处理服务,进行相似人像匹配 msgObj := map[string]string{} msgObj["appid"] = "ftp_face_comp" msgObj["faceid"] = faceid msgObj["score"] = global.FaceSameValue msgObj["camera_sn"] = camera_sn msgObj["compfaces"] = strings.Join(faces, ",") msgStr, _ := json.Marshal(msgObj) PublishMessage("/jujutong/police/face/comp", string(msgStr)) return } else { _, err = db.Raw("insert into t_face_hasfeature_group(faceid,camearsn)values(?,?)", faceid, camera_sn).Exec() if err != nil { log.Println(err) } } } else if floor == "" && camera_sn != "" { //对楼栋进行比对:当前人像还没有相似人像,加入到有特征人像分组表做为主记录 _, err = db.Raw("insert into t_face_hasfeature_group(faceid,camearsn)values(?,?)", faceid, camera_sn).Exec() if err != nil { log.Println(err) } return } }