这篇将会是 Java Lambda 教学系列的最后一篇了,将会继续讨论有关 Java Lambda Stream 的进阶应用,例如 MapReduce 及强大的 Collectors 方法,希望大家将来可以应用于自已的程序上。
MapReduce – 处理大数据的模型
不知大家有没有听过 MapReduce 这个名字?其实 MapReduce 是 Google 所开发专门处理大数据 ( BigData ) 的程序设计模型 ( Programming Model )。基本概念是把要处理的事情主要分成两个阶段操作,分别是 Map (映射) 及 Reduce (化简),其主要分工如下:
Map (映射) – 为资料中每一个元素进行独立操作 ( Independent Operation ),例如过滤 ( Filter )、提取 ( Extract )、转换 ( Convert )等。由于映射操作讲求独立性,可高度分散平行操作 ( Distributed Parallel Operation ),这个概念跟上篇提及的 Stream.map() 方法的理念一致。
Reduce (化简) – 当所有元素都经过 Map 操作并得出过渡性结果 (transient results) 后,就可以进入化简程序。在化简程序中,众多的过渡性结果就会演算出最后的结果。化简程序 ( Reduce ) 通常比映射程序 ( Map ) 有较少的分散平行操作的可行性,但由于化简程序一般的演算化都比较简单,都可以进行一定程度的平行操作。
举一个简单例子说明,如果我们想知道公司内员工最高的薪金,传统的演算法 ( Algorithm ) 如下:
- 先拿取其中一名员工的资料
- 提取其薪金数值,并设定为当前最大薪金
- 然后拿取下一名员工的资料
- 提取其薪金数值
- 如果薪金比当前薪金为大,就设定为当前最大薪金
- 重复步骤 3 – 5 ,直至所有员工的资料都遍历完毕
以上的演算法有一个缺点,就是不能分散平行地执行,只能单线性地执行。如果员工数量很多,就会成为效能瓶颈。
那 MapReduce 又如何解决这个效能瓶颈,让这项工作得以 scalable 开去平行处理呢?如果公司有 10,000 名员工,我们可先把他们分成十组,每组各 1,000 名员工,然后把他们的资料分发到十个不同的执行单位 ( Execution Unit ),去进行以下的映射程序 ( Map Operation )
映射程序 ( Map Operation )
- 拿取每一名员工的资料
- 提取其薪金数值并输出到化简程序 ( Reduce Operation )
映射程序的重要前提是其独立性 ( Independent ),可以无须要理会资料中其他原素而逹至分散地完成操作。当十组员工的薪金都提取后,可以进行第二阶段的化简程序 ( Reduce Operation )
化简程序 ( Reduce Operation )
- 比对每一项从映射程序回来的薪金资料并记下最大的薪金数值
以上的化简程序其实还可以作一些平行处理优化,就是各十组的员工薪金中先平行地得出各自最大的薪金值 ( 即各自的执行单位先告自进行以上的化简程序),然后才把十个各自最大的薪金值集中在一起进行最后的化简程序得出最后值。
由此可见,MapReduce 可以把一些电脑经常出现的问题分散多工地执行,而得到执行上效能可伸缩性 ( Performance Scalability )。
Java Lambda Stream 的 MapReduce
Java 8 以前,如果要于 Java Platform 上应用到 MapReduce ,可能只得 Apache Hadoop 或其衍生产品才能办得到。 但到了 Java 8 ,SDK 提供的 Java Lambda Stream reduce() 方法再配合上篇介绍的 parallelStream() 和 map() ,开发人员就可以无须其他第三方的软件或程式库 ( Library ) ,而实作一个小型的 MapReduce 程序。

现在我们来看看 Stream.reduce() 的 method signature
Optional<T> reduce(BinaryOperator<T> accumulator)
根据 JavaDoc 的说明,reduce() 是一个终结操作 ( Terminal Operation ),它要求给予一个 BinaryOperator 的 Java Lambda ,而这个 BinaryOperator Lambda 将会是化简程序的逻辑,它会把每一个串流到它的原素跟上次 BinaryOperator 的结果合并,直至所有原素都合并完毕,最后的化简结果将会以 Optional<T> 返回。
为甚么是 Optional<T> 呢?因为如果串流根本没有原素,就没有化简的情况可言,Optional<T> 意指不一定有数值返回。
另外的一个版本的 reduce() 方法因为有 initial value ,所以就不用返回 Optional<T>,见如下:
T reduce(T identity, BinaryOperator<T> accumulator)
举一个我们早前应用的例子,如果现在我们想找出全公司薪金最高的员工,应用 Java Lambda Stream 的语法如下:
Optional<Staff> richStaff = staffIds.parallelStream().map( StaffDao::getStaffById).reduce( (staff1, staff2) -> { staff1.getSalary() > staff2.getSalary() ? staff1 : staff2 } );
如果我们现在想知道公司的 IT Architect 的总薪金支出,套用 JDK MapReduce ,其语法如下:
Double totalSalary = staffIds.parallelStream().map( StaffDao::getStaffById).filter( staff -> staff.getJobRole().equals("IT Architect") ).reduce(new Double(0d), Double::sum);
以上例子由于给予了原始值 ( Initial Identity ) ,所以 reduce() 可以直接返回 Double。
MapReduce 的应用例子不胜枚举,例子很多搜寻器 (e.g. Google、Bing 等) 或社交网站 (e.g. Facebook、Twitter 等),很多时候都想知道搜寻关键字的频率或大众比较喜欢的事物 (计算有多少个赞好等),面对海量的搜寻和赞好,如果没有 MapReduce,很难于有效时间内得出结果而作为下一个商业策略的部署。作为此章的收结,我模拟一个类此搜寻器或社交网站经常遇到的问题 - 关键字计数 ( Keyword Count )。
假设我们有一个很大的 String List ,存有所有曾经搜寻或赞好的关键字,现在我们想得知每一个关键字出现的频率,以 Map<String, AtomicInteger> 的 key-value pair 显示,例如:{ (“IT Architect”, 10), (“WordPress”, 7″), (“Blog”, “5”) }
Map<String, AtomicInteger> resultCountMap = // First upper case all keywords wordList.parallelStream().map(String::toUpperCase).reduce( // Provide the initial ConcurrentHashMap for storage new ConcurrentHashMap<String, AtomicInteger>(), (countMap, keyword) -> { // Accumulator function to convert String to Map AtomicInteger one = new AtomicInteger(1); AtomicInteger raceConditionValue = countMap.putIfAbsent(keyword, one); if (raceConditionValue != null && raceConditionValue != one) { raceConditionValue.incrementAndGet(); } return countMap; }, (map1, map2) -> { for (Iterator it = map2.keySet().iterator(); it.hasNext(); ) { // Combiner function to merge 2 map result String keyword = it.next(); AtomicInteger incrementValue = map2.get(keyword); AtomicInteger raceConditionValue = map1.putIfAbsent(keyword, incrementValue); if (raceConditionValue != null && raceConditionValue != incrementValue) { raceConditionValue.addAndGet(incrementValue.get()); } } return map1; });
以上的例子用上了 Stream.reduce() 的第三形态,要求给予 Initial identity、Accumulator 及 Combiner。
在映射程序首先把所有关键字转成大写,然后再交给 reduce() 做累积 ( Accumulate ) 和合并 ( Combine ) 的化简程序。
累积程序主要把 Keyword 转换成 Map 并进行关键字计数,每遇到相同的关键字就加一。
合并程序则把各个累积程序的结果合并,得出最后的关键字计数 Map。
另外得要大家注意的是,由于 Java Lambda Stream 有平行操作的考虑,大家要注意 thread-safety 的问题,所以以上例子用都上了 ConcurrentHashMap 及 AtomicInteger 来避免了 synchronization 的问题。
Collectors – 强大的 Java Lambda Stream 收集器
用指定的 Java Collection 来收集 Stream 中元素
上篇有略略介绍了 Collectors 的使用方法,教晓如何把 Java Lambda Stream 中的元素收集成 List 、Set 或 Map。但查看 JavaDoc ,我们并不能确定 Collectos.toList()、toSet() 会给予我们甚么 List 和 Set 的 Implementation。如果我们想返回 Stack、TreeSet 、ConcurrentHashMap,又可以怎样做呢?
答案就是使用 Collectors.toCollection() 、toConcurrentMap() 等方法,例子如下:
// Collect as Stack Stack<Staff> stackStaff = staffList.stream().collect(Collectors.toCollection(Stack::new)); // Collect as TreeSet TreeSet<Staff> treeSetStaff = staffList.stream().collect(Collectors.toCollection(TreeSet::new)); // Collect as LinkedList LinkedList<Staff> linkedListStaff = staffList.stream().collect(Collectors.toCollection(LinkedList::new)); // Collect as HashMap HashMap<String, Staff> hashMapStaff = staffList.stream().collect(Collectors.toMap( keyMapper, valueMapper, mergeFunc, HashMap::new)); // Collect as ConcurrentHashMap, more efficient than toMap() counterpart ConcurrentHashMap <String, Staff> concurrentMapStaff = staffList.stream().collect( Collectors.toConcurrentMap(keyMapper, valueMapper, mergeFunc, ConcurrentHashMap::new));
分类收集 groupingBy() 和 partitioningBy()
Collectors 提供了很多十分有用的 Utility 方法来帮助我们收集 Java Lambda Stream 元素,其中不得不提的就是 groupingBy() 和 partitioningBy() 这两个方法。
假设公司的员工有 PM 、 IT Architect、Developer 和 Tester 等不同职能,我们想把员工按职能区分,利用 Collectors.groupingBy() 可以:
/* Collect to map with job role as key, the resulted map key includes "PM", "IT Architect", "Developer", "Tester" etc */ Map<String, List<Staff>> staffJobRoleMap = staffList.stream().collect(Collectors.groupingBy(staff -> staff.getJobRole()));
以上的例子中,透过提供 GroupingBy Lambda - staff -> staff.getJobRole(), 我们就可以把 Staff List 转换成按职能区分的 Map。
至于 Collectors.partitioningBy() 的功能就很类似,它要求一个 Predicate Lambda 去作是非判断,最后返回一个 Map<Boolean, List<T>> 。例如我们想区分员工服务年资少于 10 年、等于或大于 10 年,利用 Collectors.partitioningBy() 可以:
/* Collect to map with True/False of the provided Predicate as key */ Map<Boolean, List<Staff>> tenYearStaffMap = staffList.stream().collect(Collectors.partitioningBy( staff -> staff.getYearOfService() < 10));
年资少于 10 年的员工,就会收集在 Key 是 True 的 List 中,至于等于或大于 10 年的员工,则会收集在 Key 是 False 的 List 中。
Collectors 还有其他有趣的收集方法,这里就不一一列举它们的使用方法,但相信如果您已经看懂我早前所有的教学,到现在您应该可以触类旁通、举一反三,看看 JavaDoc 的说明就应该懂得怎样使用它们了。
结语
一连五篇的 Java Lambda 教学终于写毕,其实在写作的途中,自己亦反复尝试了不同的语法和 Java Lambda 的应用,都是边学边写的。
看看 Java 8 的 JavaDoc,您会发现到处都出现 Java Lambda 的影子。其实,大部份企业应用程序的逻辑,万变不离其宗,不外乎数据 CRUD 操作、for-loop、if-then-else 、数据转换等,这些统统都可以用 Lambda Expression 来取代。尤其 Spring 这些用那么多 Templates Method 的 Pattern 的框架,可以想像将来一定会把 Java Lambda 大派用场。