Maison > développement back-end > Tutoriel Python > Traitez de grands ensembles de données avec Python PySpark

Traitez de grands ensembles de données avec Python PySpark

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
Libérer: 2023-08-29 09:09:08
avant
793 Les gens l'ont consulté

使用Python PySpark处理大型数据集

Dans ce tutoriel, nous explorerons la puissante combinaison de Python et PySpark pour traiter de grands ensembles de données. PySpark est une bibliothèque Python qui fournit une interface à Apache Spark, un système informatique en cluster rapide et polyvalent. En tirant parti de PySpark, nous pouvons distribuer et traiter efficacement les données sur un ensemble de machines, ce qui nous permet de gérer facilement des ensembles de données à grande échelle.

Dans cet article, nous approfondirons les principes fondamentaux de PySpark et démontrerons comment effectuer diverses tâches de traitement de données sur de grands ensembles de données. Nous aborderons des concepts clés tels que les RDD (Resilient Distributed Datasets) et les trames de données, et montrerons leur application pratique avec des exemples étape par étape. En étudiant ce didacticiel, vous comprendrez parfaitement comment utiliser efficacement PySpark pour traiter et analyser des ensembles de données à grande échelle.

Section 1 : Démarrer avec PySpark

La traduction chinoise est :

Partie 1 : Démarrer avec PySpark

Dans cette section, nous allons mettre en place l'environnement de développement et nous familiariser avec les concepts de base de PySpark. Nous verrons comment installer PySpark, initialiser une SparkSession et charger des données dans des RDD et des DataFrames. Commençons par installer PySpark :

# Install PySpark
!pip install pyspark
Copier après la connexion

Sortie

Collecting pyspark
...
Successfully installed pyspark-3.1.2

Copier après la connexion

Après avoir installé PySpark, nous pouvons initialiser une SparkSession pour nous connecter à notre cluster Spark :

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("LargeDatasetProcessing").getOrCreate()
Copier après la connexion

Avec notre SparkSession prête, nous pouvons désormais charger des données dans des RDD ou des DataFrames. Les RDD constituent la structure de données de base de PySpark, qui fournit une collection distribuée d'éléments. Les DataFrames, quant à eux, organisent les données en colonnes nommées, similaires aux tables des bases de données relationnelles. Chargeons un fichier CSV en tant que DataFrame :

# Load a CSV file as a DataFrame
df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True)
Copier après la connexion

Sortie

+---+------+--------+
|id |name  |age     |
+---+------+--------+
|1  |John  |32      |
|2  |Alice |28      |
|3  |Bob   |35      |
+---+------+--------+
Copier après la connexion

Comme vous pouvez le voir dans l'extrait de code ci-dessus, nous utilisons la méthode `read.csv()` pour lire le fichier CSV dans un bloc de données. Le paramètre `header=True` signifie que la première ligne contient des noms de colonnes, et `inferSchema=True` déduira automatiquement le type de données de chaque colonne.

Partie 2 : Transformer et analyser les données

Dans cette section, nous explorerons diverses techniques de transformation et d'analyse de données à l'aide de PySpark. Nous couvrirons des opérations telles que le filtrage, l'agrégation et la jointure d'ensembles de données. Commençons par filtrer les données selon des critères précis :

# Filter data
filtered_data = df.filter(df["age"] > 30)
Copier après la connexion

Sortie

+---+----+---+
|id |name|age|
+---+----+---+
|1  |John|32 |
|3  |Bob |35 |
+---+----+---+
Copier après la connexion

Dans l'extrait de code ci-dessus, nous utilisons la méthode `filter()` pour sélectionner les lignes où la colonne « âge » est supérieure à 30. Cette opération nous permet d'extraire des sous-ensembles pertinents à partir de grands ensembles de données.

Ensuite, effectuons l'agrégation sur l'ensemble de données en utilisant les méthodes « groupBy() » et « agg() » :

# Aggregate data
aggregated_data = df.groupBy("gender").agg({"salary": "mean", "age": "max"})
Copier après la connexion

Sortie

+------+-----------+--------+
|gender|avg(salary)|max(age)|
+------+-----------+--------+
|Male  |2500       |32      |
|Female|3000       |35      |
+------+-----------+--------+
Copier après la connexion

Ici, nous regroupons les données par colonne « Genre » et calculons le salaire moyen et l'âge maximum pour chaque groupe. La trame de données « aggregated_data » résultante nous fournit des informations précieuses sur l'ensemble de données.

