栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

【深入kotlin】 - 初识 Flow

Java 更新时间:发布时间: 百科书网 趣学号
asContextElement

ThreadLocal 是 Java 中线程间数据访问冲突中的一种解决方式,常用于取代锁。其会将线程间共享的数据复制为多份(每个线程的 ThreadLocal 数据维护在一张 map 表中,其中的 key 就是每个线程对象。这张 map 中,key 是 ThreadLocal 自己,value 是复制后的数据)。这样,每个线程操作一份数据,从而解决访问冲突。

但在协程环境中,这个问题变得复杂。因为协程并没有和线程绑定,一个协程在执行过程中是可以切换线程的(例如之前 Dispatchers.Unconfined 的例子)。

Kotlin 通过一系列的扩张方法解决这个问题。当协程从线程 A 切换到 B 然后又回到 A,线程 A 的 ThreadLocal 属性自动恢复。这就是 asContextElement(value:) 方法,它是 ThreadLocal 的扩展方法,用于将一个 ThreadLocal 包装成 ThreadContextElement。ThreadContextElement 会将 ThreadLocal 值复制到协程中,但不与特定线程绑定,value 参数会覆盖 ThreadLocal 中的 value。

来看这个例子。

val threadLocal = ThreadLocal()
fun main() = runBlocking {
  threadLocal.set("Jim")
  println("${Thread.currentThread()}: ${threadLocal.get()}") // 打印:Thread[main @coroutine#1,5,main]: Jim
  
  val job = launch(Dispatchers.Default+threadLocal.asContextElement(value="Ann")){// ThreadLocal -> ThreadContextElement,同时覆盖 threadLocal 的 value,因此,threadLocal 的值会从 Jim 变成 Ann。
    println("${Thread.currentThread()}: ${threadLocal.get()}") // 打印:Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann
    yield() // 将协程切换到(线程池中的)其它线程执行
		println("${Thread.currentThread()}: ${threadLocal.get()}") // 打印:Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann
  }
  job.join() // 回到主协程,threadLocal 值自动恢复
  println("${Thread.currentThread()}, ${threadLocal.get()}") // 打印:Thread[main @coroutine#1,5,main]: Jim
}

程序的整个输出如下(打开 coroutines.debug 开关):

