# 插件开发规范
FogEdge 采用事件驱动架构设计,各个插件间使用事件总线进行数据通信;这种架构使得 FogEdge 具备低耦合和高可扩展性。
# 1 插件名称及约束
插件名称需要以英文字母开头,支持数字、英文字母和
-
的组合,长度为 1~32 个字符;插件名称不能重复;
插件可以使用
fogedge
的事件总线,使用方式是和sidecar
进程进行 http 通信;插件运行时写入本地文件的数据不会保留,在插件更新或卸载后,数据会被删除;若想存储数据,可以在插件中连接数据库应用,并进行数据写入;
# 2 插件创建
插件可以使用fogctl
命令行程序进行创建
- 创建插件命令:
fogctl create addons ${addon_uid} --image=${image_registry} --version=${image_tag} --pub-events="fogedge/app/topic1,fogedge/app/topic2" --sub-events="fogedge/app/topic1,fogedge/app/topic2" --env-vars="a=1,b=2"
- flags 说明:
flags | 格式 | 说明 |
image | 字符串 | 镜像地址 |
version | 字符串 | 镜像版本 |
pub-events | fogedge/app/event1,fogedge/app/event2 | 插件发布的事件列表 |
sub-events | fogedge/app/event1,fogedge/app/event2 | 插件订阅的事件列表 |
env-vars | key1=value1,key2=value2,key3=value3 | 插件内的系统环境变量 |
# 3 插件订阅/发布事件定义
# 3.1 事件数据格式
{
"event_id": "string", //事件唯一标识,不能重复
"event_type": "string", //事件类型
"timestamp": "int", //事件发生时间戳,单位:毫秒
"metadata": "string" //事件元数据,即事件详情
}
# 3.2 事件类型规范
事件类型类似 MQTT 主题,使用斜杆/
进行分层,每个分层的字符支持数字、英文字母和_
的组合,且必须包含前缀fogedge/app
,订阅事件类型可以包含#
和+
通配符,示例:
fogedge/app/device/mqtt/up
fogedge/app/device/mqtt/down
fogedge/app/device/status
fogedge/app/api/+
fogedge/app/rule/#
通配符规则同 MQTT 主题。
- 单层通配符 "+" (U+002B) 用于单个层级匹配通配符,单层通配符必须占据整个层级,例如:
fogedge/app/api/+/log(有效)
fogedge/app/api+ (无效)
如果插件订阅了事件类型fogedge/app/api/+/log
,将会收到以下事件类型的事件消息:
fogedge/app/api/rest/log
fogedge/app/api/grpc/log
但不会匹配到以下事件类型:
fogedge/app/api/rest/log/v1
fogedge/app/api/v1/rest/log
- 多层通配符 "#" (U+0023) 用于匹配任意层级的通配符。其必须占据整个层级 如果插件订阅了事件类型
fogedge/app/#
,将会收到一下事件类型的事件消息:
fogedge/app/device/mqtt/up
fogedge/app/api/rest
# 3.3 系统预定义事件
- mqtt 设备上线事件:
{
"event_id": "string",
"event_type": "fogedge/app/mqtt/login",
"timestamp": "int",
"metadata": "{"clientid": "string", "username":"string", "password": "string"}"
}
- mqtt 设备下线事件:
{
"event_id": "string",
"event_type": "fogedge/app/mqtt/logout",
"timestamp": "int",
"metadata": "{"clientid": "string", "username":"string"}"
}
# 4 插件编写
您可以使用任意编程语言编写您自己的插件,您需要将编写好的插件编译成 docker 镜像,并发布至可以公开访问的镜像仓库;创建插件时需要填写对应镜像地址和镜像版本。插件可以使用系统环境变量进行传参,您可以在创建插件时自定义系统环境变量。
# 4.1 插件可用的系统环境变量
插件可以使用自定义的系统环境变量,也可以直接使用预定义的系统环境变量,预定义的系统环境变量会在创建插件时自动设置。
(注意:如果自定义环境变量和预定义环境变量同名,自定义变量会被覆盖)。
预定义系统环境变量:
FOG_EDGE_UUID
: FogEdge 唯一标识
FOG_MQTT_BROKER_URL
: FogEdge 的 MQTT 地址,用户插件可以直连至该 MQTT 地址,用于和设备通信;
# 4.2 插件和设备通信
插件可以使用预定义系统环境变量FOG_MQTT_BROKER_URL
,连接 MQTT 服务器,并订阅和发布感兴趣的设备相关的 topic。
# 4.3 插件和插件通信
多个插件间通过事件进行通信。FogEdge 在创建插件时会同时创建一个 sidecar 进程,sidecar 进程和插件进程共享一个网络空间,插件通过本地网络和 sidecar 进行通信,来完成事件的订阅和发布。
# 4.3.1 事件订阅
自定义插件需要编写一个监听端口为8000
的 http 服务器,用于接收事件消息。 插件需要实现以下 http 接口:
- 接口地址
POST http://localhost:8000
- 请求体(application/json)
{
"event_id": "string", //事件唯一标识,不能重复
"event_type": "string", //事件类型
"timestamp": "int", //事件发生时间戳,单位:毫秒
"metadata": "string" //事件元数据,即事件详情
}
- 返回
若返回状态码 200,则确认该事件已成功处理;
若返回其他状态码,则该事件未被成功处理,后续会重复收到该事件;
sidecar 会将插件订阅的事件通过以上 http 接口发送给插件。
# 4.3.2 事件发布
插件可以调用 http 接口进行事件发布,事件发布接口为:
- 接口地址
POST http://localhost:8001
- 请求体(application/json)
{
"event_id": "string", //事件唯一标识,不能重复
"event_type": "string", //事件类型
"timestamp": "int", //事件发生时间戳,单位:毫秒
"metadata": "string" //事件元数据,即事件详情
}
sidecar 运行一个监听8001
端口的 http 服务器,用于接收事件发布消息。
# 5 插件示例
# go 代码示例
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
type Event struct {
EventID string `json:"event_id"`
EventType string `json:"event_type"`
Timestamp int64 `json:"timestamp"`
Metadata string `json:"metadata"`
}
func (e Event) String() string {
t := time.UnixMilli(e.Timestamp)
return fmt.Sprintf("event_type[%s], event_id[%s], happen_at[%s], metadata: %s", e.EventType, e.EventID, t, e.Metadata)
}
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", subEventHandler)
svc := &http.Server{
Addr: ":8000",
Handler: mux,
}
go func() {
err := svc.ListenAndServe()
if err != nil {
log.Fatalf("ListenAndServe: %s\n", err)
}
}()
log.Println("http service listen: 8000")
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-sig
err := svc.Shutdown(context.Background())
if err != nil {
log.Printf("http service shutdown: %s\n", err)
}
log.Println("http service shutdown gracefully")
}
func subEventHandler(w http.ResponseWriter, r *http.Request) {
rawBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
eventMsg := Event{}
err = json.Unmarshal(rawBytes, &eventMsg)
if err != nil {
w.WriteHeader(http.StatusOK)
return
}
log.Printf("receive event: %s\n", eventMsg)
err = handleSubEvent(eventMsg)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
}
// 处理订阅事件
func handleSubEvent(event Event) error {
// 事件处理逻辑...
return nil
}
// 发布事件
func handlePubEvent(e Event) error {
data, _ := json.Marshal(e)
buf := bytes.NewBuffer(data)
_, err := http.DefaultClient.Post("http://localhost:8001/", "application/json", buf)
if err != nil {
return err
}
return nil
}
← 系统部署 FogCloud 边云对接插件 →