Pandas 与 PySpark 强强联手,功能与速度齐飞!
使用Python做数据处理的数据科学家或数据从业者,对数据科学包pandas并不陌生,也不乏像云朵君一样的pandas重度使用者,项目开始写的第一行代码,大多是 import pandas as pd。pandas做数据处理可以说是yyds!而他的缺点也是非常明显,pandas 只能单机处理,它不能随数据量线性伸缩。例如,如果 pandas 试图读取的数据集大于一台机器的可用内存,则会因内存不足而失败。
另外 pandas 在处理大型数据方面非常慢,虽然有像Dask 或 Vaex 等其他库来优化提升数据处理速度,但在大数据处理神之框架Spark面前,也是小菜一碟。
幸运的是,在新的 Spark 3.2 版本中,出现了一个新的Pandas API,将pandas大部分功能都集成到PySpark中,使用pandas的接口,就能使用Spark,因为 Spark 上的 Pandas API 在后台使用 Spark,这样就能达到强强联手的效果,可以说是非常强大,非常方便。
这一切都始于 2019 年 Spark + AI 峰会。Koalas 是一个开源项目,可以在 Spark 之上使用 Pandas。一开始,它只覆盖了 Pandas 的一小部分功能,但后来逐渐壮大起来。现在,在新的 Spark 3.2 版本中,Koalas 已合并到 PySpark。
Spark 现在集成了 Pandas API,因此可以在 Spark 上运行 Pandas。只需要更改一行代码:
import pyspark.pandas as ps
由此我们可以获得诸多的优势:
- 如果我们熟悉使用Python 和 Pandas,但不熟悉 Spark,可以省略了需复杂的学习过程而立即使用PySpark。
- 可以为所有内容使用一个代码库:无论是小数据和大数据,还是单机和分布式机器。
- 可以在Spark分布式框架上,更快地运行 Pandas 代码。
最后一点尤其值得注意。
一方面,可以将分布式计算应用于在 Pandas 中的代码。且借助 Spark 引擎,代码即使在单台机器上也会更快!下图展示了在一台机器(具有 96 个 vCPU 和 384 GiBs 内存)上运行 Spark 和单独调用 pandas 分析 130GB 的 CSV 数据集的性能对比。
多线程和 Spark SQL Catalyst Optimizer 都有助于优化性能。例如,Join count 操作在整个阶段代码生成时快 4 倍:没有代码生成时为 5.9 秒,代码生成时为 1.6 秒。
Spark 在链式操作(chaining operations)中具有特别显着的优势。Catalyst 查询优化器可以识别过滤器以明智地过滤数据并可以应用基于磁盘的连接(disk-based joins),而 Pandas 倾向于每一步将所有数据加载到内存中。
现在是不是迫不及待的想尝试如何在 Spark 上使用 Pandas API 编写一些代码?我们现在就开始吧!
在 Pandas / Pandas-on-Spark / Spark 之间切换
需要知道的第一件事是我们到底在使用什么。在使用 Pandas 时,使用类pandas.core.frame.DataFrame。在 Spark 中使用 pandas API 时,使用pyspark.pandas.frame.DataFrame。虽然两者相似,但不相同。主要区别在于前者在单机中,而后者是分布式的。
可以使用 Pandas-on-Spark 创建一个 Dataframe 并将其转换为 Pandas,反之亦然:
# import Pandas-on-Spark import pyspark.pandas as ps # 使用 Pandas-on-Spark 创建一个 DataFrame ps_df = ps.DataFrame(range(10)) # 将 Pandas-on-Spark Dataframe 转换为 Pandas Dataframe pd_df = ps_df.to_pandas() # 将 Pandas Dataframe 转换为 Pandas-on-Spark Dataframe ps_df = ps.from_pandas(pd_df)
注意,如果使用多台机器,则在将 Pandas-on-Spark Dataframe 转换为 Pandas Dataframe 时,数据会从多台机器传输到一台机器,反之亦然(可参阅PySpark 指南[1])。
还可以将 Pandas-on-Spark Dataframe 转换为 Spark DataFrame,反之亦然:
# 使用 Pandas-on-Spark 创建一个 DataFrame ps_df = ps.DataFrame(range(10)) # 将 Pandas-on-Spark Dataframe 转换为 Spark Dataframe spark_df = ps_df.to_spark() # 将 Spark Dataframe 转换为 Pandas-on-Spark Dataframe ps_df_new = spark_df.to_pandas_on_spark()
数据类型如何改变?
在使用 Pandas-on-Spark 和 Pandas 时,数据类型基本相同。将 Pandas-on-Spark DataFrame 转换为 Spark DataFrame 时,数据类型会自动转换为适当的类型(请参阅PySpark 指南[2])
下面的示例显示了在转换时是如何将数据类型从 PySpark DataFrame 转换为 pandas-on-Spark DataFrame。
>>> sdf = spark.createDataFrame([ ... (1, Decimal(1.0), 1., 1., 1, 1, 1, datetime(2020, 10, 27), "1", True, datetime(2020, 10, 27)), ... ], 'tinyint tinyint, decimal decimal, float float, double double, integer integer, long long, short short, timestamp timestamp, string string, boolean boolean, date date') >>> sdf
DataFrame[tinyint: tinyint, decimal: decimal(10,0), float: float, double: double, integer: int, long: bigint, short: smallint, timestamp: timestamp, string: string, boolean: boolean, date: date]
psdf = sdf.pandas_api() psdf.dtypes
tinyintint8 decimalobject float float32 doublefloat64 integer int32 longint64 short int16 timestampdatetime64[ns] string object booleanbool date object dtype: object
Pandas-on-Spark vs Spark 函数
在 Spark 中的 DataFrame 及其在 Pandas-on-Spark 中的最常用函数。注意,Pandas-on-Spark 和 Pandas 在语法上的唯一区别就是 import pyspark.pandas as ps 一行。
当你看完如下内容后,你会发现,即使您不熟悉 Spark,也可以通过 Pandas API 轻松使用。
导入库
# 运行Spark from pyspark.sql import SparkSession spark = SparkSession.builder .appName("Spark") .getOrCreate() # 在Spark上运行Pandas import pyspark.pandas as ps
读取数据
以 old dog iris 数据集为例。
# SPARK sdf = spark.read.options(inferSchema='True', header='True').csv('iris.csv') # PANDAS-ON-SPARK pdf = ps.read_csv('iris.csv')
选择
# SPARK sdf.select("sepal_length","sepal_width").show() # PANDAS-ON-SPARK pdf[["sepal_length","sepal_width"]].head()
删除列
# SPARK sdf.drop('sepal_length').show()# PANDAS-ON-SPARK pdf.drop('sepal_length').head()
删除重复项
# SPARK sdf.dropDuplicates(["sepal_length","sepal_width"]).show() # PANDAS-ON-SPARK pdf[["sepal_length", "sepal_width"]].drop_duplicates()
筛选
# SPARK sdf.filter( (sdf.flower_type == "Iris-setosa") & (sdf.petal_length > 1.5) ).show() # PANDAS-ON-SPARK pdf.loc[ (pdf.flower_type == "Iris-setosa") & (pdf.petal_length > 1.5) ].head()
计数
# SPARK sdf.filter(sdf.flower_type == "Iris-virginica").count() # PANDAS-ON-SPARK pdf.loc[pdf.flower_type == "Iris-virginica"].count()
唯一值
# SPARK sdf.select("flower_type").distinct().show() # PANDAS-ON-SPARK pdf["flower_type"].unique()
排序
# SPARK sdf.sort("sepal_length", "sepal_width").show() # PANDAS-ON-SPARK pdf.sort_values(["sepal_length", "sepal_width"]).head()
分组
# SPARK sdf.groupBy("flower_type").count().show() # PANDAS-ON-SPARK pdf.groupby("flower_type").count()
替换
# SPARK sdf.replace("Iris-setosa", "setosa").show() # PANDAS-ON-SPARK pdf.replace("Iris-setosa", "setosa").head()
连接
#SPARK sdf.union(sdf) # PANDAS-ON-SPARK pdf.append(pdf)
transform 和 apply 函数应用
有许多 API 允许用户针对 pandas-on-Spark DataFrame 应用函数,例如:
DataFrame.transform() DataFrame.apply() DataFrame.pandas_on_spark.transform_batch() DataFrame.pandas_on_spark.apply_batch() Series.pandas_on_spark.transform_batch()
每个 API 都有不同的用途,并且在内部工作方式不同。
transform 和 apply
DataFrame.transform()和DataFrame.apply()之间的主要区别在于,前者需要返回相同长度的输入,而后者不需要。
# transform psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]}) def pandas_plus(pser): return pser + 1# 应该总是返回与输入相同的长度。 psdf.transform(pandas_plus) # apply psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]}) def pandas_plus(pser): return pser[pser % 2 == 1]# 允许任意长度 psdf.apply(pandas_plus)
在这种情况下,每个函数采用一个 pandas Series,Spark 上的 pandas API 以分布式方式计算函数,如下所示。
在“列”轴的情况下,该函数将每一行作为一个熊猫系列。
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]}) def pandas_plus(pser): return sum(pser)# 允许任意长度 psdf.apply(pandas_plus, axis='columns')
上面的示例将每一行的总和计算为pands Series
pandas_on_spark.transform_batch和pandas_on_spark.apply_batch
batch 后缀表示 pandas-on-Spark DataFrame 或 Series 中的每个块。API 对 pandas-on-Spark DataFrame 或 Series 进行切片,然后以 pandas DataFrame 或 Series 作为输入和输出应用给定函数。请参阅以下示例:
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]}) def pandas_plus(pdf): return pdf + 1# 应该总是返回与输入相同的长度。 psdf.pandas_on_spark.transform_batch(pandas_plus) psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]}) def pandas_plus(pdf): return pdf[pdf.a > 1]# 允许任意长度 psdf.pandas_on_spark.apply_batch(pandas_plus)
两个示例中的函数都将 pandas DataFrame 作为 pandas-on-Spark DataFrame 的一个块,并输出一个 pandas DataFrame。Spark 上的 Pandas API 将 pandas 数据帧组合为 pandas-on-Spark 数据帧。
在 Spark 上使用 pandas API的注意事项
避免shuffle
某些操作,例如sort_values在并行或分布式环境中比在单台机器上的内存中更难完成,因为它需要将数据发送到其他节点,并通过网络在多个节点之间交换数据。
避免在单个分区上计算
另一种常见情况是在单个分区上进行计算。目前, DataFrame.rank 等一些 API 使用 PySpark 的 Window 而不指定分区规范。这会将所有数据移动到单个机器中的单个分区中,并可能导致严重的性能下降。对于非常大的数据集,应避免使用此类 API。
不要使用重复的列名
不允许使用重复的列名,因为 Spark SQL 通常不允许这样做。Spark 上的 Pandas API 继承了这种行为。例如,见下文:
import pyspark.pandas as ps psdf = ps.DataFrame({'a': [1, 2], 'b':[3, 4]}) psdf.columns = ["a", "a"]
Reference 'a' is ambiguous, could be: a, a.;
此外,强烈建议不要使用区分大小写的列名。Spark 上的 Pandas API 默认不允许它。
import pyspark.pandas as ps psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
Reference 'a' is ambiguous, could be: a, a.;
但可以在 Spark 配置spark.sql.caseSensitive中打开以启用它,但需要自己承担风险。
from pyspark.sql import SparkSession builder = SparkSession.builder.appName("pandas-on-spark") builder = builder.config("spark.sql.caseSensitive", "true") builder.getOrCreate() import pyspark.pandas as ps psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]}) psdf
aA 013 124
使用默认索引
pandas-on-Spark 用户面临的一个常见问题是默认索引导致性能下降。当索引未知时,Spark 上的 Pandas API 会附加一个默认索引,例如 Spark DataFrame 直接转换为 pandas-on-Spark DataFrame。
如果计划在生产中处理大数据,请通过将默认索引配置为distributed或distributed-sequence来使其确保为分布式。
有关配置默认索引的更多详细信息,请参阅默认索引类型[3]。
在 Spark 上使用 pandas API
尽管 Spark 上的 pandas API 具有大部分与 pandas 等效的 API,但仍有一些 API 尚未实现或明确不受支持。因此尽可能直接在 Spark 上使用 pandas API。
例如,Spark 上的 pandas API 没有实现__iter__(),阻止用户将所有数据从整个集群收集到客户端(驱动程序)端。不幸的是,许多外部 API,例如 min、max、sum 等 Python 的内置函数,都要求给定参数是可迭代的。对于 pandas,它开箱即用,如下所示:
>>> import pandas as pd >>> max(pd.Series([1, 2, 3])) 3 >>> min(pd.Series([1, 2, 3])) 1 >>> sum(pd.Series([1, 2, 3])) 6
Pandas 数据集存在于单台机器中,自然可以在同一台机器内进行本地迭代。但是,pandas-on-Spark 数据集存在于多台机器上,并且它们是以分布式方式计算的。很难在本地迭代,很可能用户在不知情的情况下将整个数据收集到客户端。因此,最好坚持使用 pandas-on-Spark API。上面的例子可以转换如下:
>>> import pyspark.pandas as ps >>> ps.Series([1, 2, 3]).max() 3 >>> ps.Series([1, 2, 3]).min() 1 >>> ps.Series([1, 2, 3]).sum() 6
pandas 用户的另一个常见模式可能是依赖列表推导式或生成器表达式。但是,它还假设数据集在引擎盖下是本地可迭代的。因此,它可以在 pandas 中无缝运行,如下所示:
import pandas as pd data = [] countries = ['London', 'New York', 'Helsinki'] pser = pd.Series([20., 21., 12.], index=countries) for temperature in pser: assert temperature > 0 if temperature > 1000: temperature = None data.append(temperature ** 2) pd.Series(data, index=countries)
London400.0 New York441.0 Helsinki144.0 dtype: float64
但是,对于 Spark 上的 pandas API,它的工作原理与上述相同。上面的示例也可以更改为直接使用 pandas-on-Spark API,如下所示:
import pyspark.pandas as ps import numpy as np countries = ['London', 'New York', 'Helsinki'] psser = ps.Series([20., 21., 12.], index=countries) def square(temperature) -> np.float64: assert temperature > 0 if temperature > 1000: temperature = None return temperature ** 2 psser.apply(square)
London400.0 New York441.0 Helsinki144.0
减少对不同 DataFrame 的操作
Spark 上的 Pandas API 默认不允许对不同 DataFrame(或 Series)进行操作,以防止昂贵的操作。只要有可能,就应该避免这种操作。
写在最后
到目前为止,我们将能够在 Spark 上使用 Pandas。这将会导致Pandas 速度的大大提高,迁移到 Spark 时学习曲线的减少,以及单机计算和分布式计算在同一代码库中的合并。
参考资料
[1]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/pandas_pyspark.html
[2]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html
[3]默认索引类型: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type
以上是Pandas 与 PySpark 强强联手,功能与速度齐飞!的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

