JAVA和Nginx 教程大全

网站首页 > 精选教程 正文

应用Stream API与并行流处理大数据量集合操作

wys521 2024-12-23 11:19:11 精选教程 29 ℃ 0 评论

一、引言

在大数据时代,处理大规模数据集合成为了开发者面临的重要挑战。Java 8 引入的 Stream API 和并行流为我们提供了高效处理大数据量集合的工具。Stream API 允许开发者以声明式的方式处理数据集合,只需指定做什么,而不需要关心如何做。这种方式使得代码更加简洁、易于理解。

并行流则是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。Java 8 中将并行进行了优化,我们可以很容易地对数据进行并行操作。Stream API 可以声明性地通过 parallel() 与 sequential() 在并行流与顺序流之间进行切换。

例如,在处理大规模数据集合时,我们可以使用并行流来提高处理效率。以计算从 0 到一个较大数值的累计和为例,普通的累加和可能会对 CPU 的利用率不高,而采用并行流计算则可以将数据分成多个小块,分配到多个线程中进行处理,从而提高程序的执行效率。

Java 8 中的并行流实例展示了其在实际应用中的强大之处。例如,我们可以使用类似 LongStream.rangeClosed(0, 10000000L).parallel().reduce(0, Long::sum) 的代码来使用 Java 8 中的并行流处理数据。在 Java 8 中,我们还可以优雅地切换并行流和串行流,只需要使用 parallel() 和 sequential() 方法即可。

Fork/Join 框架是 Java 8 中并行流的重要组成部分。它采用 “工作窃取” 模式,当执行新的任务时,它可以将其拆分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。相对于一般的线程池实现,Fork/Join 框架的优势体现在对其中包含的任务的处理方式上。在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在 Fork/Join 框架的实现中,如果某个子任务由于等待另外一个子任务的完成而无法继续运行,那么处理该子问题的线程会主动寻找其他尚未运行的子任务来执行。这种方式减少了线程的等待时间,提高了程序的性能。

我们可以通过手动编写一个使用 Fork/Join 框架实现累加和的示例程序来更好地理解 Fork/Join 框架。例如:

package io.binghe.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;


@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;

    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;

        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1,
                    end);
            // 执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            // 合并子任务
            sum = leftResult + rightResult;
        }

        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

二、Stream API 与并行流的概念

1. Stream API 简介

Stream API 是 Java 8 中用于处理集合数据的新工具包。它以声明式的方式处理数据集合,支持多种操作如 map、filter、reduce 等。Stream API 本质上是对集合的一种抽象,它不会修改原始集合,而是通过链式操作产生新的 Stream 对象。例如,我们可以使用 Stream API 对一个包含整数的集合进行筛选、映射和求和操作。假设现有一个包含大量整数的 List 集合,我们希望对其中大于 10 的元素进行筛选,并对其进行求和。以下是使用 Stream API 处理该任务的示例代码:

List<Integer> numbers = Arrays.asList(7,14, 2, 9, 18, 25, 11, 5);
int sum = numbers.stream().filter(n -> n > 10).mapToInt(Integer::intValue).sum();
System.out.println("Sum: " + sum);

在这个示例中,我们首先将 List 集合转换为 Stream 对象,然后使用 filter () 方法过滤掉小于等于 10 的元素。接着,使用 mapToInt () 方法将 Stream 对象转换为 IntStream 对象,并最后使用 sum () 方法求和。

2. 并行流的定义

并行流是 Stream API 的一种特殊流,允许流中的元素在多个线程上并行处理,利用多核处理器优势加速数据处理。

并行流的主要优势在于性能提升。然而,并行流的性能提升并不是在所有情况下都能显现,具体取决于以下因素:

  1. 数据量:处理的数据量越大,并行流的性能提升越明显。
  1. 操作复杂度:操作越复杂,并行流的优势越明显。
  1. 系统资源:多核处理器的数量和性能对并行流的影响很大。

例如,我们可以使用以下代码来测试串行流和并行流的性能:

public class ParallelStreamPerformance {

