知识图库
Java知识库
JDK线程池实现原理
Java中的强、软、弱、虚引用
深入拆解Java虚拟机
01 开篇词 | 为什么我们要学习Java虚拟机?
02 Java代码是怎么运行的?
03 Java的基本类型
04 Java虚拟机是如何加载Java类的?
05 JVM是如何执行方法调用的?(上)
06 JVM是如何执行方法调用的?(下)
7 JVM是如何处理异常的?
Java面试常见问题整理
Java面试常见问题-Java 基础篇
Java面试常见问题-Jvm篇
Java面试常见问题-并发篇
Android知识库
Kotlin编程第一课
1 开篇词 | 入门Kotlin有多容易,精通Kotlin就有多难
2 Kotlin基础语法:正式开启学习之旅
3 面向对象:理解Kotlin设计者的良苦用心
4 Kotlin原理:编译器在幕后干了哪些“好事”?
5 实战:构建一个Kotlin版本的四则运算计算器
6 object关键字:你到底有多少种用法?
7 扩展:你的能力边界到底在哪里?
8 高阶函数:为什么说函数是Kotlin的“一等公民”?
9 实战:用Kotlin写一个英语词频统计程序
10 加餐一 | 初识Kotlin函数式编程
11 委托:你为何总是被低估?
12 泛型:逆变or协变,傻傻分不清?
13 注解与反射:进阶必备技能
14 实战:用Kotlin实现一个网络请求框架KtHttp
15 加餐二 | 什么是“表达式思维”?
16 加餐三 | 什么是“不变性思维”?
17 加餐四 | 什么是“空安全思维”?
18 春节刷题计划(一)| 当Kotlin遇上LeetCode
19 春节刷题计划(二)| 一题三解,搞定版本号判断
20 春节刷题计划(三)| 一题双解,搞定求解方程
21 春节刷题计划(四)| 一题三解,搞定分式加减法
22 什么是“协程思维模型”?
23 如何启动协程?
24 挂起函数:Kotlin协程的核心
25 Job:协程也有生命周期吗?
26 Context:万物皆为Context?
27 实战:让KtHttp支持挂起函数
28 期中考试 | 用Kotlin实现图片处理程序
29 题目解答 | 期中考试版本参考实现
30 Channel:为什么说Channel是“热”的?
31 Flow:为什么说Flow是“冷”的?
32 select:到底是在选择什么?
33 并发:协程不需要处理同步吗?
34 异常:try-catch居然会不起作用?坑!
35 实战:让KtHttp支持Flow
36 答疑(一)| Java和Kotlin到底谁好谁坏?
37 集合操作符:你也会“看完就忘”吗?
38 协程源码的地图:如何读源码才不会迷失?
39 图解挂起函数:原来你就是个状态机?
40 加餐五 | 深入理解协程基础元素
41 launch的背后到底发生了什么?
42 Dispatchers是如何工作的?
43 CoroutineScope是如何管理协程的?
44 图解Channel:如何理解它的CSP通信模型?
45 图解Flow:原来你是只纸老虎?
46 Java Android开发者还会有未来吗?
47 Kotlin与Jetpack简直是天生一对!
48 用Kotlin写一个GitHub Trending App
49 结课测试 | “Kotlin编程第一课”100分试卷等你来挑战!
50 结束语 | 不忘初心
Android Framework 教程—基础篇
01 Ubuntu 使用快速入门
02 Make 构建工具入门
03 理解 Unicode UTF-8 UTF-16 UTF-32
04 Linux Shell 脚本编程入门1——核心基础语法
05 SeAndroid 使用极速上手
06 理解 C++ 的 Memory Order
07 AOSP 极速上手
08 系统开发工具推荐
09 添加 Product
运动相关知识
爱上跑步
01 开篇词 | 跑步,不那么简单的事儿
02 跑两步就喘了,是不是我不适合跑步?
03 正确的跑步姿势是什么样的?
04 为什么跑步要先热身?
05 怎样制定你的第一个10公里跑步计划?
06 快跑和慢跑,哪个更燃脂?
07 普通跑步者应该如何选择跑鞋?
08 买跑步装备,不要踩这些坑儿
09 跑步前到底应不应该吃东西?
10 跑步到底伤不伤膝盖?
11 参加了20场马拉松,我是如何准备的?
12 除了马拉松,还能参加哪些跑步赛事?
13 热点问题答疑 :跑完第二天浑身疼,还要不要继续跑?
健身房计划
[DeepSeek]减脂塑形计划
【DeepSeek】训练周期安排
每日餐饮热量控制
减脂期间食物推荐避坑指南
HarmonyOS知识库
其他知识类目
心理学相关
如何学点心理学——关于非专业人士学心理学的一点建议
投射性认同
-
+
首页
44 图解Channel:如何理解它的CSP通信模型?
今天我们来分析Channel的源码。 Kotlin的Channel是一个非常重要的组件,在它出现之前,协程之间很难进行通信,有了它以后,协程之间的通信就轻而易举了。在第22讲当中,我们甚至还借助Channel实现的Actor做到了并发安全。 那么总的来说,Channel是热的,同时它还是一个线程安全的数据管道。而由于Channel具有线程安全的特性,因此,它最常见的用法,就是建立CSP通信模型(Communicating Sequential Processes)。 不过你可能会觉得,CSP太抽象了不好理解,但其实,这个通信模型我们在第22讲里就接触过了。当时我们虽然是通过Actor来实现的,但却是把它当作CSP在用,它们两者的差异其实很小。  关于CSP的理论,它的精确定义其实比较复杂,不过它的核心理念用一句话就可以概括:不要共享内存来通信;而是要用通信来共享内存(Don’t communicate by sharing memory; share memory by communicating)。 可是,我们为什么可以通过Channel实现CSP通信模型呢?这背后的技术细节,则需要我们通过源码来发掘了。 ### Channel背后的数据结构 为了研究Channel的源代码,我们仍然是以一个简单的Demo为例,来跟踪它的代码执行流程。 ```kotlin // 代码段1 fun main() { val scope = CoroutineScope(Job() + mySingleDispatcher) // 1,创建管道 val channel = Channel() scope.launch { // 2,在一个单独的协程当中发送管道消息 repeat(3) { channel.send(it) println("Send: $it") } channel.close() } scope.launch { // 3,在一个单独的协程当中接收管道消息 repeat(3) { val result = channel.receive() println("Receive ${result}") } } println("end") Thread.sleep(2000000L) } /* 输出结果: end Receive 0 Send: 0 Send: 1 Receive 1 Receive 2 Send: 2 */ ``` 以上代码主要分为三个部分,分别是:Channel创建、发送数据、接收数据。 我们先来分析注释1处的Channel创建逻辑。我们都知道Channel其实是一个接口,它是通过组合SendChannel、ReceiveChannel得来的。而注释1处调用的Channel(),其实是一个普通的顶层函数,只是它发挥的作用是构造函数,因此它的首字母是大写的,这跟我们上节课分析的CoroutineScope、Job也是类似的。 ```kotlin // 代码段2 public interface Channel : SendChannel, ReceiveChannel { public fun Channel( capacity: Int = RENDEZVOUS, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: ((E) -> Unit)? = null ): Channel = when (capacity) { RENDEZVOUS -> { if (onBufferOverflow == BufferOverflow.SUSPEND) RendezvousChannel(onUndeliveredElement) else ArrayChannel(1, onBufferOverflow, onUndeliveredElement) } CONFLATED -> { ConflatedChannel(onUndeliveredElement) } UNLIMITED -> LinkedListChannel(onUndeliveredElement) BUFFERED -> ArrayChannel( if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1, onBufferOverflow, onUndeliveredElement ) else -> { if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST) ConflatedChannel(onUndeliveredElement) else ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement) } } ``` 然后,从上面的代码里,我们可以看到,Channel()方法的核心逻辑就是一个when表达式,它根据传入的参数,会创建不同类型的Channel实例,包括了:RendezvousChannel、ArrayChannel、ConflatedChannel、LinkedListChannel。而这些实现类都有一个共同的父类:AbstractChannel。 ```kotliin // 代码段3 internal abstract class AbstractSendChannel( @JvmField protected val onUndeliveredElement: OnUndeliveredElement? ) : SendChannel { protected val queue = LockFreeLinkedListHead() // 省略 internal abstract class AbstractChannel( onUndeliveredElement: OnUndeliveredElement? ) : AbstractSendChannel(onUndeliveredElement), Channel {} } ``` 可以看到,AbstractChannel其实是AbstractSendChannel的内部类,同时它也是AbstractSendChannel的子类。而Channel当中的核心逻辑,都是依靠AbstractSendChannel当中的 LockFreeLinkedListHead 实现的。我们接着来看下它的源代码: ```kotlin // 代码段4 public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { public actual val isEmpty: Boolean get() = next === this } public actual open class LockFreeLinkedListNode { // 1 private val _next = atomic(this) private val _prev = atomic(this) private val _removedRef = atomic(null) } ``` 可见,LockFreeLinkedListHead其实继承自 LockFreeLinkedListNode,而LockFreeLinkedListNode则是实现Channel核心功能的关键数据结构。整个数据结构的核心思想,来自于2004年的一篇论文:《Lock-Free and Practical Doubly Linked List-Based Deques Using Single-Word Compare-and-Swap》。如果你对其中的原理感兴趣,可以去看看这篇论文。这里,为了不偏离主题,我们只分析它的核心思想。 LockFreeLinkedListNode,我们可以将其区分开来看待,即LockFree和LinkedList。 - 第一个部分:LockFree,它是通过CAS(Compare And Swap)的思想来实现的,比如JDK提供的java.util.concurrent.atomic。这一点,我们从上面注释1的atomic也可以看出来。 - 第二个部分:LinkedList,这说明LockFreeLinkedList本质上还是一个链表。简单来说,它其实是一个循环双向链表,而LockFreeLinkedListHead其实是一个哨兵节点,如果你熟悉链表这个数据结构,也可以将其看作是链表当中的虚拟头结点,这个节点本身不会用于存储任何数据,它的next指针会指向整个链表的头节点,而它的prev指针会指向整个链表的尾节点。 为了方便你理解,我画了一张图描述这个链表的结构:  请看图片左边的部分,当链表为空的时候,LockFreeLinkedListHead的next指针和prev指针,都是指向自身的。这也就意味着,这个Head节点是不会存储数据,同时,也是不会被删除的。 然后再看图片右边的部分,当链表有2个元素的时候,这时LockFreeLinkedListHead节点的next指针才是第一个节点,而Head的prev指针则是指向尾结点。 实际上,寻常的循环双向链表是可以在首尾添加元素的,同时也支持“正向遍历、逆向遍历”的。但Channel内部的这个数据结构只能在末尾添加,而它遍历的顺序则是从队首开始的。这样的设计,就让它的行为在变成了先进先出单向队列的同时,还实现了队尾添加操作,只需要O(1)的时间复杂度。  可以说,正是因为LockFreeLinkedList这个数据结构,我们才能使用Channel实现CSP通信模型。 好,在弄清楚LockFreeLinkedList这个数据结构以后,Channel后续的源码分析就很简单了。让我们来分别分析一下Channel的send()、receive()的流程。 ### 发送和接收的流程 我们回过头来看代码段1当中的逻辑,我们分别启动了两个协程,在这两个协程中,我们分别发送了三次数据,也接收了三次数据。程序首先会执行send(),由于Channel在默认情况下容量是0,所以,send()首先会被挂起。让我们来看看这部分的逻辑: ```kotlin // 代码段5 public final override suspend fun send(element: E) { // 1 if (offerInternal(element) === OFFER_SUCCESS) return // 2 return sendSuspend(element) } protected open fun offerInternal(element: E): Any { while (true) { // 3 val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILE // 省略 } } private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont -> loop@ while (true) { if (isFullImpl) { // 4 val send = if (onUndeliveredElement == null) SendElement(element, cont) else SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement) val enqueueResult = enqueueSend(send) when { enqueueResult == null -> { // 5 cont.removeOnCancellation(send) return@sc } enqueueResult is Closed<*> -> { } enqueueResult === ENQUEUE_FAILED -> {} enqueueResult is Receive<*> -> {} else -> error("enqueueSend returned $enqueueResult") } } // 省略 } } ``` 上面的挂起函数send()分为两个部分: - 注释1,尝试向Channel发送数据,如果这时候Channel已经有了消费者,那么if就会为true,send()方法就会return。不过,按照代码段1的逻辑,首次调用send()的时候,Channel还不存在消费者,因此在注释3处,尝试从LockFreeLinkedList取出消费者是不可能的。所以,程序会继续执行注释2处的逻辑。 - 注释2,会调用挂起函数sendSuspend(),它是由高阶函数suspendCancellableCoroutineReusable{} 实现的。我们看它的名字就能知道,它跟suspendCancellableCoroutine{} 是类似的(如果你有些忘了,可以回过头去看看加餐五)。另外,请留意下这个方法的注释4,它会将发送的元素封装成SendElement对象,然后调用enqueueSend()方法,将其添加到LockFreeLinkedList这个队列的末尾。如果enqueueSend()执行成功了,就会执行注释5,注册一个回调,用于将SendElement从队列中移除掉。 如果你足够细心的话,你会发现这整个流程并没有涉及到resume的调用,因此,这也意味着sendSuspend()会一直被挂起,而这就意味着send()会一直被挂起!那么,问题来了,**send()会在什么时候被恢复?** 答案当然是:**receive()被调用的时候!** ```kotlin // 代码段6 public final override suspend fun receive(): E { // 1 val result = pollInternal() @Suppress("UNCHECKED_CAST") if (result !== POLL_FAILED && result !is Closed<*>) return result as E // 2 return receiveSuspend(RECEIVE_THROWS_ON_CLOSE) } protected open fun pollInternal(): Any? { while (true) { // 3 val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED val token = send.tryResumeSend(null) if (token != null) { assert { token === RESUME_TOKEN } //4 send.completeResumeSend() return send.pollResult } send.undeliveredElement() } } // CancellableContinuationImpl private fun dispatchResume(mode: Int) { if (tryResume()) return // 5 dispatch(mode) } internal fun DispatchedTask.dispatch(mode: Int) { // 省略 if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) { val dispatcher = delegate.dispatcher val context = delegate.context if (dispatcher.isDispatchNeeded(context)) { // 6 dispatcher.dispatch(context, this) } else { resumeUnconfined() } } else { // 省略 } } ``` 可以看到,挂起函数receive()的逻辑,跟代码段5当中的send()是类似的。 - 注释1,尝试从LockFree队列当中找出是否有正在被挂起的发送方。具体的逻辑在注释3处,它会从队首开始遍历,寻找Send节点。 接着上面的代码段1的案例分析,此时我们一定是可以从队列中找到一个Send节点的,因此程序会继续执行注释4处的代码。 - 注释4,completeResumeSend(),它最终会调用注释5处的dispatch(mode),而dispatch(mode)其实就是DispatchedTask的dispatch(),是不是觉得很熟悉?这个DispatchedTask其实就是我们在第29讲当中分析过的DispatchedTask,这里的dispatch()就是协程体当中的代码在线程执行的时机。最终,它会执行在Java的Executor之上。至此,我们之前被挂起的send()方法,其实就算是恢复了。 另外,你可以再留意上面的注释2,当LockFree队列当中没有正在挂起的发送方时,它会执行receiveSuspend(),而receiveSuspend()也同样会被挂起: ```kotlin private suspend fun receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable sc@ { cont -> val receive = if (onUndeliveredElement == null) ReceiveElement(cont as CancellableContinuation, receiveMode) else ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation, receiveMode, onUndeliveredElement) while (true) { if (enqueueReceive(receive)) { removeReceiveOnCancel(cont, receive) return@sc } val result = pollInternal() if (result is Closed<*>) { receive.resumeReceiveClosed(result) return@sc } if (result !== POLL_FAILED) { cont.resume(receive.resumeValue(result as E), receive.resumeOnCancellationFun(result as E)) return@sc } } } ``` 所以,这里的逻辑其实跟之前的sendSuspend()是类似的。首先,它会封装一个ReceiveElement对象,并且将其添加到LockFree队列的末尾,如果添加成功的话,这个receiveSuspend就会继续挂起,这就意味着receive()也会被挂起。而receive()被恢复的时机,其实就对应了代码段5当中注释1的代码:offerInternal(element)。 至此,Channel的发送和接收流程,我们就都已经分析完了。按照惯例,我们还是通过一个视频来回顾代码的整体执行流程:  ### 小结 通过这节课,我们知道,Channel其实是一个线程安全的管道。它最常见的用法,就是实现CSP通信模型。它的核心理念是:**不要共享内存来通信;而是要用通信来共享内存**。而Channel之所以可以用来实现CSP通信模型,主要还是因为它底层用到的数据结构:LockFreeLinkedList。 LockFreeLinkedList虽然是一个循环双向链表,但在Channel的源码中,它会被当做先进先出的单向队列,它只在队列末尾插入节点,而遍历则只正向遍历。 还有Channel的send(),它会分为两种情况,一种是当前的LockFree队列当中已经有被挂起的接收方,这时候,send()会恢复Receive节点的执行,并且将数据发送给对方。第二种情况是:当前队列当中没有被挂起的接收方,这时候send()就会被挂起,而被发送的数据会被封装成SendElement对象插入到队列的末尾,等待被下次的receive()恢复执行。 而Channel的receive(),也是分为两种情况,一种是当前的LockFree队列当中已经存在被挂起的发送方,这时候receive()会恢复Send节点的执行,并且取出Send节点当中带过来的数据。第二种情况是:当前队列没有被挂起的发送方,这时候receive()就会被挂起,同时它也会被封装成一个ReceiveElement对象插入到队列的末尾,等待被下次的send()恢复执行。 其实,Kotlin推崇CSP模型进行并发的原因还有很多,比如门槛低、可读性高、扩展性好,还有一点是会被很多人提到的:不容易发生死锁。 不过,这里需要特别注意的是,CSP场景下的并发模型,并非不可能发生死锁,在一些特殊场景下,它也是可能发生死锁的,比如:通信死锁(Communication Deadlock)。因此,CSP也并不是解决所有并发问题的万能解药,我们还是要具体问题具体分析。 ### 思考题 在课程的开头,我们分析了Channel一共有四种实现方式:RendezvousChannel、ArrayChannel、ConflatedChannel、LinkedListChannel,请问你能结合今天学习的知识,分析LinkedListChannel的原理吗? ```kotlin internal open class LinkedListChannel(onUndeliveredElement: OnUndeliveredElement?) : AbstractChannel(onUndeliveredElement) { protected final override val isBufferAlwaysEmpty: Boolean get() = true protected final override val isBufferEmpty: Boolean get() = true protected final override val isBufferAlwaysFull: Boolean get() = false protected final override val isBufferFull: Boolean get() = false protected override fun offerInternal(element: E): Any { while (true) { val result = super.offerInternal(element) when { result === OFFER_SUCCESS -> return OFFER_SUCCESS result === OFFER_FAILED -> { // try to buffer when (val sendResult = sendBuffered(element)) { null -> return OFFER_SUCCESS is Closed<*> -> return sendResult } // otherwise there was receiver in queue, retry super.offerInternal } result is Closed<*> -> return result else -> error("Invalid offerInternal result $result") } } } protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any { while (true) { val result = if (hasReceiveOrClosed) super.offerSelectInternal(element, select) else (select.performAtomicTrySelect(describeSendBuffered(element)) ?: OFFER_SUCCESS) when { result === ALREADY_SELECTED -> return ALREADY_SELECTED result === OFFER_SUCCESS -> return OFFER_SUCCESS result === OFFER_FAILED -> {} // retry result === RETRY_ATOMIC -> {} // retry result is Closed<*> -> return result else -> error("Invalid result $result") } } } } ```
嘿手大叔
2024年10月31日 14:37
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码