這篇將會是 Java Lambda 教學系列的最後一篇了,將會繼續討論有關 Java Lambda Stream 的進階應用,例如 MapReduce 及強大的 Collectors 方法,希望大家將來可以應用於自已的程序上。

MapReduce –  處理大數據的模型

不知大家有沒有聽過 MapReduce 這個名字?其實 MapReduce 是 Google 所開發專門處理大數據 ( BigData ) 的程序設計模型 ( Programming Model )。基本概念是把要處理的事情主要分成兩個階段操作,分別是 Map (映射) 及 Reduce (化簡),其主要分工如下:

Map (映射) – 為資料中每一個元素進行獨立操作 ( Independent Operation ),例如過濾 ( Filter )、提取 ( Extract )、轉換 ( Convert )等。由於映射操作講求獨立性,可高度分散平行操作 ( Distributed Parallel Operation ),這個概念跟上篇提及的 Stream.map() 方法的理念一致。

Reduce (化簡) – 當所有元素都經過 Map 操作並得出過渡性結果 (transient results) 後,就可以進入化簡程序。在化簡程序中,眾多的過渡性結果就會演算出最後的結果。化簡程序 ( Reduce ) 通常比映射程序 ( Map ) 有較少的分散平行操作的可行性,但由於化簡程序一般的演算化都比較簡單,都可以進行一定程度的平行操作。

舉一個簡單例子說明,如果我們想知道公司內員工最高的薪金,傳統的演算法 ( Algorithm ) 如下:

  1. 先拿取其中一名員工的資料
  2. 提取其薪金數值,並設定為當前最大薪金
  3. 然後拿取下一名員工的資料
  4. 提取其薪金數值
  5. 如果薪金比當前薪金為大,就設定為當前最大薪金
  6. 重覆步驟 3 – 5 ,直至所有員工的資料都遍歷完畢

以上的演算法有一個缺點,就是不能分散平行地執行,只能單線性地執行。如果員工數量很多,就會成為效能瓶頸。

那 MapReduce 又如何解決這個效能瓶頸,讓這項工作得以 scalable 開去平行處理呢?如果公司有 10,000 名員工,我們可先把他們分成十組,每組各 1,000 名員工,然後把他們的資料分發到十個不同的執行單位 ( Execution Unit ),去進行以下的映射程序 ( Map Operation )

映射程序 ( Map Operation )

  1. 拿取每一名員工的資料
  2. 提取其薪金數值並輸出到化簡程序 ( Reduce Operation )

映射程序的重要前提是其獨立性 ( Independent ),可以無須要理會資料中其他原素而逹至分散地完成操作。當十組員工的薪金都提取後,可以進行第二階段的化簡程序 ( Reduce Operation )

化簡程序 ( Reduce Operation )

  1. 比對每一項從映射程序回來的薪金資料並記下最大的薪金數值

以上的化簡程序其實還可以作一些平行處理優化,就是各十組的員工薪金中先平行地得出各自最大的薪金值 ( 即各自的執行單位先告自進行以上的化簡程序),然後才把十個各自最大的薪金值集中在一起進行最後的化簡程序得出最後值。

由此可見,MapReduce 可以把一些電腦經常出現的問題分散多工地執行,而得到執行上效能可伸缩性 ( Performance Scalability )。

Java Lambda Stream 的 MapReduce

Java 8 以前,如果要於 Java Platform 上應用到 MapReduce ,可能只得 Apache Hadoop 或其衍生產品才能辦得到。 但到了 Java 8 ,SDK 提供的 Java Lambda Stream reduce() 方法再配合上篇介紹的 parallelStream() 和 map() ,開發人員就可以無須其他第三方的軟件或程式庫 ( Library ) ,而實作一個小型的 MapReduce 程序。

Java Lambda Stream 可以實現 Apache Hadoop MapReduce
Java Lambda Stream 可以實現 Apache Hadoop MapReduce

現在我們來看看 Stream.reduce() 的 method signature

Optional<T> reduce(BinaryOperator<T> accumulator)

