Home > Java > javaTutorial > body text

Java8 new features streaming data processing

黄舟
Release: 2017-02-23 10:28:39
Original
1888 people have browsed it

Abstract: java8's streaming data processing greatly simplifies our operations on structures such as collections and arrays, allowing us to operate with functional thinking. This article will explore the basic use of java8's streaming data processing.

1. Introduction to streaming processing

When I came into contact with Java8 streaming processing, my first impression was that streaming processing makes collection operations simpler. Many operations that usually require multiple lines of code can be implemented in one line with the help of streaming. For example, if we want to filter out all even numbers from a collection containing integers and encapsulate them into a new List, then before java8, we need to implement it through the following code:

List<Integer> evens = new ArrayList<>();
for (final Integer num : nums) {
    if (num % 2 == 0) {
        evens.add(num);
    }
}
Copy after login

Through the streaming processing of java8, we can simplify the code to:

List<Integer> evens = nums.stream().filter(num -> num % 2 == 0).collect(Collectors.toList());
Copy after login


Let’s briefly explain the meaning of the above line of statement,

stream()
Copy after login
Copy after login
Copy after login
Copy after login

The operation converts the collection into a stream,

filter()
Copy after login

performs our custom filtering process, here we filter out all even numbers through lambda expressions, and finally we use

collect()
Copy after login

to filter the results Encapsulate the processing and specify its encapsulation into a List collection through

Collectors.toList()
Copy after login
Copy after login

.

As can be seen from the above example, Java8's streaming processing greatly simplifies the operation of collections. In fact, it is not just collections, including arrays, files, etc., as long as it can be converted into a stream, we can You can use streaming processing to operate it similar to how we write SQL statements. Java8 implements stream processing through internal iteration. A stream processing can be divided into three parts: conversion to stream, intermediate operation, and terminal operation. As shown below:

Java8 new features streaming data processing

# Taking a collection as an example, for a streaming operation we first need to call the

stream()
Copy after login
Copy after login
Copy after login
Copy after login

function to convert it into a stream, and then call The corresponding

中间操作
Copy after login

achieves the operations we need to perform on the collection, such as filtering, conversion, etc. Finally,

终端操作
Copy after login

is used to encapsulate the previous results and return the form we need.

2. Intermediate operation

We define a simple student entity class for the following example demonstration:

public class Student {
    /** 学号 */
    private long id;
    private String name;
    private int age;
    /** 年级 */
    private int grade;
    /** 专业 */
    private String major;
    /** 学校 */
    private String school;
    // 省略getter和setter
}
// 初始化
List<Student> students = new ArrayList<Student>() {
    {
        add(new Student(20160001, "孔明", 20, 1, "土木工程", "武汉大学"));
        add(new Student(20160002, "伯约", 21, 2, "信息安全", "武汉大学"));
        add(new Student(20160003, "玄德", 22, 3, "经济管理", "武汉大学"));
        add(new Student(20160004, "云长", 21, 2, "信息安全", "武汉大学"));
        add(new Student(20161001, "翼德", 21, 2, "机械与自动化", "华中科技大学"));
        add(new Student(20161002, "元直", 23, 4, "土木工程", "华中科技大学"));
        add(new Student(20161003, "奉孝", 23, 4, "计算机科学", "华中科技大学"));
        add(new Student(20162001, "仲谋", 22, 3, "土木工程", "浙江大学"));
        add(new Student(20162002, "鲁肃", 23, 4, "计算机科学", "浙江大学"));
        add(new Student(20163001, "丁奉", 24, 5, "土木工程", "南京大学"));
    }
};
Copy after login

2.1 Filtering

Filtering, as the name suggests, is to filter the elements that meet the conditions in the collection according to the given requirements. The filtering operations provided by java8 include: filter, distinct, limit, and skip.

filter

In the previous example we have demonstrated how to use filter, which is defined as:

Stream<T> filter(Predicate<? super T> predicate)
Copy after login

, filter accepts a predicate

Predicate
Copy after login
Copy after login

, we can define filter conditions through this predicate. When introducing lambda expressions, we introduced that

Predicate
Copy after login
Copy after login

is a functional interface, which contains a

test(T t)
Copy after login

