【更好的理解ThingsBoard规则引擎的】深入理解Akka Actor模型

  • 作者: 凯哥Java(公众号:凯哥Java)
  • java游戏
  • 时间:2023-05-05 14:52
  • 3397人已阅读
简介 CarlHewitt在1973年对Actor模型进行了如下定义:"Actor模型是一个把'Actor'作为并发计算的通用原语".Actor是异步驱动,可以并行和分布式部署及运行的最小颗粒。也就是说,它可以被分配,分布,调度到不同的CPU,不同的节点,乃至不同的时间片上运行,而不影响最终的结果。因此Actor在空间(分布式)和时间(异步驱动)上解耦的。而Akka是

🔔🔔好消息!好消息!🔔🔔

 如果您需要注册ChatGPT,想要升级ChatGPT4。凯哥可以代注册ChatGPT账号代升级ChatGPT4

有需要的朋友👉:微信号 kaigejava2022

Carl Hewitt 在1973年对Actor模型进行了如下定义:"Actor模型是一个把'Actor'作为并发计算的通用原语". Actor是异步驱动,可以并行和分布式部署及运行的最小颗粒。也就是说,它可以被分配,分布,调度到不同的CPU,不同的节点,乃至不同的时间片上运行,而不影响最终的结果。因此Actor在空间(分布式)和时间(异步驱动)上解耦的。而Akka是Lightbend(前身是Typesafe)公司在JVM上的Actor模型的实现。我们在了解actor模型之前,首先来了解actor模型主要是为了解决什么样的问题。

Why modern systems need a new programming model

在akka系统的官网上主要介绍了现代并发编程模型所遇到的问题,里面主要提到了三个点

1) 在面向对象的语言中一个显著的特点是封装,然后通过对象提供的一些方法来操作其状态,但是共享内存的模型下,多线程对共享对象的并发访问会造成并发安全问题。一般会采用加锁的方式去解决

473fa06b23b1a5c4ab25d440acba9e81.png

加锁会带来一些问题

  • 加锁的开销很大,线程上下文切换的开销大

  • 加锁导致线程block,无法去执行其他的工作,被block无法执行的线程,其实也是占据了一种系统资源

  • 加锁在编程语言层面无法防止隐藏的死锁问题


2)我们知道Java中并发模型是通过共享内存来实现。而cpu中会利用局部cache来加速主存的访问,为了解决多线程间缓存不一致的问题,在java中一般会通过使用volatile或者Atmoic来标记变量,通过Jmm的happens before机制来保障多线程间共享变量的可见性。因此从某种意义上来说是没有共享内存的,而是通过cpu将cache line的数据刷新到主存的方式来实现可见。
因此与其去通过标记共享变量或者加锁的方式,依赖cpu缓存更新,倒不如每个并发实例之间只保存local的变量,而在不同的实例之间通过message来传递。

3)call stack的问题
当我们编程模型异步化之后,还有一个比较大的问题是调用栈转移的问题,如下图中主线程提交了一个异步任务到队列中,Worker thread 从队列提取任务执行,调用栈就变成了workthread发起的,当任务出现异常时,处理和排查就变得困难。

73d3450994de001f62528d0553c8695a.png

How the Actor Model Meets the Needs of Modern Distributed Systems

那么akka 的actor的模型是怎样处理这些问题的,actor模型中的抽象主体变为了actor,

  • actor之间可以互相发送message。

  • actor在收到message之后会将其存入其绑定的Mailbox中。

  • Actor从Mailbox中提取消息,执行内部方法,修改内部状态。

  • 继续给其他actor发送message。

可以看到下图,actor内部的执行流程是顺序的,同一时刻只有一个message在进行处理,也就是actor的内部逻辑可以实现无锁化的编程。actor和线程数解耦,可以创建很多actor绑定一个线程池来进行处理,no lock,no block的方式能减少资源开销,并提升并发的性能

65c076f57af0592d69f0bc680d1b4976.png

actor编程样例

下面简单来看一个actor的样例

依赖

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.11</artifactId>
    <version>2.4.20</version>
</dependency>