    public static void main(String[] args) {
        long startTime, endTime;

        // 测试串行流性能
        startTime = System.currentTimeMillis();
        LongStream.rangeClosed(1, 10_000_000)
                  .sum();
        endTime = System.currentTimeMillis();
        System.out.println("Sequential stream time: " + (endTime - startTime) + " ms");

        // 测试并行流性能
        startTime = System.currentTimeMillis();
        LongStream.rangeClosed(1, 10_000_000)
                  .parallel()
                  .sum();
        endTime = System.currentTimeMillis();
        System.out.println("Parallel stream time: " + (endTime - startTime) + " ms");
    }
}

创建并行流有几种方式,以下是一些常见的方法:

  1. 从集合创建并行流:
List<String> data = Arrays.asList("A", "B", "C", "D", "E");
// 创建并行流
data.parallelStream().forEach(System.out::println);
  1. 使用 Stream 接口的 parallel () 方法:
// 创建一个范围流并转换为并行流
IntStream.range(1, 10).parallel().forEach(System.out::println);

并行流在多个线程中并行执行,操作必须是线程安全的。适用场景为 CPU 密集型任务,不适合 I/O 密集型任务。在实际使用中,需要进行性能测试,确保并行流能够带来性能提升。

例如,我们可以使用并行流进行过滤和映射操作:

List<String> data = Arrays.asList("apple", "banana", "cherry", "date", "elderberry");
// 使用并行流进行过滤和映射
List<String> result = data.parallelStream().filter(s -> s.startsWith("b")).map(String::toUpperCase).collect(Collectors.toList());
System.out.println(result);

三、Stream API 与并行流的区别

1. 执行方式

Stream API 中的串行流在单线程上顺序执行操作,每个操作按照先后顺序依次处理集合中的元素。例如在对一个整数列表进行遍历求和时,串行流会逐个元素进行处理,从第一个元素开始,依次进行累加操作,直到处理完所有元素。这种执行方式简单直观,易于理解和调试,特别是在处理小规模数据或者顺序依赖的操作时非常有效。

而并行流则将数据分成多个部分在多个线程上并行执行。它利用多核处理器的优势,同时在多个线程上处理数据的不同部分,从而提高处理效率。以同样的整数列表求和为例,并行流会将列表分成若干个小块,每个线程负责处理一个小块,最后将各个线程的结果合并起来得到最终的总和。这种并行执行的方式在处理大规模数据和计算密集型任务时,能够显著减少处理时间。

2. 创建方法

  1. 通过 stream () 方法创建串行流,任何实现了 java.util.Collection 接口的集合都可以通过调用 stream () 方法来创建一个串行流。例如,对于一个 List 集合:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Stream<Integer> stream = numbers.stream();

这里通过调用 numbers 集合的 stream () 方法创建了一个串行流,这个流可以进行各种中间操作和终端操作,如过滤、映射、求和等。

  1. 通过 parallelStream () 方法创建并行流。同样以一个集合为例:
List<String> data = Arrays.asList("A", "B", "C", "D", "E");
// 创建并行流
data.parallelStream().forEach(System.out::println);

这里通过调用 data 集合的 parallelStream () 方法创建了一个并行流,并行流中的元素可以在多个线程上同时进行处理,提高了处理大规模数据的效率。

此外,还可以通过将串行流转换为并行流,即使用串行流的 parallel () 方法。例如:

Stream<Integer> serialStream = Arrays.asList(1, 2, 3, 4, 5).stream();
Stream<Integer> parallelStream = serialStream.parallel();

这里先创建了一个串行流 serialStream,然后通过调用 parallel () 方法将其转换为并行流 parallelStream。

四、并行流处理大数据量集合的优势

1. 提高性能

  1. 利用多核处理器优势,在处理大量数据时加速处理速度。

在大数据量集合的处理中,并行流能够充分利用多核处理器的优势。例如,当处理一个包含大量元素的集合时,并行流会将数据自动划分为多个子任务,每个子任务在不同的线程上运行,从而可以同时利用多个处理器核心进行计算。这样可以显著提高处理速度,尤其是对于计算密集型任务。

