java8的Stream流水线,用起来很爽,但是他是怎么做到的呢。
Stream流水线记录用户的每一步操作步骤(map,filter等),当用户调用结束操作(Collect,reduce 等)时将用户之前记录的操作一并执行。这里就有几个问题要解决了
- 如何记录用户操作
- 如何将用户操作串联起来
- 如何触发整个任务获取结果
记录串联用户操作
首先查看类图

查看源码我们可以看到 list.stream().map(x->x+”123”).filter(x->x.startsWith(“test”)).collect(Collectors.toList());
这样的流水线实际上每次都创建一个Stream的实现类并且返回,并且新创建的实现类持有上一个的引用。

中间操作分类
- unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()等操作时无状态的 创建的是 StatelessOp对象
- distinct() sorted() sorted() limit() skip() 创建的是 StateFullOp对象
以map为例看如何记录用户串联操作:
1 | public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { |
这里创建了一个 StatelessOp 对象 并返回,看构造函数的第一个参数 新创建的Stream对象,持有了原来对象的引用,这样就以链表的形势记录了用户操作。
sink
这里复写了opWrapSink方法,返回值为sink,sink接口是串联用户操作的关键:
sink 接口 主要有四个方法
- begin 开始遍历元素之前回调
- end 元素遍历后回调
- cancellationRequested 是否可以结束操作,可以让短路操作尽早结束,比如findAny这种就不用遍历所有的元素。
- accept 接受一个元素,并对一个元进行处理
1 | return new Sink.ChainedReference<P_OUT, R>(sink) { |
这里复写了 accept方法,对传入的元素 调用了用户传入的方法(mapper.apply(u)) ,并把其返回结果交个下游的sink 处理。这样就把用户的操作串联起来了。
看一个复杂的sink实现,
1 | private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> { |
如何触发任务
终端操作最终会调用下面这个函数
1 |
|
最终对每个元素触发上面的到sink
1 |
|
获取结果
TerminalSink 一下3种
- FindSink findAny optional 等操作 ,在对应的Sink中记录这个值,等到执行结束时返回。
- AccumulatingSink collect reduce等 归约操作,在sink 中 收集元素放入相应容器中。
- ForEachOp forEach ,在sink 中对元素回调 传入的 consumer 方法。
异步实现
stream.parallel()方法可以将流 转换会并行流,多个线程一起处理该流实现方式就是
java.util.Spliterator#trySplit() 方法,将流分成一个个子流,在不同的线程中执行。