Thread[main @coroutine#1,5,main]: Jim
Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann
Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann
Thread[main @coroutine#1,5,main]: Jim

ThreadContextElement 不会跟踪 thread local 的值,它只是原来值的拷贝,所以对它的任何修改都不会影响原来 thread local 的值。

如果你在协程中直接使用了 thread local(没有使用 asContextElement 方法),并修改了 thread local 的值,则 thread local 的值可能会变得不确定,如:

val t1 = ThreadLocal.withInitial{ "initial" }
runBlocking { // 注意,这里没有用 asContextElement(value:) 方法
	println(t1.get()) // 打印:initial
	withContext(t1.asContextElement("modified")) { 
		println(t1.get()) // 打印:modified
	}
	// 返回原来的上下文
	println(t1.get())// 可能打印:initial,也可能打印:modified,不确定
}
Flow

Flow 表示一个异步流,类似于 Java stream。如果一个函数需要返回多个值,除了使用集合外,还可以返回 Flow:

// 方法一、使用集合
private fun myMethod(): List = listOf("How are you doing", "Not bad","Thank you", "How about you")
fun main() {
  myMethod().forEach {
    println(it)
  }
}
// 方法二、使用 Sequence
private fun myMethod1(): Sequence = sequence {
  for(i in 100..105){
    Thread.sleep(1000) // 阻塞主线程
    yield(i) // 返回一个元素
  }
} 
fun main() {
  myMethod1().forEach{ println(it) } // 打印 100 ~ 105
}
// 方法三、使用协程
private suspend fun myMethod2():List{
  delay(1000)
  return listOf("How are you doing", "Not bad","Thank you", "How about you")
}
fun main()=runBlocking {
  myMethod2().forEach{ println(it) } // 每一秒打印一个字符串
}
// 方法四、使用 Flow
fun myMethod3():Flow = flow{ // 调用 flow 构建器,自动变成挂起函数(无需显式使用 suspend 关键字),因而可以调用其他挂起函数
  for (i in 1..4){
    delay(1000)
    emit(i) // 类似 yield,异步返回一个结果
  }
}
fun main() = runBlocking {
  launch{
    for (i in 1..2) {
      println("group $i ----")
      delay(2000)
    }
  }
  myMethod3().collect { println(it) } // collect 用于接收 emit 返回的一个结果,二者是配对的
}

第一种方法的特点是:

  • myMethod 方法是阻塞的
  • 集合中所有值只能全部求解后一次性返回,不分先后

第二种方法的特点是:

  • Thread.sleep 模拟阻塞方式的求解过程

  • sequence 中每 yield 一次,就返回给调用者一次,所以5条打印语句并不是一次性打印出来的,而是每隔一秒打印出一个数字。

第三种方法的特点是:

  • delay 是异步的,模拟异步求解过程,它不会阻塞主线程
  • myMethod2 是挂起函数,所以只能在另外一个挂起函数或协程中调用,因此 main() 函数使用了 runBlocking
  • 但是仍然是一次性返回所有结果

第四种方法的特点是:

  • 求解过程是异步的,不会阻塞线程

  • 返回过程是异步的,每 emit 一次就返回一个,而不是全部一次性返回

    因此打印结果会先打印 group 1 ----,然后是数字 1 和 2(间隔1秒),2 秒后是group 2 ----,然后是 3 和 4(间隔1秒):

    group 1 ----
    1
    2
    group 2 ----
    3
    4
    

可以看出 Flow 非常类似于 Sequence,但是它是异步执行的,而 Sequence 是同步执行的。

此外,如果将 Flow 中的 delay 换成 Thread.sleep,则 Flow 的 emit 失去作用,异步返回变成一次性返回:

1
2
3
4
group 1 ----
group 3 ----

很显然,Thread.sleep 阻塞了主线程的执行。

Flow 构建器

Flow 通过构建器进行构建,它有 4 种构建器:

  • flowOf(…)
  • asFlow(…)
  • flow{…}
  • channelFlow{ … }

之前我们已经使用的就是 flow{…} 构建器 。它的使用较为简单。接下来我们看一下另外3个构建器。

  • flowOf

    定义一个发射固定数量值的流。它接收一个可变参数的值,并对这个可变参数进行循环 emit。

fun main() = runBloking {
	flowOf(10,20,29,30).collect{ println(it) }
}
  • asFlow

    集合和序列都提供了 asFlow 扩展方法,可以将自身转换为一个 flow 流。

fun main() = runBlocking {
	(1..10).asFlow().collect{ println(it) }
}
中间操作和终止操作

中间操作不会导致 Flow 中的代码被执行,比如 emit 就是一种中间操作,它不会导致代码真正被执行。而终止操作才会执行代码,collect 就是一种终止操作。

private fun myMethod():Flow = flow{
	println("I'm fine.")
	for(i in 1..3) {
		delay(1000)
		emit(i)
	}
}
fun main() = runBlocking {
  println("Let it go!")
  val flow = myMethod() // 调用方法并不会导致代码被执行
  println("See you.")
}

打印结果是“I’m fine."一句并不会被打印:

Let it go!
See you.

要想让 flow 真正执行,需要加上:

flow.collect{ println(it) }

注意,如果多次调用终止操作,将会导致 flow 多次执行。

Flow 的中间操作中可以调用挂起函数的,这与 Sequence 是不同的。

private suspend fun myExecution(input:Int): String{
	delay(1000)
	return "output:$input"
}
fun main() = runBlocking {
  (1..10).asFlow()
  .filter{ it -> it > 5 }
  .map{ input -> myExecution(input) }// 调用挂起函数 myExecution
  .collect{ println(it) }
}

输出结果如下:

output:6
output:7
...
output:10

除了 filter 和 map,Flow 还支持 transform 操作:

private suspend fun myExecution(input:Int): String{
	delay(1000)
	return "output: $input"
}
fun main() = runBlocking {
	(1..10).asFlow().transform{ input -> 
                             emit( "transform: $input")
                             emit(myExecution(input))
                             emit("------")
                            }.collect{ println(it) }
}

输出结果如下:

transform: 1
output: 1
------
transform: 2
output: 2
------
...
transform: 10
output: 10
------

transform 中可以执行任何逻辑,不需要返回任何值,如果需要向 flow 发送值,可以使用 emit (可以发射多次)。它比 filter 或 map 更自由和强大。

中间操作中可以限定元素数量:

fun myNumbers():Flow = flow{
  try {
		emit(1)
 	 emit(2)
  	println("----")
 	 emit(3)
  }catch(e: Exception) {
		println(e)
  }finally{
    println("--finally")
  }
	
}
fun main() = runBlocking {
	myNumbers().take(2).collect( println(it) )// take(2) 仅获取前 2 个元素
}

输出结果如下:

1
2
kotlinx.coroutines.flow.interal.AbortFlowException: Flow was aborted, no more elements needed
finally

可以看到当 take 到指定元素后,flow 会直接抛出一个异常,从而导致流被取消。

所有的终止操作都是挂起函数。终止操作才会真正执行流的代码。除了 collect 操作,flow 还有其他终止操作 ,比如 toList,toSet,reduce 等。

fun main() = runBlocking {
	val result = (1..4).asFlow().map( it*it )
	.reduce( a,b -> a+b ) // 汇聚操作,将元素值进行累加
	println(result)
}

输出结果为 30,因为 1+4+9+16 = 30。

Flow 是顺序执行的。collect 操作运行在终止操作的协程,默认不会开启新的协程。每个 emit 的元素都会由所有的中间操作进行处理,最后由终止操作处理。

fun main() = runBlocking {
	(1..10).asFlow().filter{ // 只允许偶数元素通过,奇数被过滤
    it % 2 == 0
  }.map {// 偶数元素进入 map 操作
    println("map: $it")
    it
  }.collect {
    println("collect: $it")
  }
}

输出结果如下:

filter: 2
map: 2
collect: 2
filter: 4
map: 4
collect: 4
...
filter: 10
map: 10
collect: 10
转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/956643.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号