method. The method returns

boolean
Copy after login

. Now we want to filter out all Wuhan University students from the collection

students
Copy after login

, then we can achieve it through filter and pass the filtering operation as a parameter to filter:

List<Student> whuStudents = students.stream()
                                    .filter(student -> "武汉大学".equals(student.getSchool()))
                                    .collect(Collectors.toList());
Copy after login

distinct

The distinct operation is similar to the

DISTINCT
Copy after login

keyword we add when writing SQL statements, which is used for deduplication processing. distinct is based on

Object.equals(Object)
Copy after login

Implementation, back to the original example, assuming we want to filter out all non-repeating even numbers, then we can add the distinct operation:

List<Integer> evens = nums.stream()
                        .filter(num -> num % 2 == 0).distinct()
                        .collect(Collectors.toList());
Copy after login

limit# The
##limit operation is also similar to the

LIMIT
Copy after login

keyword in the SQL statement, but its function is relatively weak. Limit returns a stream containing the first n elements. When the set size is less than n, the actual length is returned. For example, the following example returns students whose first two majors are

土木工程
Copy after login

:



List<Student> civilStudents = students.stream()
                                    .filter(student -> "土木工程".equals(student.getMajor())).limit(2)
                                    .collect(Collectors.toList());
Copy after login

Speaking of limit, I have to mention it Here’s another stream operation:


sorted
Copy after login

. This operation is used to sort the elements in the stream. sorted requires that the elements to be compared must implement the

Comparable
Copy after login

interface. It doesn’t matter if it does not. We can pass the comparator as a parameter to

sorted(Comparator<? super T> comparator)
Copy after login

. For example, if we want to filter out students majoring in civil engineering, sort them by age from youngest to oldest, and filter out the two youngest students, then it can be implemented as:



List<Student> sortedCivilStudents = students.stream()
                                            .filter(student -> "土木工程".equals(student.getMajor())).sorted((s1, s2) -> s1.getAge() - s2.getAge())
                                            .limit(2)
                                            .collect(Collectors.toList());
Copy after login

The skip
skip operation is the opposite of the limit operation. Just like its literal meaning, it skips the first n elements. For example, if we want to find the civil engineering students sorted after 2, then we can achieve For:


List<Student> civilStudents = students.stream()
                                    .filter(student -> "土木工程".equals(student.getMajor()))
                                    .skip(2)
                                    .collect(Collectors.toList());
Copy after login

By skip, the first two elements will be skipped and a stream constructed from all subsequent elements will be returned. If n is greater than the length of the set that meets the condition, a Empty collection.



2.2 Mapping

In SQL, by adding the required field name after the

SELECT
Copy after login

keyword, we can only output the field data we need. The mapping operation of streaming processing also achieves this purpose. In streaming processing of Java 8, there are mainly two types of mapping operations: map and flatMap.



map
For example, suppose we want to filter out the names of all students majoring in computer science, then we can filter through the map based on filter The student entity is mapped into a student name string, and the specific implementation is as follows:


List<String> names = students.stream()
                            .filter(student -> "计算机科学".equals(student.getMajor()))
                            .map(Student::getName).collect(Collectors.toList());
Copy after login

除了上面这类基础的map,java8还提供了

mapToDouble(ToDoubleFunction<? super T> mapper)
Copy after login

mapToInt(ToIntFunction<? super T> mapper)
Copy after login

mapToLong(ToLongFunction<? super T> mapper)
Copy after login

,这些映射分别返回对应类型的流,java8为这些流设定了一些特殊的操作,比如我们希望计算所有专业为计算机科学学生的年龄之和,那么我们可以实现如下:

int totalAge = students.stream()
                    .filter(student -> "计算机科学".equals(student.getMajor()))
                    .mapToInt(Student::getAge).sum();
Copy after login

通过将Student按照年龄直接映射为

IntStream
Copy after login
Copy after login
Copy after login

,我们可以直接调用提供的

sum()
Copy after login

方法来达到目的,此外使用这些数值流的好处还在于可以避免jvm装箱操作所带来的性能消耗。

flatMap

flatMap与map的区别在于 flatMap是将一个流中的每个值都转成一个个流,然后再将这些流扁平化成为一个流 。举例说明,假设我们有一个字符串数组

