# AMQP 客户端接入

# 1. 服务端订阅概述

消息订阅服务使用AMQP 0.9.1协议,第三方云服务可以直接订阅多种类型的消息,例如:设备状态变化通知、设备事件通知、设备生命周期变更、设备拓扑结构变更、产品变更、OTA 任务通知等,消息会直接转发至您的服务器。目前暂不支持自定义订阅类型

# 1.1 使用场景

  • AMQP 客户端以项目access_key为队列名,接收项目下全部设备的订阅数据;

  • AMQP 客户端可以根据需求设置exclusive参数:若设置为 false,则可以部署多个消费者程序,消费同一项目的订阅消息,消息随机转发至任一客户端,适用于高并发场景;若设置为 true,则仅允许唯一消费者;

# 1.2 使用限制

  • 不支持规则转发;

  • 当消费者离线或消费慢时,消息会进入堆积队列,消息堆积数量可自定义(私有化部署可任意配置),公共实例默认为 1000 条;

  • 当前版本不支持消息类型自定义;

# 2. 使用 AMQP 服务端订阅

消息订阅服务使用标准AMQP协议,FogCloud 仅支持AMQP 0.9.1版协议标准

# 2.1 amqp 客户端接入说明

# 2.1.1 连接配置

私有化部署连接地址:

连接地址

说明

amqp://${AMQP_domain}:5672

amqp-tcp 协议,生产环境不建议使用

amqps://${AMQP_domain}:5671

amqp-tls 协议

公共实例连接地址:

地域

amqp 连接地址

中国

amqps://app.amqp.fogcloud.io:5671

德国

amqps://amqp-eu.fogcloud.io:5671

# 2.1.2 连接认证

AMQP 0.9.1协议连接的 URI 标准格式为:

amqp://[username:password@]host[:port] (over tcp)

amqps://[username:password@]host[:port] (over tls)

  • username

固定格式为${clientID}<span>&</span>${accessKey}<span>&</span>${timestamp}

  • password

使用hmacsha1算法,以${accessSecret}为密钥,对固定格式字符串计算签名,并将得到字节码转为 hex 字符串即为password;被签名的字符串格式为:clientId${clientID}accessKey${accessKey}timestamp${timestamp}

  • 参数说明

参数

是否必填

说明

${clientID}

客户端标识,推荐使用 uuid

${accessKey}

项目 ID

${timestamp}

时间戳,单位秒

${accessSecret}

-

项目 Secret

  • 连接参数示例

从 FogCloud 后台获取到项目的accessKeyaccessSecret,示例:

{
  "accessKey": "U1wLs3cdb6554962",
  "accessSecret": "96ce4bd8b42c6e9035d968f1f6341ba4"
}

生成随机字符串作为${clientID}:"hello";

获取当前时间戳作为${timestamp}:"1675931243";

需要计算签名的固定字符串为:"clientIdhelloaccessKeyU1wLs3cdb6554962timestamp1675931243"

密钥为:"96ce4bd8b42c6e9035d968f1f6341ba4"

使用 hmacsha1 算法计算签名的伪代码:

hmacsha1("96ce4bd8b42c6e9035d968f1f6341ba4", "clientIdhelloaccessKeyU1wLs3cdb6554962timestamp1675931243").toHexString()

得到 amqp 连接参数的username:"hello&U1wLs3cdb6554962&1675930742"

得到 amqp 连接参数的password:"14058d9291247709420e01f7e18606545882b3cb"

最后得到 amqp uri 为:amqps://hello<span>&</span>U1wLs3cdb6554962<span>&</span>1675930742:14058d9291247709420e01f7e18606545882b3cb@app.amqp.fogcloud.io:5671

# 2.1.3 消费队列

amqp 客户端连接成功后,客户端需要使用access_key作为队列名称,读取队列消息;目前客户端仅拥有队列读权限;

# 2.2 接入示例

# 2.2.1 golang sdk 接入示例

FogCloud 提供了 Golang SDK,您可以查看详细使用说明 (opens new window)

安装依赖:

go get github.com/fogcloud-io/fog-amqp

代码示例:

var (
	AMQPHost        = "localhost"
	AMQPPort        = "5672"
	AMQPTLS         = false
	FogAccessKey    = "xgHc40bf04fb020c"
	FogAccessSecret = "c3bad348bb34390558f7f1aacce17877"
	clientID        = "fog-consumer"
)

func main() {
	cli, err := amqp.NewFogAMQPClient(AMQPHost, AMQPPort, FogAccessKey, FogAccessSecret, clientID, AMQPTLS)
	if err != nil {
		log.Fatal(err)
	}

	err = cli.ConsumeWithHandler(
		100,
		FogAccessKey,
		func(b amqp.Delivery) { log.Printf("amqp receive: %s", b.Body); b.Ack(true) },
		amqp.WithConsumerOptionsConsumerTag("fog-consumer-1"),
		amqp.WithConsumerOptionsAutoAck(false),
		amqp.WithConsumerOptionsNoWait(true),
	)
	if err != nil {
		log.Print(err)
	}
}

# 2.2.2 python sdk 接入示例

详细使用参见第三方库文档 (opens new window)

安装依赖:

python -m pip install pika --upgrade

代码示例:

#!/usr/bin/env python
import pika
import time,sys,os,ssl,hmac,hashlib,random

amqp_host = "app.amqp.fogcloud.io"
amqp_port = 5671
client_id =  random.randbytes(10).hex()
fog_access_key = "xxx"
fog_access_secret = "xxxxx"

def generate_amqp_username(client_id, access_key, timestamp):
    return '{}&{}&{}'.format(client_id, access_key, timestamp)

def generate_amqp_password(plaintext, secret):
    return hmac.new(bytes(secret, encoding="utf-8"), bytes(plaintext, encoding="utf-8"), hashlib.sha1).hexdigest()

def generate_plaintext(client_id, access_key, timestamp):
    return 'clientId{}accessKey{}timestamp{}'.format(client_id, access_key, timestamp)

def main():
    timestamp = int(time.time())
    username = generate_amqp_username(client_id=client_id, access_key=fog_access_key, timestamp=timestamp)
    password = generate_amqp_password(plaintext=generate_plaintext(client_id=client_id, access_key=fog_access_key, timestamp=timestamp), secret=fog_access_secret)
    context = ssl.SSLContext(ssl.PROTOCOL_TLS)
    conn = pika.BlockingConnection(pika.ConnectionParameters(host=amqp_host, port=amqp_port, virtual_host="/", credentials=pika.PlainCredentials(username=username, password=password), ssl_options=pika.SSLOptions(context=context)))
    ch = conn.channel()
    def callback(ch, method, properties, body):
        print("amqp receive: %r" % body)

    ch.basic_consume(queue=fog_access_key, on_message_callback=callback, auto_ack=True)
    ch.start_consuming()    

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)    
更新时间: 2023/2/28 下午3:49:02