123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- package mqtt
- import (
- "scd_check_tools/conf"
- "scd_check_tools/logger"
- "scd_check_tools/tools"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "os"
- "strconv"
- "strings"
- "time"
- //import the Paho Go MQTT library
- MQTT "github.com/eclipse/paho.mqtt.golang"
- "github.com/streadway/amqp"
- //"golang.org/x/net/proxy"
- )
- //数据消息通道
- var MqttMessageChan = make(chan string, 1000)
- var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
- mqttDeal := new(ReceiveDataMessage)
- mqttDeal.Receive(msg.Topic(), string(msg.Payload()))
- }
- //接收rabbitmq的消息
- //接收到消息后通过messageChan通道传递给数据库处理过程
- var frabbit = func(msg amqp.Delivery) {
- msgStr := fmt.Sprintf("%s|%s", msg.RoutingKey, string(msg.Body))
- fmt.Println(tools.NowTime() + " ==== received data:" + msgStr)
- MqttMessageChan <- msgStr
- }
- var (
- url string
- topic string
- cnffile string
- )
- //连接RabbitMQ时,topic属性值为router key
- //仅RabbitMQ需要指定exchangename属性
- type Config struct {
- Mqtt string `json:"mqttname"`
- Host string `json:"host"`
- Port int `json:"port,string"`
- User string `json:"user"`
- Pwd string `json:"pwd"`
- Topic string `json:"topic"`
- ExchangeName string `json:"exchangename"`
- Clientid string `json:"clientid"`
- ExtShell_webtest string `json:"extshell_webtest"`
- }
- var rabbitMqClient *MessagingClient
- var mqttserverconn MQTT.Client
- func failOnError(err error, msg string) {
- if err != nil {
- fmt.Println("%s: %s", msg, err)
- os.Exit(1)
- }
- }
- func Start() {
- cnffile = "conf/mqtt.cnf"
- fmt.Println(tools.NowTime()+" 加载MQTT配置文件", cnffile)
- fileHanlder, err := os.Open(cnffile)
- if err != nil {
- fmt.Println(tools.NowTime()+" 配置文件mqtt.cnf:", err)
- return
- }
- txt, _ := ioutil.ReadAll(fileHanlder)
- fileHanlder.Close()
- txtStr := string(txt)
- if txtStr == "" {
- fmt.Println(tools.NowTime() + " 配置文件mqtt.cnf配置不正确,请正确配置")
- return
- }
- cfgdata := Config{}
- err = json.Unmarshal(txt, &cfgdata)
- if err != nil {
- fmt.Println(tools.NowTime()+" MQTT配置文件解析失败:", err)
- return
- }
- if cfgdata.Topic == "" {
- fmt.Println(tools.NowTime() + " 注意:当前未订阅任何主题,是否需要修复?")
- }
- if cfgdata.Mqtt == "" {
- cfgdata.Mqtt = "MQTT"
- }
- fmt.Println(tools.NowTime() + " 正在连接" + cfgdata.Mqtt + "服务器" + cfgdata.Host + ":" + strconv.Itoa(cfgdata.Port))
- if cfgdata.Mqtt == "rabbitmq" {
- RabbitMQ(cfgdata)
- } else {
- Mqtt(cfgdata)
- }
- //go PublishMessage()
- /*
- go func() {
- //向通道模拟写入数据
- for i := 0; i < 10; i++ {
- msg := "msg id:" + strconv.Itoa(i)
- messageChan <- msg
- fmt.Println(NowTime() + " " + strconv.Itoa(len(messageChan)))
- time.Sleep(1 * time.Second)
- }
- }()
- */
- }
- func RabbitMQ(cfgdata Config) {
- if cfgdata.ExchangeName == "" {
- fmt.Println("exchangename属性未配置")
- os.Exit(1)
- }
- //rabbitMqClient = new(MessagingClient)
- connstr := fmt.Sprintf("amqp://%s:%s@%s:%s", cfgdata.User, cfgdata.Pwd, cfgdata.Host, strconv.Itoa(cfgdata.Port))
- rbconn, err := amqp.Dial(fmt.Sprintf("%s/", connstr))
- failOnError(err, "rabbitMQ["+connstr+"] 连接失败")
- //rabbitMqClient.ConnectToRabbitBroker(connstr)
- RabbitMQSubscribe(rbconn, cfgdata.ExchangeName, "topic", cfgdata.Topic, "", frabbit)
- }
- func RabbitMQSubscribe(mconn *amqp.Connection, exchangeName string, exchangeType string, routerkey string, consumerName string, handlerFunc func(amqp.Delivery)) error {
- if mconn == nil {
- panic("Tried to Subscribe before connection was initialized. Don't do that.")
- }
- ch, err := mconn.Channel()
- failOnError(err, "Failed to open a channel")
- // defer ch.Close()
- err = ch.ExchangeDeclare(
- exchangeName, // name of the exchange
- exchangeType, // type
- true, // durable
- false, // delete when complete
- false, // internal
- false, // noWait
- nil, // arguments
- )
- failOnError(err, "Failed to register an Exchange")
- fmt.Printf("declared Exchange, declaring Queue (%s)", "")
- queue, err := ch.QueueDeclare(
- "", // name of the queue
- false, // durable
- false, // delete when usused
- false, // exclusive
- false, // noWait
- nil, // arguments
- )
- failOnError(err, "Failed to register an Queue")
- fmt.Printf("declared Queue (%d messages, %d consumers), binding to Exchange (key '%s')",
- queue.Messages, queue.Consumers, exchangeName)
- for _, v := range strings.Split(routerkey, ",") {
- err = ch.QueueBind(
- queue.Name, // name of the queue
- v, // bindingKey
- exchangeName, // sourceExchange
- false, // noWait
- nil, // arguments
- )
- if err != nil {
- return fmt.Errorf("Queue Bind: %s", err)
- }
- }
- msgs, err := ch.Consume(
- queue.Name, // queue
- consumerName, // consumer
- true, // auto-ack
- false, // exclusive
- false, // no-local
- false, // no-wait
- nil, // args
- )
- failOnError(err, "Failed to register a consumer")
- go consumeLoop2(msgs, handlerFunc)
- return nil
- }
- func consumeLoop2(deliveries <-chan amqp.Delivery, handlerFunc func(d amqp.Delivery)) {
- for d := range deliveries {
- // Invoke the handlerFunc func we passed as parameter.
- handlerFunc(d)
- }
- }
- func Mqtt(cfgdata Config) MQTT.Client {
- opt := MQTT.NewClientOptions().AddBroker(cfgdata.Host + ":" + strconv.Itoa(cfgdata.Port))
- if cfgdata.Clientid == "" {
- cfgdata.Clientid = "mqtt-receive-" + conf.GlobalConfig["appname"]
- }
- opt.ConnectTimeout = 10 * time.Second
- opt.SetClientID(cfgdata.Clientid)
- opt.SetDefaultPublishHandler(f)
- opt.SetUsername(cfgdata.User)
- opt.SetPassword(cfgdata.Pwd)
- mqttserverconn = MQTT.NewClient(opt)
- for {
- if token := mqttserverconn.Connect(); token.Wait() && token.Error() != nil {
- fmt.Println(tools.NowTime() + " MQTT服务器连接失败:" + token.Error().Error())
- time.Sleep(5 * time.Second)
- } else {
- break
- }
- }
- fmt.Println(tools.NowTime() + " MQTT服务器连接成功...")
- if cfgdata.Topic != "" {
- topiclist := strings.Split(cfgdata.Topic, ",")
- for _, v := range topiclist {
- errcnt := 5
- for {
- if token := mqttserverconn.Subscribe(v, 0, nil); token.Wait() && token.Error() != nil {
- fmt.Println(tools.NowTime() + v + " 主题订阅失败:" + token.Error().Error())
- time.Sleep(5 * time.Second)
- errcnt = errcnt - 1
- if errcnt == 0 {
- break
- }
- } else {
- fmt.Println(tools.NowTime() + " 主题" + v + "订阅成功")
- break
- }
- }
- }
- }
- return mqttserverconn
- }
- func PublishMessage(topic string, text string) {
- if mqttserverconn == nil {
- return
- }
- logger.Logger.Debug(" 发布数据" + text + "到主题" + topic)
- token := mqttserverconn.Publish(topic, 0, false, text)
- token.Wait()
- }
|