
ThreadLocal 是 Java 中线程间数据访问冲突中的一种解决方式,常用于取代锁。其会将线程间共享的数据复制为多份(每个线程的 ThreadLocal 数据维护在一张 map 表中,其中的 key 就是每个线程对象。这张 map 中,key 是 ThreadLocal 自己,value 是复制后的数据)。这样,每个线程操作一份数据,从而解决访问冲突。
但在协程环境中,这个问题变得复杂。因为协程并没有和线程绑定,一个协程在执行过程中是可以切换线程的(例如之前 Dispatchers.Unconfined 的例子)。
Kotlin 通过一系列的扩张方法解决这个问题。当协程从线程 A 切换到 B 然后又回到 A,线程 A 的 ThreadLocal 属性自动恢复。这就是 asContextElement(value:) 方法,它是 ThreadLocal 的扩展方法,用于将一个 ThreadLocal 包装成 ThreadContextElement。ThreadContextElement 会将 ThreadLocal 值复制到协程中,但不与特定线程绑定,value 参数会覆盖 ThreadLocal 中的 value。
来看这个例子。
val threadLocal = ThreadLocal() fun main() = runBlocking { threadLocal.set("Jim") println("${Thread.currentThread()}: ${threadLocal.get()}") // 打印:Thread[main @coroutine#1,5,main]: Jim val job = launch(Dispatchers.Default+threadLocal.asContextElement(value="Ann")){// ThreadLocal -> ThreadContextElement,同时覆盖 threadLocal 的 value,因此,threadLocal 的值会从 Jim 变成 Ann。 println("${Thread.currentThread()}: ${threadLocal.get()}") // 打印:Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann yield() // 将协程切换到(线程池中的)其它线程执行 println("${Thread.currentThread()}: ${threadLocal.get()}") // 打印:Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann } job.join() // 回到主协程,threadLocal 值自动恢复 println("${Thread.currentThread()}, ${threadLocal.get()}") // 打印:Thread[main @coroutine#1,5,main]: Jim }
程序的整个输出如下(打开 coroutines.debug 开关):
Thread[main @coroutine#1,5,main]: Jim Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann Thread[main @coroutine#1,5,main]: Jim
FlowThreadContextElement 不会跟踪 thread local 的值,它只是原来值的拷贝,所以对它的任何修改都不会影响原来 thread local 的值。
如果你在协程中直接使用了 thread local(没有使用 asContextElement 方法),并修改了 thread local 的值,则 thread local 的值可能会变得不确定,如:
val t1 = ThreadLocal.withInitial{ "initial" } runBlocking { // 注意,这里没有用 asContextElement(value:) 方法 println(t1.get()) // 打印:initial withContext(t1.asContextElement("modified")) { println(t1.get()) // 打印:modified } // 返回原来的上下文 println(t1.get())// 可能打印:initial,也可能打印:modified,不确定 }
Flow 表示一个异步流,类似于 Java stream。如果一个函数需要返回多个值,除了使用集合外,还可以返回 Flow:
// 方法一、使用集合 private fun myMethod(): List= listOf("How are you doing", "Not bad","Thank you", "How about you") fun main() { myMethod().forEach { println(it) } } // 方法二、使用 Sequence private fun myMethod1(): Sequence = sequence { for(i in 100..105){ Thread.sleep(1000) // 阻塞主线程 yield(i) // 返回一个元素 } } fun main() { myMethod1().forEach{ println(it) } // 打印 100 ~ 105 } // 方法三、使用协程 private suspend fun myMethod2():List { delay(1000) return listOf("How are you doing", "Not bad","Thank you", "How about you") } fun main()=runBlocking { myMethod2().forEach{ println(it) } // 每一秒打印一个字符串 } // 方法四、使用 Flow fun myMethod3():Flow = flow{ // 调用 flow 构建器,自动变成挂起函数(无需显式使用 suspend 关键字),因而可以调用其他挂起函数 for (i in 1..4){ delay(1000) emit(i) // 类似 yield,异步返回一个结果 } } fun main() = runBlocking { launch{ for (i in 1..2) { println("group $i ----") delay(2000) } } myMethod3().collect { println(it) } // collect 用于接收 emit 返回的一个结果,二者是配对的 }
第一种方法的特点是:
第二种方法的特点是:
Thread.sleep 模拟阻塞方式的求解过程
sequence 中每 yield 一次,就返回给调用者一次,所以5条打印语句并不是一次性打印出来的,而是每隔一秒打印出一个数字。
第三种方法的特点是:
第四种方法的特点是:
求解过程是异步的,不会阻塞线程
返回过程是异步的,每 emit 一次就返回一个,而不是全部一次性返回
因此打印结果会先打印 group 1 ----,然后是数字 1 和 2(间隔1秒),2 秒后是group 2 ----,然后是 3 和 4(间隔1秒):
group 1 ---- 1 2 group 2 ---- 3 4
可以看出 Flow 非常类似于 Sequence,但是它是异步执行的,而 Sequence 是同步执行的。
此外,如果将 Flow 中的 delay 换成 Thread.sleep,则 Flow 的 emit 失去作用,异步返回变成一次性返回:
1 2 3 4 group 1 ---- group 3 ----
很显然,Thread.sleep 阻塞了主线程的执行。
Flow 构建器Flow 通过构建器进行构建,它有 4 种构建器:
之前我们已经使用的就是 flow{…} 构建器 。它的使用较为简单。接下来我们看一下另外3个构建器。
flowOf
定义一个发射固定数量值的流。它接收一个可变参数的值,并对这个可变参数进行循环 emit。
fun main() = runBloking {
flowOf(10,20,29,30).collect{ println(it) }
}
asFlow
集合和序列都提供了 asFlow 扩展方法,可以将自身转换为一个 flow 流。
fun main() = runBlocking {
(1..10).asFlow().collect{ println(it) }
}
中间操作和终止操作
中间操作不会导致 Flow 中的代码被执行,比如 emit 就是一种中间操作,它不会导致代码真正被执行。而终止操作才会执行代码,collect 就是一种终止操作。
private fun myMethod():Flow= flow{ println("I'm fine.") for(i in 1..3) { delay(1000) emit(i) } } fun main() = runBlocking { println("Let it go!") val flow = myMethod() // 调用方法并不会导致代码被执行 println("See you.") }
打印结果是“I’m fine."一句并不会被打印:
Let it go! See you.
要想让 flow 真正执行,需要加上:
flow.collect{ println(it) }
注意,如果多次调用终止操作,将会导致 flow 多次执行。
Flow 的中间操作中可以调用挂起函数的,这与 Sequence 是不同的。
private suspend fun myExecution(input:Int): String{
delay(1000)
return "output:$input"
}
fun main() = runBlocking {
(1..10).asFlow()
.filter{ it -> it > 5 }
.map{ input -> myExecution(input) }// 调用挂起函数 myExecution
.collect{ println(it) }
}
输出结果如下:
output:6 output:7 ... output:10
除了 filter 和 map,Flow 还支持 transform 操作:
private suspend fun myExecution(input:Int): String{
delay(1000)
return "output: $input"
}
fun main() = runBlocking {
(1..10).asFlow().transform{ input ->
emit( "transform: $input")
emit(myExecution(input))
emit("------")
}.collect{ println(it) }
}
输出结果如下:
transform: 1 output: 1 ------ transform: 2 output: 2 ------ ... transform: 10 output: 10 ------
transform 中可以执行任何逻辑,不需要返回任何值,如果需要向 flow 发送值,可以使用 emit (可以发射多次)。它比 filter 或 map 更自由和强大。
中间操作中可以限定元素数量:
fun myNumbers():Flow= flow{ try { emit(1) emit(2) println("----") emit(3) }catch(e: Exception) { println(e) }finally{ println("--finally") } } fun main() = runBlocking { myNumbers().take(2).collect( println(it) )// take(2) 仅获取前 2 个元素 }
输出结果如下:
1 2 kotlinx.coroutines.flow.interal.AbortFlowException: Flow was aborted, no more elements needed finally
可以看到当 take 到指定元素后,flow 会直接抛出一个异常,从而导致流被取消。
所有的终止操作都是挂起函数。终止操作才会真正执行流的代码。除了 collect 操作,flow 还有其他终止操作 ,比如 toList,toSet,reduce 等。
fun main() = runBlocking {
val result = (1..4).asFlow().map( it*it )
.reduce( a,b -> a+b ) // 汇聚操作,将元素值进行累加
println(result)
}
输出结果为 30,因为 1+4+9+16 = 30。
Flow 是顺序执行的。collect 操作运行在终止操作的协程,默认不会开启新的协程。每个 emit 的元素都会由所有的中间操作进行处理,最后由终止操作处理。
fun main() = runBlocking {
(1..10).asFlow().filter{ // 只允许偶数元素通过,奇数被过滤
it % 2 == 0
}.map {// 偶数元素进入 map 操作
println("map: $it")
it
}.collect {
println("collect: $it")
}
}
输出结果如下:
filter: 2 map: 2 collect: 2 filter: 4 map: 4 collect: 4 ... filter: 10 map: 10 collect: 10