
本文深入探讨了如何利用java stream api对数据进行分组、计数,并高效地提取出现频率最高的n个元素。文章首先介绍了一种基于全量排序的简洁方案,随后进一步优化,提出了一种使用自定义collector结合priorityqueue进行部分排序的策略,以应对大规模数据场景下对性能的更高要求,并提供了详细的代码示例与性能分析。
在日常数据处理中,我们经常会遇到这样的需求:给定一个数据集,需要按某个字段进行分组,统计每个组的元素数量,然后找出数量最多的前N个组。例如,在一个城市列表中,找出拥有城市数量最多的前3个国家代码。Java Stream API为这类问题提供了强大而灵活的解决方案。
方案一:分组、计数与全量排序
最直观的实现方式是先将数据分组并计数,然后对结果进行排序,最后截取前N个元素。
实现步骤
-
分组与计数: 使用 Collectors.groupingBy() 结合 Collectors.counting() 来创建一个 Map
,其中键是分组字段(如国家代码),值是该组的元素数量(如城市数量)。 - 转换为Stream并排序: 将这个Map的 entrySet() 转换为一个Stream,并根据值(计数)进行降序排序。
- 截取前N个: 使用 limit(N) 方法获取排序后的前N个元素。
- 提取键: 最后,通过 map(Map.Entry::getKey) 提取出我们需要的国家代码。
示例代码
假设我们有一个 City 实体类:
public class City {
private int id;
private String name;
private String countryCode;
// 构造函数, getters, setters
public City(int id, String name, String countryCode) {
this.id = id;
this.name = name;
this.countryCode = countryCode;
}
public String getCountryCode() {
return countryCode;
}
// ... 其他方法
}获取拥有城市数量最多的前N个国家代码的实现:
立即学习“Java免费学习笔记(深入)”;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class CityAnalyzer {
public static List getTopNCodes(List cities, int limit) {
return cities.stream()
.collect(Collectors.groupingBy( // 1. 分组并计数
City::getCountryCode,
Collectors.counting()
)) // 结果为 Map,例如 { "DE": 3, "FR": 1, "DK": 1 }
.entrySet().stream() // 2. 转换为 Stream>
.sorted(Map.Entry.comparingByValue().reversed()) // 3. 按值降序排序
.limit(limit) // 4. 截取前N个
.map(Map.Entry::getKey) // 5. 提取键 (国家代码)
.toList(); // 收集为List
}
public static void main(String[] args) {
List cities = List.of(
new City(1, "Berlin", "DE"),
new City(2, "Munich", "DE"),
new City(3, "Köln", "DE"),
new City(4, "Paris", "FR"),
new City(5, "Copenhagen", "DK"),
new City(6, "Hamburg", "DE"),
new City(7, "Lyon", "FR")
);
List top3CountryCodes = getTopNCodes(cities, 3);
System.out.println("Top 3 country codes by city count: " + top3CountryCodes);
// 预期输出: [DE, FR, DK] (或 [DE, DK, FR] 取决于FR和DK的相对顺序,但DE肯定排第一)
}
} 注意事项与性能分析
- 优点: 代码简洁易懂,适用于大多数场景。
- 缺点: 这种方法的时间复杂度为 O(M log M),其中 M 是分组后的唯一键的数量(即 Map 的大小)。如果 M 非常大,而我们只需要非常小的 N(例如 N=3),那么对整个 M 个元素进行排序会造成不必要的开销。对于海量数据且 N 很小的情况,性能可能成为瓶颈。
方案二:使用自定义Collector结合PriorityQueue进行部分排序
为了优化性能,我们可以避免对所有分组结果进行完全排序,转而使用数据结构 PriorityQueue(优先级队列)来实现部分排序。PriorityQueue 默认是一个最小堆,我们可以利用它来维护一个大小为 N 的堆,其中包含当前遇到的 N 个最大元素。
实现思路
-
分组与计数: 这一步与方案一相同,生成 Map
。 - 自定义Collector: 创建一个自定义的 Collector,其可变容器是一个 PriorityQueue。
-
PriorityQueue维护Top N:
- PriorityQueue 被配置为最小堆,用于存储 Map.Entry
。这意味着堆顶元素始终是当前堆中计数值最小的那个。 - 当处理每个 Map.Entry 时:
- 如果 PriorityQueue 的大小尚未达到 N,直接将当前 Entry 加入队列。
- 如果 PriorityQueue 的大小已达到 N,则将当前 Entry 的计数值与堆顶元素(即当前堆中计数值最小的元素)进行比较。如果当前 Entry 的计数值更大,则移除堆顶元素,并将当前 Entry 加入队列。这样,队列始终维护着计数值最大的 N 个元素。
- PriorityQueue 被配置为最小堆,用于存储 Map.Entry
- 结果提取: Collector 的 finisher 阶段将 PriorityQueue 中的元素取出,并根据计数值进行降序排序,然后提取键。
泛型化实现
为了提高代码复用性,我们可以将这个逻辑泛型化,使其适用于任何类型的数据和任何提取键的函数。
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
public class CityAnalyzerOptimized {
/**
* 获取列表中按某个键分组后,计数最多的前N个键。
*
* @param list 原始数据列表
* @param keyExtractor 提取分组键的函数
* @param limit 需要获取的前N个元素数量
* @param 列表元素类型
* @param 分组键类型
* @return 计数最多的前N个键的列表
*/
public static List getTopN(List list,
Function keyExtractor,
int limit) {
if (list == null || list.isEmpty() || limit <= 0) {
return List.of();
}
// 1. 分组并计数,结果为 Map
Map countedMap = list.stream()
.collect(Collectors.groupingBy(
keyExtractor,
Collectors.counting()
));
// 2. 使用自定义Collector进行部分排序
return countedMap.entrySet().stream()
.collect(getMaxNCollector(
limit,
Map.Entry.comparingByValue(), // PriorityQueue作为最小堆,基于值进行比较
Map.Entry::getKey
));
}
/**
* 创建一个自定义Collector,用于从Stream中找出最大的N个元素。
*
* @param size 要保留的元素数量N
* @param comparatorForMinHeap 用于PriorityQueue的比较器,应使PriorityQueue成为最小堆(例如,按值升序)
* @param keyExtractor 从Map.Entry中提取最终结果键的函数
* @param Stream中元素的类型 (Map.Entry)
* @param 最终结果列表元素的类型 (K)
* @return 维护Top N元素的Collector
*/
public static Collector> getMaxNCollector(int size,
Comparator comparatorForMinHeap,
Function keyExtractor) {
return Collector.of(
() -> new PriorityQueue<>(size, comparatorForMinHeap), // Supplier: 创建一个指定大小和比较器的最小堆
(Queue queue, T next) -> { // Accumulator: 处理每个元素
if (queue.size() < size) {
queue.add(next); // 队列未满,直接添加
} else {
// 队列已满,与堆顶元素(当前最小的Top N元素)比较
if (comparatorForMinHeap.compare(queue.peek(), next) < 0) { // 如果新元素比堆顶元素大
queue.poll(); // 移除堆顶元素
queue.add(next); // 添加新元素
}
}
},
(Queue left, Queue right) -> { // Combiner: 合并两个部分结果(两个PriorityQueue)
// 将右侧队列的元素逐个添加到左侧队列,并保持Top N的逻辑
right.forEach(next -> {
if










