# 插件开发规范

FogEdge 采用事件驱动架构设计,各个插件间使用事件总线进行数据通信;这种架构使得 FogEdge 具备低耦合和高可扩展性。

# 1 插件名称及约束

  • 插件名称需要以英文字母开头,支持数字、英文字母和-的组合,长度为 1~32 个字符;

  • 插件名称不能重复;

  • 插件可以使用fogedge的事件总线,使用方式是和sidecar进程进行 http 通信;

  • 插件运行时写入本地文件的数据不会保留,在插件更新或卸载后,数据会被删除;若想存储数据,可以在插件中连接数据库应用,并进行数据写入;

# 2 插件创建

插件可以使用fogctl命令行程序进行创建

  • 创建插件命令:
fogctl create addons ${addon_uid} --image=${image_registry} --version=${image_tag} --pub-events="fogedge/app/topic1,fogedge/app/topic2" --sub-events="fogedge/app/topic1,fogedge/app/topic2" --env-vars="a=1,b=2"
  • flags 说明:

flags

格式

说明

image

字符串

镜像地址

version

字符串

镜像版本

pub-events

fogedge/app/event1,fogedge/app/event2

插件发布的事件列表

sub-events

fogedge/app/event1,fogedge/app/event2

插件订阅的事件列表

env-vars

key1=value1,key2=value2,key3=value3

插件内的系统环境变量

# 3 插件订阅/发布事件定义

# 3.1 事件数据格式

{
    "event_id": "string",   //事件唯一标识,不能重复
    "event_type": "string", //事件类型
    "timestamp": "int",     //事件发生时间戳,单位:毫秒
    "metadata": "string"    //事件元数据,即事件详情
}

# 3.2 事件类型规范

事件类型类似 MQTT 主题,使用斜杆/进行分层,每个分层的字符支持数字、英文字母和_的组合,且必须包含前缀fogedge/app,订阅事件类型可以包含#+通配符,示例:

fogedge/app/device/mqtt/up
fogedge/app/device/mqtt/down
fogedge/app/device/status
fogedge/app/api/+
fogedge/app/rule/#

通配符规则同 MQTT 主题。

  • 单层通配符 "+" (U+002B) 用于单个层级匹配通配符,单层通配符必须占据整个层级,例如:
fogedge/app/api/+/log(有效)
fogedge/app/api+ (无效)

如果插件订阅了事件类型fogedge/app/api/+/log,将会收到以下事件类型的事件消息:

fogedge/app/api/rest/log
fogedge/app/api/grpc/log

但不会匹配到以下事件类型:

fogedge/app/api/rest/log/v1
fogedge/app/api/v1/rest/log
  • 多层通配符 "#" (U+0023) 用于匹配任意层级的通配符。其必须占据整个层级 如果插件订阅了事件类型fogedge/app/#,将会收到一下事件类型的事件消息:
fogedge/app/device/mqtt/up
fogedge/app/api/rest

# 3.3 系统预定义事件

  • mqtt 设备上线事件:
{
  "event_id": "string",
  "event_type": "fogedge/app/mqtt/login",
  "timestamp": "int",
  "metadata": "{"clientid": "string", "username":"string", "password": "string"}"
}
  • mqtt 设备下线事件:
{
  "event_id": "string",
  "event_type": "fogedge/app/mqtt/logout",
  "timestamp": "int",
  "metadata": "{"clientid": "string", "username":"string"}"
}

# 4 插件编写

您可以使用任意编程语言编写您自己的插件,您需要将编写好的插件编译成 docker 镜像,并发布至可以公开访问的镜像仓库;创建插件时需要填写对应镜像地址和镜像版本。插件可以使用系统环境变量进行传参,您可以在创建插件时自定义系统环境变量。

# 4.1 插件可用的系统环境变量

插件可以使用自定义的系统环境变量,也可以直接使用预定义的系统环境变量,预定义的系统环境变量会在创建插件时自动设置。

(注意:如果自定义环境变量和预定义环境变量同名,自定义变量会被覆盖)。

预定义系统环境变量:

FOG_EDGE_UUID: FogEdge 唯一标识

FOG_MQTT_BROKER_URL: FogEdge 的 MQTT 地址,用户插件可以直连至该 MQTT 地址,用于和设备通信;

# 4.2 插件和设备通信

插件可以使用预定义系统环境变量FOG_MQTT_BROKER_URL,连接 MQTT 服务器,并订阅和发布感兴趣的设备相关的 topic。

# 4.3 插件和插件通信

多个插件间通过事件进行通信。FogEdge 在创建插件时会同时创建一个 sidecar 进程,sidecar 进程和插件进程共享一个网络空间,插件通过本地网络和 sidecar 进行通信,来完成事件的订阅和发布。

# 4.3.1 事件订阅

自定义插件需要编写一个监听端口为8000的 http 服务器,用于接收事件消息。 插件需要实现以下 http 接口:

  • 接口地址
POST http://localhost:8000
  • 请求体(application/json)
{
    "event_id": "string",   //事件唯一标识,不能重复
    "event_type": "string", //事件类型
    "timestamp": "int",     //事件发生时间戳,单位:毫秒
    "metadata": "string"    //事件元数据,即事件详情
}
  • 返回

若返回状态码 200,则确认该事件已成功处理;

若返回其他状态码,则该事件未被成功处理,后续会重复收到该事件;

sidecar 会将插件订阅的事件通过以上 http 接口发送给插件。

# 4.3.2 事件发布

插件可以调用 http 接口进行事件发布,事件发布接口为:

  • 接口地址
POST http://localhost:8001
  • 请求体(application/json)
{
    "event_id": "string",   //事件唯一标识,不能重复
    "event_type": "string", //事件类型
    "timestamp": "int",     //事件发生时间戳,单位:毫秒
    "metadata": "string"    //事件元数据,即事件详情
}

sidecar 运行一个监听8001端口的 http 服务器,用于接收事件发布消息。

# 5 插件示例

# go 代码示例

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"
)

type Event struct {
	EventID   string `json:"event_id"`
	EventType string `json:"event_type"`
	Timestamp int64  `json:"timestamp"`
	Metadata  string `json:"metadata"`
}

func (e Event) String() string {
	t := time.UnixMilli(e.Timestamp)
	return fmt.Sprintf("event_type[%s], event_id[%s], happen_at[%s], metadata: %s", e.EventType, e.EventID, t, e.Metadata)
}

func main() {
	mux := http.NewServeMux()
	mux.HandleFunc("/", subEventHandler)

	svc := &http.Server{
		Addr:    ":8000",
		Handler: mux,
	}
	go func() {
		err := svc.ListenAndServe()
		if err != nil {
			log.Fatalf("ListenAndServe: %s\n", err)
		}
	}()

	log.Println("http service listen: 8000")
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	<-sig
	err := svc.Shutdown(context.Background())
	if err != nil {
		log.Printf("http service shutdown: %s\n", err)
	}

	log.Println("http service shutdown gracefully")
}

func subEventHandler(w http.ResponseWriter, r *http.Request) {
	rawBytes, err := ioutil.ReadAll(r.Body)
	if err != nil {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	eventMsg := Event{}
	err = json.Unmarshal(rawBytes, &eventMsg)
	if err != nil {
		w.WriteHeader(http.StatusOK)
		return
	}

	log.Printf("receive event: %s\n", eventMsg)
	err = handleSubEvent(eventMsg)
	if err != nil {
		w.WriteHeader(http.StatusInternalServerError)
	} else {
		w.WriteHeader(http.StatusOK)
	}
}

// 处理订阅事件
func handleSubEvent(event Event) error {
	// 事件处理逻辑...
	return nil
}

// 发布事件
func handlePubEvent(e Event) error {
	data, _ := json.Marshal(e)
	buf := bytes.NewBuffer(data)
	_, err := http.DefaultClient.Post("http://localhost:8001/", "application/json", buf)
	if err != nil {
		return err
	}
	return nil
} 
更新时间: 2024/7/31 14:20:29