Main

    public static void main(String[] args) throws InterruptedException {
        final ActorSystem actorSystem = ActorSystem.create("actor-system");
 
        final ActorRef actorRef = actorSystem.actorOf(Props.create(BankActor.class), "bank-actor");
 
        CountDownLatch addCount = new CountDownLatch(20);
        CountDownLatch minusCount = new CountDownLatch(10);
 
        Thread addCountT = new Thread(new Runnable() {
            @Override
            public void run() {
                while (addCount.getCount() > 0) {
                    actorRef.tell(Command.ADD, null);
                    addCount.countDown();
                }
            }
        });
 
        Thread minusCountT = new Thread(new Runnable() {
            @Override
            public void run() {
                while (minusCount.getCount() > 0) {
                    actorRef.tell(Command.MINUS, null);
                    minusCount.countDown();
                }
            }
        });
 
        minusCountT.start();
        addCountT.start();
        minusCount.await();
        addCount.await();
 
        Future<Object> count = Patterns.ask(actorRef, Command.GET, 1000);
        count.onComplete(
                new OnComplete<Object>() {
                    @Override
                    public void onComplete(Throwable failure, Object success) throws Throwable {
                        if (failure != null) {
                            failure.printStackTrace();
                        } else {
                            log.info("Get result from " + success);
                        }
                    }
                },
                Executors.directExecutionContext());
        actorSystem.shutdown();
    }


  1. 创建actor

  2. 通过actorRef和actor并发交互

  3. 获取actor最后的状态


actor

public class BankActor extends UntypedActor {
 
    private static final Logger log = LoggerFactory.getLogger(BankActor.class);
    private int count;
 
    @Override
    public void preStart() throws Exception, Exception {
        super.preStart();
        count = 0;
    }
 
    @Override
    public void onReceive(Object message) throws Throwable {
        // 可以使用枚举或者动态代理类来实现方法调用
        if (message instanceof Command) {
            Command cmd = (Command) message;
            switch (cmd) {
                case ADD:
                    log.info("Add 1 from {} to {}", count, ++count);
                    break;
                case MINUS:
                    log.info("Minus 1 from {} to {}", count, --count);
                    break;
                case GET:
                    log.info("Return current count " + getSender());
                    getSender().tell(count, this.getSelf());
                    break;
                default:
                    log.warn("UnSupport cmd: " + cmd);
            }
        } else {
            log.warn("Discard unknown message: {}", message);
        }
    }
}

枚举:

enum Command {
    ADD,
    MINUS,
    GET
}

执行结果:

15:36:46.376 [actor-system-akka.actor.default-dispatcher-5] INFO akka.BankActor - Add 1 from 0 to 1
15:36:46.385 [actor-system-akka.actor.default-dispatcher-5] INFO akka.BankActor - Minus 1 from 1 to 0
15:36:46.385 [actor-system-akka.actor.default-dispatcher-5] INFO akka.BankActor - Minus 1 from 0 to -1
15:36:46.385 [actor-system-akka.actor.default-dispatcher-5] INFO akka.BankActor - Minus 1 from -1 to -2
15:36:46.386 [actor-system-akka.actor.default-dispatcher-5] INFO akka.BankActor - Minus 1 from -2 to -3
15:36:46.386 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Minus 1 from -3 to -4
15:36:46.386 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Minus 1 from -4 to -5
15:36:46.386 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -5 to -4
15:36:46.386 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Minus 1 from -4 to -5
15:36:46.386 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -5 to -4
15:36:46.386 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Minus 1 from -4 to -5
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Minus 1 from -5 to -6
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Minus 1 from -6 to -7
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -7 to -6
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -6 to -5
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -5 to -4
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -4 to -3
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -3 to -2
15:36:46.393 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -2 to -1
15:36:46.393 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -1 to 0
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 0 to 1
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 1 to 2
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 2 to 3
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 3 to 4
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 4 to 5
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 5 to 6
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 6 to 7
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 7 to 8
15:36:46.395 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 8 to 9
15:36:46.395 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 9 to 10
15:36:46.402 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Return current count Actor[akka://actor-system/temp/$a]
15:36:46.403 [actor-system-akka.actor.default-dispatcher-2] INFO akka.ActorTest - Get result from 10


在这个例子中简单模拟了并发加减操作,例子中最终是+20 , -10,最终结果为10。我们可以看到actor内部并不需要通过加锁或设置volatile的方式来维护其并发安全性,使用起来非常的方便,貌似再也不用担心并发安全的问题了,那么akka具体是帮我们怎么做的呢?

actor内部实现

首先当我们讨论并发安全的时候我们实际上是在说在多线程运行的情况下,如何保证程序的原子性,有序性和可见性,akka也是基于java的应用,因此分析akka是怎么做的也逃不开java内存模型。JSR-133使用happens-before的概念来阐述操作之间的内存可见性,其中主要包括

  • 程序顺序规则:一个线程中的每个操作,happens-before于该线程中的任意后续操作。

  • 监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。

  • volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读。

  • 传递性:如果A happens-before B,且B happens-before C,那么A happens-before C。

基于此,akka也提出了他的happens-before原则:

310a9920ca9889179faf444d0197023e.png

f74d6b61df838d77e227bb3273783ed1.png

简单的说就是akka能够保证在处理下一个message时,对于上一个message对actor内部状态的改动是可见的。

在Stack Overflow上也有小伙伴提出这个问题,可以参考一下
https://stackoverflow.com/questions/27484460/akka-what-is-the-reason-of-processing-messages-one-at-a-time-in-an-actor

那么akka具体是怎么实现的呢?

原子性,有序性

e3d0904bc7a095c97fb28cc231104bfd.png

在akka模型中,每一个actor都有一个相应的邮箱用于存放接受到的message, 然后一个Dispatcher负责线程调度,处理mailbox中的message

Enqueue

a015e3bf9e65fd9ce4026064181f5530.png

在actor接受到邮件后, 首先会将message放入队列中,并提交一个异步任务

registerForExecution

c4e8750b6b03eba4616fd2dad02977e5.png



提交的异步任务是执行MailBox (MailBox是一个Runnable实现)

Mailbox#run

fff1a50a03f00789cea2046da73e21e9.png

主体是先处理系统消息,然后处理MailBox中的用户message

processMailBox

2496a94af76f157330bc52be17cb3e8f.pngd5a23f15f14d9c75c7b22c386ea9dc79.png

处理message的逻辑比较简单,这里可以看到每一个时刻都是只有一个线程在执行mailbox中的message,一次处理多少是有throughput的参数来决定。因此这种模型下消息处理就是顺序的(但是这也有坏处,message处理的主流程不能block,否则message处理都会被拖慢,因此在Flink工程中mater的actor内部大量采用了基于CompletableFuture异步编程的方式)。actor内部的变量,如上面例子中的count,是不会有并发访问的,因此原子性和有序性都得到了保障。

但是,细心的同学可能发现,虽然每次执行的线程都只有一个,但是具体是哪个线程并不是绑定的,两次执行的线程完全可能不相同,甚至可能调度在不同的cpu上。不同线程更改count变量之后,这个变量也没有声明成volatile,如何保证线程1 执行message1更新完后,线程2执行message2时能看到1的变更结果呢?

也有同学产生了类似的问题,可以参考
https://stackoverflow.com/questions/10165603/should-my-akka-actors-properties-be-marked-volatile/

Because an actor will only ever handle one message at any given time, we can guarantee that accessing the actor's local state is safe to access, even though the Actor itself may be switching Threads which it is executing on. Akka guarantees that the state written while handling message M1 are visible to the Actor once it handles M2, even though it may now be running on a different thread (normally guaranteeing this kind of safety comes at a huge cost, Akka handles this for you).

有人回答了Akka handles this for you. 但是我还是很奇怪,akka是怎么做的呢?

可见性

Java Memory Model

首先我们再来回顾一下Java内存模型,JMM内存模型中抽象了四种内存屏障用于处理cpu指令重排带来的线程安全问题。

  • LoadLoad屏障:对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。

  • StoreStore屏障:对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。

  • LoadStore屏障:对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。

  • StoreLoad屏障:对于这样的语句Store1; StoreLoad; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。它的开销是四种屏障中最大的。在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能


详细解释可以参考:https://github.com/openjdk/jdk/blob/6bab0f539fba8fb441697846347597b4a0ade428/src/jdk.internal.vm.ci/share/classes/jdk.vm.ci.code/src/jdk/vm/ci/code/MemoryBarriers.java

除了以上四种,HotSpot VM还定义了特殊的acquire和release内存屏障,acquire防止它后面的读写操作重排序到acquire的前面;release防止它前面的读写操作重排序到release后面。
acquire和release两者放在一起就像一个栅栏,可禁止栅栏内的事务跑到栅栏外,但是它不阻止栅栏外的事务跑到栅栏内。
acquire可以由LoadLoad + LoadStore组成,release可以由StoreStore 和LoadStore组成,他们都没有使用StoreLoad屏障,这意味着x86架构原生就具有acquire和release的语义。

因为x86架构下是强内存模型,只允许Store和Load顺序重排,因此内存barrier实际也只有StoreLoad一种实现。但是这里我们不去过多的纠结不同cpu架构的细节。

-- 《深入解析Java虚拟机HotSpot》 第六章

9cc0096548cc88a3cb760c33864a6120.png

volatile关键字会生成的memory barrier

Synchronizes-With

在回忆完Java内存模型后,我们再来看上面提到的MailBox#run方法的实现

8b30de52171011292373a8fa840d10a6.png

上面代码中Volatile read 和 Volatile write分别通过unsafe工具以Volatile的方式去读取和修改Mailbox对象的内部变量。

// volatile read
Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)   
    
