# AMQP 客户端接入
# 1. 服务端订阅概述
消息订阅服务使用AMQP 0.9.1
协议,第三方云服务可以直接订阅多种类型的消息,例如:设备状态变化通知、设备事件通知、设备生命周期变更、设备拓扑结构变更、产品变更、OTA 任务通知等,消息会直接转发至您的服务器。目前暂不支持自定义订阅类型。
# 1.1 使用场景
AMQP 客户端以项目
access_key
为队列名,接收项目下全部设备的订阅数据;AMQP 客户端可以根据需求设置
exclusive
参数:若设置为 false,则可以部署多个消费者程序,消费同一项目的订阅消息,消息随机转发至任一客户端,适用于高并发场景;若设置为 true,则仅允许唯一消费者;
# 1.2 使用限制
不支持规则转发;
当消费者离线或消费慢时,消息会进入堆积队列,消息堆积数量可自定义(私有化部署可任意配置),公共实例默认为 1000 条;
当前版本不支持消息类型自定义;
# 2. 使用 AMQP 服务端订阅
消息订阅服务使用标准AMQP
协议,FogCloud 仅支持AMQP 0.9.1
版协议标准
# 2.1 amqp 客户端接入说明
# 2.1.1 连接配置
私有化部署连接地址:
连接地址 | 说明 |
amqp://${AMQP_domain}:5672 | amqp-tcp 协议,生产环境不建议使用 |
amqps://${AMQP_domain}:5671 | amqp-tls 协议 |
公共实例连接地址:
地域 | amqp 连接地址 |
中国 | amqps://app.amqp.fogcloud.io:5671 |
德国 | amqps://amqp-eu.fogcloud.io:5671 |
# 2.1.2 连接认证
AMQP 0.9.1
协议连接的 URI 标准格式为:
amqp://[username:password@]host[:port] (over tcp)
amqps://[username:password@]host[:port] (over tls)
- username
固定格式为${clientID}<span>&</span>${accessKey}<span>&</span>${timestamp}
- password
使用hmacsha1
算法,以${accessSecret}
为密钥,对固定格式字符串计算签名,并将得到字节码转为 hex 字符串即为password
;被签名的字符串格式为:clientId${clientID}accessKey${accessKey}timestamp${timestamp}
- 参数说明
参数 | 是否必填 | 说明 |
${clientID} | 是 | 客户端标识,推荐使用 uuid |
${accessKey} | 是 | 项目 ID |
${timestamp} | 是 | 时间戳,单位秒 |
${accessSecret} | - | 项目 Secret |
- 连接参数示例
从 FogCloud 后台获取到项目的accessKey
和accessSecret
,示例:
{ "accessKey": "U1wLs3cdb6554962", "accessSecret": "96ce4bd8b42c6e9035d968f1f6341ba4" }
生成随机字符串作为${clientID}
:"hello";
获取当前时间戳作为${timestamp}
:"1675931243";
需要计算签名的固定字符串为:"clientIdhelloaccessKeyU1wLs3cdb6554962timestamp1675931243"
密钥为:"96ce4bd8b42c6e9035d968f1f6341ba4"
使用 hmacsha1 算法计算签名的伪代码:
hmacsha1("96ce4bd8b42c6e9035d968f1f6341ba4", "clientIdhelloaccessKeyU1wLs3cdb6554962timestamp1675931243").toHexString()
得到 amqp 连接参数的username
:"hello&U1wLs3cdb6554962&1675930742"
得到 amqp 连接参数的password
:"14058d9291247709420e01f7e18606545882b3cb"
最后得到 amqp uri 为:amqps://hello<span>&</span>U1wLs3cdb6554962<span>&</span>1675930742:14058d9291247709420e01f7e18606545882b3cb@app.amqp.fogcloud.io:5671
# 2.1.3 消费队列
amqp 客户端连接成功后,客户端需要使用access_key
作为队列名称,读取队列消息;目前客户端仅拥有队列读权限;
# 2.2 接入示例
# 2.2.1 golang sdk 接入示例
FogCloud 提供了 Golang SDK,您可以查看详细使用说明 (opens new window)
安装依赖:
go get github.com/fogcloud-io/fog-amqp
代码示例:
var ( AMQPHost = "localhost" AMQPPort = "5672" AMQPTLS = false FogAccessKey = "xgHc40bf04fb020c" FogAccessSecret = "c3bad348bb34390558f7f1aacce17877" clientID = "fog-consumer" ) func main() { cli, err := amqp.NewFogAMQPClient(AMQPHost, AMQPPort, FogAccessKey, FogAccessSecret, clientID, AMQPTLS) if err != nil { log.Fatal(err) } err = cli.ConsumeWithHandler( 100, FogAccessKey, func(b amqp.Delivery) { log.Printf("amqp receive: %s", b.Body); b.Ack(true) }, amqp.WithConsumerOptionsConsumerTag("fog-consumer-1"), amqp.WithConsumerOptionsAutoAck(false), amqp.WithConsumerOptionsNoWait(true), ) if err != nil { log.Print(err) } }
# 2.2.2 python sdk 接入示例
详细使用参见第三方库文档 (opens new window)
安装依赖:
python -m pip install pika --upgrade
代码示例:
#!/usr/bin/env python import pika import time,sys,os,ssl,hmac,hashlib,random amqp_host = "app.amqp.fogcloud.io" amqp_port = 5671 client_id = random.randbytes(10).hex() fog_access_key = "xxx" fog_access_secret = "xxxxx" def generate_amqp_username(client_id, access_key, timestamp): return '{}&{}&{}'.format(client_id, access_key, timestamp) def generate_amqp_password(plaintext, secret): return hmac.new(bytes(secret, encoding="utf-8"), bytes(plaintext, encoding="utf-8"), hashlib.sha1).hexdigest() def generate_plaintext(client_id, access_key, timestamp): return 'clientId{}accessKey{}timestamp{}'.format(client_id, access_key, timestamp) def main(): timestamp = int(time.time()) username = generate_amqp_username(client_id=client_id, access_key=fog_access_key, timestamp=timestamp) password = generate_amqp_password(plaintext=generate_plaintext(client_id=client_id, access_key=fog_access_key, timestamp=timestamp), secret=fog_access_secret) context = ssl.SSLContext(ssl.PROTOCOL_TLS) conn = pika.BlockingConnection(pika.ConnectionParameters(host=amqp_host, port=amqp_port, virtual_host="/", credentials=pika.PlainCredentials(username=username, password=password), ssl_options=pika.SSLOptions(context=context))) ch = conn.channel() def callback(ch, method, properties, body): print("amqp receive: %r" % body) ch.basic_consume(queue=fog_access_key, on_message_callback=callback, auto_ack=True) ch.start_consuming() if __name__ == '__main__': try: main() except KeyboardInterrupt: print('Interrupted') try: sys.exit(0) except SystemExit: os._exit(0)