不要再盲目使用parallelStream啦

Posted by maybelence on 2021-12-31

前言

JAVA8 的特性相信许多开发者都已经非常了解了,其中很重要的一个特性— Stream ,这个特性让我们能够以声明性的方式在集合上构建复杂的查询。并且,Stream API 为并行执行也提供了一种简单的方法。只需添加 parallel() 语句或使用 parallelStream() 函数。但是如果开发者盲目的使用并行流,不仅不会提高性能,反而会引发致命的错误。

示例

现在给你一组指定的数组,需要你计算出每个数字的乘积。这种情况我们采用 stream 可以用一个链式代码直接一步到位,省去了写 for 循环的这样臃肿的代码。

1
2
3
4
5
6
public static void main(String[] args) {
long[] array = new long[]{3, 123, 1, 31, 56, 61, 22};
long total = Arrays.stream(array)
.reduce(1, (acc, next) -> acc * next);
System.out.println(total);
}

如果我们拿到的结果还需要乘以一个固定的数字 m ,那么我们只需要修改代码为:

1
2
int total = Arrays.stream(array)
.reduce(m, (acc, next) -> acc * next);

如果数字过多串行流的顺序执行会不会导致效率很低呢?于是我又尝试采用 parallel() 来执行程序。

1
2
3
4
5
6
7
public static void main(String[] args) {
long[] array = new long[]{3, 123, 1, 31, 56, 61, 22};
long total = Arrays.stream(array)
.parallel()
.reduce(1, (acc, next) -> acc * next);
System.out.println(total);
}

我意外的发现,当 m=1 的时候,串行流和并行流取得的结果是一致的,而当 m 不为 1 时,两者的结果并不匹配。比如当 m=3 的时候,串行流的运算结果为 2578991184 而并行流的运算结果为 1880084573136 。是什么导致了这样的误差呢?

ForkJoinPool

Java Streams 默认使用同一个 ForkJoinPool 执行并行流。 ForkJoinPool 主要就是将任务递归拆分为多个块,然后可以独立地计算每个块。

Stream.reduce 顺序执行的时候是这样的:

未命名文件 (6).png

并行流的算法其实也非常简单,我们假设任务仅被分成 2 部分:

未命名文件 (8).png

每个块都多乘了一次 m ,并行流给每个任务块都应用了给定的标识 m 。知道了这个刚刚的 bug 我们也就可以解决了。我们可以将每个标识 m 都采用 1 ,乘 1 并不会影响程序结果,然后得到最后的结果只会再乘以 m :

1
2
3
4
5
6
7
public static void main(String[] args) {
long[] array = new long[]{3, 123, 1, 31, 56, 61, 22};
long total = Arrays.stream(array)
.parallel()
.reduce(1, (acc, next) -> acc * next) * m;
System.out.println(total);
}

通过这个示例,我们再使用流的时候,有哪些小细节应该注意呢?

Reduce 应当可拆分

如果不确定流是串行流(比如它作为函数参数来提供),则 reduce 函数的 identity 不应影响单个任务块的结果。即求和函数的 identity 必须为 0 ,而求乘积的 identity 必须为 1。

合理采用并行流

并不是所有流操作都应该并行化。例如 mapflatMapfilter 是无状态的,因此我们可以采用并行流的做法。而 sortdistinctlimit 不但不会带来性能提升,反而可能会引发错误。

并且,并行化的有效性在很大程度上取决于流的来源。 ArrayListarrayIntStream.range 支持随机访问,这意味着它们可以轻松拆分。但是 LinkedList 分解需要 O(n) 时间。还有 Stream.iterateBufferedReader 也要尽量避免采用并行流,因为它们的开头都有未知的长度,因此很难估算拆分来源。

编写单元测试

并行流虽然具备潜在的性能优势,但是同时也可能带来一些致命的错误,因此,每次将串行流替换为并行流时,为了确保功能未被破坏。需要编写一定的单元侧试。

总结

今天主要介绍了并行流一些使用上的小细节,但是对 ForkJoinPool 并没有做具体解析。觉得写的不错的小伙伴点个赞支持一下吧。


Copyright by @maybelence.

...

...

00:00
00:00