// volatile write
Unsafe.instance.putIntVolatile(this, AbstractMailbox.mailboxStatusOffset, newStatus)

根据上面JMM所定义的内容,实际上会在volatile read write 前后插入相应的memory barrier。形成如下的交互模式。第一个线程执行完mailbox内容后执行volatile write 插入release barrier,第二个线程启动后执行volatile read 插入acquire barrier,这样在这两个线程之间就形成了happens before的关系,从而保障了可见性。

6b6c6da5a6a0a17b20ba35f11c89a5e3.png

因此我们可以得出结论,通过对Mailbox内部volatile变量的读写,借助volatile的内存屏障语义,再加上单线程执行模型,实现了actor 内部状态变量的可见性。

这种方式也被称作synchronize-with,这是一种保障线程间变量可见的机制。在实现中一般会有两种变量。

  • guard variable 门禁变量

  • payload 真正在两个线程间需要传递/共享的变量

在akka这个实现中Mailbox的volatile status变量就是门禁,actor internal state 就是payload。

In Java version 5 onward, every store to a volatile variable is a write-release, while every load from a volatile variable is a read-acquire. Therefore, any volatile variable in Java can act as a guard variable, and can be used to propagate a payload of any size between threads.


另外synchronize with实现有很多种方式,可以通过volatile也可以通过原子变量,还可以通过锁等等。通过volatile实现的也被称做volatile-piggyback

1709e34b3277b80844cca3ac339f7249.png

通过volatile-piggyback这种方式实现有几个好处

  1. 首先显而易见的就是避免使用lock

  2. 用户actor代码可以不用使用volatile来标记变量,可以减少barrier的数量,提升性能。也降低了并发程序的复杂度

  3. 这种方式下不会产生较重的StoreLoad barrier,所以真实的性能开销应该也比较低,特别对于x86架构来说LoadLoad,LoadStore等都是空操作。和基于lock实现的同步就更占优势了。


关于x86强内存模型的补充说明:
可能有同学还有疑问(其实是我之前还有疑问),因为上面提到,在x86强内 存模型下,本身就带有acquire 和 release语义,那加上这个volatile关键字有什么意义呢?当然我们开发java代码的时候肯定要写跨平台统一的代码,但是除开这个之外,volatile关键字还会禁止编译器的指令重排,从而从编译层面保障不会破坏Happen-before的语义。

例如以下HotSpot Vm中,指令内存屏障实现的OrderAccess模块,其中除了StoreLoad,其他三者都只执行了comiler_barrier方法,这里方法中volatile表示禁止编译器优化汇编代码。memory表示告知编译器汇编代码执行内存读取和写入的操作,编译器可能需要在执行汇编前将一些指定的寄存器刷入内存。而StoreLoad方法则是使用指令加上lock前缀来用作内存屏障指令

015a52197106fa829ef0fc0940506245.png


《深入java虚拟机HotSpot》第六章

总结

从我个人理解角度说,akka的actor模型采用基于消息传递的机制实现并发编程,可以实现无锁异步化的编程模型,并且通过亲和性调度等方案,可以更好的利用cpu cache,对于高并发场景来说应该是一大利器。

参考


preshing大神的一系列并发内存模型的文章















TopTop