Java Stream

1. 概念

流是Java8 API添加的一个新的抽象,称为流Stream,以一种声明性方式处理数据集合(侧重对于源数据计算能力的封装,并且支持序列与并行两种操作方式)

Stream流是从支持数据处理操作的源生成的元素序列,源可以是数组、文件、集合、函数。流不是集合元素,它不是数据结构并不保存数据,它的主要目的在于计算

Stream流是对集合(Collection)对象功能的增强,与Lambda表达式结合,可以提高编程效率、间接性和程序可读性。

2. 创建流

Stream 通常从一个数据源(如集合、数组、文件等)创建。你可以使用集合类的 stream() 方法或数组的 Arrays.stream() 方法来获得 Stream。

  • 从集合获取流

    List获取流:

    List<String> list = Arrays.asList("apple", "banana", "cherry");
    Stream<String> stream = list.stream();
    

    Set获取流:

    Set<Integer> set = new HashSet<>(Arrays.asList(1, 2, 3, 4, 5));
    Stream<Integer> stream = set.stream();
    

    Map获取键的流或值的流:

    Map<String, Integer> map = new HashMap<>();
    Stream<String> keyStream = map.keySet().stream();
    Stream<Integer> valueStream = map.values().stream();
    
  • 从数组获取流

    int[] array = {1, 2, 3, 4, 5};
    IntStream stream = Arrays.stream(array);
    
  • 从文件获取流

    Path path = Paths.get("file.txt");
    Stream<String> lines = Files.lines(path);
    
  • 通过静态工厂方法创建流

    使用Stream.of()创建流:

    Stream<String> stream = Stream.of("apple", "banana", "cherry");
    

    使用IntStream.range()创建范围流:

    IntStream rangeStream = IntStream.range(1, 6); // 1, 2, 3, 4, 5
    
  • 通过生成方法创建流

    使用Stream.generate()创建无限流:

    Stream<Double> randomStream = Stream.generate(Math::random);
    

    使用Stream.iterate()创建无限流:

    Stream<Integer> infiniteStream = Stream.iterate(1, n -> n + 1);
    

3. 中间操作

在Java Stream中,中间操作是一类操作,用于构建流的处理链,但不会立即执行实际的操作。它们允许你对流中的元素进行转换、筛选、排序等处理。中间操作通常用于创建一个新的流,以供后续操作使用。

这类操作都是惰性化的,仅仅调用到这类方法,并没有真正开始流的遍历,真正的遍历需等到终端操作时

以下是一些常见的Java Stream中间操作:

流方法含义示例
filter(Predicate<T> predicate)根据给定的条件筛选流中的元素,只保留满足条件的元素。Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5); Stream<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0); // 筛选偶数
map(Function<T, R> mapper)将流中的元素映射到另一种类型。可以用于转换元素的类型或提取元素的某个属性。Stream<String> words = Stream.of("apple", "banana", "cherry");
Stream<Integer> wordLengths = words.map(String::length); // 获取字符串长度
flatMap(Function<T, Stream<R>> mapper)将流中的每个元素映射为一个流,然后将这些流合并为一个新的流。嵌套list展平Stream<String> lines = Stream.of("hello world", "how are you");
Stream<String> words = lines.flatMap(line -> Stream.of(line.split(" ")));
sorted()对流中的元素进行排序。Stream<Integer> numbers = Stream.of(3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5);
Stream<Integer> sortedNumbers = numbers.sorted();
limit(long maxSize)截断流,使其最多包含指定数量的元素。Stream<String> colors = Stream.of("red", "green", "blue", "yellow", "purple");
Stream<String> limitedColors = colors.limit(3); // 限制为前3个元素
skip(long n)跳过流中的前N个元素,获取后面的元素。Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
Stream<Integer> skippedNumbers = numbers.skip(2); // 跳过前2个元素
distinct()去重操作,消除流中的重复元素。Stream<String> colors = Stream.of("red", "green", "blue", "red", "green");
Stream<String> uniqueColors = colors.distinct();
peek(Consumer<T> action)用于在处理元素时执行某个操作,通常用于调试或日志记录。Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
Stream<Integer> peekedNumbers = numbers.peek(n -> System.out.println("Processing: " + n));

4. 终端操作

在Java Stream中,终端操作是用于触发流处理的操作,当调用终端操作时,流的处理链将开始执行,并产生最终结果。终端操作通常是流处理的最后一步,它们可以返回一个值、收集元素到集合中、迭代元素或输出结果等。

以下是一些常见的Java Stream终端操作

流方法含义示例
collect收集器,将流转换为其他形式List strings = Arrays.asList(“cv”, “abd”, “aba”, “efg”, “abcd”,“jkl”, “jkl”);
Set set = strings.stream().collect(Collectors.toSet());
List list = strings.stream().collect(Collectors.toList());
Map<String, String> map = strings.stream().collect(Collectors.toMap(v ->v.concat("_name"), v1 -> v1, (v1, v2) -> v1));
forEach遍历流List strings = Arrays.asList(“cv”, “abd”, “aba”, “efg”, “abcd”,“jkl”, “jkl”);
strings.stream().forEach(s -> out.println(s));
findFirst返回第一个元素List strings = Arrays.asList(“cv”, “abd”, “aba”, “efg”, “abcd”,“jkl”, “jkl”);
Optional first = strings.stream().findFirst();
findAny将返回当前流中的任意元素List strings = Arrays.asList(“cv”, “abd”, “aba”, “efg”, “abcd”,“jkl”, “jkl”);
Optional any = strings.stream().findAny();
count返回流中元素总数List strings = Arrays.asList(“cv”, “abd”, “aba”, “efg”, “abcd”,“jkl”, “jkl”);
long count = strings.stream().count();
sum求和int sum = userList.stream().mapToInt(User::getId).sum();
max最大值int max = userList.stream().max(Comparator.comparingInt(User::getId)).get().getId();
min最小值int min = userList.stream().min(Comparator.comparingInt(User::getId)).get().getId();
anyMatch检查是否至少匹配一个元素,返回booleanList strings = Arrays.asList(“abc”, “abd”, “aba”, “efg”, “abcd”,“jkl”, “jkl”);
boolean b = strings.stream().anyMatch(s -> s == “abc”);
allMatch检查是否匹配所有元素,返回booleanList strings = Arrays.asList(“abc”, “abd”, “aba”, “efg”, “abcd”,“jkl”, “jkl”);
boolean b = strings.stream().allMatch(s -> s == “abc”);
noneMatch检查是否没有匹配所有元素,返回booleanList strings = Arrays.asList(“abc”, “abd”, “aba”, “efg”, “abcd”,“jkl”, “jkl”);
boolean b = strings.stream().noneMatch(s -> s == “abc”);
reduce可以将流中元素反复结合起来,得到一个值List strings = Arrays.asList(“cv”, “abd”, “aba”, “efg”, “abcd”,“jkl”, “jkl”);
Optional reduce = strings.stream().reduce((acc,item) -> {return acc+item;});
if(reduce.isPresent())out.println(reduce.get());

5. Collect收集

在Java Stream中,Collect是一个终端操作,用于将流中的元素收集到集合或其他数据结构中。它允许你按照自定义规则将流的元素聚合成一个集合或映射为一个新的数据结构。 Java提供了Collectors类,其中包含了各种用于收集元素的静态工厂方法。

以下是一些常见的Java Stream Collect操作:

  1. Collectors.toList():将流中的元素收集到一个List集合中。
  2. Collectors.toSet():将流中的元素收集到一个Set集合中,去重重复元素。
  3. Collectors.toMap(keyMapper, valueMapper):将流中的元素按照给定的键和值映射规则收集到Map中。
  4. Collectors.joining():将流中的字符串元素连接成一个单一的字符串,可以指定分隔符。
  5. Collectors.toCollection(collectionFactory):将元素收集到指定类型的集合中,你可以提供集合工厂函数。
  6. Collectors.toConcurrentMap(keyMapper, valueMapper):将流中的元素收集到并发ConcurrentMap中。
  7. Collectors.groupingBy(classifier):将元素按照给定的分类器函数进行分组,并返回一个Map,其中键是分组的标准,值是属于每个组的元素列表。
  8. Collectors.groupingBy(classifier, downstream):类似于groupingBy,但可以指定一个下游Collector来进一步处理分组中的元素。
  9. Collectors.partitioningBy(predicate):根据给定的条件将元素分成两个组(true和false),返回一个Map
  10. Collectors.toUnmodifiableList():将流中的元素收集到一个不可修改的List中。
  11. Collectors.toUnmodifiableSet():将流中的元素收集到一个不可修改的Set中。
  12. Collectors.collectingAndThen(downstream, finisher):将收集操作的结果应用于另一个Finisher函数,以获得最终结果。
  13. Collectors.summingInt(ToIntFunction<T> mapper):将流中元素的整数属性求和。
  14. Collectors.summingLong(ToLongFunction<T> mapper):将流中元素的长整数属性求和。
  15. Collectors.summingDouble(ToDoubleFunction<T> mapper):将流中元素的双精度浮点数属性求和。
  16. Collectors.averagingInt(ToIntFunction<T> mapper):计算流中元素的整数属性的平均值。
  17. Collectors.averagingLong(ToLongFunction<T> mapper):计算流中元素的长整数属性的平均值。
  18. Collectors.averagingDouble(ToDoubleFunction<T> mapper):计算流中元素的双精度浮点数属性的平均值。
  19. Collectors.counting():计算流中元素的数量。
  20. Collectors.minBy(comparator):查找流中的最小元素,可以指定自定义比较器。
  21. Collectors.maxBy(comparator):查找流中的最大元素,可以指定自定义比较器。
  22. Collectors.collectingAndThen(downstream, finisher):将收集操作的结果应用于另一个Finisher函数,以获得最终结果。
  23. Collectors.mapping(mapper, downstream):将元素映射为不同类型,然后使用另一个Collector收集结果。
  24. Collectors.reducing(identity, op):对流中的元素进行归约操作,返回一个结果。
  25. Collectors.reducing(identity, mapper, op):对流中的元素进行归约操作,可以指定映射函数和累加器函数。

6. 处理Optional

java.util.Optional 是 Java 8 引入的类,用于处理可能包含或不包含值的情况,以防止空指针异常。它是一种容器,可以包含或不包含非空值。Optional 类提供了一种优雅的方式来处理可能为 null 的情况,并强制开发者更仔细地处理这些情况。以下是关于 Optional 的详细解释和用法:

创建 Optional 对象

你可以使用 Optional 类的工厂方法来创建 Optional 对象:

// 创建包含非空值的 Optional
Optional<String> optionalWithValue = Optional.of("Hello, World!");

// 创建一个空的 Optional
Optional<String> emptyOptional = Optional.empty();

// 创建一个可以包含或不包含值的 Optional
Optional<String> optional = Optional.ofNullable(getValueFromSomeSource());

访问 Optional 中的值

  • get():获取 Optional 中的值,如果值为 null 则抛出 NoSuchElementException 异常。
Optional<String> optional = Optional.of("Hello, World!");
String value = optional.get(); // 获取值
  • orElse(T other):获取 Optional 中的值,如果值为 null,则返回指定默认值。
Optional<String> optional = Optional.empty();
String value = optional.orElse("Default Value"); // 如果值为 null,返回 "Default Value"
  • orElseGet(Supplier<? extends T> other):获取 Optional 中的值,如果值为 null,则使用提供的 Supplier 来生成默认值。
Optional<String> optional = Optional.empty();
String value = optional.orElseGet(() -> generateDefaultValue()); // 使用 Supplier 生成默认值
  • orElseThrow(Supplier<? extends X> exceptionSupplier):获取 Optional 中的值,如果值为 null,则抛出指定异常。
Optional<String> optional = Optional.empty();
String value = optional.orElseThrow(() -> new IllegalStateException("Value is not present"));

检查是否存在值

  • isPresent():检查 Optional 中是否存在值。
Optional<String> optional = Optional.of("Hello, World!");
boolean isPresent = optional.isPresent(); // 返回 true
  • ifPresent(Consumer<? super T> consumer):如果 Optional 中存在值,执行指定的操作。
Optional<String> optional = Optional.of("Hello, World!");
optional.ifPresent(value -> System.out.println("Value is present: " + value));

映射和过滤值

  • map(Function<? super T, ? extends U> mapper):对 Optional 中的值应用映射函数,并返回一个新的 Optional 包含映射的结果。
Optional<String> optional = Optional.of("Hello, World!");
Optional<Integer> lengthOptional = optional.map(String::length); // 包含字符串长度
  • filter(Predicate<? super T> predicate):根据指定的条件过滤 Optional 中的值,如果值满足条件,返回原始 Optional,否则返回空的 Optional
Optional<String> optional = Optional.of("Hello, World!");
Optional<String> filteredOptional = optional.filter(s -> s.startsWith("Hello")); // 包含值

注意事项

  • 不要滥用 Optional,它不适用于所有情况。通常,Optional 适用于方法返回值,以表示可能的空值情况,或者在方法参数中表示是否接受 null 值。
  • 不要将 Optional 用于集合。Java集合已经有了空值处理机制,如 null 值表示集合为空。
  • Optional 视为一种工具,而不是在代码中的常规数据结构。避免在字段中使用 Optional,而应该在方法中使用它来表达可能的缺失值情况。
  • 注意 Optional 的性能开销,如果值存在,它会在内部进行额外的包装,因此不应该用于性能敏感的部分。

Optional 是一种有用的工具,可以帮助你更安全地处理可能的空值情况,但要谨慎使用,只在合适的情况下使用。

7. 理解归约操作的概念

Stream.reduce() 是 Java Stream API 中用于执行归约操作的方法。归约是将流中的元素通过某种操作(通常是二元操作)进行合并,最终得到一个结果的过程。reduce() 方法可以用来计算总和、最大/最小值、连接字符串等各种归约操作。

reduce() 方法有两个重载的版本:

  1. reduce(BinaryOperator<T> accumulator):这个版本的 reduce() 接受一个二元操作符,它将流中的元素依次合并起来,最终返回一个包含归约结果的 Optional 对象。如果流为空,Optional 将包含空值。

    Optional<Integer> sum = Stream.of(1, 2, 3, 4, 5)
            .reduce((x, y) -> x + y);
    int result = sum.orElse(0); // 如果流为空,返回 0
    
  2. reduce(T identity, BinaryOperator<T> accumulator):这个版本的 reduce() 接受一个初始值 identity,以及一个二元操作符,它将初始值和流中的元素依次合并起来,最终返回一个结果。

    int sum = Stream.of(1, 2, 3, 4, 5)
            .reduce(0, (x, y) -> x + y);
    

下面是一些示例,演示如何使用 reduce() 方法:

// 计算集合中的元素总和
int sum = Stream.of(1, 2, 3, 4, 5)
        .reduce(0, (x, y) -> x + y); // 结果为 15

// 查找最大值
int max = Stream.of(3, 6, 1, 9, 4)
        .reduce(Integer.MIN_VALUE, (x, y) -> Integer.max(x, y)); // 结果为 9

// 连接字符串
String concat = Stream.of("Hello", " ", "World")
        .reduce("", (x, y) -> x + y); // 结果为 "Hello World"

要注意的是,reduce() 方法通常在终端操作中使用,因为它会消费流中的元素。归约操作是一个强大的工具,用于在处理数据时执行各种操作,从求和到查找最大/最小值等等

8. 处理基本类型值的流

ava Stream API 提供了用于处理基本类型值的流的特化版本,以避免装箱拆箱操作,提高性能。这些特化版本主要包括 IntStreamLongStreamDoubleStream,分别用于处理基本数据类型 intlongdouble 的流。

下面是如何创建和使用这些特化流的示例:

创建特化流

// 创建一个 IntStream
IntStream intStream = IntStream.of(1, 2, 3, 4, 5);

// 创建一个 LongStream
LongStream longStream = LongStream.of(100L, 200L, 300L);

// 创建一个 DoubleStream
DoubleStream doubleStream = DoubleStream.of(1.1, 2.2, 3.3, 4.4);

常见操作

与普通的 Stream 类似,特化流也支持各种中间操作和终端操作。以下是一些示例:

// 中间操作:过滤、映射
IntStream.of(1, 2, 3, 4, 5)
    .filter(x -> x % 2 == 0)
    .map(x -> x * 2)
    .forEach(System.out::println);

// 终端操作:计算总和、最大值
int sum = IntStream.of(1, 2, 3, 4, 5).sum();
int max = IntStream.of(1, 2, 3, 4, 5).max().orElse(0);

使用特化流处理基本类型数组

你还可以将基本类型数组转换为相应类型的特化流进行处理。例如:

int[] intArray = {1, 2, 3, 4, 5};
int sum = Arrays.stream(intArray).sum();

这些特化流非常有用,因为它们消除了装箱和拆箱操作,提高了性能,特别是在处理大量基本类型数据时。特化流使得在处理基本类型值时,可以使用流操作的便利性和功能性。

9. 使用并行流加快流操作

Java Stream API 提供了并行流(Parallel Stream)来加快流操作,特别是在多核处理器上并行执行操作时。并行流允许将集合分成多个部分,并同时处理这些部分,以提高处理速度。在这里,我将详细介绍如何使用并行流以及一些注意事项。

使用并行流的基本步骤

  1. 创建流:首先,你需要创建一个普通的流,例如 StreamIntStreamLongStreamDoubleStream
  2. 调用 parallel() 方法:使用流的 parallel() 方法将其转换为并行流。这会将底层数据分成多个部分,以便并行处理。
  3. 执行流操作:在并行流上执行操作,就像在普通流上一样。流操作可以包括 filtermapreduce 等。
  4. 终端操作:最后,执行一个终端操作(如 forEachcollectsum 等)来触发实际的计算。在这个步骤中,流操作将并行执行。

示例

以下是一个示例,演示如何使用并行流来计算一组数字的总和:

int sum = IntStream.rangeClosed(1, 1000000)
    .parallel() // 将流转换为并行流
    .sum();      // 终端操作,计算总和
System.out.println("Sum: " + sum);

注意事项

  1. 性能提升:使用并行流可以显著提高处理大数据集的性能,特别是在多核处理器上。然而,在小数据集上使用并行流可能不会带来性能提升,甚至会导致性能下降。
  2. 线程安全:并行流是线程安全的,因为它使用并行处理来处理数据,但要确保你的流操作是无状态的,不依赖于共享状态。避免在并行流中修改共享变量。
  3. 注意副作用:在并行流操作中,如果有副作用(side effect),它们可能会出现不确定性行为,因此要小心使用。
  4. 调试:调试并行流操作可能更加复杂,因为多个线程在同时执行操作。可以使用调试工具和技术来排查问题。
  5. 选择合适的场景:并行流适用于处理大数据集,但在小数据集或简单计算中,使用普通流可能更为简单和有效。

总之,Java 并行流是一种用于提高流操作性能的有用工具,特别适合处理大数据集。然而,它需要小心处理,以确保线程安全和正确性。选择合适的场景和适当的数据量来使用并行流,以获得最佳性能。