mqtt_receive_message.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package mqtt
  2. import (
  3. "encoding/json"
  4. "log"
  5. "scd_check_tools/conf"
  6. "scd_check_tools/global"
  7. "scd_check_tools/sms"
  8. "scd_check_tools/tools"
  9. "strings"
  10. "github.com/astaxie/beego/orm"
  11. )
  12. //mqtt消息处理控制器
  13. type ReceiveDataMessage struct {
  14. }
  15. func (c *ReceiveDataMessage) Receive(topic string, msg string) error {
  16. if conf.GlobalConfig["loglevel"] == "5" {
  17. log.Println("ReceiveDataMessage:" + msg)
  18. }
  19. obj := map[string]interface{}{}
  20. err := json.Unmarshal([]byte(msg), &obj)
  21. if err != nil {
  22. log.Println(err)
  23. return err
  24. }
  25. //imsi黑名单查询
  26. if topic == "/jujutong/police/blacklist/imsi/query" {
  27. c.MqttQueryList("imsi")
  28. return nil
  29. }
  30. appid := tools.IsEmpty(obj["appid"])
  31. //人脸特征值检测
  32. if topic == "/jujutong/police/face/extractfeatureresult" || topic == "/jujutong/police/face/extractfeatureerror" {
  33. if appid == "full_search_face" {
  34. imgid := tools.IsEmpty(obj["imgid"])
  35. if imgid != "" {
  36. global.ImgFeartyueCheckListLock.Lock()
  37. global.ImgFeartyueCheckList[imgid] = obj
  38. global.ImgFeartyueCheckListLock.Unlock()
  39. }
  40. return nil
  41. }
  42. return nil
  43. }
  44. if topic == "/jujutong/police/face/comperror" {
  45. //log.Println(msg)
  46. return nil
  47. }
  48. client_id := tools.IsEmpty(obj["client_id"])
  49. //布控分析结果
  50. if client_id == "mqtt_client_control_publish" && topic == "/jujutong/police/face/compresult" {
  51. count := tools.IsEmpty(obj["count"])
  52. if count == "" || count == "0" {
  53. return nil
  54. }
  55. result := obj["result"].([]interface{})
  56. row1 := result[0].(map[string]interface{})
  57. control_id := tools.IsEmpty(row1["control_id"])
  58. control_sub_id := tools.IsEmpty(row1["control_sub_id"])
  59. if control_id == "" || control_sub_id == "" {
  60. return nil
  61. }
  62. go c.SendControlTaskSms(control_id, control_sub_id)
  63. return nil
  64. }
  65. //碰撞分析结果
  66. if client_id == "mqtt_client_collision_publish" && topic == "/jujutong/police/face/compresult" {
  67. return nil
  68. }
  69. //伴随分析结果
  70. if appid == "accompany_task_process" && strings.HasPrefix(topic, "/jujutong/police/accompany/") {
  71. state := tools.IsEmpty(obj["state"])
  72. if state == "2" {
  73. accompanyid := tools.IsEmpty(obj["accompanyid"])
  74. go c.SendAccompanyTaskSms(accompanyid)
  75. }
  76. return nil
  77. }
  78. if appid == "ftp_face_comp" && topic == "/jujutong/police/face/compresult" {
  79. faceid := tools.IsEmpty(obj["faceid"])
  80. if faceid == "" {
  81. return nil
  82. }
  83. camera_sn := tools.IsEmpty(obj["camera_sn"])
  84. floor := tools.IsEmpty(obj["floor"])
  85. go func() {
  86. c.saveSameFace(faceid, floor, camera_sn, obj["result"])
  87. }()
  88. }
  89. return nil
  90. }
  91. //布控任务短信通知
  92. func (c *ReceiveDataMessage) SendControlTaskSms(controlid, controlsubid string) {
  93. if controlid == "" || controlsubid == "" {
  94. return
  95. }
  96. db := orm.NewOrm()
  97. sql := "select s1.name, s.control_name,(select mobilephone from t_data_user where s1.cr=id) phone from t_data_control_sub s,t_data_control s1 where s.controlid=s1.id and s.id=? and s1.id=?"
  98. var tableData []orm.Params
  99. _, err := db.Raw(sql, controlsubid, controlid).Values(&tableData)
  100. if err != nil {
  101. log.Println(err)
  102. return
  103. }
  104. if len(tableData) > 0 {
  105. taskname := tools.IsEmpty(tableData[0]["name"]) + "-" + tools.IsEmpty(tableData[0]["control_name"])
  106. phone := tools.IsEmpty(tableData[0]["phone"])
  107. if phone != "" {
  108. smsplatform := conf.GlobalConfig["sms.platform"]
  109. if smsplatform == "" {
  110. log.Println("未配置短信消息发送平台")
  111. } else {
  112. smsSend := sms.GetSmsInstance(smsplatform)
  113. datatype := sms.TemplateCode_SetControl_Task
  114. params := []string{"布控分析", ":" + taskname + " 新"}
  115. phone := strings.Split(phone, ",")
  116. smsSend.Send(datatype, params, phone)
  117. }
  118. }
  119. }
  120. }
  121. //伴随任务完成短信通知
  122. func (c *ReceiveDataMessage) SendAccompanyTaskSms(accompanyid string) {
  123. if accompanyid == "" {
  124. return
  125. }
  126. o := orm.NewOrm()
  127. sqlCommandText := "select a.name,(select mobilephone from t_data_user where a.cr=id) phone,(select count(DISTINCT face_id) from t_data_accompany_result where accompany_id=a.id) cnt from t_data_accompany a where a.id=?;"
  128. var tableData []orm.Params
  129. _, err := o.Raw(sqlCommandText, accompanyid).Values(&tableData)
  130. if err != nil {
  131. log.Println(err)
  132. return
  133. }
  134. if len(tableData) > 0 {
  135. taskname := tools.IsEmpty(tableData[0]["name"])
  136. phone := tools.IsEmpty(tableData[0]["phone"])
  137. cnt := tools.IsEmpty(tableData[0]["cnt"])
  138. if phone != "" {
  139. smsplatform := conf.GlobalConfig["sms.platform"]
  140. if smsplatform == "" {
  141. log.Println("未配置短信消息发送平台")
  142. } else {
  143. smsSend := sms.GetSmsInstance(smsplatform)
  144. datatype := sms.TemplateCode_Follow_Collision_task
  145. params := []string{"伴随分析", ":" + taskname + " ", cnt}
  146. phone := strings.Split(phone, ",")
  147. smsSend.Send(datatype, params, phone)
  148. }
  149. }
  150. }
  151. }
  152. func (c *ReceiveDataMessage) MqttQueryList(typecode string) error {
  153. o := orm.NewOrm()
  154. var sqlCommandText string
  155. var sqlCondition []string
  156. var sqlParameter []interface{}
  157. sqlCommandText = "select * from t_data_blacklist a "
  158. if typecode != "" {
  159. sqlCondition = append(sqlCondition, "a.typecode=?")
  160. sqlParameter = append(sqlParameter, typecode)
  161. }
  162. if len(sqlCondition) > 0 {
  163. sqlCommandText += " where " + strings.Join(sqlCondition, " and ")
  164. }
  165. sqlCommandText += " order by a.id desc "
  166. var tableData []orm.Params
  167. _, err := o.Raw(sqlCommandText, sqlParameter).Values(&tableData)
  168. if err != nil {
  169. log.Println(err)
  170. return err
  171. }
  172. msg := map[string]interface{}{}
  173. msg["opt"] = "queryresult"
  174. list := []string{}
  175. for _, row := range tableData {
  176. list = append(list, tools.IsEmpty(row["v"]))
  177. }
  178. msg["list"] = list
  179. msgStr, _ := json.Marshal(msg)
  180. PublishMessage("/jujutong/police/blacklist/imsi", string(msgStr))
  181. return nil
  182. }
  183. //获取到人像匹配结果
  184. //加入到有特征人像分组表及子表中
  185. //floor与camera_sn都有值时表示进行了娄层比对;仅指定了camera_sn时表示楼栋比对
  186. func (c *ReceiveDataMessage) saveSameFace(faceid, floor, camera_sn string, result interface{}) {
  187. db := orm.NewOrm()
  188. var err error
  189. if result != nil {
  190. resultLst := result.([]interface{})
  191. if len(resultLst) > 0 {
  192. faces1 := []string{}
  193. for _, r := range resultLst {
  194. r1 := r.(map[string]interface{})
  195. faces1 = append(faces1, tools.IsEmpty(r1["fid"]))
  196. }
  197. sql := "select id,faceid from t_face_hasfeature_group where faceid in ('" + strings.Join(faces1, "','") + "')"
  198. gary := []orm.Params{}
  199. db.Raw(sql, faceid).Values(&gary)
  200. faceInGroup := map[string]string{}
  201. for _, t := range gary {
  202. faceInGroup[t["faceid"].(string)] = t["id"].(string)
  203. }
  204. sql = "insert into t_relation_face_same(groupid, faceid,camearsn,samefaceid,samevalue)values(?,?,?,?,?)"
  205. if len(faceInGroup) == 0 {
  206. sqlresult, _ := db.Raw("insert into t_face_hasfeature_group(camearsn,faceid)values(?,?)", camera_sn, faceid).Exec()
  207. newgid, _ := sqlresult.LastInsertId()
  208. if newgid > 0 {
  209. for _, r := range resultLst {
  210. r1 := r.(map[string]interface{})
  211. sameFaceID := tools.IsEmpty(r1["fid"])
  212. cv := tools.IsEmpty(r1["cv"])
  213. if cv == "" {
  214. cv = "0"
  215. }
  216. _, err = db.Raw(sql, newgid, faceid, camera_sn, sameFaceID, cv).Exec()
  217. if err != nil {
  218. log.Println(err)
  219. }
  220. }
  221. }
  222. return
  223. }
  224. for _, r := range resultLst {
  225. r1 := r.(map[string]interface{})
  226. groupFaceID := tools.IsEmpty(r1["fid"])
  227. if gid, has := faceInGroup[groupFaceID]; has {
  228. cv := tools.IsEmpty(r1["cv"])
  229. if cv == "" {
  230. cv = "0"
  231. }
  232. _, err = db.Raw(sql, gid, groupFaceID, camera_sn, faceid, cv).Exec()
  233. if err != nil {
  234. log.Println(err)
  235. }
  236. }
  237. }
  238. return
  239. }
  240. }
  241. if floor != "" {
  242. //楼层比对没有结果,进行楼栋主人像比对
  243. dst := []orm.Params{}
  244. _, err = db.Raw("select faceid from t_face_hasfeature_group where camearsn=?", camera_sn).Values(&dst)
  245. if err != nil {
  246. log.Println(err)
  247. } else if len(dst) > 0 {
  248. faces := []string{}
  249. for _, r := range dst {
  250. faces = append(faces, r["faceid"].(string))
  251. }
  252. //发送mqtt消息到人像处理服务,进行相似人像匹配
  253. msgObj := map[string]string{}
  254. msgObj["appid"] = "ftp_face_comp"
  255. msgObj["faceid"] = faceid
  256. msgObj["score"] = global.FaceSameValue
  257. msgObj["camera_sn"] = camera_sn
  258. msgObj["compfaces"] = strings.Join(faces, ",")
  259. msgStr, _ := json.Marshal(msgObj)
  260. PublishMessage("/jujutong/police/face/comp", string(msgStr))
  261. return
  262. } else {
  263. _, err = db.Raw("insert into t_face_hasfeature_group(faceid,camearsn)values(?,?)", faceid, camera_sn).Exec()
  264. if err != nil {
  265. log.Println(err)
  266. }
  267. }
  268. } else if floor == "" && camera_sn != "" {
  269. //对楼栋进行比对:当前人像还没有相似人像,加入到有特征人像分组表做为主记录
  270. _, err = db.Raw("insert into t_face_hasfeature_group(faceid,camearsn)values(?,?)", faceid, camera_sn).Exec()
  271. if err != nil {
  272. log.Println(err)
  273. }
  274. return
  275. }
  276. }