前三篇 Java Lambda 教学 Part 1, Part 2, Part 3 可以说是基础入门篇,如果要真真正正发挥 Java Lambda 的威力,就不得不提 Java 8 新加入的 Stream,这个 Java Lambda Stream API 可以说是为 Lambda 而生的,而我将会在这篇介绍它的用途。

Java Lambda Stream

Stream 是一个 Collection Wrapper,可以让把集合内的元素 ( Elements ) 以串流 ( Stream ) 形式交到 Stream Methods 以 Lambda Expression 执行,其运用例子如下:

staffList.stream().filter(
    staff -> staff.getServiceOfYear() > 5).forEach(
    staff -> staff.setSalary(staff.getSalary() * 1.1)) ;

Stream.of(staffIdArray).map(
    StaffDao::getStaffById).filter(
    staff -> staff.getJobRole().equals("IT Architect"))
       .findFirst().orElse(null));

以上的例子利用了 Collection.stream() 和 Stream.of() 把 staffList 集合和 staffId Array 中的元素以串流形式送到 filter(), forEach(), map(), findFirst() 等 Stream Methods 去执行,而实际的执行逻辑就是用我们已经见惯见熟的 Java Lambda Expression 来表逹。

使用 Java Lambda Streams 来遍历 ( Traverse ) 元素并以 Stream Methods 来处理比传统用 for-loop 来遍历来处理,除了让程序看来精简外,主要还有以下三个好处:

  1. Lazy evalutation – JVM 在没有遇到 terminal operations 前,并不会执行任何 Stream method。
  2. Short-circuit execution – JVM 会自行判断 short-circuit,提早完成集合元素遍历 ( Element traversal )
  3. Automatic parallelism – JVM 可以把 Java Lambda Streams 同时交给多核心处理器 ( Multi-Core )或多线程 (Multi-Thread) 来执行,而阁下是无须去编写任何多线程代码。

怎样可以做到以上三大好处,容下会再以解释。但现在得先看看 Java 8 究竟有几多道扳斧可以建立 Java Lambda Stream。

如何建立 Java Lambda Stream?

由于大部份 Java Lambda Stream 的操作都是应用于集合上,如 List 、Set、Map 等,所以我们最常见的就是通过 Collection Interface 提供的 stream() 和 parallelStream() 来建立 Java Lambda Stream,其例子如下:

Stream<Staff> staffStream = staffList.stream();

Stream<JobRole> jobRoleStream = jobRoleSet.parallelStream();

Stream<Staff> staffStream = projectStaffMap.
    values().stream();

至于 Java Array,就可以利用 Stream.of() 或 Arrays.stream() 把 Array 中的原素串流化,其例子如下:

Stream<Staff> staffStream = Stream.of(staffArray);

Stream<Integer> intStream = Stream.of(integerArray);
// int[] intArray
IntStream anotherIntStream = Arrays.stream(intArray);

如何收集 Java Lambda Stream 中的原素?

通过以上的方法,我们可以把 Java Collection 和 Array 串流化成 Java Lambda Stream ,经过 Stream Methods 中的 Java Lambda 处理后,我们可以利用Stream.collect() 及 Collectors 把 Stream 中的原素还原回 Java Collection 或利用 Stream.toArray() 把 Stream 中的原素还原回 Java Array。其应用例子如下:

List<Staff> staffList = staffStream.collect(Collectors.toList());
Set<Staff> staffSet = staffStream.collect(Collectors.toSet());

