package mqtt import ( "scd_check_tools/conf" "scd_check_tools/logger" "scd_check_tools/tools" "encoding/json" "fmt" "io/ioutil" "os" "strconv" "strings" "time" //import the Paho Go MQTT library MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/streadway/amqp" //"golang.org/x/net/proxy" ) //数据消息通道 var MqttMessageChan = make(chan string, 1000) var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { mqttDeal := new(ReceiveDataMessage) mqttDeal.Receive(msg.Topic(), string(msg.Payload())) } //接收rabbitmq的消息 //接收到消息后通过messageChan通道传递给数据库处理过程 var frabbit = func(msg amqp.Delivery) { msgStr := fmt.Sprintf("%s|%s", msg.RoutingKey, string(msg.Body)) fmt.Println(tools.NowTime() + " ==== received data:" + msgStr) MqttMessageChan <- msgStr } var ( url string topic string cnffile string ) //连接RabbitMQ时,topic属性值为router key //仅RabbitMQ需要指定exchangename属性 type Config struct { Mqtt string `json:"mqttname"` Host string `json:"host"` Port int `json:"port,string"` User string `json:"user"` Pwd string `json:"pwd"` Topic string `json:"topic"` ExchangeName string `json:"exchangename"` Clientid string `json:"clientid"` ExtShell_webtest string `json:"extshell_webtest"` } var rabbitMqClient *MessagingClient var mqttserverconn MQTT.Client func failOnError(err error, msg string) { if err != nil { fmt.Println("%s: %s", msg, err) os.Exit(1) } } func Start() { cnffile = "conf/mqtt.cnf" fmt.Println(tools.NowTime()+" 加载MQTT配置文件", cnffile) fileHanlder, err := os.Open(cnffile) if err != nil { fmt.Println(tools.NowTime()+" 配置文件mqtt.cnf:", err) return } txt, _ := ioutil.ReadAll(fileHanlder) fileHanlder.Close() txtStr := string(txt) if txtStr == "" { fmt.Println(tools.NowTime() + " 配置文件mqtt.cnf配置不正确,请正确配置") return } cfgdata := Config{} err = json.Unmarshal(txt, &cfgdata) if err != nil { fmt.Println(tools.NowTime()+" MQTT配置文件解析失败:", err) return } if cfgdata.Topic == "" { fmt.Println(tools.NowTime() + " 注意:当前未订阅任何主题,是否需要修复?") } if cfgdata.Mqtt == "" { cfgdata.Mqtt = "MQTT" } fmt.Println(tools.NowTime() + " 正在连接" + cfgdata.Mqtt + "服务器" + cfgdata.Host + ":" + strconv.Itoa(cfgdata.Port)) if cfgdata.Mqtt == "rabbitmq" { RabbitMQ(cfgdata) } else { Mqtt(cfgdata) } //go PublishMessage() /* go func() { //向通道模拟写入数据 for i := 0; i < 10; i++ { msg := "msg id:" + strconv.Itoa(i) messageChan <- msg fmt.Println(NowTime() + " " + strconv.Itoa(len(messageChan))) time.Sleep(1 * time.Second) } }() */ } func RabbitMQ(cfgdata Config) { if cfgdata.ExchangeName == "" { fmt.Println("exchangename属性未配置") os.Exit(1) } //rabbitMqClient = new(MessagingClient) connstr := fmt.Sprintf("amqp://%s:%s@%s:%s", cfgdata.User, cfgdata.Pwd, cfgdata.Host, strconv.Itoa(cfgdata.Port)) rbconn, err := amqp.Dial(fmt.Sprintf("%s/", connstr)) failOnError(err, "rabbitMQ["+connstr+"] 连接失败") //rabbitMqClient.ConnectToRabbitBroker(connstr) RabbitMQSubscribe(rbconn, cfgdata.ExchangeName, "topic", cfgdata.Topic, "", frabbit) } func RabbitMQSubscribe(mconn *amqp.Connection, exchangeName string, exchangeType string, routerkey string, consumerName string, handlerFunc func(amqp.Delivery)) error { if mconn == nil { panic("Tried to Subscribe before connection was initialized. Don't do that.") } ch, err := mconn.Channel() failOnError(err, "Failed to open a channel") // defer ch.Close() err = ch.ExchangeDeclare( exchangeName, // name of the exchange exchangeType, // type true, // durable false, // delete when complete false, // internal false, // noWait nil, // arguments ) failOnError(err, "Failed to register an Exchange") fmt.Printf("declared Exchange, declaring Queue (%s)", "") queue, err := ch.QueueDeclare( "", // name of the queue false, // durable false, // delete when usused false, // exclusive false, // noWait nil, // arguments ) failOnError(err, "Failed to register an Queue") fmt.Printf("declared Queue (%d messages, %d consumers), binding to Exchange (key '%s')", queue.Messages, queue.Consumers, exchangeName) for _, v := range strings.Split(routerkey, ",") { err = ch.QueueBind( queue.Name, // name of the queue v, // bindingKey exchangeName, // sourceExchange false, // noWait nil, // arguments ) if err != nil { return fmt.Errorf("Queue Bind: %s", err) } } msgs, err := ch.Consume( queue.Name, // queue consumerName, // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") go consumeLoop2(msgs, handlerFunc) return nil } func consumeLoop2(deliveries <-chan amqp.Delivery, handlerFunc func(d amqp.Delivery)) { for d := range deliveries { // Invoke the handlerFunc func we passed as parameter. handlerFunc(d) } } func Mqtt(cfgdata Config) MQTT.Client { opt := MQTT.NewClientOptions().AddBroker(cfgdata.Host + ":" + strconv.Itoa(cfgdata.Port)) if cfgdata.Clientid == "" { cfgdata.Clientid = "mqtt-receive-" + conf.GlobalConfig["appname"] } opt.ConnectTimeout = 10 * time.Second opt.SetClientID(cfgdata.Clientid) opt.SetDefaultPublishHandler(f) opt.SetUsername(cfgdata.User) opt.SetPassword(cfgdata.Pwd) mqttserverconn = MQTT.NewClient(opt) for { if token := mqttserverconn.Connect(); token.Wait() && token.Error() != nil { fmt.Println(tools.NowTime() + " MQTT服务器连接失败:" + token.Error().Error()) time.Sleep(5 * time.Second) } else { break } } fmt.Println(tools.NowTime() + " MQTT服务器连接成功...") if cfgdata.Topic != "" { topiclist := strings.Split(cfgdata.Topic, ",") for _, v := range topiclist { errcnt := 5 for { if token := mqttserverconn.Subscribe(v, 0, nil); token.Wait() && token.Error() != nil { fmt.Println(tools.NowTime() + v + " 主题订阅失败:" + token.Error().Error()) time.Sleep(5 * time.Second) errcnt = errcnt - 1 if errcnt == 0 { break } } else { fmt.Println(tools.NowTime() + " 主题" + v + "订阅成功") break } } } } return mqttserverconn } func PublishMessage(topic string, text string) { if mqttserverconn == nil { return } logger.Logger.Debug(" 发布数据" + text + "到主题" + topic) token := mqttserverconn.Publish(topic, 0, false, text) token.Wait() }