# AMQP 客户端接入

# 1 服务端订阅概述

第三方服务器可以直接订阅多种类型的消息,例如:设备状态变化通知、设备事件通知、设备生命周期变更、设备拓扑结构变更、产品变更、OTA 任务通知等,消息会直接转发至您的服务器。目前暂不支持自定义订阅类型

# 1.1 使用场景

  • 服务端接收项目下全部设备的订阅数据;

  • 服务端可以部署集群,消费同一项目的订阅消息,消息随机转发至任一服务器,适用于高并发场景;

# 1.2 使用限制

  • 不支持规则转发;

  • 当消费者离线或消费慢时,消息会进入堆积队列,堆积数量有限制;

  • 当前版本不支持消息类型自定义;

# 2 使用 AMQP 服务端订阅

# 2.1 amqp 客户端接入说明

  1. 连接配置说明 接入地址:${domainName}:5672

  2. 连接认证

userName: clientId&accessKey&timestamp
password: hmacsha1(accessSecret, "clientId"+clientId+"accessKey"+accessKey+"timestamp"+timestamp)

参数说明:

参数

是否必填

说明

clientId

客户端标识,推荐使用 uuid

accessKey

项目 id

timestamp

时间戳,单位秒

accessSecret

-

项目 secret

# 2.2 接入示例

# 2.2.1 golang sdk 接入示例
package main

import (
	"crypto/hmac"
	"crypto/md5"
	"crypto/sha1"
	"encoding/hex"
	"errors"
	"fmt"
	"hash"
	"log"
	"strconv"
	"time"

	"github.com/streadway/amqp"
)

var (
	RABBITMQ_HOST = "localhost"
	RABBITMQ_PORT = "5672"

	ACCESS_KEY    = "xgHc40bf04fb020c"
	ACCESS_SECRET = "6651cccf71b43a5e3c8e2c05bd278a85"
	CLIENT_ID     = "test"
)

func main() {
	username, password := getAMQPAccess(CLIENT_ID, ACCESS_KEY, ACCESS_SECRET)
	client, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s", username, password, RABBITMQ_HOST, RABBITMQ_PORT))
	if err != nil {
		return
	}
	amqpCh, err := client.Channel()
	if err != nil {
		return
	}

	msgCh, err := amqpCh.Consume(ACCESS_KEY, "", true, false, false, true, amqp.Table{})
	if err != nil {
		return
	}

	for i := range msgCh {
		log.Printf("msg from amqp: %s", i.Body)
	}
}

func getAMQPAccess(clientID, accessKey, accessSecret string) (username, password string) {
	timestamp := strconv.Itoa(int(time.Now().Unix()))
	sign, _ := authAMQPSign(clientID, accessKey, timestamp, accessSecret, "hmacsha1")
	username = fmt.Sprintf("%s&%s&%s", clientID, accessKey, timestamp)
	password = sign
	return
}

func authAMQPSign(clientId, accessKey, timestamp, accessSecret, signMethod string) (string, error) {
	src := ""
	src = fmt.Sprintf("clientId%saccessKey%s", clientId, accessKey)
	if timestamp != "" {
		src = src + "timestamp" + timestamp
	}

	var h hash.Hash
	switch signMethod {
	case "hmacsha1":
		h = hmac.New(sha1.New, []byte(accessSecret))
	case "hmacmd5":
		h = hmac.New(md5.New, []byte(accessSecret))
	default:
		return "", errors.New("invalid sign method")
	}

	_, err := h.Write([]byte(src))
	if err != nil {
		return "", err
	}
	return hex.EncodeToString(h.Sum(nil)), nil
}
更新时间: 2022/6/6 下午4:10:39