# AMQP 客户端接入
# 1. 服务端订阅概述
消息订阅服务使用AMQP 0.9.1协议,第三方云服务可以直接订阅多种类型的消息,例如:设备状态变化通知、设备事件通知、设备生命周期变更、设备拓扑结构变更、产品变更、OTA 任务通知等,消息会直接转发至您的服务器。目前暂不支持自定义订阅类型。
# 1.1 使用场景
AMQP 客户端以项目
access_key为队列名,接收项目下全部设备的订阅数据;AMQP 客户端可以根据需求设置
exclusive参数:若设置为 false,则可以部署多个消费者程序,消费同一项目的订阅消息,消息随机转发至任一客户端,适用于高并发场景;若设置为 true,则仅允许唯一消费者;
# 1.2 使用限制
不支持规则转发;
当消费者离线或消费慢时,消息会进入堆积队列,消息堆积数量可自定义(私有化部署可任意配置),公共实例默认为 1000 条;
当前版本不支持消息类型自定义;
# 2. 使用 AMQP 服务端订阅
消息订阅服务使用标准AMQP协议,FogCloud 仅支持AMQP 0.9.1版协议标准
# 2.1 amqp 客户端接入说明
# 2.1.1 连接配置
私有化部署连接地址:
连接地址 | 说明 |
amqp://${AMQP_domain}:5672 | amqp-tcp 协议,生产环境不建议使用 |
amqps://${AMQP_domain}:5671 | amqp-tls 协议 |
公共实例连接地址:
地域 | amqp 连接地址 |
中国 | amqps://app.amqp.fogcloud.io:5671 |
德国 | amqps://amqp-eu.fogcloud.io:5671 |
# 2.1.2 连接认证
AMQP 0.9.1协议连接的 URI 标准格式为:
amqp://[username:password@]host[:port] (over tcp)
amqps://[username:password@]host[:port] (over tls)
- username
固定格式为${clientID}<span>&</span>${accessKey}<span>&</span>${timestamp}
- password
使用hmacsha1算法,以${accessSecret}为密钥,对固定格式字符串计算签名,并将得到字节码转为 hex 字符串即为password;被签名的字符串格式为:clientId${clientID}accessKey${accessKey}timestamp${timestamp}
- 参数说明
参数 | 是否必填 | 说明 |
${clientID} | 是 | 客户端标识,推荐使用 uuid |
${accessKey} | 是 | 项目 ID |
${timestamp} | 是 | 时间戳,单位秒 |
${accessSecret} | - | 项目 Secret |
- 连接参数示例
从 FogCloud 后台获取到项目的accessKey和accessSecret,示例:
{
"accessKey": "U1wLs3cdb6554962",
"accessSecret": "96ce4bd8b42c6e9035d968f1f6341ba4"
} 生成随机字符串作为${clientID}:"hello";
获取当前时间戳作为${timestamp}:"1675931243";
需要计算签名的固定字符串为:"clientIdhelloaccessKeyU1wLs3cdb6554962timestamp1675931243"
密钥为:"96ce4bd8b42c6e9035d968f1f6341ba4"
使用 hmacsha1 算法计算签名的伪代码:
hmacsha1("96ce4bd8b42c6e9035d968f1f6341ba4", "clientIdhelloaccessKeyU1wLs3cdb6554962timestamp1675931243").toHexString()
得到 amqp 连接参数的username:"hello&U1wLs3cdb6554962&1675930742"
得到 amqp 连接参数的password:"14058d9291247709420e01f7e18606545882b3cb"
最后得到 amqp uri 为:amqps://hello<span>&</span>U1wLs3cdb6554962<span>&</span>1675930742:14058d9291247709420e01f7e18606545882b3cb@app.amqp.fogcloud.io:5671
# 2.1.3 消费队列
amqp 客户端连接成功后,客户端需要使用access_key作为队列名称,读取队列消息;目前客户端仅拥有队列读权限;
# 2.2 接入示例
# 2.2.1 golang sdk 接入示例
FogCloud 提供了 Golang SDK,您可以查看详细使用说明 (opens new window)
安装依赖:
go get github.com/fogcloud-io/fog-amqp
代码示例:
var (
AMQPHost = "localhost"
AMQPPort = "5672"
AMQPTLS = false
FogAccessKey = "xgHc40bf04fb020c"
FogAccessSecret = "c3bad348bb34390558f7f1aacce17877"
clientID = "fog-consumer"
)
func main() {
cli, err := amqp.NewFogAMQPClient(AMQPHost, AMQPPort, FogAccessKey, FogAccessSecret, clientID, AMQPTLS)
if err != nil {
log.Fatal(err)
}
err = cli.ConsumeWithHandler(
100,
FogAccessKey,
func(b amqp.Delivery) { log.Printf("amqp receive: %s", b.Body); b.Ack(true) },
amqp.WithConsumerOptionsConsumerTag("fog-consumer-1"),
amqp.WithConsumerOptionsAutoAck(false),
amqp.WithConsumerOptionsNoWait(true),
)
if err != nil {
log.Print(err)
}
}
# 2.2.2 python sdk 接入示例
详细使用参见第三方库文档 (opens new window)
安装依赖:
python -m pip install pika --upgrade
代码示例:
#!/usr/bin/env python
import pika
import time,sys,os,ssl,hmac,hashlib,random
amqp_host = "app.amqp.fogcloud.io"
amqp_port = 5671
client_id = random.randbytes(10).hex()
fog_access_key = "xxx"
fog_access_secret = "xxxxx"
def generate_amqp_username(client_id, access_key, timestamp):
return '{}&{}&{}'.format(client_id, access_key, timestamp)
def generate_amqp_password(plaintext, secret):
return hmac.new(bytes(secret, encoding="utf-8"), bytes(plaintext, encoding="utf-8"), hashlib.sha1).hexdigest()
def generate_plaintext(client_id, access_key, timestamp):
return 'clientId{}accessKey{}timestamp{}'.format(client_id, access_key, timestamp)
def main():
timestamp = int(time.time())
username = generate_amqp_username(client_id=client_id, access_key=fog_access_key, timestamp=timestamp)
password = generate_amqp_password(plaintext=generate_plaintext(client_id=client_id, access_key=fog_access_key, timestamp=timestamp), secret=fog_access_secret)
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
conn = pika.BlockingConnection(pika.ConnectionParameters(host=amqp_host, port=amqp_port, virtual_host="/", credentials=pika.PlainCredentials(username=username, password=password), ssl_options=pika.SSLOptions(context=context)))
ch = conn.channel()
def callback(ch, method, properties, body):
print("amqp receive: %r" % body)
ch.basic_consume(queue=fog_access_key, on_message_callback=callback, auto_ack=True)
ch.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)