根據 JavaDoc 的說明,reduce() 是一個終結操作 ( Terminal Operation ),它要求給予一個 BinaryOperator 的 Java Lambda ,而這個 BinaryOperator Lambda 將會是化簡程序的邏輯,它會把每一個串流到它的原素跟上次 BinaryOperator 的結果合併,直至所有原素都合併完畢,最後的化簡結果將會以 Optional<T> 返回。

為甚麼是 Optional<T> 呢?因為如果串流根本沒有原素,就沒有化簡的情況可言,Optional<T> 意指不一定有數值返回。

另外的一個版本的 reduce() 方法因為有 initial value ,所以就不用返回 Optional<T>,見如下:

T reduce(T identity, BinaryOperator<T> accumulator)

舉一個我們早前應用的例子,如果現在我們想找出全公司薪金最高的員工,應用 Java Lambda Stream 的語法如下:

Optional<Staff> richStaff = staffIds.parallelStream().map(
    StaffDao::getStaffById).reduce(
        (staff1, staff2) -> { 
            staff1.getSalary() > staff2.getSalary() ? staff1 : staff2
        }
    );

如果我們現在想知道公司的 IT Architect 的總薪金支出,套用 JDK MapReduce ,其語法如下:

Double totalSalary = staffIds.parallelStream().map(
    StaffDao::getStaffById).filter(
        staff -> staff.getJobRole().equals("IT Architect")
    ).reduce(new Double(0d), Double::sum);

以上例子由於給予了原始值 ( Initial Identity ) ,所以 reduce() 可以直接返回 Double。

MapReduce 的應用例子不勝枚舉,例子很多搜尋器 (e.g. Google、Bing 等) 或社交網站 (e.g. Facebook、Twitter 等),很多時候都想知道搜尋關鍵字的頻率或大眾比較喜歡的事物 (計算有多少個讚好等),面對海量的搜尋和讚好,如果沒有 MapReduce,很難於有效時間內得出結果而作為下一個商業策略的部署。作為此章的收結,我模擬一個類此搜尋器或社交網站經常遇到的問題 - 關鍵字計數 ( Keyword Count )。

假設我們有一個很大的 String List ,存有所有曾經搜尋或讚好的關鍵字,現在我們想得知每一個關鍵字出現的頻率,以 Map<String, AtomicInteger> 的 key-value pair 顯示,例如:{ (“IT Architect”, 10), (“WordPress”, 7″), (“Blog”, “5”) }

Map<String, AtomicInteger> resultCountMap = 
    // First upper case all keywords
    wordList.parallelStream().map(String::toUpperCase).reduce(
        // Provide the initial ConcurrentHashMap for storage
        new ConcurrentHashMap<String, AtomicInteger>(),
        (countMap, keyword) -> {
            // Accumulator function to convert String to Map 
            AtomicInteger one = new AtomicInteger(1);
            AtomicInteger raceConditionValue = 
                    countMap.putIfAbsent(keyword, one);
            if (raceConditionValue != null && raceConditionValue != one) {
                raceConditionValue.incrementAndGet();
            }
            return countMap;
        }, (map1, map2) -> {
            for (Iterator it = map2.keySet().iterator();
                    it.hasNext(); ) {
                // Combiner function to merge 2 map result
                String keyword = it.next();
                AtomicInteger incrementValue = map2.get(keyword);
                AtomicInteger raceConditionValue = 
                        map1.putIfAbsent(keyword, incrementValue);
                if (raceConditionValue != null 
                        && raceConditionValue != incrementValue) {
                    raceConditionValue.addAndGet(incrementValue.get());
                }
            }
            return map1;
        });

以上的例子用上了 Stream.reduce() 的第三形態,要求給予 Initial identity、Accumulator 及 Combiner。
在映射程序首先把所有關鍵字轉成大寫,然後再交給 reduce() 做累積 ( Accumulate ) 和合併 ( Combine ) 的化簡程序。
累積程序主要把 Keyword 轉換成 Map 並進行關鍵字計數,每遇到相同的關鍵字就加一。
合併程序則把各個累積程序的結果合併,得出最後的關鍵字計數 Map。
另外得要大家注意的是,由於 Java Lambda Stream 有平行操作的考慮,大家要注意 thread-safety 的問題,所以以上例子用都上了 ConcurrentHashMap 及 AtomicInteger 來避免了 synchronization 的問題。

