一套万能的异步处理方案(VIP珍藏版)

简介 大家好,我是凯哥Java(小绿泡:kaigejava)。 前言良好的系统设计必须要做到开闭原则,随着业务的不断迭代更新,核心代码也会被不断改动,出错的概率也会大大增加。但是大部分增加的功能都是在扩展原有的功能,既要保证性能又要保证质量,我们往往都会使用异步线程池来处理,然而却增加了很多不确定性因素。由此我设计了一套通用的异步处理SDK,可以很轻松的实现各种异步处理!目的通过异步处理不仅能

🔔🔔🔔好消息!好消息!🔔🔔🔔

有需要的朋友👉:联系凯哥 微信号 kaigejava2022

大家好,我是凯哥Java(小绿泡:kaigejava)。 

前言

良好的系统设计必须要做到开闭原则,随着业务的不断迭代更新,核心代码也会被不断改动,出错的概率也会大大增加。

但是大部分增加的功能都是在扩展原有的功能,既要保证性能又要保证质量,我们往往都会使用异步线程池来处理,然而却增加了很多不确定性因素。

由此我设计了一套通用的异步处理 SDK,可以很轻松的实现各种异步处理!

目的

通过异步处理不仅能够保证方法能够得到有效的执行而且不影响主流程

更重要的是各种兜底方法保证数据不丢失,从而达到最终一致性

优点

无侵入设计,独立数据库,独立定时任务,独立消息队列,独立人工执行界面(统一登录认证)

使用 spring 事务事件机制,即使异步策略解析失败也不会影响业务

如果你的方法正在运行事务,会等事务提交后或回滚后再处理事件

就算事务提交了,异步策略解析失败了,我们还有兜底方案执行(除非数据库有问题,消息队列有问题,方法有 bug)

原理

容器初始化 bean 完成后遍历所有方法,把有@AsyncExec 注解的方法缓存起来

方法运行时通过 AOP 切面发布事件

事务事件监听处理异步执行策略

@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)


fallbackExecution=true 没有事务正在运行,依然处理事件TransactionPhase.AFTER_COMPLETION 事务提交后和事务回滚后都处理事件

组件

kafka 消息队列xxl job 定时任务mysql 数据库spring 切面vue 界面

设计模式

策略模板方法动态代理

d2c814b0f28775b648db5bfcd912a35f.png


数据库脚本

CREATE TABLE`async_scene` (

`id`bigintNOTNULL AUTO_INCREMENT COMMENT'主键ID',

`application_name`varchar(100) NOTNULLDEFAULT''COMMENT'应用名称',

`method_sign`varchar(50) NOTNULLDEFAULT''COMMENT'方法签名',

`scene_name`varchar(200) NOTNULLDEFAULT''COMMENT'业务场景描述',

`async_type`varchar(50) NOTNULLDEFAULT''COMMENT'异步策略类型',

`queue_name`varchar(200) NOTNULLDEFAULT''COMMENT'队列名称',

`theme_value`varchar(100) NOTNULLDEFAULT''COMMENT'消费主题',

`exec_count`intNOTNULLDEFAULT'0'COMMENT'失败重试次数',

`exec_deleted`intNOTNULLDEFAULT'0'COMMENT'执行后是否删除',

`async_version`varchar(50) NOTNULLDEFAULT''COMMENT'组件版本号',

`create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间',

`update_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'更新时间',

`cdc_crt_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'记录新增时间',

`cdc_upd_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'记录修改时间',

  PRIMARY KEY (`id`) USING BTREE,

UNIQUEKEY`uk_application_sign` (`application_name`,`method_sign`) USING BTREE,

KEY`idx_cdc_upd_time` (`cdc_upd_time`)

) ENGINE=InnoDBCOMMENT='异步场景表';




异步策略

14b6c7c328a5318f38f1ed9132f49f21.png

安全级别

9f07435739dbe5156da063f7b6797e90.png

执行状态

c8715dab8e43ab8cf5407e4ae6774382.png

流程图

b738f0d9b2e49113ee01ec49ade63559.png


3a7919026be27b020ee041490de9df91.png

e50f979ef22154e0f220dd32b8c23f73.png

apollo 配置

# 开关:默认关闭

async.enabled=true


# 应用名称

spring.application.name=xxx


# 数据源 druid

async.datasource.driver-class-name=com.mysql.jdbc.Driver

async.datasource.url=jdbc:mysql://127.0.0.1:3306/fc_async?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true

async.datasource.username=user

async.datasource.password=xxxx

async.datasource.filters=config

async.datasource.connectionProperties=config.decrypt=true;config.decrypt.key=yyy

#静态地址

spring.resources.add-mappings=true

spring.resources.static-locations=classpath:/static/



# 以下配置都有默认值

# 核心线程数

async.executor.thread.corePoolSize=10

# 最大线程数

async.executor.thread.maxPoolSize=50

# 队列容量

async.executor.thread.queueCapacity=10000

# 活跃时间

async.executor.thread.keepAliveSeconds=600


# 执行成功是否删除记录:默认删除

async.exec.deleted=true


# 自定义队列名称前缀:默认应用名称

async.topic=${spring.application.name}


# 重试执行次数:默认5次

async.exec.count=5


# 重试最大查询数量

async.retry.limit=100


# 补偿最大查询数量

async.comp.limit=100


# 登录拦截:默认false

async.login=false

用法

1,异步开关


scm.async.enabled=true

2,在需要异步执行的方法加注解 (必须是spring代理方法)

@AsyncExec(type = AsyncExecEnum.SAVE_ASYNC, remark = "数据字典")

3,人工处理地址

http://localhost:8004/async/index.html


注意

1,应用名称


spring.application.name

2,队列名称

${async.topic:${spring.application.name}}_async_queue

3,自己业务要做幂等
4,一个应用公用一个队列


自产自消


5,定时任务

异步重试定时任务(2分钟重试一次,可配置重试次数)


异步补偿定时任务(一小时补偿一次,创建时间在一小时之前的)

效果展示

48160eb99d7ce7e9d1ea85579200847a.png


GitHub 地址

https://github.com/xiongyanokok/fc-async

juejin.cn/post/7266087843239084090



TopTop