String[] strs = {"java8", "is", "easy", "to", "use"};
Copy after login

,我们希望输出构成这一数组的所有非重复字符,那么我们可能首先会想到如下实现:

List<String[]> distinctStrs = Arrays.stream(strs)
                                .map(str -> str.split(""))  // 映射成为Stream<String[]>
                                .distinct()
                                .collect(Collectors.toList());
Copy after login

在执行map操作以后,我们得到是一个包含多个字符串(构成一个字符串的字符数组)的流,此时执行distinct操作是基于在这些字符串数组之间的对比,所以达不到我们希望的目的,此时的输出为:

[j, a, v, a, 8]
[i, s]
[e, a, s, y]
[t, o]
[u, s, e]
Copy after login

distinct只有对于一个包含多个字符的流进行操作才能达到我们的目的,即对

Stream<String>
Copy after login
Copy after login

进行操作。此时flatMap就可以达到我们的目的:

List distinctStrs = Arrays.stream(strs)
                                .map(str -> str.split(""))  // 映射成为Stream<String[]>
                                .flatMap(Arrays::stream)  // 扁平化为Stream<String>
                                .distinct()
                                .collect(Collectors.toList());
Copy after login

flatMap将由map映射得到的

Stream<String[]>
Copy after login

,转换成由各个字符串数组映射成的流

Stream<String>
Copy after login
Copy after login

,再将这些小的流扁平化成为一个由所有字符串构成的大流

Steam<String>
Copy after login

,从而能够达到我们的目的。

与map类似,flatMap也提供了针对特定类型的映射操作:

flatMapToDouble(Function<? super T,? extends DoubleStream> mapper)
Copy after login

flatMapToInt(Function<? super T,? extends IntStream> mapper)
Copy after login

flatMapToLong(Function<? super T,? extends LongStream> mapper)
Copy after login



三. 终端操作

终端操作是流式处理的最后一步,我们可以在终端操作中实现对流查找、归约等操作。

3.1 查找

allMatch

allMatch用于检测是否全部都满足指定的参数行为,如果全部满足则返回true,例如我们希望检测是否所有的学生都已满18周岁,那么可以实现为:

boolean isAdult = students.stream().allMatch(student -> student.getAge() >= 18);
anyMatch

anyMatch则是检测是否存在一个或多个满足指定的参数行为,如果满足则返回true,例如我们希望检测是否有来自武汉大学的学生,那么可以实现为:

boolean hasWhu = students.stream().anyMatch(student -> "武汉大学".equals(student.getSchool()));
noneMathch

noneMatch用于检测是否不存在满足指定行为的元素,如果不存在则返回true,例如我们希望检测是否不存在专业为计算机科学的学生,可以实现如下:

boolean noneCs = students.stream().noneMatch(student -> "计算机科学".equals(student.getMajor()));
findFirst

findFirst用于返回满足条件的第一个元素,比如我们希望选出专业为土木工程的排在第一个学生,那么可以实现如下:

Optional optStu = students.stream().filter(student -> "土木工程".equals(student.getMajor())).findFirst();
findFirst不携带参数,具体的查找条件可以通过filter设置,此外我们可以发现findFirst返回的是一个Optional类型,关于该类型的具体讲解可以参考上一篇:Java8新特性 – Optional类。

findAny

findAny相对于findFirst的区别在于,findAny不一定返回第一个,而是返回任意一个,比如我们希望返回任意一个专业为土木工程的学生,可以实现如下:

Optional optStu = students.stream().filter(student -> "土木工程".equals(student.getMajor())).findAny();
实际上对于顺序流式处理而言,findFirst和findAny返回的结果是一样的,至于为什么会这样设计,是因为在下一篇我们介绍的并行流式处理,当我们启用并行流式处理的时候,查找第一个元素往往会有很多限制,如果不是特别需求,在并行流式处理中使用findAny的性能要比findFirst好。

3.2 归约

前面的例子中我们大部分都是通过

collect(Collectors.toList())
Copy after login
Copy after login

对数据封装返回,如我的目标不是返回一个新的集合,而是希望对经过参数化操作后的集合进行进一步的运算,那么我们可用对集合实施归约操作。java8的流式处理提供了

reduce
Copy after login

方法来达到这一目的。

前面我们通过mapToInt将

Stream<Student>
Copy after login

映射成为

IntStream
Copy after login
Copy after login
Copy after login

,并通过

IntStream
Copy after login
Copy after login
Copy after login

的sum方法求得所有学生的年龄之和,实际上我们通过归约操作,也可以达到这一目的,实现如下:

// 前面例子中的方法
int totalAge = students.stream()
                .filter(student -> "计算机科学".equals(student.getMajor()))
                .mapToInt(Student::getAge).sum();
// 归约操作
int totalAge = students.stream()
                .filter(student -> "计算机科学".equals(student.getMajor()))
                .map(Student::getAge)
                .reduce(0, (a, b) -> a + b);
// 进一步简化
int totalAge2 = students.stream()
                .filter(student -> "计算机科学".equals(student.getMajor()))
                .map(Student::getAge)
                .reduce(0, Integer::sum);
// 采用无初始值的重载版本,需要注意返回Optional
Optional<Integer> totalAge = students.stream()
                .filter(student -> "计算机科学".equals(student.getMajor()))
                .map(Student::getAge)
                .reduce(Integer::sum);  // 去掉初始值
Copy after login

3.3 收集

前面利用

collect(Collectors.toList())
Copy after login
Copy after login

是一个简单的收集操作,是对处理结果的封装,对应的还有

toSet
Copy after login

toMap
Copy after login

,以满足我们对于结果组织的需求。这些方法均来自于

java.util.stream.Collectors
Copy after login

,我们可以称之为收集器。

3.3.1 归约

收集器也提供了相应的归约操作,但是与reduce在内部实现上是有区别的,收集器更加适用于可变容器上的归约操作,这些收集器广义上均基于

Collectors.reducing()
Copy after login

实现。

例1:求学生的总人数

long count = students.stream().collect(Collectors.counting());
// 进一步简化
long count = students.stream().count();
Copy after login

例2:求年龄的最大值和最小值

// 求最大年龄
Optional<Student> olderStudent = students.stream().collect(Collectors.maxBy((s1, s2) -> s1.getAge() - s2.getAge()));
// 进一步简化
Optional<Student> olderStudent2 = students.stream().collect(Collectors.maxBy(Comparator.comparing(Student::getAge)));
// 求最小年龄
Optional<Student> olderStudent3 = students.stream().collect(Collectors.minBy(Comparator.comparing(Student::getAge)));
Copy after login

例3:求年龄总和

int totalAge4 = students.stream().collect(Collectors.summingInt(Student::getAge));
对应的还有

summingLong
Copy after login

summingDouble
Copy after login



例4:求年龄的平均值

double avgAge = students.stream().collect(Collectors.averagingInt(Student::getAge));
对应的还有

averagingLong
Copy after login

averagingDouble
Copy after login



例5:一次性得到元素个数、总和、均值、最大值、最小值

IntSummaryStatistics statistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));
输出:

IntSummaryStatistics{count=10, sum=220, min=20, average=22.000000, max=24}
对应的还有

summarizingLong
Copy after login

summarizingDouble
Copy after login



例6:字符串拼接

String names = students.stream().map(Student::getName).collect(Collectors.joining());
// 输出:孔明伯约玄德云长翼德元直奉孝仲谋鲁肃丁奉
String names = students.stream().map(Student::getName).collect(Collectors.joining(", "));
// 输出:孔明, 伯约, 玄德, 云长, 翼德, 元直, 奉孝, 仲谋, 鲁肃, 丁奉

3.3.2 分组

在数据库操作中,我们可以通过

GROUP BY
Copy after login

关键字对查询到的数据进行分组,java8的流式处理也为我们提供了这样的功能

Collectors.groupingBy
Copy after login

来操作集合。比如我们可以按学校对上面的学生进行分组:

Map> groups = students.stream().collect(Collectors.groupingBy(Student::getSchool));

groupingBy
Copy after login
Copy after login

接收一个分类器

Function<? super T, ? extends K> classifier
Copy after login

,我们可以自定义分类器来实现需要的分类效果。