En plus du filtrage et de l'agrégation, PySpark nous permet également de joindre efficacement plusieurs ensembles de données. Prenons un exemple où nous avons deux DataFrames : "df1" et "df2". Nous pouvons les rejoindre sur la base d'une colonne commune :

# Join two DataFrames
joined_data = df1.join(df2, on="id", how="inner")
Copier après la connexion

Sortie

+---+----+---------+------+
|id |name|department|salary|
+---+----+---------+------+
|1  |John|HR       |2500  |
|2  |Alice|IT      |3000  |
|3  |Bob |Sales    |2000  |
+---+----+---------+------+
Copier après la connexion

La méthode `join()` nous permet de fusionner des DataFrames en fonction des colonnes communes spécifiées par le paramètre `on`. En fonction de nos besoins, nous pouvons choisir différents types de connexion, tels que « interne », « externe », « gauche » ou « droite ».

Partie 3 : Technologie PySpark avancée

Dans cette section, nous explorerons la technologie avancée PySpark pour améliorer davantage nos capacités de traitement de données. Nous aborderons des sujets tels que les fonctions définies par l'utilisateur (UDF), les fonctions de fenêtre et la mise en cache. Commençons par définir et utiliser les UDF :

from pyspark.sql.functions import udf

# Define a UDF
def square(x):
    return x ** 2

# Register the UDF
square_udf = udf(square)

# Apply the UDF to a column
df = df.withColumn("age_squared", square_udf(df["age"]))
Copier après la connexion

Sortie

+---+------+---+------------+
|id |name  |age|age_squared |
+---+------+---+------------+
|1  |John  |32 |1024        |
|2  |Alice |28 |784         |
|3  |Bob   |35 |1225        |
+---+------+---+------------+
Copier après la connexion

Dans l'extrait de code ci-dessus, nous avons défini une fonction UDF simple nommée `square()` qui est utilisée pour mettre au carré l'entrée donnée. Nous enregistrons ensuite cet UDF à l'aide de la fonction `udf()` et l'appliquons à la colonne "age", créant une nouvelle colonne appelée "age_squared" dans notre DataFrame.

PySpark fournit également de puissantes fonctions de fenêtre qui nous permettent d'effectuer des calculs dans une plage de fenêtre spécifique. Considérons les lignes précédentes et suivantes pour calculer le salaire moyen par employé :

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, avg

# Define the window
window = Window.orderBy("id")

# Calculate average salary with lag and lead
df = df.withColumn("avg_salary", (lag(df["salary"]).over(window) + lead(df["salary"]).over(window) + df["salary"]) / 3)
Copier après la connexion

Sortie

+---+----+---------+------+----------+
|id |name|department|salary|avg_salary|
+---+----+---------+------+----------+
|1  |John|HR       |2500  |2666.6667 |
|2  |Alice|

IT      |3000  |2833.3333 |
|3  |Bob |Sales    |2000  |2500      |
+---+----+---------+------+----------+
Copier après la connexion

Dans l'extrait de code ci-dessus, nous utilisons la méthode "Window.orderBy()" pour définir une fenêtre qui spécifie l'ordre des lignes en fonction de la colonne "id". Nous utilisons ensuite les fonctions "lag()" et "lead()" pour accéder respectivement à la ligne précédente et suivante. Enfin, nous calculons le salaire moyen en considérant la ligne actuelle et ses voisines.

Enfin, la mise en cache est une technologie importante dans PySpark pour améliorer les performances des algorithmes itératifs ou des calculs répétés. Nous pouvons mettre en cache un DataFrame ou un RDD en mémoire en utilisant la méthode `cache()` :

# Cache a DataFrame
df.cache()
Copier après la connexion

La mise en cache n'affichera aucune sortie, mais les opérations ultérieures reposant sur le DataFrame mis en cache seront plus rapides puisque les données sont stockées en mémoire.

Conclusion

Dans ce tutoriel, nous explorons la puissance de PySpark pour traiter de grands ensembles de données en Python. Nous avons d'abord configuré l'environnement de développement et chargé les données dans des RDD et des DataFrames. Nous avons ensuite approfondi les techniques de transformation et d'analyse des données, notamment le filtrage, l'agrégation et la jointure d'ensembles de données. Enfin, nous discutons des techniques avancées de PySpark telles que les fonctions définies par l'utilisateur, les fonctions de fenêtre et la mise en cache.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:tutorialspoint.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal