# AMQP 客户端接入
# 1. 服务端订阅概述
消息订阅服务使用AMQP 0.9.1协议,第三方云服务可以直接订阅多种类型的消息,例如:设备状态变化通知、设备事件通知、设备生命周期变更、设备拓扑结构变更、产品变更、OTA 任务通知等,消息会直接转发至您的服务器。目前暂不支持自定义订阅类型。
# 1.1 使用场景
AMQP 客户端以项目
access_key为队列名,接收项目下全部设备的订阅数据;AMQP 客户端可以根据需求设置
exclusive参数:若设置为 false,则可以部署多个消费者程序,消费同一项目的订阅消息,消息随机转发至任一客户端,适用于高并发场景;若设置为 true,则仅允许唯一消费者;
# 1.2 使用限制
不支持规则转发;
当消费者离线或消费慢时,消息会进入堆积队列,消息堆积数量可自定义(私有化部署可任意配置),公共实例默认为 1000 条;
当前版本不支持消息类型自定义;
# 2. 使用 AMQP 服务端订阅
消息订阅服务使用标准AMQP协议,FogCloud 仅支持AMQP 0.9.1版协议标准
# 2.1 amqp 客户端接入说明
# 2.1.1 连接配置
私有化部署连接地址:
连接地址  |  说明  | 
amqp://${AMQP_domain}:5672  |  amqp-tcp 协议,生产环境不建议使用  | 
amqps://${AMQP_domain}:5671  |  amqp-tls 协议  | 
公共实例连接地址:
地域  |  amqp 连接地址  | 
中国  |  amqps://app.amqp.fogcloud.io:5671  | 
德国  |  amqps://amqp-eu.fogcloud.io:5671  | 
# 2.1.2 连接认证
AMQP 0.9.1协议连接的 URI 标准格式为:
amqp://[username:password@]host[:port] (over tcp)
amqps://[username:password@]host[:port] (over tls)
- username
 
固定格式为${clientID}<span>&</span>${accessKey}<span>&</span>${timestamp}
- password
 
使用hmacsha1算法,以${accessSecret}为密钥,对固定格式字符串计算签名,并将得到字节码转为 hex 字符串即为password;被签名的字符串格式为:clientId${clientID}accessKey${accessKey}timestamp${timestamp}
- 参数说明
 
参数  |  是否必填  |  说明  | 
${clientID}  |  是  |  客户端标识,推荐使用 uuid  | 
${accessKey}  |  是  |  项目 ID  | 
${timestamp}  |  是  |  时间戳,单位秒  | 
${accessSecret}  |  -  |  项目 Secret  | 
- 连接参数示例
 
从 FogCloud 后台获取到项目的accessKey和accessSecret,示例:
{
  "accessKey": "U1wLs3cdb6554962",
  "accessSecret": "96ce4bd8b42c6e9035d968f1f6341ba4"
} 生成随机字符串作为${clientID}:"hello";
获取当前时间戳作为${timestamp}:"1675931243";
需要计算签名的固定字符串为:"clientIdhelloaccessKeyU1wLs3cdb6554962timestamp1675931243"
密钥为:"96ce4bd8b42c6e9035d968f1f6341ba4"
使用 hmacsha1 算法计算签名的伪代码:
hmacsha1("96ce4bd8b42c6e9035d968f1f6341ba4", "clientIdhelloaccessKeyU1wLs3cdb6554962timestamp1675931243").toHexString() 
得到 amqp 连接参数的username:"hello&U1wLs3cdb6554962&1675930742"
得到 amqp 连接参数的password:"14058d9291247709420e01f7e18606545882b3cb"
最后得到 amqp uri 为:amqps://hello<span>&</span>U1wLs3cdb6554962<span>&</span>1675930742:14058d9291247709420e01f7e18606545882b3cb@app.amqp.fogcloud.io:5671
# 2.1.3 消费队列
amqp 客户端连接成功后,客户端需要使用access_key作为队列名称,读取队列消息;目前客户端仅拥有队列读权限;
# 2.2 接入示例
# 2.2.1 golang sdk 接入示例
FogCloud 提供了 Golang SDK,您可以查看详细使用说明 (opens new window)
安装依赖:
go get github.com/fogcloud-io/fog-amqp
代码示例:
var (
	AMQPHost        = "localhost"
	AMQPPort        = "5672"
	AMQPTLS         = false
	FogAccessKey    = "xgHc40bf04fb020c"
	FogAccessSecret = "c3bad348bb34390558f7f1aacce17877"
	clientID        = "fog-consumer"
)
func main() {
	cli, err := amqp.NewFogAMQPClient(AMQPHost, AMQPPort, FogAccessKey, FogAccessSecret, clientID, AMQPTLS)
	if err != nil {
		log.Fatal(err)
	}
	err = cli.ConsumeWithHandler(
		100,
		FogAccessKey,
		func(b amqp.Delivery) { log.Printf("amqp receive: %s", b.Body); b.Ack(true) },
		amqp.WithConsumerOptionsConsumerTag("fog-consumer-1"),
		amqp.WithConsumerOptionsAutoAck(false),
		amqp.WithConsumerOptionsNoWait(true),
	)
	if err != nil {
		log.Print(err)
	}
}
 # 2.2.2 python sdk 接入示例
详细使用参见第三方库文档 (opens new window)
安装依赖:
python -m pip install pika --upgrade
代码示例:
#!/usr/bin/env python
import pika
import time,sys,os,ssl,hmac,hashlib,random
amqp_host = "app.amqp.fogcloud.io"
amqp_port = 5671
client_id =  random.randbytes(10).hex()
fog_access_key = "xxx"
fog_access_secret = "xxxxx"
def generate_amqp_username(client_id, access_key, timestamp):
    return '{}&{}&{}'.format(client_id, access_key, timestamp)
def generate_amqp_password(plaintext, secret):
    return hmac.new(bytes(secret, encoding="utf-8"), bytes(plaintext, encoding="utf-8"), hashlib.sha1).hexdigest()
def generate_plaintext(client_id, access_key, timestamp):
    return 'clientId{}accessKey{}timestamp{}'.format(client_id, access_key, timestamp)
def main():
    timestamp = int(time.time())
    username = generate_amqp_username(client_id=client_id, access_key=fog_access_key, timestamp=timestamp)
    password = generate_amqp_password(plaintext=generate_plaintext(client_id=client_id, access_key=fog_access_key, timestamp=timestamp), secret=fog_access_secret)
    context = ssl.SSLContext(ssl.PROTOCOL_TLS)
    conn = pika.BlockingConnection(pika.ConnectionParameters(host=amqp_host, port=amqp_port, virtual_host="/", credentials=pika.PlainCredentials(username=username, password=password), ssl_options=pika.SSLOptions(context=context)))
    ch = conn.channel()
    def callback(ch, method, properties, body):
        print("amqp receive: %r" % body)
    ch.basic_consume(queue=fog_access_key, on_message_callback=callback, auto_ack=True)
    ch.start_consuming()    
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)