前言
JAVA8 的特性相信许多开发者都已经非常了解了,其中很重要的一个特性— Stream
,这个特性让我们能够以声明性的方式在集合上构建复杂的查询。并且,Stream API
为并行执行也提供了一种简单的方法。只需添加 parallel()
语句或使用 parallelStream()
函数。但是如果开发者盲目的使用并行流,不仅不会提高性能,反而会引发致命的错误。
示例
现在给你一组指定的数组,需要你计算出每个数字的乘积。这种情况我们采用 stream 可以用一个链式代码直接一步到位,省去了写 for
循环的这样臃肿的代码。
1 | public static void main(String[] args) { |
如果我们拿到的结果还需要乘以一个固定的数字 m ,那么我们只需要修改代码为:
1 | int total = Arrays.stream(array) |
如果数字过多串行流的顺序执行会不会导致效率很低呢?于是我又尝试采用 parallel()
来执行程序。
1 | public static void main(String[] args) { |
我意外的发现,当 m=1
的时候,串行流和并行流取得的结果是一致的,而当 m 不为 1 时,两者的结果并不匹配。比如当 m=3
的时候,串行流的运算结果为 2578991184
而并行流的运算结果为 1880084573136
。是什么导致了这样的误差呢?
ForkJoinPool
Java Streams 默认使用同一个 ForkJoinPool
执行并行流。 ForkJoinPool 主要就是将任务递归拆分为多个块,然后可以独立地计算每个块。
Stream.reduce
顺序执行的时候是这样的:
并行流的算法其实也非常简单,我们假设任务仅被分成 2 部分:
每个块都多乘了一次 m ,并行流给每个任务块都应用了给定的标识 m 。知道了这个刚刚的 bug 我们也就可以解决了。我们可以将每个标识 m 都采用 1 ,乘 1 并不会影响程序结果,然后得到最后的结果只会再乘以 m :
1 | public static void main(String[] args) { |
通过这个示例,我们再使用流的时候,有哪些小细节应该注意呢?
Reduce 应当可拆分
如果不确定流是串行流(比如它作为函数参数来提供),则 reduce 函数的 identity
不应影响单个任务块的结果。即求和函数的 identity 必须为 0 ,而求乘积的 identity 必须为 1。
合理采用并行流
并不是所有流操作都应该并行化。例如 map
,flatMap
和 filter
是无状态的,因此我们可以采用并行流的做法。而 sort
,distinct
和 limit
不但不会带来性能提升,反而可能会引发错误。
并且,并行化的有效性在很大程度上取决于流的来源。 ArrayList
,array
或IntStream.range
支持随机访问,这意味着它们可以轻松拆分。但是 LinkedList
分解需要 O(n) 时间。还有 Stream.iterate
和 BufferedReader
也要尽量避免采用并行流,因为它们的开头都有未知的长度,因此很难估算拆分来源。
编写单元测试
并行流虽然具备潜在的性能优势,但是同时也可能带来一些致命的错误,因此,每次将串行流替换为并行流时,为了确保功能未被破坏。需要编写一定的单元侧试。
总结
今天主要介绍了并行流一些使用上的小细节,但是对 ForkJoinPool
并没有做具体解析。觉得写的不错的小伙伴点个赞支持一下吧。
...
...
Copyright by @maybelence.