ホームページ > バックエンド開発 > Python チュートリアル > Pandas と PySpark が連携して、機能性と速度の両方を実現します。

Pandas と PySpark が連携して、機能性と速度の両方を実現します。

WBOY
リリース: 2023-05-01 21:19:05
転載
1973 人が閲覧しました

データ処理に Python を使用するデータ サイエンティストやデータ実務者は、データ サイエンス パッケージ pandas に馴染みがあり、Yun Duojun のような pandas のヘビー ユーザーもいます。コードの最初の行のほとんどはプロジェクトの開始時に書かれています。パンダを pd としてインポートします。 Pandas はデータ処理の yyds と言えます。欠点も非常に明白で、パンダは 1 台のマシンでしか処理できず、データ量に応じて線形に拡張できません。たとえば、パンダがマシンの利用可能なメモリよりも大きいデータ セットを読み取ろうとすると、メモリ不足により失敗します。

さらに、pandas は大規模なデータの処理が非常に遅いため、データ処理速度を最適化して向上させるための Dask や Vaex などのライブラリは他にもありますが、Spark という神フレームワークの前では朝飯前です。ビッグデータ処理。

幸いなことに、新しい Spark 3.2 バージョンでは、新しい Pandas API が登場し、ほとんどの pandas 関数が PySpark に統合されています。Spark には Pandas API が Spark を使用しているため、pandas インターフェイスを使用すると、Spark を使用できます。バックグラウンドで強力な協力の効果を達成できるため、非常に強力で非常に便利であると言えます。

すべては Spark AI Summit 2019 から始まりました。 Koalas は、Spark 上で Pandas を使用するオープンソース プロジェクトです。当初は Pandas の機能のほんの一部しかカバーしていませんでしたが、徐々に規模が大きくなっていきました。新しい Spark 3.2 バージョンでは、Koalas が PySpark に統合されました。

Spark に Pandas API が統合されたため、Spark 上で Pandas を実行できるようになりました。変更する必要があるコードは 1 行だけです:

import pyspark.pandas as ps
ログイン後にコピー

これにより、多くの利点が得られます:

  • Python と Pandas の使用には慣れているが、Spark には詳しくない場合、複雑さを省略できますので、学習プロセスにすぐに PySpark を使い始めてください。
  • 小規模データとビッグデータ、単一マシンと分散マシンなど、すべてに 1 つのコード ベースを使用できます。
  • Spark 分散フレームワークでは、Pandas コードをより速く実行できます。
#最後の点は特に注目に値します。

一方で、分散コンピューティングは Pandas のコードに適用できます。また、Spark エンジンを使用すると、1 台のマシンでもコードが高速になります。以下のグラフは、96 個の vCPU と 384 GiB のメモリを備えたマシンで Spark を実行した場合と、pandas を単独で呼び出して 130 GB CSV データセットを分析した場合のパフォーマンスの比較を示しています。

Pandas 与 PySpark 强强联手,功能与速度齐飞!

マルチスレッドと Spark SQL Catalyst Optimizer はどちらもパフォーマンスの最適化に役立ちます。たとえば、結合カウント操作は、フルステージのコード生成を使用すると 4 倍高速になり、コード生成なしの場合は 5.9 秒、コード生成ありの場合は 1.6 秒になります。

Spark には、操作の連鎖において特に大きな利点があります。 Catalyst クエリ オプティマイザーはフィルターを認識してデータをインテリジェントにフィルターし、ディスクベースの結合を適用できますが、Pandas は各ステップですべてのデータをメモリにロードすることを好みます。

Spark で Pandas API を使用してコードを記述する方法を試してみませんか?今すぐ始めましょう!

Pandas 間の切り替え / Pandas-on-Spark / Spark

最初に知っておく必要があるのは、正確に何を使用しているのかということです。 Pandas を使用する場合は、pandas.core.frame.DataFrame クラスを使用します。 Spark で pandas API を使用する場合は、pyspark.pandas.frame.DataFrame を使用します。この 2 つは似ていますが、同じではありません。主な違いは、前者は単一マシン内にあるのに対し、後者は分散されていることです。

Pandas-on-Spark を使用してデータフレームを作成し、それを Pandas に変換することも、その逆も可能です。

# import Pandas-on-Spark 
import pyspark.pandas as ps

# 使用 Pandas-on-Spark 创建一个 DataFrame 
ps_df = ps.DataFrame(range(10))

