# AMQP 客户端接入
# 1 服务端订阅概述
第三方服务器可以直接订阅多种类型的消息,例如:设备状态变化通知、设备事件通知、设备生命周期变更、设备拓扑结构变更、产品变更、OTA 任务通知等,消息会直接转发至您的服务器。目前暂不支持自定义订阅类型。
# 1.1 使用场景
服务端接收项目下全部设备的订阅数据;
服务端可以部署集群,消费同一项目的订阅消息,消息随机转发至任一服务器,适用于高并发场景;
# 1.2 使用限制
不支持规则转发;
当消费者离线或消费慢时,消息会进入堆积队列,堆积数量有限制;
当前版本不支持消息类型自定义;
# 2 使用 AMQP 服务端订阅
# 2.1 amqp 客户端接入说明
连接配置说明 接入地址:${domainName}:5672
连接认证
userName: clientId&accessKey×tamp password: hmacsha1(accessSecret, "clientId"+clientId+"accessKey"+accessKey+"timestamp"+timestamp)
参数说明:
参数 | 是否必填 | 说明 |
clientId | 是 | 客户端标识,推荐使用 uuid |
accessKey | 是 | 项目 id |
timestamp | 是 | 时间戳,单位秒 |
accessSecret | - | 项目 secret |
# 2.2 接入示例
# 2.2.1 golang sdk 接入示例
package main import ( "crypto/hmac" "crypto/md5" "crypto/sha1" "encoding/hex" "errors" "fmt" "hash" "log" "strconv" "time" "github.com/streadway/amqp" ) var ( RABBITMQ_HOST = "localhost" RABBITMQ_PORT = "5672" ACCESS_KEY = "xgHc40bf04fb020c" ACCESS_SECRET = "6651cccf71b43a5e3c8e2c05bd278a85" CLIENT_ID = "test" ) func main() { username, password := getAMQPAccess(CLIENT_ID, ACCESS_KEY, ACCESS_SECRET) client, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s", username, password, RABBITMQ_HOST, RABBITMQ_PORT)) if err != nil { return } amqpCh, err := client.Channel() if err != nil { return } msgCh, err := amqpCh.Consume(ACCESS_KEY, "", true, false, false, true, amqp.Table{}) if err != nil { return } for i := range msgCh { log.Printf("msg from amqp: %s", i.Body) } } func getAMQPAccess(clientID, accessKey, accessSecret string) (username, password string) { timestamp := strconv.Itoa(int(time.Now().Unix())) sign, _ := authAMQPSign(clientID, accessKey, timestamp, accessSecret, "hmacsha1") username = fmt.Sprintf("%s&%s&%s", clientID, accessKey, timestamp) password = sign return } func authAMQPSign(clientId, accessKey, timestamp, accessSecret, signMethod string) (string, error) { src := "" src = fmt.Sprintf("clientId%saccessKey%s", clientId, accessKey) if timestamp != "" { src = src + "timestamp" + timestamp } var h hash.Hash switch signMethod { case "hmacsha1": h = hmac.New(sha1.New, []byte(accessSecret)) case "hmacmd5": h = hmac.New(md5.New, []byte(accessSecret)) default: return "", errors.New("invalid sign method") } _, err := h.Write([]byte(src)) if err != nil { return "", err } return hex.EncodeToString(h.Sum(nil)), nil }