/* The 1st argument of toMap() is key-mapper function, 2nd argument is value-mapper function, while the 3rd argument is merge value function when the key collision is detected */  
Map<String, Staff> staffMap = staffStream.collect(
    Collectors.toMap(staff -> staff.getStaffId(), 
        Function.identity(), 
        (staff1, staff2) -> {
            throw new SystemException("Duplicated Staff ID detected");
        }
    );

/* toArray() requires providing a generator function to construct the typed array */
Staff[] staffArray = staffStream.toArray(Staff[]::new);

以上的例子中,Collectors.toList() 和 Collectors.toSet() 比较直观,不用详加解说,Collectors.toMap() 的用法则比较复杂,它的 method signature 如下:

Collector<T,?,Map<K,U>> toMap(Function<? super T,? extends K> keyMapper,
    Function<? super T,? extends U> valueMapper,
    BinaryOperator<U> mergeFunction);

哗!只看了 Method Signature 就已经头痛了,根本不知从何入手。不用担心,我会慢慢解拆的。首先,先不管那些 Generics syntax,它们不是我们要讨论的主题。

Collectors.toMap() 主要要求三个 Function Lambda:

第一个 keyMapper,即是我们要指定如何把 Stream 中元素抽出其 Key-value pair 中的 Key。在我的例子中,由于我的 Map Key 是 Staff ID,所以我的 keyMapper Function Lambda 就是 staff -> staff.getStaffId()。

第二个 Function Lambda 就是 valueMapper,即要提供如何抽出 Key-value pair 中的 value,由于在这里我们只想把 Staff Object 原封不动的放回 Map 中,所以这里我给予了 Function.identity() 这个 static method。( 这是利用 Function.identity() 一个十分典型的例子 )

最后,我们要提供一个 mergeFunction ,它的作用是当 JVM 进行 Collectors.toMap() 的操作发现有 key conflict 的时候,我们应该怎样把conflict key 中的 values 进行合并 (Merge)。由于我们的 Staff ID 应该是唯一 ( Unique ) 的,理应不会出现须要合并的情况,所以我们这里的会把合并动作看成错误并中止执行。其 Lambda Expression 为 (staff1, staff2) -> { throw new SystemException(“Duplicated Staff ID detected”); }。

Collectors 还提供了很多其他的 Utility Methods 去收集 Stream 中元素,大家不妨参考 Collectors JavaDoc,去了解其他 collectors 的用法。

至于 Stream.toArray() 的 Method Signature 为:

A[] toArray(IntFunction<A[]> generator)

技术上,它须要提供一个 integer function generator。甚么是 generator 呢?

其实说穿了只是一个 target array 的 Java Constructor,这里的例子用了 Method Reference。Staff[]::new 就是 Staff Array Constructor 的 method reference。

虽然 Stream interface 提供了一个没有 argument 的 toArray() ,但这个零 argument 的版本只能传回 Object[],而 Object[] 在 Java 中是不能 Cast 成为 Staff[] 的,所以大部份情况下,大家应该会用到 generator 版本的 toArray()。

看到这里,如果还没有完全明白我以上的解说,又或者不清楚甚么是 Lambda Expression、Method Reference 或 Function 等,建议您可以回顾我早阵子的 Java Lambda 教学

Stream Methods

介绍了如何建立 Stream 和如何收集 Stream 中元素转换回 Collections 和 Array 后,终于来到戏玉了,就是如何运用各式各样的 Stream Methods,这篇将主力介绍当中比较常用的 forEach() 、filter()、map() 和 findFirst()。

Stream.forEach(Consumer<? super T> action)

使用 forEach() 须要提供一个 Consumer Lambda 去处理每一个流过 forEach() 的元素。为甚么叫每一个流过的元素呢?由于不一定所有 Stream 中的原素都有可能执行 Consumer Lambda 的逻辑,因为在这之前已被过滤掉 ( Filter,下节将会介绍)。

例如,我们想把每一名员工加薪 10%,我们可以这样做:

staffList.stream().forEach(staff -> staff.setSalary(staff.getSalary() * 1.1));

如果用传统的 for-loop 去表逹,即是以下的相同逻辑:

for (Staff staff : staffList) {
    staff.setSalary(staff.getSalary() * 1.1);
}

要注意的是 forEach() 被视为终结操作 (Terminal Operation),意则 Invoke 后Stream 就会终结,不能再有下文,而且 JVM 一旦遇到 forEach() ,就会立刻执行整条 Stream chained operations。(不明白我说甚么,姑且暂时不用理会,再看下去就会明白。)

如果有下文,可以怎样?行!有一个孖生 method,叫 Stream.peek(),它是一个中途操作 (Intermediate Operation),可以保留 Stream 来串连下一个 Stream Method。其例子如下:

staffList.stream().peek(staff -> staff.setSalary(
    staff.getSalary() * 1.1)).forEach(
    staff -> staff.setLastModified(new Date());

在以上 peek() 的例子中,当JVM 遇到 peek() 时,并不会立刻执行它,因它不是一个 terminal operation ,直至踫上 forEach() ,才一并对 Stream 中的原素一个又一个地执行 peek() 和 forEach() 中的 Lambda Expression。

Stream.filter(Predicate<? super T> predicate)

使用 filter() 须要提供一个 Predicate Lambda 去验証每一个流过的元素是否符合所提供的条件。符合条件的原素,可以继续在 Java Stream 流动到下个 chained operation。filter() 是一个 Intermediate Operation 。
例如,公司中只有 IT Architect 才有加薪,我们可以这样写:

staffList.stream().filter(
    staff -> staff.getJobRole().equals("IT Architect")).forEach(
    staff -> staff.setSalary(staff.getSalary() * 1.1));

其对应的传统写法为:

for (Staff staff : staffList) {
    if (staff.getJobRole().equals("IT Architect")) {
        staff.setSalary(staff.getSalary() * 1.1);
    }
}

Stream.map(Function<? super T,? extends R> mapper)

map() 可以把 Stream 中的原素通过其提供的 mapper function,转换成另一个 Object Stream 并继续下一个 chained operation。map() 亦是一个 Intermediate Operation。

例如,如果我们手上只有 Staff IDs ,要执行早前的加薪例子,就得先要从数据库拿取相关员工的资料后,才能进行加薪操作,其应用如下:

staffIdList.stream().map(StaffDao::getStaffById).filter(
    staff -> staff != null).filter(
    staff -> staff.getJobRole().equals("IT Architect")).forEach(
    staff -> staff.setSalary(staff.getSalary() * 1.1));

其对应的传统写法为:

for (String staffId : staffIdList) {
    Staff staff = StaffDao.getStaffById(staffId);
    if (staff != null && staff.getJobRole().equals("IT Architect")) {
        staff.setSalary(staff.getSalary() * 1.1);
    }
}

Stream.findFirst()

findFirst() 通常配合 filter() 使用,它会传回 Stream 中第一个流过它的原素,它是一个短路终结操作 (Short-circuit Terminal Operation)。除了会终结 Stream 之外,还会提早结束 stream chained-operations。例如,我们想知道公司究竟有没有服务超过二十年的 IT Architect,我们可以这样写:

Staff seniorITArchitect = staffList.stream().filter(
    staff -> staff.getYearOfService() > 20).filter(
    staff.getJobRole().equals("IT Architect")
        .findFirst().orElse(null);

其对应的传统写法为:

Staff seniorITArchitect = null;
for (Staff staff : staffList) { 
    if (staff.getYearOfService() > 20 
            && staff.getJobRole().equals("IT Architect")) {
        seniorITArchitect = staff;
        break;
     }
}

在这里,当 JVM 发现了有第一个流动到 findFirst() 的元素后,就会立刻中断 Stream chained-operations。如果真的没有一个原素符合条件并流动到 findFirst(),orElse() 中的 Object 就会被返回,作为找不到符合条件的预设值。

除了 Stream.findFirst() 外,其他 short-circuit 的 terminal operations 有 Stream.allMatch() 、Stream.anyMatch()、Stream.findAny() 及 Stream.nonMatch() 等。

Java Lambda Stream 的平行操作

看到这里,大家大概对 Java Lambda Stream 的操作应该有了一点认识,而且亦明白 Java Stream 怎样为 JVM 提供了 Lazy evaluation 及 Short-circuit 的程序优化 ( Optimization )。

最后,我想说明平行操作 ( Automatic Parallelism ) 的概念是如何逹成的。

依据我们最后的例子,如果我们把 staffIdList.stream() 改为 staffIdList.parallelStream(),那么我们的代码就会自动支援多核心处理器或多线程执行,方法就是这么简单:

staffIdList.parallelStream().map(StaffDao::getStaffById).filter(
    staff -> staff != null).filter(
    staff -> staff.getJobRole().equals("IT Architect")).forEach(
    staff -> staff.setSalary(staff.getSalary() * 1.1));

当然,改成 parallelStream() 并不代表一定会变成平行处理,要视乎执行硬件、JVM 配置和当时 JVM 的负荷而定,还得要留意平行处理会不会为程序带来负作用 ( Side-effect )。

但基于 functional programming paradigm 的特性, Lambda operation 大都可以平行化而不会带来负作用,把 stream() 改成为 parallelStream() 在大多数情况应该百利而无一害的。

下篇待续,如有问题,请留言给我。谢谢!

Java Lambda Stream 教学 Part 1
标签:                    

发布留言

发布留言必须填写的电子邮件地址不会公开。 必填栏位标示为 *