# 将 Pandas-on-Spark Dataframe 转换为 Pandas Dataframe 
pd_df = ps_df.to_pandas()

# 将 Pandas Dataframe 转换为 Pandas-on-Spark Dataframe 
ps_df = ps.from_pandas(pd_df)
ログイン後にコピー

複数のマシンを使用する場合は、 Pandas に変換する前の Pandas-on-Spark: Spark データフレームを Pandas データフレームに変換するとき、データは複数のマシンから 1 つのマシンに、またはその逆に転送されます (PySpark ガイド [1] を参照)。

Pandas-on-Spark データフレームを Spark データフレームに変換したり、その逆に変換したりすることもできます。

# 使用 Pandas-on-Spark 创建一个 DataFrame 
ps_df = ps.DataFrame(range(10))

# 将 Pandas-on-Spark Dataframe 转换为 Spark Dataframe 
spark_df = ps_df.to_spark()

# 将 Spark Dataframe 转换为 Pandas-on-Spark Dataframe 
ps_df_new = spark_df.to_pandas_on_spark()
ログイン後にコピー

データ型はどのように変わりますか?

Pandas-on-Spark と Pandas を使用する場合、データ型は基本的に同じです。 Pandas-on-Spark DataFrame を Spark DataFrame に変換すると、データ型は自動的に適切な型に変換されます (PySpark ガイド [2] を参照)

次の例は、変換時にデータがどのように変換されるかを示しています。 PySpark DataFrame から pandas-on-Spark DataFrame への型変換。

>>> sdf = spark.createDataFrame([
... (1, Decimal(1.0), 1., 1., 1, 1, 1, datetime(2020, 10, 27), "1", True, datetime(2020, 10, 27)),
... ], 'tinyint tinyint, decimal decimal, float float, double double, integer integer, long long, short short, timestamp timestamp, string string, boolean boolean, date date')
>>> sdf
ログイン後にコピー
DataFrame[tinyint: tinyint, decimal: decimal(10,0),
float: float, double: double, integer: int,
long: bigint, short: smallint, timestamp: timestamp, 
string: string, boolean: boolean, date: date]
ログイン後にコピー
psdf = sdf.pandas_api()
psdf.dtypes
ログイン後にコピー
tinyintint8
decimalobject
float float32
doublefloat64
integer int32
longint64
short int16
timestampdatetime64[ns]
string object
booleanbool
date object
dtype: object
ログイン後にコピー

Pandas-on-Spark と Spark 関数

Spark の DataFrame と、Pandas-on-Spark で最もよく使用される関数。 Pandas-on-Spark と Pandas の唯一の構文の違いは、ps 行としての import pyspark.pandas であることに注意してください。

当你看完如下内容后,你会发现,即使您不熟悉 Spark,也可以通过 Pandas API 轻松使用。

导入库

# 运行Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder 
.appName("Spark") 
.getOrCreate()
# 在Spark上运行Pandas
import pyspark.pandas as ps
ログイン後にコピー

读取数据

以 old dog iris 数据集为例。

# SPARK 
sdf = spark.read.options(inferSchema='True', 
header='True').csv('iris.csv')
# PANDAS-ON-SPARK 
pdf = ps.read_csv('iris.csv')
ログイン後にコピー

选择

# SPARK 
sdf.select("sepal_length","sepal_width").show()
# PANDAS-ON-SPARK 
pdf[["sepal_length","sepal_width"]].head()
ログイン後にコピー

删除列

# SPARK 
sdf.drop('sepal_length').show()# PANDAS-ON-SPARK 
pdf.drop('sepal_length').head()
ログイン後にコピー

删除重复项

# SPARK 
sdf.dropDuplicates(["sepal_length","sepal_width"]).show()
# PANDAS-ON-SPARK 
pdf[["sepal_length", "sepal_width"]].drop_duplicates()
ログイン後にコピー

筛选

# SPARK 
sdf.filter( (sdf.flower_type == "Iris-setosa") & (sdf.petal_length > 1.5) ).show()
# PANDAS-ON-SPARK 
pdf.loc[ (pdf.flower_type == "Iris-setosa") & (pdf.petal_length > 1.5) ].head()
ログイン後にコピー

计数

# SPARK 
sdf.filter(sdf.flower_type == "Iris-virginica").count()
# PANDAS-ON-SPARK 
pdf.loc[pdf.flower_type == "Iris-virginica"].count()
ログイン後にコピー

唯一值

# SPARK 
sdf.select("flower_type").distinct().show()
# PANDAS-ON-SPARK 
pdf["flower_type"].unique()
ログイン後にコピー

排序

# SPARK 
sdf.sort("sepal_length", "sepal_width").show()
# PANDAS-ON-SPARK 
pdf.sort_values(["sepal_length", "sepal_width"]).head()
ログイン後にコピー

分组

# SPARK 
sdf.groupBy("flower_type").count().show()
# PANDAS-ON-SPARK 
pdf.groupby("flower_type").count()
ログイン後にコピー

替换

# SPARK 
sdf.replace("Iris-setosa", "setosa").show()
# PANDAS-ON-SPARK 
pdf.replace("Iris-setosa", "setosa").head()
ログイン後にコピー

连接

#SPARK 
sdf.union(sdf)
# PANDAS-ON-SPARK 
pdf.append(pdf)
ログイン後にコピー

transform 和 apply 函数应用

有许多 API 允许用户针对 pandas-on-Spark DataFrame 应用函数,例如:

DataFrame.transform() 
DataFrame.apply()
DataFrame.pandas_on_spark.transform_batch()
DataFrame.pandas_on_spark.apply_batch()
Series.pandas_on_spark.transform_batch()
ログイン後にコピー

每个 API 都有不同的用途,并且在内部工作方式不同。

transform 和 apply

DataFrame.transform()和DataFrame.apply()之间的主要区别在于,前者需要返回相同长度的输入,而后者不需要。

# transform
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pser):
return pser + 1# 应该总是返回与输入相同的长度。

psdf.transform(pandas_plus)

# apply
psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]})
def pandas_plus(pser):
return pser[pser % 2 == 1]# 允许任意长度

psdf.apply(pandas_plus)
ログイン後にコピー

在这种情况下,每个函数采用一个 pandas Series,Spark 上的 pandas API 以分布式方式计算函数,如下所示。

Pandas 与 PySpark 强强联手,功能与速度齐飞!

在“列”轴的情况下,该函数将每一行作为一个熊猫系列。

psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pser):
return sum(pser)# 允许任意长度
psdf.apply(pandas_plus, axis='columns')
ログイン後にコピー

上面的示例将每一行的总和计算为pands Series

Pandas 与 PySpark 强强联手,功能与速度齐飞!

pandas_on_spark.transform_batch和pandas_on_spark.apply_batch

batch 后缀表示 pandas-on-Spark DataFrame 或 Series 中的每个块。API 对 pandas-on-Spark DataFrame 或 Series 进行切片,然后以 pandas DataFrame 或 Series 作为输入和输出应用给定函数。请参阅以下示例:

psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pdf):
return pdf + 1# 应该总是返回与输入相同的长度。

psdf.pandas_on_spark.transform_batch(pandas_plus)

psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pdf):
return pdf[pdf.a > 1]# 允许任意长度

psdf.pandas_on_spark.apply_batch(pandas_plus)
ログイン後にコピー

两个示例中的函数都将 pandas DataFrame 作为 pandas-on-Spark DataFrame 的一个块,并输出一个 pandas DataFrame。Spark 上的 Pandas API 将 pandas 数据帧组合为 pandas-on-Spark 数据帧。

Pandas 与 PySpark 强强联手,功能与速度齐飞!

在 Spark 上使用 pandas API的注意事项

避免shuffle

某些操作,例如sort_values在并行或分布式环境中比在单台机器上的内存中更难完成,因为它需要将数据发送到其他节点,并通过网络在多个节点之间交换数据。

避免在单个分区上计算

另一种常见情况是在单个分区上进行计算。目前, DataFrame.rank 等一些 API 使用 PySpark 的 Window 而不指定分区规范。这会将所有数据移动到单个机器中的单个分区中,并可能导致严重的性能下降。对于非常大的数据集,应避免使用此类 API。

不要使用重复的列名

不允许使用重复的列名,因为 Spark SQL 通常不允许这样做。Spark 上的 Pandas API 继承了这种行为。例如,见下文:

import pyspark.pandas as ps
psdf = ps.DataFrame({'a': [1, 2], 'b':[3, 4]})
psdf.columns = ["a", "a"]
ログイン後にコピー
Reference 'a' is ambiguous, could be: a, a.;
ログイン後にコピー
ログイン後にコピー

