前三篇 Java Lambda 教學 Part 1, Part 2, Part 3 可以說是基礎入門篇,如果要真真正正發揮 Java Lambda 的威力,就不得不提 Java 8 新加入的 Stream,這個 Java Lambda Stream API 可以說是為 Lambda 而生的,而我將會在這篇介紹它的用途。

Java Lambda Stream

Stream 是一個 Collection Wrapper,可以讓把集合內的元素 ( Elements ) 以串流 ( Stream ) 形式交到 Stream Methods 以 Lambda Expression 執行,其運用例子如下:

staffList.stream().filter(
    staff -> staff.getServiceOfYear() > 5).forEach(
    staff -> staff.setSalary(staff.getSalary() * 1.1)) ;

Stream.of(staffIdArray).map(
    StaffDao::getStaffById).filter(
    staff -> staff.getJobRole().equals("IT Architect"))
       .findFirst().orElse(null));

以上的例子利用了 Collection.stream() 和 Stream.of() 把 staffList 集合和 staffId Array 中的元素以串流形式送到 filter(), forEach(), map(), findFirst() 等 Stream Methods 去執行,而實際的執行邏輯就是用我們已經見慣見熟的 Java Lambda Expression 來表逹。

使用 Java Lambda Streams 來遍歷 ( Traverse ) 元素並以 Stream Methods 來處理比傳統用 for-loop 來遍歷來處理,除了讓程序看來精簡外,主要還有以下三個好處:

  1. Lazy evalutation – JVM 在沒有遇到 terminal operations 前,並不會執行任何 Stream method。
  2. Short-circuit execution – JVM 會自行判斷 short-circuit,提早完成集合元素遍歷 ( Element traversal )
  3. Automatic parallelism – JVM 可以把 Java Lambda Streams 同時交給多核心處理器 ( Multi-Core )或多線程 (Multi-Thread) 來執行,而閣下是無須去編寫任何多線程代碼。

怎樣可以做到以上三大好處,容下會再以解釋。但現在得先看看 Java 8 究竟有幾多道扳斧可以建立 Java Lambda Stream。

如何建立 Java Lambda Stream?

由於大部份 Java Lambda Stream 的操作都是應用於集合上,如 List 、Set、Map 等,所以我們最常見的就是通過 Collection Interface 提供的 stream() 和 parallelStream() 來建立 Java Lambda Stream,其例子如下:

Stream<Staff> staffStream = staffList.stream();

Stream<JobRole> jobRoleStream = jobRoleSet.parallelStream();

Stream<Staff> staffStream = projectStaffMap.
    values().stream();

至於 Java Array,就可以利用 Stream.of() 或 Arrays.stream() 把 Array 中的原素串流化,其例子如下:

Stream<Staff> staffStream = Stream.of(staffArray);

Stream<Integer> intStream = Stream.of(integerArray);
// int[] intArray
IntStream anotherIntStream = Arrays.stream(intArray);

如何收集 Java Lambda Stream 中的原素?

通過以上的方法,我們可以把 Java Collection 和 Array 串流化成 Java Lambda Stream ,經過 Stream Methods 中的 Java Lambda 處理後,我們可以利用Stream.collect() 及 Collectors 把 Stream 中的原素還原回 Java Collection 或利用 Stream.toArray() 把 Stream 中的原素還原回 Java Array。其應用例子如下:

List<Staff> staffList = staffStream.collect(Collectors.toList());
Set<Staff> staffSet = staffStream.collect(Collectors.toSet());

