123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- package mqtt
- import (
- "encoding/json"
- "log"
- "scd_check_tools/conf"
- "scd_check_tools/global"
- "scd_check_tools/sms"
- "scd_check_tools/tools"
- "strings"
- "github.com/astaxie/beego/orm"
- )
- //mqtt消息处理控制器
- type ReceiveDataMessage struct {
- }
- func (c *ReceiveDataMessage) Receive(topic string, msg string) error {
- if conf.GlobalConfig["loglevel"] == "5" {
- log.Println("ReceiveDataMessage:" + msg)
- }
- obj := map[string]interface{}{}
- err := json.Unmarshal([]byte(msg), &obj)
- if err != nil {
- log.Println(err)
- return err
- }
- //imsi黑名单查询
- if topic == "/jujutong/police/blacklist/imsi/query" {
- c.MqttQueryList("imsi")
- return nil
- }
- appid := tools.IsEmpty(obj["appid"])
- //人脸特征值检测
- if topic == "/jujutong/police/face/extractfeatureresult" || topic == "/jujutong/police/face/extractfeatureerror" {
- if appid == "full_search_face" {
- imgid := tools.IsEmpty(obj["imgid"])
- if imgid != "" {
- global.ImgFeartyueCheckListLock.Lock()
- global.ImgFeartyueCheckList[imgid] = obj
- global.ImgFeartyueCheckListLock.Unlock()
- }
- return nil
- }
- return nil
- }
- if topic == "/jujutong/police/face/comperror" {
- //log.Println(msg)
- return nil
- }
- client_id := tools.IsEmpty(obj["client_id"])
- //布控分析结果
- if client_id == "mqtt_client_control_publish" && topic == "/jujutong/police/face/compresult" {
- count := tools.IsEmpty(obj["count"])
- if count == "" || count == "0" {
- return nil
- }
- result := obj["result"].([]interface{})
- row1 := result[0].(map[string]interface{})
- control_id := tools.IsEmpty(row1["control_id"])
- control_sub_id := tools.IsEmpty(row1["control_sub_id"])
- if control_id == "" || control_sub_id == "" {
- return nil
- }
- go c.SendControlTaskSms(control_id, control_sub_id)
- return nil
- }
- //碰撞分析结果
- if client_id == "mqtt_client_collision_publish" && topic == "/jujutong/police/face/compresult" {
- return nil
- }
- //伴随分析结果
- if appid == "accompany_task_process" && strings.HasPrefix(topic, "/jujutong/police/accompany/") {
- state := tools.IsEmpty(obj["state"])
- if state == "2" {
- accompanyid := tools.IsEmpty(obj["accompanyid"])
- go c.SendAccompanyTaskSms(accompanyid)
- }
- return nil
- }
- if appid == "ftp_face_comp" && topic == "/jujutong/police/face/compresult" {
- faceid := tools.IsEmpty(obj["faceid"])
- if faceid == "" {
- return nil
- }
- camera_sn := tools.IsEmpty(obj["camera_sn"])
- floor := tools.IsEmpty(obj["floor"])
- go func() {
- c.saveSameFace(faceid, floor, camera_sn, obj["result"])
- }()
- }
- return nil
- }
- //布控任务短信通知
- func (c *ReceiveDataMessage) SendControlTaskSms(controlid, controlsubid string) {
- if controlid == "" || controlsubid == "" {
- return
- }
- db := orm.NewOrm()
- sql := "select s1.name, s.control_name,(select mobilephone from t_data_user where s1.cr=id) phone from t_data_control_sub s,t_data_control s1 where s.controlid=s1.id and s.id=? and s1.id=?"
- var tableData []orm.Params
- _, err := db.Raw(sql, controlsubid, controlid).Values(&tableData)
- if err != nil {
- log.Println(err)
- return
- }
- if len(tableData) > 0 {
- taskname := tools.IsEmpty(tableData[0]["name"]) + "-" + tools.IsEmpty(tableData[0]["control_name"])
- phone := tools.IsEmpty(tableData[0]["phone"])
- if phone != "" {
- smsplatform := conf.GlobalConfig["sms.platform"]
- if smsplatform == "" {
- log.Println("未配置短信消息发送平台")
- } else {
- smsSend := sms.GetSmsInstance(smsplatform)
- datatype := sms.TemplateCode_SetControl_Task
- params := []string{"布控分析", ":" + taskname + " 新"}
- phone := strings.Split(phone, ",")
- smsSend.Send(datatype, params, phone)
- }
- }
- }
- }
- //伴随任务完成短信通知
- func (c *ReceiveDataMessage) SendAccompanyTaskSms(accompanyid string) {
- if accompanyid == "" {
- return
- }
- o := orm.NewOrm()
- sqlCommandText := "select a.name,(select mobilephone from t_data_user where a.cr=id) phone,(select count(DISTINCT face_id) from t_data_accompany_result where accompany_id=a.id) cnt from t_data_accompany a where a.id=?;"
- var tableData []orm.Params
- _, err := o.Raw(sqlCommandText, accompanyid).Values(&tableData)
- if err != nil {
- log.Println(err)
- return
- }
- if len(tableData) > 0 {
- taskname := tools.IsEmpty(tableData[0]["name"])
- phone := tools.IsEmpty(tableData[0]["phone"])
- cnt := tools.IsEmpty(tableData[0]["cnt"])
- if phone != "" {
- smsplatform := conf.GlobalConfig["sms.platform"]
- if smsplatform == "" {
- log.Println("未配置短信消息发送平台")
- } else {
- smsSend := sms.GetSmsInstance(smsplatform)
- datatype := sms.TemplateCode_Follow_Collision_task
- params := []string{"伴随分析", ":" + taskname + " ", cnt}
- phone := strings.Split(phone, ",")
- smsSend.Send(datatype, params, phone)
- }
- }
- }
- }
- func (c *ReceiveDataMessage) MqttQueryList(typecode string) error {
- o := orm.NewOrm()
- var sqlCommandText string
- var sqlCondition []string
- var sqlParameter []interface{}
- sqlCommandText = "select * from t_data_blacklist a "
- if typecode != "" {
- sqlCondition = append(sqlCondition, "a.typecode=?")
- sqlParameter = append(sqlParameter, typecode)
- }
- if len(sqlCondition) > 0 {
- sqlCommandText += " where " + strings.Join(sqlCondition, " and ")
- }
- sqlCommandText += " order by a.id desc "
- var tableData []orm.Params
- _, err := o.Raw(sqlCommandText, sqlParameter).Values(&tableData)
- if err != nil {
- log.Println(err)
- return err
- }
- msg := map[string]interface{}{}
- msg["opt"] = "queryresult"
- list := []string{}
- for _, row := range tableData {
- list = append(list, tools.IsEmpty(row["v"]))
- }
- msg["list"] = list
- msgStr, _ := json.Marshal(msg)
- PublishMessage("/jujutong/police/blacklist/imsi", string(msgStr))
- return nil
- }
- //获取到人像匹配结果
- //加入到有特征人像分组表及子表中
- //floor与camera_sn都有值时表示进行了娄层比对;仅指定了camera_sn时表示楼栋比对
- func (c *ReceiveDataMessage) saveSameFace(faceid, floor, camera_sn string, result interface{}) {
- db := orm.NewOrm()
- var err error
- if result != nil {
- resultLst := result.([]interface{})
- if len(resultLst) > 0 {
- faces1 := []string{}
- for _, r := range resultLst {
- r1 := r.(map[string]interface{})
- faces1 = append(faces1, tools.IsEmpty(r1["fid"]))
- }
- sql := "select id,faceid from t_face_hasfeature_group where faceid in ('" + strings.Join(faces1, "','") + "')"
- gary := []orm.Params{}
- db.Raw(sql, faceid).Values(&gary)
- faceInGroup := map[string]string{}
- for _, t := range gary {
- faceInGroup[t["faceid"].(string)] = t["id"].(string)
- }
- sql = "insert into t_relation_face_same(groupid, faceid,camearsn,samefaceid,samevalue)values(?,?,?,?,?)"
- if len(faceInGroup) == 0 {
- sqlresult, _ := db.Raw("insert into t_face_hasfeature_group(camearsn,faceid)values(?,?)", camera_sn, faceid).Exec()
- newgid, _ := sqlresult.LastInsertId()
- if newgid > 0 {
- for _, r := range resultLst {
- r1 := r.(map[string]interface{})
- sameFaceID := tools.IsEmpty(r1["fid"])
- cv := tools.IsEmpty(r1["cv"])
- if cv == "" {
- cv = "0"
- }
- _, err = db.Raw(sql, newgid, faceid, camera_sn, sameFaceID, cv).Exec()
- if err != nil {
- log.Println(err)
- }
- }
- }
- return
- }
- for _, r := range resultLst {
- r1 := r.(map[string]interface{})
- groupFaceID := tools.IsEmpty(r1["fid"])
- if gid, has := faceInGroup[groupFaceID]; has {
- cv := tools.IsEmpty(r1["cv"])
- if cv == "" {
- cv = "0"
- }
- _, err = db.Raw(sql, gid, groupFaceID, camera_sn, faceid, cv).Exec()
- if err != nil {
- log.Println(err)
- }
- }
- }
- return
- }
- }
- if floor != "" {
- //楼层比对没有结果,进行楼栋主人像比对
- dst := []orm.Params{}
- _, err = db.Raw("select faceid from t_face_hasfeature_group where camearsn=?", camera_sn).Values(&dst)
- if err != nil {
- log.Println(err)
- } else if len(dst) > 0 {
- faces := []string{}
- for _, r := range dst {
- faces = append(faces, r["faceid"].(string))
- }
- //发送mqtt消息到人像处理服务,进行相似人像匹配
- msgObj := map[string]string{}
- msgObj["appid"] = "ftp_face_comp"
- msgObj["faceid"] = faceid
- msgObj["score"] = global.FaceSameValue
- msgObj["camera_sn"] = camera_sn
- msgObj["compfaces"] = strings.Join(faces, ",")
- msgStr, _ := json.Marshal(msgObj)
- PublishMessage("/jujutong/police/face/comp", string(msgStr))
- return
- } else {
- _, err = db.Raw("insert into t_face_hasfeature_group(faceid,camearsn)values(?,?)", faceid, camera_sn).Exec()
- if err != nil {
- log.Println(err)
- }
- }
- } else if floor == "" && camera_sn != "" {
- //对楼栋进行比对:当前人像还没有相似人像,加入到有特征人像分组表做为主记录
- _, err = db.Raw("insert into t_face_hasfeature_group(faceid,camearsn)values(?,?)", faceid, camera_sn).Exec()
- if err != nil {
- log.Println(err)
- }
- return
- }
- }
|