Heim > Backend-Entwicklung > Python-Tutorial > Verarbeiten Sie große Datensätze mit Python PySpark

Verarbeiten Sie große Datensätze mit Python PySpark

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
Freigeben: 2023-08-29 09:09:08
nach vorne
794 Leute haben es durchsucht

使用Python PySpark处理大型数据集

In diesem Tutorial werden wir die leistungsstarke Kombination von Python und PySpark für die Verarbeitung großer Datenmengen erkunden. PySpark ist eine Python-Bibliothek, die eine Schnittstelle zu Apache Spark bereitstellt, einem schnellen und vielseitigen Cluster-Computing-System. Durch die Nutzung von PySpark können wir Daten effizient auf eine Reihe von Maschinen verteilen und verarbeiten, sodass wir große Datensätze problemlos verarbeiten können.

In diesem Artikel befassen wir uns mit den Grundlagen von PySpark und zeigen, wie man verschiedene Datenverarbeitungsaufgaben an großen Datensätzen durchführt. Wir behandeln Schlüsselkonzepte wie RDDs (Resilient Distributed Datasets) und Datenrahmen und zeigen ihre praktische Anwendung anhand von Schritt-für-Schritt-Beispielen. Durch das Studium dieses Tutorials verfügen Sie über ein solides Verständnis dafür, wie Sie PySpark effektiv zur Verarbeitung und Analyse großer Datensätze nutzen können.

Abschnitt 1: Erste Schritte mit PySpark

Die chinesische Übersetzung lautet:

Teil 1: Erste Schritte mit PySpark

In diesem Abschnitt richten wir die Entwicklungsumgebung ein und machen uns mit den Grundkonzepten von PySpark vertraut. Wir behandeln die Installation von PySpark, die Initialisierung einer SparkSession und das Laden von Daten in RDDs und DataFrames. Beginnen wir mit der Installation von PySpark:

# Install PySpark
!pip install pyspark
Nach dem Login kopieren

Ausgabe

Collecting pyspark
...
Successfully installed pyspark-3.1.2

Nach dem Login kopieren

Nach der Installation von PySpark können wir eine SparkSession initialisieren, um eine Verbindung zu unserem Spark-Cluster herzustellen:

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("LargeDatasetProcessing").getOrCreate()
Nach dem Login kopieren

Da unsere SparkSession bereit ist, können wir jetzt Daten in RDDs oder DataFrames laden. RDDs sind die grundlegende Datenstruktur in PySpark, die eine verteilte Sammlung von Elementen bereitstellt. DataFrames hingegen organisieren Daten in benannten Spalten, ähnlich wie Tabellen in relationalen Datenbanken. Laden wir eine CSV-Datei als DataFrame:

# Load a CSV file as a DataFrame
df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True)
Nach dem Login kopieren

Ausgabe

+---+------+--------+
|id |name  |age     |
+---+------+--------+
|1  |John  |32      |
|2  |Alice |28      |
|3  |Bob   |35      |
+---+------+--------+
Nach dem Login kopieren

Wie Sie dem obigen Codeausschnitt entnehmen können, verwenden wir die Methode „read.csv()“, um die CSV-Datei in einen Datenrahmen einzulesen. Der Parameter „header=True“ bedeutet, dass die erste Zeile Spaltennamen enthält, und „inferSchema=True“ leitet automatisch den Datentyp jeder Spalte ab.

Teil 2: Daten transformieren und analysieren

In diesem Abschnitt werden wir verschiedene Datentransformations- und Analysetechniken mit PySpark untersuchen. Wir behandeln Vorgänge wie das Filtern, Aggregieren und Zusammenführen von Datensätzen. Beginnen wir damit, die Daten nach bestimmten Kriterien zu filtern:

# Filter data
filtered_data = df.filter(df["age"] > 30)
Nach dem Login kopieren

Ausgabe

+---+----+---+
|id |name|age|
+---+----+---+
|1  |John|32 |
|3  |Bob |35 |
+---+----+---+
Nach dem Login kopieren

Im obigen Codeausschnitt verwenden wir die Methode „filter()“, um Zeilen auszuwählen, in denen die Spalte „Alter“ größer als 30 ist. Mit dieser Operation können wir relevante Teilmengen aus großen Datensätzen extrahieren.

Als nächstes führen wir eine Aggregation des Datensatzes mit den Methoden „groupBy()“ und „agg()“ durch:

# Aggregate data
aggregated_data = df.groupBy("gender").agg({"salary": "mean", "age": "max"})
Nach dem Login kopieren

Ausgabe

+------+-----------+--------+
|gender|avg(salary)|max(age)|
+------+-----------+--------+
|Male  |2500       |32      |
|Female|3000       |35      |
+------+-----------+--------+
Nach dem Login kopieren

Hier gruppieren wir die Daten nach der Spalte „Geschlecht“ und berechnen das Durchschnittsgehalt und das Höchstalter für jede Gruppe. Der resultierende Datenrahmen „aggreated_data“ liefert uns wertvolle Einblicke in den Datensatz.