2. 方便易用

  1. 简单调用 parallel () 方法即可将顺序流转换为并行流,无需手动管理线程和任务调度。

使用并行流非常方便,只需要在顺序流上调用 parallel () 方法,就可以将其转换为并行流。这使得开发者无需手动管理线程和任务调度,大大简化了并行编程的复杂性。例如,对于一个 List 集合,可以通过以下方式将其转换为并行流并进行操作:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.parallelStream().map(n -> n * 2).forEach(System.out::println);

3. 自动任务划分

  1. 流框架自动将数据划分为多个子任务,并在后台管理任务执行,简化并行编程复杂性。

并行流的一个重要优势是自动任务划分。当使用并行流处理大数据量集合时,流框架会自动将数据划分为多个子任务,每个子任务由不同的线程负责处理。在后台,流框架会管理这些任务的执行,包括任务的分配、调度和结果的合并。这种自动任务划分的机制大大简化了并行编程的复杂性,使得开发者可以更加专注于业务逻辑的实现,而无需关心底层的并行执行细节。例如,在处理一个大型数据集时,并行流可以自动将数据划分为多个小块,分别在不同的线程上进行处理,最后将各个子任务的结果合并起来得到最终的结果。

五、Stream API 处理大数据量集合操作方法

1. 使用 map 函数进行转换

  1. 可以将流中的元素进行类型转换、属性提取、计算等操作。

Stream API 中的map函数是一个强大的工具,它允许我们对流中的元素进行各种转换操作。例如,我们可以将一个包含对象的流转换为包含对象某个属性的流。假设我们有一个包含员工对象的列表,每个员工对象有一个name属性和一个age属性,我们可以使用map函数将这个流转换为只包含员工名字的流。以下是一个示例代码:

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

class Employee {
    private String name;
    private int age;

    public Employee(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }
}

public class MapFunctionExample {

    public static void main(String[] args) {
        // 创建员工列表
        List<Employee> employees = new ArrayList<>();
        employees.add(new Employee("Alice", 30));
        employees.add(new Employee("Bob", 25));
        employees.add(new Employee("Charlie", 35));

        // 使用 map 函数将员工列表转换为名字列表
        List<String> names = employees.stream()
                                      .map(Employee::getName)
                                      .collect(Collectors.toList());

        // 打印名字列表
        System.out.println(names);
    }
}

在这个例子中,map(Employee::getName)将每个员工对象转换为其名字,最终生成一个包含所有员工名字的新流。我们还可以使用map函数进行计算操作。例如,对于一个包含整数的流,我们可以使用map函数将每个整数乘以 2。以下是示例代码:

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class MapFunctionCalculationExample {

    public static void main(String[] args) {
        // 创建整数列表
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

        // 使用 map 函数将每个整数乘以2
        List<Integer> doubledNumbers = numbers.stream()
                                              .map(n -> n * 2)
                                              .collect(Collectors.toList());

        // 打印加倍后的数字列表
        System.out.println(doubledNumbers);
    }
}

map函数还可以用于类型转换。假设我们有一个包含字符串的流,每个字符串表示一个整数,我们可以使用map函数将这个流转换为包含整数的流。以下是示例代码:

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class MapFunctionTypeConversionExample {

    public static void main(String[] args) {
        // 创建字符串数字列表
        List<String> stringNumbers = Arrays.asList("1", "2", "3", "4", "5");

        // 使用 map 函数将字符串流转换为整数流
        List<Integer> integerNumbers = stringNumbers.stream()
                                                    .map(Integer::parseInt)
                                                    .collect(Collectors.toList());

        // 打印转换后的整数列表
        System.out.println(integerNumbers);
    }
}

2. 使用 mapToInt 函数进行映射

  1. 专门用于将流中的元素映射为 int 类型的值,提供更高效的操作和更少的内存消耗。

mapToInt函数是 Stream API 中的一个专门用于将流中的元素映射为int类型值的方法。它接受一个函数作为参数,该函数将流中的元素转换为int类型的值。例如,对于一个包含整数的列表,我们可以使用mapToInt函数将每个整数平方后转换为int类型的值。以下是示例代码:

import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;

public class MapToIntExample {

    public static void main(String[] args) {
        // 创建整数列表
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

        // 使用 mapToInt 函数将每个整数平方后转换为 int 类型的值,并求和
        int sum = numbers.stream()
                         .mapToInt(n -> n * n) // 将每个整数平方,并转换为原始 int 流
                         .sum();              // 计算所有平方数的总和

        // 打印平方数的总和
        System.out.println(sum);
    }
}

在这个例子中,mapToInt(n -> n * n)将每个整数平方后转换为int类型的值,然后使用sum方法计算这些平方值的总和。mapToInt函数返回的是一个IntStream,它提供了一些专门用于处理int类型值的方法,如sum、average、max、min等。这些方法可以更高效地处理int类型的值,并且消耗更少的内存。与map函数相比,mapToInt函数在处理int类型值时更加高效,因为它不需要进行装箱和拆箱操作。装箱和拆箱操作会消耗额外的内存和时间,特别是在处理大量数据时。

3. 使用 filter 函数进行过滤

  1. 根据指定条件过滤出流中满足条件的元素。

filter函数是 Stream API 中的一个重要方法,它允许我们根据指定条件过滤出流中的元素。例如,对于一个包含整数的流,我们可以使用filter函数过滤出大于 10 的整数。以下是示例代码:

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class FilterExample {

    public static void main(String[] args) {
        // 创建整数列表
        List<Integer> numbers = Arrays.asList(7, 14, 2, 9, 18, 25, 11, 5);

        // 使用 filter 函数过滤出大于10的整数
        List<Integer> filteredNumbers = numbers.stream()
                                               .filter(n -> n > 10)
                                               .collect(Collectors.toList());

        // 打印过滤后的整数列表
        System.out.println(filteredNumbers);
    }
}

在这个例子中,filter(n -> n > 10)将流中的每个整数与 10 进行比较,如果大于 10 则保留该整数,否则过滤掉。最后,使用collect方法将过滤后的元素收集到一个新的列表中。filter函数可以接受任何Predicate对象作为参数,Predicate是一个函数式接口,它接受一个参数并返回一个布尔值,表示该参数是否满足条件。我们可以使用 Lambda 表达式或方法引用来创建Predicate对象。例如,对于一个包含字符串的流,我们可以使用filter函数过滤出以特定字符开头的字符串。以下是示例代码:

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class FilterStringExample {

    public static void main(String[] args) {
        // 创建字符串列表
        List<String> words = Arrays.asList("apple", "banana", "cherry", "date", "elderberry");

        // 使用 filter 函数过滤出以字母 'b' 开头的字符串
        List<String> filteredWords = words.stream()
                                          .filter(s -> s.startsWith("b"))
                                          .collect(Collectors.toList());

        // 打印过滤后的字符串列表
        System.out.println(filteredWords);
    }
}

在这个例子中,filter(s -> s.startsWith("b"))将流中的每个字符串与字母 'b' 进行比较,如果以 'b' 开头则保留该字符串,否则过滤掉。最后,使用collect方法将过滤后的元素收集到一个新的列表中。filter函数还可以与其他 Stream API 方法结合使用,以实现更复杂的过滤逻辑。例如,我们可以先使用map函数将流中的元素进行转换,然后再使用filter函数进行过滤。以下是示例代码:

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

class Person {
    private String name;
    private int age;

    public Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }
}

public class FilterAndMapExample {

    public static void main(String[] args) {
        // 创建Person对象列表
        List<Person> people = Arrays.asList(
            new Person("Alice", 30),
            new Person("Bob", 25),
            new Person("Charlie", 35)
        );

        // 先使用 filter 函数过滤出年龄大于30的人,然后使用 map 函数将 Person 对象转换为年龄
        List<Integer> ages = people.stream()
                                   .filter(person -> person.getAge() > 30) // 先筛选
                                   .map(Person::getAge)                    // 再转换
                                   .collect(Collectors.toList());

        // 打印年龄列表
        System.out.println(ages);
    }
}