此外,强烈建议不要使用区分大小写的列名。Spark 上的 Pandas API 默认不允许它。

import pyspark.pandas as ps
psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
ログイン後にコピー
Reference 'a' is ambiguous, could be: a, a.;
ログイン後にコピー
ログイン後にコピー

但可以在 Spark 配置spark.sql.caseSensitive中打开以启用它,但需要自己承担风险。

from pyspark.sql import SparkSession
builder = SparkSession.builder.appName("pandas-on-spark")
builder = builder.config("spark.sql.caseSensitive", "true")
builder.getOrCreate()

import pyspark.pandas as ps
psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
psdf
ログイン後にコピー
aA
013
124
ログイン後にコピー

使用默认索引

pandas-on-Spark 用户面临的一个常见问题是默认索引导致性能下降。当索引未知时,Spark 上的 Pandas API 会附加一个默认索引,例如 Spark DataFrame 直接转换为 pandas-on-Spark DataFrame。

如果计划在生产中处理大数据,请通过将默认索引配置为distributed或distributed-sequence来使其确保为分布式。

有关配置默认索引的更多详细信息,请参阅默认索引类型[3]。

在 Spark 上使用 pandas API

尽管 Spark 上的 pandas API 具有大部分与 pandas 等效的 API,但仍有一些 API 尚未实现或明确不受支持。因此尽可能直接在 Spark 上使用 pandas API。

例如,Spark 上的 pandas API 没有实现__iter__(),阻止用户将所有数据从整个集群收集到客户端(驱动程序)端。不幸的是,许多外部 API,例如 min、max、sum 等 Python 的内置函数,都要求给定参数是可迭代的。对于 pandas,它开箱即用,如下所示:

>>> import pandas as pd
>>> max(pd.Series([1, 2, 3]))
3
>>> min(pd.Series([1, 2, 3]))
1
>>> sum(pd.Series([1, 2, 3]))
6
ログイン後にコピー

Pandas 数据集存在于单台机器中,自然可以在同一台机器内进行本地迭代。但是,pandas-on-Spark 数据集存在于多台机器上,并且它们是以分布式方式计算的。很难在本地迭代,很可能用户在不知情的情况下将整个数据收集到客户端。因此,最好坚持使用 pandas-on-Spark API。上面的例子可以转换如下:

>>> import pyspark.pandas as ps
>>> ps.Series([1, 2, 3]).max()
3
>>> ps.Series([1, 2, 3]).min()
1
>>> ps.Series([1, 2, 3]).sum()
6
ログイン後にコピー

pandas 用户的另一个常见模式可能是依赖列表推导式或生成器表达式。但是,它还假设数据集在引擎盖下是本地可迭代的。因此,它可以在 pandas 中无缝运行,如下所示:

import pandas as pd
data = []
countries = ['London', 'New York', 'Helsinki']
pser = pd.Series([20., 21., 12.], index=countries)
for temperature in pser:
 assert temperature > 0
 if temperature > 1000:
 temperature = None
 data.append(temperature ** 2)

pd.Series(data, index=countries)
ログイン後にコピー
London400.0
New York441.0
Helsinki144.0
dtype: float64
ログイン後にコピー

但是,对于 Spark 上的 pandas API,它的工作原理与上述相同。上面的示例也可以更改为直接使用 pandas-on-Spark API,如下所示:

import pyspark.pandas as ps
import numpy as np
countries = ['London', 'New York', 'Helsinki']
psser = ps.Series([20., 21., 12.], index=countries)
def square(temperature) -> np.float64:
 assert temperature > 0
 if temperature > 1000:
 temperature = None
 return temperature ** 2

psser.apply(square)
ログイン後にコピー
London400.0
New York441.0
Helsinki144.0
ログイン後にコピー

减少对不同 DataFrame 的操作

Spark 上的 Pandas API 默认不允许对不同 DataFrame(或 Series)进行操作,以防止昂贵的操作。只要有可能,就应该避免这种操作。

写在最后

到目前为止,我们将能够在 Spark 上使用 Pandas。这将会导致Pandas 速度的大大提高,迁移到 Spark 时学习曲线的减少,以及单机计算和分布式计算在同一代码库中的合并。

参考资料

[1]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/pandas_pyspark.html

[2]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html

[3]默认索引类型: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type

以上がPandas と PySpark が連携して、機能性と速度の両方を実現します。の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:51cto.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート