azure
Support formation Microsoft Azure
azure
Support formation Microsoft Azure
DP-203

icon picker
Transformer des données avec Spark dans Azure Synapse Analytics

Gonzague Ducos
Apache Spark fournit l’objet dataframe comme structure principale pour travailler avec des données
Pour charger des données dans un dataframe, vous utilisez la fonction spark.read.
order_details = spark.read.csv('/orders/*.csv', header=True, inferSchema=True)
display(order_details.limit(5))

Transformer la structure des données

Opérations classiques sur un dataframe :
Filtrage des lignes et des colonnes
Renommage des colonnes
Création de nouvelles colonnes, souvent dérivées de colonnes existantes
Remplacement de valeurs null ou d’autres valeurs
from pyspark.sql.functions import split, col
# Créer les nouveaux champs FirstName et LastName en séparant sur l'espace
transformed_df = order_details.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))
# Suprime le champ CustomerName
transformed_df = transformed_df.drop("CustomerName")
display(transformed_df.limit(5))

Enregistrer les données transformées

## Enregistre au format Parquet dans le lac
transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')
print ("Transformed data saved!")
Parfois, votre besoin de transformation de données peut en fait simplement consister à convertir des données d’un autre format (comme CSV) vers Parquet !

Partitionner des fichiers de données

Technique d’optimisation qui permet à Spark d’obtenir les meilleures performances sur les nœuds Worker.

Partitionner le fichier de sortie

Pour enregistrer un dataframe en tant que jeu de fichiers partitionné, utilisez la méthode partitionBy lors de l’écriture des données

from pyspark.sql.functions import year, col
# Charge la source de données
df = spark.read.csv('/orders/*.csv', header=True, inferSchema=True)
# Ajoute la colonne Year
dated_df = df.withColumn("Year", year(col("OrderDate")))
# Partitionne par year
dated_df.write.partitionBy("Year").mode("overwrite").parquet("/data")
Les noms de dossiers générés lors du partitionnement d’un dataframe incluent le nom et la valeur de la colonne de partitionnement au format colonne=valeur.

Filtrer des fichiers Parquet dans une requête

Les colonnes de partitionnement spécifiées dans le chemin de fichier sont omises dans le dataframe résultant.
orders_2020 = spark.read.parquet('/partitioned_data/Year=2020')
display(orders_2020.limit(5))

Transformer des données avec SQL

La bibliothèque SparkSQL vous permet aussi d’utiliser SQL comme méthode pour travailler avec des données.
Les tables sont des abstractions de métadonnées sur des fichiers. Les données ne sont pas stockées dans une table relationnelle, mais la table fournit une couche relationnelle sur les fichiers du lac de données.

Créer des tables et des vues

Enregistre un dataframe (chargé depuis des fichiers CSV) sous la forme d’une table externe nommée sales_orders. Les fichiers sont stockés dans le dossier /sales_orders_table du lac de données :
order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')
Les définitions de table dans Spark sont stockées dans le metastore, une couche de métadonnées qui encapsule les abstractions relationnelles sur les fichiers.
Les tables externes sont des tables relationnelles du metastore qui référencent des fichiers à un emplacement du lac de données que vous spécifiez.
Vous pouvez accéder à ces données :
en interrogeant la table
en lisant les fichiers directement à partir du lac de données
Les tables externes sont « faiblement liées » aux fichiers sous-jacents. La suppression de la table ne supprime pas les fichiers. Utiliser Spark pour effectuer le gros travail de transformation, puis d’enregistrer les données dans le lac. Une fois cette opération effectuée, vous pouvez supprimer la table.
Vous pouvez aussi définir des tables managées, pour lesquelles les fichiers de données sous-jacents sont stockés à un emplacement de stockage géré en interne associé au metastore. Les tables managées sont « étroitement liées » aux fichiers et la suppression d’une table managée supprime les fichiers associés.

Utiliser SQL pour interroger et transformer les données

# Crée des colonnées dérivées
sql_transform = spark.sql("SELECT *, YEAR(OrderDate) AS Year, MONTH(OrderDate) AS Month FROM sales_orders")
# Enregistre le résultat
sql_transform.write.partitionBy("Year","Month").saveAsTable('transformed_orders', format='parquet', mode='overwrite', path='/transformed_orders_table')
Hiérarchie de dossiers au format Year=*NNNN* / Month=*N*.

Interroger le metastore

Table précédente créée dans le metastore, peut donc être interrogée :
%%sql
SELECT * FROM transformed_orders
WHERE Year = 2021
AND Month = 1

Supprimer des tables

%%sql
DROP TABLE transformed_orders;
DROP TABLE sales_orders;

Want to print your doc?
This is not the way.
Try clicking the ⋯ next to your doc name or using a keyboard shortcut (
CtrlP
) instead.