在这个例子中,map(Person::getAge)将每个Person对象转换为其年龄,然后filter(age -> age > 30)将年龄大于 30 的人过滤出来。最后,使用collect方法将过滤后的年龄收集到一个新的列表中。

六、应用 Stream API 和并行流处理大数据的实际案例

1. 处理用户列表

  1. 找出年龄大于 30 岁的用户并按名字排序。

假设有一个用户列表,我们可以使用 Stream API 和并行流来找出年龄大于 30 岁的用户并按名字排序。以下是使用并行流的示例代码:

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

class User {
    private String name;
    private int age;

    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }

    // Optionally override toString for better print output
    @Override
    public String toString() {
        return getName() + " - " + getAge();
    }
}

public class ProcessUserListExample {

    public static void main(String[] args) {
        // 创建User对象列表
        List<User> users = new ArrayList<>();
        users.add(new User("Alice", 30));
        users.add(new User("Bob", 25));
        users.add(new User("Charlie", 35));
        users.add(new User("David", 40));

        // 使用并行流找出年龄大于 30 岁的用户并按名字排序
        List<User> filteredUsers = users.parallelStream()
                                        .filter(user -> user.getAge() > 30)
                                        .sorted(Comparator.comparing(User::getName))
                                        .collect(Collectors.toList());

        // 打印筛选并排序后的用户信息
        System.out.println("Filtered and sorted users:");
        filteredUsers.forEach(System.out::println);
    }
}

在这个例子中,我们首先创建了一个包含用户对象的列表。然后,使用并行流的parallelStream()方法创建并行流。接着,使用filter()方法筛选出年龄大于 30 岁的用户,再使用sorted()方法按用户名字进行排序。最后,使用collect()方法将结果收集到一个新的列表中。

2. 统计字符串长度之和

  1. 分别使用普通流和并行流计算字符串列表中所有字符串的长度之和。

以下是分别使用普通流和并行流计算字符串列表中所有字符串长度之和的示例代码:

import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;

public class StringLengthSumExample {

    public static void main(String[] args) {
        // 创建字符串列表
        List<String> words = Arrays.asList("apple", "banana", "cherry", "date", "elderberry");

        // 使用普通流计算字符串长度之和
        int sumWithSequentialStream = words.stream()
                                           .mapToInt(String::length)
                                           .sum();

        System.out.println("Sum with sequential stream: " + sumWithSequentialStream);

        // 使用并行流计算字符串长度之和
        int sumWithParallelStream = words.parallelStream()
                                         .mapToInt(String::length)
                                         .sum();

        System.out.println("Sum with parallel stream: " + sumWithParallelStream);
    }
}

在这个例子中,我们首先创建了一个包含字符串的列表。然后,分别使用普通流和并行流计算字符串长度之和。对于普通流,我们使用stream()方法创建流,然后使用mapToInt()方法将每个字符串映射为其长度,最后使用sum()方法计算长度之和。对于并行流,我们使用parallelStream()方法创建并行流,然后执行相同的操作。

七、结论

Stream API 和并行流为大数据处理提供了强大的工具,正确使用和适当优化可以显著提高处理性能。在实际应用中,需要根据数据量、计算复杂度等因素选择合适的处理方式。

对于大数据处理而言,Stream API 和并行流各有其优势和适用场景。在处理大数据量集合时,并行流能够利用多核处理器的优势,提高处理速度。然而,并非在所有情况下并行流都比串行流更快,这取决于数据量、数据结构、任务复杂度和硬件资源等因素。

对于小数据集,并行流的额外开销可能会超过并行处理带来的性能提升,导致并行流比串行流更慢。而对于大数据集,并行流通常能够显著提高处理速度,特别是在计算密集型任务中。此外,某些数据结构不适合并行处理,如链表,因为随机访问成本较高,导致并行流性能不佳。任务复杂度也是一个重要因素,对于简单任务,并行流的性能提升可能不明显;对于复杂任务,并行流的优势可能更显著。硬件资源也会影响并行流的性能,如果硬件资源有限,如单核处理器,并行流的性能提升

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表