上面演示的是一级分组,我们还可以定义多个分类器实现 多级分组,比如我们希望在按学校分组的基础之上再按照专业进行分组,实现如下:

Map>> groups2 = students.stream().collect(
Collectors.groupingBy(Student::getSchool, // 一级分组,按学校
Collectors.groupingBy(Student::getMajor))); // 二级分组,按专业
实际上在

groupingBy
Copy after login
Copy after login

的第二个参数不是只能传递groupingBy,还可以传递任意

Collector
Copy after login

类型,比如我们可以传递一个

Collector.counting
Copy after login

,用以统计每个组的个数:

Map groups = students.stream().collect(Collectors.groupingBy(Student::getSchool, Collectors.counting()));
如果我们不添加第二个参数,则编译器会默认帮我们添加一个

Collectors.toList()
Copy after login
Copy after login



3.3.3 分区

分区可以看做是分组的一种特殊情况,在分区中key只有两种情况:true或false,目的是将待分区集合按照条件一分为二,java8的流式处理利用

ollectors.partitioningBy()
Copy after login

方法实现分区,该方法接收一个谓词,例如我们希望将学生分为武大学生和非武大学生,那么可以实现如下:

Map> partition = students.stream().collect(Collectors.partitioningBy(student -> "武汉大学".equals(student.getSchool())));
分区相对分组的优势在于,我们可以同时得到两类结果,在一些应用场景下可以一步得到我们需要的所有结果,比如将数组分为奇数和偶数。

以上介绍的所有收集器均实现自接口

java.util.stream.Collector
Copy after login

,该接口的定义如下:

public interface Collector<T, A, R> {
    /**
     * A function that creates and returns a new mutable result container.
     *
     * @return a function which returns a new, mutable result container
     */
    Supplier<A> supplier();
    /**
     * A function that folds a value into a mutable result container.
     *
     * @return a function which folds a value into a mutable result container
     */
    BiConsumer<A, T> accumulator();
    /**
     * A function that accepts two partial results and merges them.  The
     * combiner function may fold state from one argument into the other and
     * return that, or may return a new result container.
     *
     * @return a function which combines two partial results into a combined
     * result
     */
    BinaryOperator<A> combiner();
    /**
     * Perform the final transformation from the intermediate accumulation type
     * {@code A} to the final result type {@code R}.
     *
     * <p>If the characteristic {@code IDENTITY_TRANSFORM} is
     * set, this function may be presumed to be an identity transform with an
     * unchecked cast from {@code A} to {@code R}.
     *
     * @return a function which transforms the intermediate result to the final
     * result
     */
    Function<A, R> finisher();
    /**
     * Returns a {@code Set} of {@code Collector.Characteristics} indicating
     * the characteristics of this Collector.  This set should be immutable.
     *
     * @return an immutable set of collector characteristics
     */
    Set<Characteristics> characteristics();
}
Copy after login

我们也可以实现该接口来定义自己的收集器,此处不再展开。

四. 并行流式数据处理

流式处理中的很多都适合采用 分而治之 的思想,从而在处理集合较大时,极大的提高代码的性能,java8的设计者也看到了这一点,所以提供了 并行流式处理。上面的例子中我们都是调用

stream()
Copy after login
Copy after login
Copy after login
Copy after login

方法来启动流式处理,java8还提供了

parallelStream()
Copy after login
Copy after login
Copy after login

来启动并行流式处理,

parallelStream()
Copy after login
Copy after login
Copy after login

本质上基于java7的Fork-Join框架实现,其默认的线程数为宿主机的内核数。

启动并行流式处理虽然简单,只需要将

stream()
Copy after login
Copy after login
Copy after login
Copy after login

替换成

parallelStream()
Copy after login
Copy after login
Copy after login

即可,但既然是并行,就会涉及到多线程安全问题,所以在启用之前要先确认并行是否值得(并行的效率不一定高于顺序执行),另外就是要保证线程安全。此两项无法保证,那么并行毫无意义,毕竟结果比速度更加重要,以后有时间再来详细分析一下并行流式数据处理的具体实现和最佳实践。

 以上就是Java8 新特性之流式数据处理 的内容,更多相关内容请关注PHP中文网(www.php.cn)!


source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!