Stream流常规用法
JDK8
对Stream
接口的定义,Stream
接口继承BaseStream
接口
1 | public interface Stream<T> extends BaseStream<T, Stream<T>> |
BaseStream
接口定义
1 | public interface BaseStream<T, S extends BaseStream<T, S>> extends AutoCloseable |
BaseStream接口
Base interface for streams, which are sequences of elements supporting sequential and parallel aggregate operations.
1 | BaseStream是流的基本接口,它支持顺序和并行聚合操作的元素序列。 |
1 | // demo 用法:计算红色的权重和 |
BaseStream
接口定义了7
个接口方法,重写了AutoCloseable
的close
方法
1 | Iterator<T> iterator(); // 流迭代器 |
Stream接口
Stream
A sequence of elements supporting sequential and parallel aggregate operations.
支持顺序和并行聚合操作的元素序列
1 | Stream(流)是一个来自数据源的元素队列并支持聚合操作 |
为了执行计算,流操作被组合成一个流管道。一个流管道由一个源(可能是一个数组,一个集合,一个生成器函数,一个I/O
通道等),零个或多个中间操作(将一个流转换为另一个流,例如filter
和终端操作(产生结果,如count
或者forEach
))
除了Stream
这种对象引用流,还有一些特殊流IntStream
,LongStream
,DoubleStream
等。
图中4种stream接口继承自BaseStream
,其中IntStream, LongStream, DoubleStream
对应三种基本类型(int, long, double
,注意不是包装类型),Stream
对应所有剩余类型的stream视图。为不同数据类型设置不同stream接口,可以
- 提高性能
- 增加特定接口函数
你可能会奇怪为什么不把IntStream
等设计成Stream
的子接口?毕竟这接口中的方法名大部分是一样的。答案是这些方法的名字虽然相同,但是返回类型不同,如果设计成父子接口关系,这些方法将不能共存,因为Java不允许只有返回类型不同的方法重载
虽然大部分情况下stream是容器调用Collection.stream()
方法得到的,但stream和collections有以下不同
- 无存储。stream不是一种数据结构,它只是某种数据源的一个视图,数据源可以是一个数组,Java容器或I/O channel等。
- 为函数式编程而生。对stream的任何修改都不会修改背后的数据源,比如对stream执行过滤操作并不会删除被过滤的元素,而是会产生一个不包含被过滤元素的新stream。
- 惰式执行。stream上的操作并不会立即执行,只有等到用户真正需要结果的时候才会执行。
- 可消费性。stream只能被“消费”一次,一旦遍历过就会失效,就像容器的迭代器那样,想要再次遍历必须重新生成
对stream的操作分为为两类,**中间操作(*intermediate operations*)和结束操作(*terminal operations*)**,二者特点是:
- 中间操作总是会惰式执行,调用中间操作只会生成一个标记了该操作的新stream,仅此而已。
- 结束操作会触发实际计算,计算发生时会把所有中间操作积攒的操作以pipeline的方式执行,这样可以减少迭代次数。计算完成之后stream就会失效。
Stream API
中大量使用Lambda
表达式作为回调方法,但这并不是关键。理解Stream我们更关心的是另外两个问题:流水线和自动并行。使用Stream
或许很容易写入如下形式的代码:
1 | int longestStringLengthStartingWithA |
上述代码求出以字母*A
开头的字符串的最大长度,一种直白的方式是为每一次函数调用都执一次迭代,这样做能够实现功能,但效率上肯定是无法接受的。类库的实现着使用流水线(Pipeline*)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中尽可能多的执行用户指定的操作。
Stream操作分类 | ||
---|---|---|
中间操作(Intermediate operations ) |
无状态(Stateless ) |
unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek() |
有状态(Stateful ) |
distinct() sorted() sorted() limit() skip() |
|
结束操作(Terminal operations ) |
非短路操作 | forEach() forEachOrdered() toArray() reduce() collect() max() min() count() |
短路操作(short-circuiting ) |
anyMatch() allMatch() noneMatch() findFirst() findAny() |
Stream
上的所有操作分为两类:中间操作和结束操作,中间操作只是一种标记,只有结束操作才会触发实际计算。中间操作又可以分为无状态的(Stateless)和有状态的(Stateful),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序是有状态操作,在读取所有元素之前并不能确定排序结果;结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,比如找到第一个满足条件的元素。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。
区分中间操作和结束操作最简单的方法,就是看方法的返回值,返回值为stream的大都是中间操作,否则是结束操作
Stream的实现原理
仍然考虑上述求最长字符串的程序,一种直白的流水线实现方式是为每一次函数调用都执一次迭代,并将处理中间结果放到某种数据结构中(比如数组,容器等)。具体说来,就是调用filter()
方法后立即执行,选出所有以A开头的字符串并放到一个列表list1
中,之后让list1传递给mapToInt()
方法并立即执行,生成的结果放到list2
中,最后遍历list2找出最大的数字作为最终结果。程序的执行流程如如所示:
这样做实现起来非常简单直观,但有两个明显的弊端:
- 迭代次数多。迭代次数跟函数调用的次数相等
- 频繁产生中间结果。每次函数调用都产生一次中间结果,存储开销无法接受
这些弊端使得效率底下,根本无法接受。如果不使用Stream API我们都知道上述代码该如何在一次迭代中完成,大致是如下形式:
1 | int longest = 0; |
采用这种方式我们不但减少了迭代次数,也避免了存储中间结果,显然这就是流水线,因为我们把三个操作放在了一次迭代当中。只要我们事先知道用户意图,总是能够采用上述方式实现跟Stream API等价的功能,但问题是Stream类库的设计者并不知道用户的意图是什么。如何在无法假设用户行为的前提下实现流水线,是类库的设计者要考虑的问题
Stream流水线解决方案
我们大致能够想到,应该采用某种方式记录用户每一步的操作,当用户调用结束操作时将之前记录的操作叠加到一起在一次迭代中全部执行掉。沿着这个思路,有几个问题需要解决:
- 用户的操作如何记录?
- 操作如何叠加?
- 叠加之后的操作如何执行?
- 执行后的结果(如果有)在哪里?
操作如何记录
注意这里使用的是操作(operation
)一词,指的是Stream
中间操作的操作,很多Stream
操作会需要一个回调函数(Lambda
表达式),因此一个完整的操作是**数据来源,操作,回调函数
**构成的三元组。Stream
中使用Stage
的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper
来代表Stage
,将具有先后顺序的各个Stage
连到一起,就构成了整个流水线。跟Stream
相关类和接口的继承关系图示。
还有IntPipeline
, LongPipeline
, DoublePipeline
没在图中画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟ReferencePipeline
是并列关系。图中Head
用于表示第一个Stage
,即调用调用诸如Collection.stream()
方法产生的Stage
,很显然这个Stage
里不包含任何操作;StatelessOp
和StatefulOp
分别表示无状态和有状态的Stage
,对应于无状态和有状态的中间操作。
Stream流水线组织结构示意图如下:
图中通过Collection.stream()
方法得到Head
也就是stage0
,紧接着调用一系列的中间操作,不断产生新的Stream
。这些Stream
对象以双向链表的形式组织在一起,构成整个流水线,由于每个Stage
都记录了前一个Stage
和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作。这就是Stream
记录操作的方式。
操作如何叠加
以上只是解决了操作记录的问题,要想让流水线起到应有的作用我们需要一种将所有操作叠加到一起的方案。你可能会觉得这很简单,只需要从流水线的head
开始依次执行每一步的操作(包括回调函数)就行了。这听起来似乎是可行的,但是你忽略了前面的Stage
并不知道后面Stage
到底执行了哪种操作,以及回调函数是哪种形式。换句话说,只有当前Stage
本身才知道该如何执行自己包含的动作。这就需要有某种协议来协调相邻Stage
之间的调用关系。
这种协议由Sink
接口完成,Sink
接口包含的方法如下表所示:
方法名 | 作用 |
---|---|
void begin(long size) |
开始遍历元素之前调用该方法,通知Sink 做好准备 |
void end() |
所有元素遍历完成之后调用,通知Sink 没有更多的元素了 |
boolean cancellationRequested() |
是否可以结束操作,可以让短路操作尽早结束 |
void accept(T t) |
遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage 把自己包含的操作和回调方法封装到该方法里,前一个Stage 只需要调用当前Stage.accept(T t) 方法就行了 |
有了上面的协议,相邻Stage
之间调用就很方便了,每个Stage
都会将自己的操作封装到一个Sink
里,前一个Stage
只需调用后一个Stage
的accept()
方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink
的begin()
和end()
方法也是必须实现的。比如Stream.sorted()
是一个有状态的中间操作,其对应的Sink.begin()
方法可能创建一个乘放结果的容器,而accept()方法负责将元素添加到该容器,最后end()
负责对容器进行排序。对于短路操作,Sink.cancellationRequested()
也是必须实现的,比如Stream.findFirst()
是短路操作,只要找到一个元素,cancellationRequested()
就应该返回true
,以便调用者尽快结束查找。Sink
的四个接口方法常常相互协作,共同完成计算任务。实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法。
有了Sink
对操作的包装,Stage
之间的调用问题就解决了,执行时只需要从流水线的head
开始对数据源依次调用每个Stage
对应的Sink.{begin(), accept(), cancellationRequested(), end()}
方法就可以了。一种可能的Sink.accept()
方法流程是这样的:
1 | void accept(U u){ |
Sink
接口的其他几个方法也是按照这种[处理->转发]的模型实现。下面我们结合具体例子看看Stream
的中间操作是如何将自身的操作包装成Sink
以及Sink
是如何将处理结果转发给下一个Sink
的。先看Stream.map()
方法:
1 | // Stream.map(),调用该方法将产生一个新的Stream |
上述代码看似复杂,其实逻辑很简单,就是将回调函数mapper
包装到一个Sink
当中。由于Stream.map()
是一个无状态的中间操作,所以map()
方法返回了一个StatelessOp
内部类对象(一个新的Stream
),调用这个新Stream
的opWripSink()
方法将得到一个包装了当前回调函数的Sink
。
再来看一个复杂一点的例子。Stream.sorted()
方法将对Stream
中的元素进行排序,显然这是一个有状态的中间操作,因为读取所有元素之前是没法得到最终顺序的。抛开模板代码直接进入问题本质,sorted()
方法是如何将操作封装成Sink
的呢?sorted()
一种可能封装的Sink
代码如下:
1 | // Stream.sort()方法用到的Sink实现 |
上述代码完美的展现了Sink
的四个接口方法是如何协同工作的:
- 首先
beging()
方法告诉Sink
参与排序的元素个数,方便确定中间结果容器的的大小; - 之后通过
accept()
方法将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历所有元素; - 最后
end()
方法告诉Sink
所有元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink
; - 如果下游的
Sink
是短路操作,将结果传递给下游时不断询问下游cancellationRequested()
是否可以结束处理
叠加之后的操作如何执行
Sink
完美封装了Stream
每一步操作,并给出了[处理->转发]的模式来叠加操作。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。是什么启动这一连串的操作呢?也许你已经想到了启动的原始动力就是结束操作(Terminal Operation
),一旦调用某个结束操作,就会触发整个流水线的执行。
结束操作之后不能再有别的操作,所以结束操作不会创建新的流水线阶段(Stage
),直观的说就是流水线的链表不会在往后延伸了。结束操作会创建一个包装了自己操作的Sink
,这也是流水线中最后一个Sink
,这个Sink
只需要处理数据而不需要将结果传递给下游的Sink
(因为没有下游)。对于Sink
的[处理->转发]模型,结束操作的Sink
就是调用链的出口。
我们再来考察一下上游的Sink
是如何找到下游Sink
的。一种可选的方案是在PipelineHelper
中设置一个Sink
字段,在流水线中找到下游Stage
并访问Sink
字段即可。但Stream
类库的设计者没有这么做,而是设置了一个Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)
方法来得到Sink
,该方法的作用是返回一个新的包含了当前Stage
代表的操作以及能够将结果传递给downstream
的Sink
对象。为什么要产生一个新对象而不是返回一个Sink
字段?这是因为使用opWrapSink()
可以将当前操作与下游Sink
(上文中的downstream
参数)结合成新Sink
。试想只要从流水线的最后一个Stage
开始,不断调用上一个Stage
的opWrapSink()
方法直到最开始(不包括stage0
,因为stage0
代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的Sink
,用代码表示就是这样:
1 | // AbstractPipeline.wrapSink() |
现在流水线上从开始到结束的所有的操作都被包装到了一个Sink
里,执行这个Sink
就相当于执行整个流水线,执行Sink
的代码如下:
1 | // AbstractPipeline.copyInto(), 对spliterator代表的数据执行wrappedSink代表的操作。 |
上述代码首先调用wrappedSink.begin()
方法告诉Sink
数据即将到来,然后调用spliterator.forEachRemaining()
方法对数据进行迭代(Spliterator
是容器的一种迭代器,[参阅](https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/3-Lambda and Collections.md#spliterator)),最后调用wrappedSink.end()
方法通知Sink
数据处理结束。逻辑如此清晰。
执行后的结果在哪里
最后一个问题是流水线上所有操作都执行后,用户所需要的结果(如果有)在哪里?首先要说明的是不是所有的Stream结束操作都需要返回结果,有些操作只是为了使用其副作用(Side-effects),比如使用Stream.forEach()
方法将结果打印出来就是常见的使用副作用的场景(事实上,除了打印之外其他场景都应避免使用副作用),对于真正需要返回结果的结束操作结果存在哪里呢?
特别说明:副作用不应该被滥用,也许你会觉得在Stream.forEach()里进行元素收集是个不错的选择,就像下面代码中那样,但遗憾的是这样使用的正确性和效率都无法保证,因为Stream可能会并行执行。大多数使用副作用的地方都可以使用[归约操作](http://www.cnblogs.com/CarpenterLee/p/5-Streams API(II).md)更安全和有效的完成。
1 | // 错误的收集方式 |
回到流水线执行结果的问题上来,需要返回结果的流水线结果存在哪里呢?这要分不同的情况讨论,下表给出了各种有返回结果的Stream结束操作。
返回类型 | 对应的结束操作 |
---|---|
boolean |
anyMatch() allMatch() noneMatch() |
Optional |
findFirst() findAny() |
归约结果 | reduce() collect() |
数组 | toArray() |
- 对于表中返回
boolean
或者Optional
的操作(Optional
是存放 一个 值的容器)的操作,由于值返回一个值,只需要在对应的Sink
中记录这个值,等到执行结束时返回就可以了。 - 对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过[收集器](http://www.cnblogs.com/CarpenterLee/p/5-Streams API(II).md#收集器)指定)。
collect(), reduce(), max(), min()
都是归约操作,虽然max()和min()
也是返回一个Optional
,但事实上底层是通过调用[reduce()](http://www.cnblogs.com/CarpenterLee/p/5-Streams API(II).md#多面手reduce)方法实现的。 - 对于返回是数组的情况,毫无疑问的结果会放在数组当中。这么说当然是对的,但在最终返回数组之前,结果其实是存储在一种叫做*
Node
*的数据结构中的。Node
是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。
Stream创建
最常用的创建Stream
有两种途径:
- 通过
Stream
接口的静态工厂方法(注意:Java8
里面接口可以带静态方法) - 通过
Collection
接口的默认方法:stream()
,把一个Collection
对象转成Stream
stream() − 为集合创建串行流
parallelStream() − 为集合创建并行流
使用Stream静态方法来创建Stream
of
方法:有两个overload
方法,一个接受可变长参数,一个接受单一值generator
方法:生成无限长度的Stream
,其元素的生成是通过给定的Supplier
(这个接口可以看成一个对象的工厂,每次调用返回一个给定类型的对象)iterate
方法:也是生成无限长度的Stream
,和generator
不同的是,其元素的生成是重复对给定的种子值(seed
)调用用户指定函数来生成的。其中包含的元素可以认为是:seed
,f(seed)
,f(f(seed))
无限循环
1 | Stream.iterate(1, item -> item + 1).limit(10).forEach(System.out::println); // limit |
1 | // 数组 |
转换Stream
转换Stream
其实就是把一个Stream
通过某些行为转换成一个新的Stream
。Stream
接口中定义了几个常用的转换方法
distinct
:对于Stream
中包含的元素进行去重操作(去重逻辑依赖元素的equals
方法),新生成的Stream
中没有重复的元素filter
:对于Stream
中包含的元素使用给定的过滤函数进行过滤操作,新生成的Stream
只包含符合条件的元素map
:对于Stream中包含的元素使用给定的转换函数进行转换操作,新生成的Stream
只包含转换生成的元素。这个方法有三个原始类型的变种方法,分别是:mapToInt
,mapToLong
和mapToDouble
。这三个方法也比较好理解,比如mapToInt
就是把原始Stream
转换成一个新的Stream
,这个新生成的Stream
中的元素都是int
类型。之所以会有这三个变种方法,可以免除自动装箱/拆箱的额外消耗。flatMap
:和map
类型,不同的是其每个元素转换得到的是Stream
对象,会把Stream
中的元素压缩到父集合中peek
:生成一个包含原Stream
的所有元素的新的Stream
,同时会提供一个消费函数(Consumer
实例),新Stream
每个元素被消费的时候都会执行给定的消费函数limit
:对一个Stream
进行截断操作,获取其前N
个元素,如果原Stream
中包含的元素个数小于N
,那就获取其所有的元素skip
:返回一个丢弃原Stream
的前N
个元素后剩下元素组成的新Stream
,如果原Stream
中包含的元素个数小于N
,那么返回空Stream
Reduce Stream
A reduction operation (also called a fold) takes a sequence of input elements and combines them into a single summary result by repeated application of a combining operation, such as finding the sum or maximum of a set of numbers, or accumulating elements into a list. The streams classes have multiple forms of general reduction operations, called reduce() and collect(), as well as multiple specialized reduction forms such as sum(), max(), or count().
汇聚操作(也称为折叠)接口一个元素序列作为输入,反复使用某个合并操作,把序列中的元素合并成一个汇总的结果。比如查找一个数字列表的总和或者最大值,或者把这些数字累积成一个List
对象。Stream
接口有一些通用的汇聚操作,比如reduce()
和collect()
;也有一些特定用途的汇聚操作,比如sum()
,max()
,count()
。sum
方法不是所有的Stream
对象都有的,只有IntStream
,LongStream
和DoubleStream
才有
- 可变汇聚:把输入的元素累积到一个可变的容器中,比如
Collection
或者StringBuilder
- 其他汇聚:除去可变汇聚剩下的,一般都不是通过反复修改某个可变对象,而是通过把前一次的汇聚结果当成下一次的入参,反复如此。比如
reduce
,count
,allMatch
可变汇聚
可变汇聚对应的只有一个方法:collect
,它可以把Stream
中的元素收集到一个结果容器中(比如Collection
):
1 | <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner); |
Supplier supplier
是一个工厂函数,用来生成一个新的容器;BiConsumer accumulator
也是一个函数,用来把Stream
中的元素添加到结果容器中;BiConsumer combiner
还是一个函数,用来把中间状态的多个结果容器合并成为一个(并发的时候会用到)。
其他汇聚
reduce
1 | List<Integer> ints = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); |
可以看到reduce
方法接受一个函数,这个函数有两个参数,第一个参数是上次函数执行的返回值(也称为中间结果),第二个参数是stream
中的元素,这个函数把这两个值相加,得到的和会被赋值给下次执行这个函数的第一个参数。要注意的是:第一次执行的时候第一个参数的值是Stream
的第一个元素,第二个参数是Stream
的第二个元素。这个方法返回值类型是Optional
,这是Java8
防止出现NPE
的一种可行方法
1 | List<Integer> ints = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); |
它允许用户提供一个循环计算的初始值,如果Stream
为空,就直接返回该值。而且这个方法不会返回Optional
,因为其不会出现null
值
count
获取Stream
中元素的个数
1 | List<Integer> ints = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); |
allMatch
:是不是Stream
中的所有元素都满足给定的匹配条件
anyMatch
:Stream
中是否存在任何一个元素满足匹配条件
findFirst
: 返回Stream
中的第一个元素,如果Stream
为空,返回空Optional
noneMatch
:是不是Stream
中的所有元素都不满足给定的匹配条件
max
和min
:使用给定的比较器(Operator
),返回Stream
中的最大|最小值
Lazy
懒操作:对于会消耗较多资源的对象,使用延迟初始化是比较好的选择。这不仅能够节省一些资源,同事能够加快对象的创建速度,从而从整体上提升性能。但是对于一个对象的延迟初始化,需要注意的一点就是这些实现细节不应该暴露给用户,即用户能够按照正常的操作流程来使用该对象。
1 | public synchronized Heavy getHeavy() { |
Lazy Evaluation
Lazy Evaluation
(延迟求值)
延迟求值的主要目的是减少需要执行的代码量来提高执行速度
其实Java
语言中有一些地方已经应用了延迟求值的概念,比如对逻辑表达式的求值
在执行fn1() || fn2()
时,当fn1()
返回true
的时候,fn2()
是不会被执行的。同样的,在执行fn1()&&fn2()
时,当fn1()
返回false
的时候,fn2()
是不会被执行的。这就是大家熟知的短路(Short-circuiting
)操作。
然而对于方法调用,在发生实际调用前所传入的参数都会被求值,即使某些参数在方法中根本就没有被用到。因此这就造成了潜在的性能浪费,我们可以使用Lambda
表达式来进行改进。
当参数列表中有Lambda
表达式和方法引用时,这种类型的参数只有在真正地需要被使用时才会由Java
编译器求值,我们可以利用这一点来实现延迟求值。Java 8
中新添加的Stream
类型的许多方法都实现了延迟求值。比如filter
方法接受的Predicate
函数接口,并不一定会被集合中的所有元素调用。因此,我们可以考虑将方法的参数冲构成函数接口来实现延迟求值。
Eager求值
1 | public class Evaluation { |
以上代码中,虽然希望使用短路操作来得到最后的结果(input1 && input2
),但是已经晚了。在对参数进行求值的时候,input1
和input2
的值实际上就已经被确认了,从上面的输出可以看出这一点。这段代码会执行至少4
秒,显然这不是最优解。
延迟求值的设计
如果我们知道方法中的某些参数可能不会被用到,那么就可以对它们进行重构,将它们替换成函数接口来实现延迟求值。比如上述代码中使用了短路操作,说明input2
的求值也许是不必要的,这是可以将它替换成Supplier
接口
1 | public static void lazyEvaluator( |
替换成Supplier
类型的函数接口后,只有在调用它的get
方法,才会真正执行求值操作。那么上述的短路操作就有意义了,当input1.get()
返回的是false
,input2.get()
根本就不会被调用
1 | lazyEvaluator(() -> evaluate(1), () -> evaluate(2)); |
此时的执行时间只有2
秒多一点,比之前的4秒而言,性能提高了接近100%。在某些参数不被需要的场合下,借助Lambda
表达式或者方法引用来实现哪些参数确实能够增加性能,但是也使得代码稍微的复杂了一点,但是为了性能的提升这些代价也是值得的。
Stream懒操作
Stream
有两种类型的方法:
- 中间操作(
Intermediate Operation
) - 结束操作(
Terminal Operation
)
Stream
之所以懒在于每次在使用Stream
时,都会连接多个中间操作,并在最后附上一个结束操作。像map()
和filter()
这样的方法是中间操作,在调用它们时,会立即返回另一个Stream
对象。而对于reduce()
及findFirst()
这样的方法,它们是结束操作,在调用它们时才会在执行真正的操作来获取需要的值。
Example
数据
1 |
|
1 | Person person1 = Person.builder().id(1).name("11").room(1).tags(Lists.newArrayList("111", "aaa")).build(); |
collect
1 | List<Integer> ids = personList.parallelStream().map(Person::getId).collect(Collectors.toList()); |
1 | // {1=Person(id=1, name=11, room=1, tags=[111, aaa]), |
distinct
1 | List<Integer> distinctRooms = personList.stream().map(Person::getRoom).distinct().collect(Collectors.toList()); |
1 | personList.add(Person.builder().id(1).name("11").room(1).tags(Lists.newArrayList("111", "aaa")).build()); |
max\min\sum\average
1 | long sum = personList.stream().mapToInt(Person::getId).sum(); |
groupby
1 | // {1=[Person(id=1, name=11, room=1, tags=[111, aaa]), Person(id=2, name=22, room=1, tags=[222, bbb]), Person(id=3, name=33, room=1, tags=[333, ccc])], |
1 | // 分组之后再次流处理 |
reduce
1 | System.out.println(personList.stream().reduce((c1, c2) -> { |
1 | System.out.println(personList.stream().reduce((c1, c2) -> { |
sorted
1 | List<Person> sortedList = personList.stream().sorted(Comparator.comparing(Person::getId).reversed()).collect(Collectors.toList()); |
joining
1 | String names = personList.stream().map(Person::getName).collect(Collectors.joining()); |
flatmap
1 | List<String> flatMapResult = personList.stream().flatMap(inner -> inner.getTags().stream()).collect(Collectors.toList()); |
anyMatch
1 | boolean b = personList.stream().anyMatch(c -> "11".equals(c.getName())); // true |
filter
1 | List<Person> collect = personList.stream().filter(c -> "11".equals(c.getName())).collect(Collectors.toList()); |
peek
peek不是一个终止操作
流编程中,必须有一个终止操作
1 | String[] arr = {"a", "b", "c", "d"}; |
1 | a |
手动分页
1 | int pageSize = 2; |
统计
1 | IntSummaryStatistics intSummaryStatistics = personList.stream().mapToInt(Person::getId).summaryStatistics(); |
迭代器
1 | Stream.iterate(0, i -> i + 1).limit(bizIds.size()).forEach() |
参考文献
- [1] 深入理解Java Stream流水线
- [2] Stream Pipelines
- [3] JavaLambdaInternals