pandas安装教程:解析常见安装错误及其解决方法,需要具体代码示例引言:Pandas是一个强大的数据分析工具,广泛应用于数据清洗、数据处理和数据可视化等方面,因此在数据科学领域备受推崇。然而,由于环境配置和依赖问题,安装pandas可能会遇到一些困难和错误。本文将为大家提供一份pandas安装教程,并解析一些常见的安装错误及其解决方法。一、安装pandas

如何使用pandas正确读取txt文件,需要具体代码示例Pandas是一个广泛使用的Python数据分析库,它可以用于处理各种各样的数据类型,包括CSV文件、Excel文件、SQL数据库等。同时,它也可以用于读取文本文件,例如txt文件。但是,在读取txt文件时,我们有时会遇到一些问题,例如编码问题、分隔符问题等。本文将介绍如何使用pandas正确读取txt

python可以通过使用pip、使用conda、从源代码、使用IDE集成的包管理工具来安装pandas。详细介绍:1、使用pip,在终端或命令提示符中运行pip install pandas命令即可安装pandas;2、使用conda,在终端或命令提示符中运行conda install pandas命令即可安装pandas;3、从源代码安装等等。

Pandas是一个强大的数据分析工具,可以方便地读取和处理各种类型的数据文件。其中,CSV文件是最常见和常用的数据文件格式之一。本文将介绍如何使用Pandas读取CSV文件并进行数据分析,同时提供具体的代码示例。一、导入必要的库首先,我们需要导入Pandas库和其他可能需要的相关库,如下所示:importpandasaspd二、读取CSV文件使用Pan