/* The 1st argument of toMap() is key-mapper function, 2nd argument is value-mapper function, while the 3rd argument is merge value function when the key collision is detected */  
Map<String, Staff> staffMap = staffStream.collect(
    Collectors.toMap(staff -> staff.getStaffId(), 
        Function.identity(), 
        (staff1, staff2) -> {
            throw new SystemException("Duplicated Staff ID detected");
        }
    );

/* toArray() requires providing a generator function to construct the typed array */
Staff[] staffArray = staffStream.toArray(Staff[]::new);

以上的例子中,Collectors.toList() 和 Collectors.toSet() 比較直觀,不用詳加解說,Collectors.toMap() 的用法則比較複雜,它的 method signature 如下:

Collector<T,?,Map<K,U>> toMap(Function<? super T,? extends K> keyMapper,
    Function<? super T,? extends U> valueMapper,
    BinaryOperator<U> mergeFunction);

嘩!只看了 Method Signature 就已經頭痛了,根本不知從何入手。不用擔心,我會慢慢解拆的。首先,先不管那些 Generics syntax,它們不是我們要討論的主題。

Collectors.toMap() 主要要求三個 Function Lambda:

第一個 keyMapper,即是我們要指定如何把 Stream 中元素抽出其 Key-value pair 中的 Key。在我的例子中,由於我的 Map Key 是 Staff ID,所以我的 keyMapper Function Lambda 就是 staff -> staff.getStaffId()。

第二個 Function Lambda 就是 valueMapper,即要提供如何抽出 Key-value pair 中的 value,由於在這裡我們只想把 Staff Object 原封不動的放回 Map 中,所以這裡我給予了 Function.identity() 這個 static method。( 這是利用 Function.identity() 一個十分典型的例子 )

最後,我們要提供一個 mergeFunction ,它的作用是當 JVM 進行 Collectors.toMap() 的操作發現有 key conflict 的時候,我們應該怎樣把conflict key 中的 values 進行合併 (Merge)。由於我們的 Staff ID 應該是唯一 ( Unique ) 的,理應不會出現須要合併的情況,所以我們這裡的會把合併動作看成錯誤並中止執行。其 Lambda Expression 為 (staff1, staff2) -> { throw new SystemException(“Duplicated Staff ID detected”); }。

Collectors 還提供了很多其他的 Utility Methods 去收集 Stream 中元素,大家不妨參考 Collectors JavaDoc,去了解其他 collectors 的用法。

至於 Stream.toArray() 的 Method Signature 為:

A[] toArray(IntFunction<A[]> generator)

技術上,它須要提供一個 integer function generator。甚麼是 generator 呢?

其實說穿了只是一個 target array 的 Java Constructor,這裡的例子用了 Method Reference。Staff[]::new 就是 Staff Array Constructor 的 method reference。

雖然 Stream interface 提供了一個沒有 argument 的 toArray() ,但這個零 argument 的版本只能傳回 Object[],而 Object[] 在 Java 中是不能 Cast 成為 Staff[] 的,所以大部份情況下,大家應該會用到 generator 版本的 toArray()。

看到這裡,如果還沒有完全明白我以上的解說,又或者不清楚甚麼是 Lambda Expression、Method Reference 或 Function 等,建議您可以回顧我早陣子的 Java Lambda 教學

Stream Methods

介紹了如何建立 Stream 和如何收集 Stream 中元素轉換回 Collections 和 Array 後,終於來到戲玉了,就是如何運用各式各樣的 Stream Methods,這篇將主力介紹當中比較常用的 forEach() 、filter()、map() 和 findFirst()。

Stream.forEach(Consumer<? super T> action)

使用 forEach() 須要提供一個 Consumer Lambda 去處理每一個流過 forEach() 的元素。為甚麼叫每一個流過的元素呢?由於不一定所有 Stream 中的原素都有可能執行 Consumer Lambda 的邏輯,因為在這之前已被過濾掉 ( Filter,下節將會介紹)。

例如,我們想把每一名員工加薪 10%,我們可以這樣做:

staffList.stream().forEach(staff -> staff.setSalary(staff.getSalary() * 1.1));

如果用傳統的 for-loop 去表逹,即是以下的相同邏輯:

for (Staff staff : staffList) {
    staff.setSalary(staff.getSalary() * 1.1);
}

要注意的是 forEach() 被視為終結操作 (Terminal Operation),意則 Invoke 後Stream 就會終結,不能再有下文,而且 JVM 一旦遇到 forEach() ,就會立刻執行整條 Stream chained operations。(不明白我說甚麼,姑且暫時不用理會,再看下去就會明白。)

如果有下文,可以怎樣?行!有一個孖生 method,叫 Stream.peek(),它是一個中途操作 (Intermediate Operation),可以保留 Stream 來串連下一個 Stream Method。其例子如下:

staffList.stream().peek(staff -> staff.setSalary(
    staff.getSalary() * 1.1)).forEach(
    staff -> staff.setLastModified(new Date());

在以上 peek() 的例子中,當JVM 遇到 peek() 時,並不會立刻執行它,因它不是一個 terminal operation ,直至踫上 forEach() ,才一併對 Stream 中的原素一個又一個地執行 peek() 和 forEach() 中的 Lambda Expression。

Stream.filter(Predicate<? super T> predicate)

使用 filter() 須要提供一個 Predicate Lambda 去驗証每一個流過的元素是否符合所提供的條件。符合條件的原素,可以繼續在 Java Stream 流動到下個 chained operation。filter() 是一個 Intermediate Operation 。
例如,公司中只有 IT Architect 才有加薪,我們可以這樣寫:

staffList.stream().filter(
    staff -> staff.getJobRole().equals("IT Architect")).forEach(
    staff -> staff.setSalary(staff.getSalary() * 1.1));

其對應的傳統寫法為:

for (Staff staff : staffList) {
    if (staff.getJobRole().equals("IT Architect")) {
        staff.setSalary(staff.getSalary() * 1.1);
    }
}

Stream.map(Function<? super T,? extends R> mapper)

map() 可以把 Stream 中的原素通過其提供的 mapper function,轉換成另一個 Object Stream 並繼續下一個 chained operation。map() 亦是一個 Intermediate Operation。

例如,如果我們手上只有 Staff IDs ,要執行早前的加薪例子,就得先要從數據庫拿取相關員工的資料後,才能進行加薪操作,其應用如下:

staffIdList.stream().map(StaffDao::getStaffById).filter(
    staff -> staff != null).filter(
    staff -> staff.getJobRole().equals("IT Architect")).forEach(
    staff -> staff.setSalary(staff.getSalary() * 1.1));

其對應的傳統寫法為:

for (String staffId : staffIdList) {
    Staff staff = StaffDao.getStaffById(staffId);
    if (staff != null && staff.getJobRole().equals("IT Architect")) {
        staff.setSalary(staff.getSalary() * 1.1);
    }
}

Stream.findFirst()

findFirst() 通常配合 filter() 使用,它會傳回 Stream 中第一個流過它的原素,它是一個短路終結操作 (Short-circuit Terminal Operation)。除了會終結 Stream 之外,還會提早結束 stream chained-operations。例如,我們想知道公司究竟有沒有服務超過二十年的 IT Architect,我們可以這樣寫:

Staff seniorITArchitect = staffList.stream().filter(
    staff -> staff.getYearOfService() > 20).filter(
    staff.getJobRole().equals("IT Architect")
        .findFirst().orElse(null);

其對應的傳統寫法為:

Staff seniorITArchitect = null;
for (Staff staff : staffList) { 
    if (staff.getYearOfService() > 20 
            && staff.getJobRole().equals("IT Architect")) {
        seniorITArchitect = staff;
        break;
     }
}

在這裡,當 JVM 發現了有第一個流動到 findFirst() 的元素後,就會立刻中斷 Stream chained-operations。如果真的沒有一個原素符合條件並流動到 findFirst(),orElse() 中的 Object 就會被返回,作為找不到符合條件的預設值。

除了 Stream.findFirst() 外,其他 short-circuit 的 terminal operations 有 Stream.allMatch() 、Stream.anyMatch()、Stream.findAny() 及 Stream.nonMatch() 等。

Java Lambda Stream 的平行操作

看到這裡,大家大概對 Java Lambda Stream 的操作應該有了一點認識,而且亦明白 Java Stream 怎樣為 JVM 提供了 Lazy evaluation 及 Short-circuit 的程序優化 ( Optimization )。

最後,我想說明平行操作 ( Automatic Parallelism ) 的概念是如何逹成的。

依據我們最後的例子,如果我們把 staffIdList.stream() 改為 staffIdList.parallelStream(),那麼我們的代碼就會自動支援多核心處理器或多線程執行,方法就是這麼簡單:

staffIdList.parallelStream().map(StaffDao::getStaffById).filter(
    staff -> staff != null).filter(
    staff -> staff.getJobRole().equals("IT Architect")).forEach(
    staff -> staff.setSalary(staff.getSalary() * 1.1));

當然,改成 parallelStream() 並不代表一定會變成平行處理,要視乎執行硬件、JVM 配置和當時 JVM 的負荷而定,還得要留意平行處理會不會為程序帶來負作用 ( Side-effect )。

但基於 functional programming paradigm 的特性, Lambda operation 大都可以平行化而不會帶來負作用,把 stream() 改成為 parallelStream() 在大多數情況應該百利而無一害的。

下篇待續,如有問題,請留言給我。謝謝!

Java Lambda Stream 教學 Part 1
Tagged on:                     

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

zh_HKChinese (Hong Kong)