Collectors – 強大的 Java Lambda Stream 收集器

用指定的 Java Collection 來收集 Stream 中元素

上篇有略略介紹了 Collectors 的使用方法,教曉如何把 Java Lambda Stream 中的元素收集成 List 、Set 或 Map。但查看 JavaDoc ,我們並不能確定 Collectos.toList()、toSet()  會給予我們甚麼 List 和 Set 的 Implementation。如果我們想返回 Stack、TreeSet 、ConcurrentHashMap,又可以怎樣做呢?

答案就是使用 Collectors.toCollection() 、toConcurrentMap() 等方法,例子如下:

// Collect as Stack
Stack<Staff> stackStaff = 
  staffList.stream().collect(Collectors.toCollection(Stack::new));
// Collect as TreeSet
TreeSet<Staff> treeSetStaff = 
    staffList.stream().collect(Collectors.toCollection(TreeSet::new));
// Collect as LinkedList
LinkedList<Staff> linkedListStaff = 
    staffList.stream().collect(Collectors.toCollection(LinkedList::new));
// Collect as HashMap
HashMap<String, Staff> hashMapStaff = 
    staffList.stream().collect(Collectors.toMap(
        keyMapper, valueMapper, mergeFunc, HashMap::new));
// Collect as ConcurrentHashMap, more efficient than toMap() counterpart
ConcurrentHashMap <String, Staff> concurrentMapStaff = 
    staffList.stream().collect(
        Collectors.toConcurrentMap(keyMapper, valueMapper, 
            mergeFunc, ConcurrentHashMap::new));

分類收集 groupingBy() 和 partitioningBy()

Collectors 提供了很多十分有用的 Utility 方法來幫助我們收集 Java Lambda Stream 元素,其中不得不提的就是 groupingBy() 和 partitioningBy() 這兩個方法。

假設公司的員工有 PM 、 IT Architect、Developer 和 Tester 等不同職能,我們想把員工按職能區分,利用 Collectors.groupingBy() 可以:

/* Collect to map with job role as key,
   the resulted map key includes "PM", 
   "IT Architect", "Developer", "Tester" etc */
Map<String, List<Staff>> staffJobRoleMap = 
    staffList.stream().collect(Collectors.groupingBy(staff -> staff.getJobRole()));

以上的例子中,透過提供 GroupingBy Lambda - staff -> staff.getJobRole(), 我們就可以把 Staff List 轉換成按職能區分的 Map。

至於 Collectors.partitioningBy() 的功能就很類似,它要求一個 Predicate Lambda 去作是非判斷,最後返回一個 Map<Boolean, List<T>> 。例如我們想區分員工服務年資少於 10 年、等於或大於 10 年,利用 Collectors.partitioningBy() 可以:

/* Collect to map with True/False
   of the provided Predicate as key
*/  
Map<Boolean, List<Staff>> tenYearStaffMap = 
    staffList.stream().collect(Collectors.partitioningBy(
        staff -> staff.getYearOfService() < 10));

年資少於 10 年的員工,就會收集在 Key 是 True 的 List 中,至於等於或大於 10 年的員工,則會收集在 Key 是 False 的 List 中。

Collectors 還有其他有趣的收集方法,這裡就不一一列舉它們的使用方法,但相信如果您已經看懂我早前所有的教學,到現在您應該可以觸類旁通、舉一反三,看看 JavaDoc 的說明就應該懂得怎樣使用它們了。

結語

一連五篇的 Java Lambda 教學終於寫畢,其實在寫作的途中,自己亦反覆嘗試了不同的語法和 Java Lambda 的應用,都是邊學邊寫的。

看看 Java 8 的 JavaDoc,您會發現到處都出現 Java Lambda 的影子。其實,大部份企業應用程序的邏輯,萬變不離其宗,不外乎數據 CRUD 操作、for-loop、if-then-else 、數據轉換等,這些統統都可以用 Lambda Expression 來取代。尤其 Spring 這些用那麼多 Templates Method 的 Pattern 的框架,可以想像將來一定會把 Java Lambda 大派用場。

 

Java Lambda Stream 教學 Part 2
Tagged on:                         

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

zh_HKChinese (Hong Kong)