规则引擎解析

  • 作者: 凯哥Java(公众号:凯哥Java)
  • thingsboard
  • 时间:2023-05-04 17:20
  • 2144人已阅读
简介 描述规则引擎是Thingsboard的核心部分,基于Actor编程模型,类似事件驱动;每个actor都有自己的消息队列(mailBox)保存接收到的消息actor可以创建actoractor可以将消息转发给其他actor分析Actor模型实现系统中与Actor模型相关类都在工程common/actor下,几个核心类说明如下:TbActorSystemActor系统接口,Actor系统类实现该接口T

🔔🔔好消息!好消息!🔔🔔

 如果您需要注册ChatGPT,想要升级ChatGPT4。凯哥可以代注册ChatGPT账号代升级ChatGPT4

有需要的朋友👉:微信号 kaigejava2022

描述

规则引擎是Thingsboard的核心部分,基于Actor编程模型,类似事件驱动;

每个actor都有自己的消息队列(mailBox)保存接收到的消息

actor可以创建actor

actor可以将消息转发给其他actor

20d658e853274cc190c36d3898059fa8.png

分析
Actor模型实现
系统中与Actor模型相关类都在工程common/actor下,几个核心类说明如下:

TbActorSystem Actor系统接口,Actor系统类实现该接口
TbActor Actor接口,所有Actor需直接或间接实现该接口
TbActorCreator Actor创建接口,所有Actor创建器直接或间接实现该接口
TbActorRef Actor句柄接口,使用TbActorCreator创建Actor后返回此句柄,通常指向Actor的邮箱。
TbActorCtx Actor上下文接口,继承TbActorRef接口。
AbstractTbActor 抽象Actor,实现TbActor接口。
DefaultTbActorSystem Actor系统,用于Dispatcher的创建删除、Actor的创建查找和Actor之间消息传递等。
Dispatcher 调度器,用于调度Actor的创建或消息分发。
TbActorMailbox 邮箱,实现TbActorCtx接口,指向某个actor,同时存储消息到队列并使用调度器处理队列中的消息。
类继承关系图如下:

154be2460fdaca8c6daa4910afdc3884.png

TbActorMailBox中有两个队列用于存储待处理的消息:

af2a64357e30e5600f717d97bc4707b3.png

两个队列的类型是ConcurrentLinkedQueue,非阻塞并发队列,减少了线程切换,性能好。

基于以上类,实现一个Actor代码如下:

类继承关系如下:

f6c22b865b81782c27b5dd1bcaf9724b.png

引擎初始化

回到正题,DefaultActorService构造一个使用Actor模型系统的规则引擎,分为两个阶段:

阶段1:类初始化,方法:initActorSystem

默认会将全部租户下的全部规则节点加载到内存中

阶段2:应用准备完成,方法为onApplicationEvent  ==> AppActor收到消息后,在doProcess方法中进行处理  ==> TenantActorinit阶段进行租户下规则链RuleChainActor创建 ==>

RuleChainActorinit阶段,创建RuleChainActorMessageProcessor并调用其start,进行规则节点RuleNodeActor的创建 ==>

RuleNodeActorinit阶段,创建RuleNodeActorMessageProcessor并调用其start,进行TbNode的创建 ==>

当与设备相关消息开始上传时,TenantActor还会初始化DeviceActor:

973c2893a1b15bb7d697d4cc1e05cd9b.png

消息传输

完成规则初始化后,规则引擎接受消息传输,以普通设备上传时序数据并存储为例,规则引擎处理的核心处理流程如下:

完整流程图:


核心流程图(去掉一些非必要的)如下:

a5749e101218e5087d9c8a49d0bf4eec.png

规则链加载

应用启动时,从DefaultActorService的PostConstruct的initActorSystem方法开始:

通过类DefaultActorSystem的createActor方法创建AppActor,该方法时创建Actor的实现,其他Actor也是通过该方法创建的。创建actor之后会执行actor中的initActor方法

AppActor的init方法:

定时向AppActor的mail box中发送一条消息;

而tryProcessQueue方法最终会执行actor的process方法

在创建租户Actor后,也执行TanentActor的init方法:

同样创建RuleChainActor后也会执行init方法:

进入processor.start方法:

然后查看RuleNodeActor的init方法,发现和RuleChainActor是一样的,定义在抽象类ComponentActor中,顺着查看到RuleNodeActorMessageProcessor的start方法。


Actor System

服务启用后,只有一个AppActor,但会包含所有的TalentActor;一个TalentActor有且只有一条RootRuleChainActor,同时可以有多条RuleChainActor;每个RuleChainActor可以包含多个RuleNodeActor和RuleChainActor

f57893734f827dd9ae10e12d0b21887b.png

Actor执行

Actor的执行逻辑定义在process方法中

通过TbActorRef类的tell方法将消息传递给Actor执行:




TopTop