python安装pandas的步骤:1、打开终端或命令提示符;2、输入“pip install pandas”命令安装pandas库;3、等待安装完成,可以在Python脚本中导入并使用pandas库了;4、使用的是特定的虚拟环境,确保在安装pandas之前激活相应的虚拟环境;5、使用的是集成开发环境,可以添加“import pandas as pd”代码来导入pandas库。

使用pandas读取txt文件的实用技巧,需要具体代码示例在数据分析和数据处理中,txt文件是一种常见的数据格式。使用pandas读取txt文件可以快速、方便地进行数据处理。本文将介绍几种实用的技巧,以帮助你更好的使用pandas读取txt文件,并配以具体的代码示例。读取带有分隔符的txt文件使用pandas读取带有分隔符的txt文件时,可以使用read_c

数据处理利器:Pandas读取SQL数据库中的数据,需要具体代码示例随着数据量的不断增长和复杂性的提高,数据处理成为了现代社会中一个重要的环节。在数据处理过程中,Pandas成为了许多数据分析师和科学家们的首选工具之一。本文将介绍如何使用Pandas库来读取SQL数据库中的数据,并提供一些具体的代码示例。Pandas是基于Python的一个强大的数据处理和分

Pandas去重方法大揭秘:快速、高效的数据去重方式,需要具体代码示例在数据分析和处理过程中,经常会遇到数据中存在重复的情况。重复数据可能会对分析结果产生误导,因此去重是一个非常重要的工作环节。在Pandas这个强大的数据处理库中,提供了多种方法来实现数据去重,本文将介绍一些常用的去重方法,并附上具体的代码示例。基于单列去重最常见的情况是根据某一列的值是否重