Neben der Filterung und Aggregation ermöglicht uns PySpark auch die effiziente Verknüpfung mehrerer Datensätze. Betrachten wir ein Beispiel, in dem wir zwei DataFrames haben: „df1“ und „df2“. Wir können sie anhand einer gemeinsamen Kolumne verbinden:

# Join two DataFrames
joined_data = df1.join(df2, on="id", how="inner")
Nach dem Login kopieren

Ausgabe

+---+----+---------+------+
|id |name|department|salary|
+---+----+---------+------+
|1  |John|HR       |2500  |
|2  |Alice|IT      |3000  |
|3  |Bob |Sales    |2000  |
+---+----+---------+------+
Nach dem Login kopieren

Mit der Methode „join()“ können wir DataFrames basierend auf den gemeinsamen Spalten zusammenführen, die durch den Parameter „on“ angegeben werden. Je nach Bedarf können wir verschiedene Verbindungsarten wählen, wie zum Beispiel „innen“, „außen“, „links“ oder „rechts“.

Teil 3: Erweiterte PySpark-Technologie

In diesem Abschnitt werden wir die fortschrittliche PySpark-Technologie untersuchen, um unsere Datenverarbeitungsfähigkeiten weiter zu verbessern. Wir behandeln Themen wie benutzerdefinierte Funktionen (UDFs), Fensterfunktionen und Caching. Beginnen wir mit der Definition und Verwendung von UDFs:

from pyspark.sql.functions import udf

# Define a UDF
def square(x):
    return x ** 2

# Register the UDF
square_udf = udf(square)

# Apply the UDF to a column
df = df.withColumn("age_squared", square_udf(df["age"]))
Nach dem Login kopieren

Ausgabe

+---+------+---+------------+
|id |name  |age|age_squared |
+---+------+---+------------+
|1  |John  |32 |1024        |
|2  |Alice |28 |784         |
|3  |Bob   |35 |1225        |
+---+------+---+------------+
Nach dem Login kopieren

Im obigen Codeausschnitt haben wir eine einfache UDF-Funktion namens „square()“ definiert, die zum Quadrieren der gegebenen Eingabe verwendet wird. Anschließend registrieren wir diese UDF mit der Funktion „udf()“ und wenden sie auf die Spalte „Alter“ an, wodurch eine neue Spalte mit dem Namen „age_squared“ in unserem DataFrame erstellt wird.

PySpark bietet außerdem leistungsstarke Fensterfunktionen, mit denen wir Berechnungen innerhalb eines bestimmten Fensterbereichs durchführen können. Betrachten wir die vorherige und die nächste Zeile, um das Durchschnittsgehalt pro Mitarbeiter zu berechnen:

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, avg

# Define the window
window = Window.orderBy("id")

# Calculate average salary with lag and lead
df = df.withColumn("avg_salary", (lag(df["salary"]).over(window) + lead(df["salary"]).over(window) + df["salary"]) / 3)
Nach dem Login kopieren

Ausgabe

+---+----+---------+------+----------+
|id |name|department|salary|avg_salary|
+---+----+---------+------+----------+
|1  |John|HR       |2500  |2666.6667 |
|2  |Alice|

IT      |3000  |2833.3333 |
|3  |Bob |Sales    |2000  |2500      |
+---+----+---------+------+----------+
Nach dem Login kopieren

Im obigen Codeauszug verwenden wir die Methode „Window.orderBy()“, um ein Fenster zu definieren, das die Reihenfolge der Zeilen basierend auf der Spalte „id“ angibt. Anschließend verwenden wir die Funktionen „lag()“ und „lead()“, um auf die vorherige bzw. nächste Zeile zuzugreifen. Abschließend berechnen wir das Durchschnittsgehalt, indem wir die aktuelle Zeile und ihre Nachbarn berücksichtigen.

Schließlich ist Caching eine wichtige Technologie in PySpark, um die Leistung iterativer Algorithmen oder wiederholter Berechnungen zu verbessern. Mit der Methode „cache()“ können wir einen DataFrame oder ein RDD im Speicher zwischenspeichern:

# Cache a DataFrame
df.cache()
Nach dem Login kopieren

Caching zeigt keine Ausgabe an, aber nachfolgende Vorgänge, die auf dem zwischengespeicherten DataFrame basieren, werden schneller sein, da die Daten im Speicher gespeichert werden.

Fazit

In diesem Tutorial erkunden wir die Leistungsfähigkeit von PySpark für die Verarbeitung großer Datensätze in Python. Wir haben zunächst die Entwicklungsumgebung eingerichtet und die Daten in RDDs und DataFrames geladen. Anschließend befassten wir uns mit Datentransformations- und Analysetechniken, einschließlich Filtern, Aggregieren und Zusammenführen von Datensätzen. Abschließend besprechen wir fortgeschrittene PySpark-Techniken wie benutzerdefinierte Funktionen, Fensterfunktionen und Caching.

Das obige ist der detaillierte Inhalt vonVerarbeiten Sie große Datensätze mit Python PySpark. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:tutorialspoint.com
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage