azure
Support formation Microsoft Azure
azure
Support formation Microsoft Azure
Labs 203 | Azure Date Engineer

Lab 07 | Utiliser Delta Lake avec Spark dans Azure Synapse Analytics

Gonzague Ducos
Delta Lake est un projet open source visant à créer une couche de stockage de données transactionnelles au-dessus d’un lac de données. Delta Lake ajoute la prise en charge de la sémantique relationnelle pour les opérations de données par lots et en continu, et permet la création d’une architecture Lakehouse dans laquelle Apache Spark peut être utilisé pour traiter et interroger des données dans des tables basées sur des fichiers sous-jacents dans le lac de données.
Cet exercice devrait durer environ 40 minutes.

Avant de commencer

Vous aurez besoin d’un dans lequel vous disposez d’un accès de niveau administratif.

Approvisionner un espace de travail Azure Synapse Analytics

Vous aurez besoin d’un espace de travail Azure Synapse Analytics avec accès au stockage du lac de données et d’un pool Apache Spark que vous pouvez utiliser pour interroger et traiter des fichiers dans le lac de données.
Dans cet exercice, vous allez utiliser une combinaison d’un script PowerShell et d’un modèle ARM pour provisionner un espace de travail Azure Synapse Analytics.
Connectez-vous au à l’adresse https://portal.azure.com.
Utilisez le bouton [>_] à droite de la barre de recherche en haut de la page pour créer un Cloud Shell dans le portail Azure, en sélectionnant un environnement PowerShell et en créant un stockage si vous y êtes invité. Cloud Shell fournit une interface de ligne de commande dans un volet situé au bas du portail Azure, comme illustré ici : ​
image.png
Remarque : Si vous avez déjà créé un Cloud Shell qui utilise un environnement Bash, utilisez le menu déroulant en haut à gauche du volet Cloud Shell pour le remplacer par PowerShell.
Notez que vous pouvez redimensionner la coque du nuage en faisant glisser la barre de séparation en haut du volet ou en utilisant les icônes , et X en haut à droite du volet pour réduire, agrandir et fermer le volet. Pour plus d’informations sur l’utilisation d’Azure Cloud Shell, consultez la .
Dans le volet PowerShell, entrez les commandes suivantes pour cloner ce référentiel :
rm -r dp-203 -f
git clone https://github.com/MicrosoftLearning/dp-203-azure-data-engineer dp-203
Une fois le référentiel cloné, entrez les commandes suivantes pour accéder au dossier de cet exercice et exécutez le script setup.ps1 qu’il contient :
cd dp-203/Allfiles/labs/07
./setup.ps1
Si vous y êtes invité, choisissez l’abonnement que vous souhaitez utiliser (cela ne se produira que si vous avez accès à plusieurs abonnements Azure).
Lorsque vous y êtes invité, entrez un mot de passe approprié à définir pour votre pool SQL Azure Synapse. ​Remarque : N’oubliez pas ce mot de passe !
Attendez que le script soit terminé - cela prend généralement environ 10 minutes, mais dans certains cas, cela peut prendre plus de temps. Pendant que vous attendez, consultez l’article dans la documentation Azure Synapse Analytics.

Créer des tables delta

Le script provisionne un espace de travail Azure Synapse Analytics et un compte de stockage Azure pour héberger le lac de données, puis charge un fichier de données dans le lac de données.

Explorer les données du lac de données

Une fois le script terminé, dans le portail Azure, accédez au groupe de ressources dp203-xxxxxxx qu’il a créé, puis sélectionnez votre espace de travail Synapse.
Dans la page Vue d’ensemble de votre espace de travail Synapse, dans la carte Ouvrir Synapse Studio, sélectionnez Ouvrir pour ouvrir Synapse Studio dans un nouvel onglet du navigateur. Connectez-vous si vous y êtes invité.
Sur le côté gauche de Synapse Studio, utilisez l’icône ›› pour développer le menu, ce qui révèle les différentes pages de Synapse Studio que vous utiliserez pour gérer les ressources et effectuer des tâches d’analyse de données.
Dans la page Données, affichez l’onglet Lié et vérifiez que votre espace de travail inclut un lien vers votre compte de stockage Azure Data Lake Storage Gen2, qui doit avoir un nom similaire à synapsexxxxxxx (Principal - datalakexxxxxxx).
Développez votre compte de stockage et vérifiez qu’il contient un conteneur de système de fichiers nommé files.
Sélectionnez le conteneur files et notez qu’il contient un dossier nommé products. Ce dossier contient les données que vous allez utiliser dans cet exercice.
Ouvrez le dossier products et observez qu’il contient un fichier nommé products.csv.
Sélectionnez products.csv, puis dans la liste Nouveau bloc-notes de la barre d’outils, sélectionnez Charger dans DataFrame.
Dans le volet Notebook 1 qui s’ouvre, dans la liste Attacher à, sélectionnez le pool Sparkxxxxxxx et assurez-vous que la langue est définie sur PySpark (Python).
Examinez le code de la première (et unique) cellule du bloc-notes, qui doit ressembler à ceci :
%%pyspark
df = spark.read.load('abfss://files@datalakexxxxxxx.dfs.core.windows.net/products/products.csv', format='csv'
## If header exists uncomment line below
##, header=True
)
display(df.limit(10))
Décommentez la ligne ,header=True (car le fichier products.csv a les en-têtes de colonne dans la première ligne), de sorte que votre code ressemble à ceci :
%%pyspark
df = spark.read.load('abfss://files@datalakexxxxxxx.dfs.core.windows.net/products/products.csv', format='csv'
## If header exists uncomment line below
, header=True
)
display(df.limit(10))
Utilisez l’icône à gauche de la cellule de code pour l’exécuter et attendez les résultats. La première fois que vous exécutez une cellule dans un bloc-notes, le pool Spark est démarré, de sorte que le retour des résultats peut prendre environ une minute. Finalement, les résultats devraient apparaître sous la cellule, et ils devraient être similaires à ceci :
Productid
Nom du produit
Catégorie
Liste de prix
771
Mountain-100 Argent, 38
Vtt
3399.9900
772
Montagne-100 Argent, 42
Vtt
3399.9900
There are no rows in this table

Charger les données du fichier dans une table delta

Sous les résultats renvoyés par la première cellule de code, utilisez le bouton + Code pour ajouter une nouvelle cellule de code. Entrez ensuite le code suivant dans la nouvelle cellule et exécutez-le :
delta_table_path = "/delta/products-delta"
df.write.format("delta").save(delta_table_path)
Dans l’onglet files, utilisez l’icône dans la barre d’outils pour revenir à la racine du conteneur files et notez qu’un nouveau dossier nommé delta a été créé. Ouvrez ce dossier et la table products-delta qu’il contient, où vous devez voir le(s) fichier(s) au format parquet contenant les données.
Revenez à l’onglet Notebook 1 et ajoutez une autre nouvelle cellule de code. Ensuite, dans la nouvelle cellule, ajoutez le code suivant et exécutez-le :
from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of product 771 by 10%)
deltaTable.update(
condition = "ProductID == 771",
set = { "ListPrice": "ListPrice * 0.9" })

# View the updated data as a dataframe
deltaTable.toDF().show(10)
Les données sont chargées dans un objet DeltaTable et mises à jour. Vous pouvez voir la mise à jour reflétée dans les résultats de la requête.
Ajoutez une autre nouvelle cellule de code avec le code suivant et exécutez-la :
new_df = spark.read.format("delta").load(delta_table_path)
new_df.show(10)
Le code charge les données de la table delta dans une trame de données à partir de son emplacement dans le lac de données, en vérifiant que la modification que vous avez apportée via un objet DeltaTable a été conservée.
Modifiez le code que vous venez d’exécuter comme suit, en spécifiant l’option permettant d’utiliser la fonction de voyage dans le temps de delta lake pour afficher une version précédente des données.
new_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
new_df.show(10)
Lorsque vous exécutez le code modifié, les résultats affichent la version d’origine des données.
Ajoutez une autre nouvelle cellule de code avec le code suivant et exécutez-la :
deltaTable.history(10).show(20, False, True)
L’historique des 20 dernières modifications apportées à la table est affiché - il devrait y en avoir deux (la création originale et la mise à jour que vous avez faite).

Créer des tables de catalogue

Jusqu’à présent, vous avez utilisé des tables delta en chargeant des données à partir du dossier contenant les fichiers parquet sur lesquels la table est basée. Vous pouvez définir des tables de catalogue qui encapsulent les données et fournissent une entité de table nommée que vous pouvez référencer dans le code SQL. Spark prend en charge deux types de tables de catalogue pour delta lake :
Tables externes définies par le chemin d’accès aux fichiers Parquet contenant les données de la table.
Tables gérées, qui sont définies dans le metastore Hive pour le pool Spark.

Créer une table externe

Dans une nouvelle cellule de code, ajoutez et exécutez le code suivant :
spark.sql("CREATE DATABASE AdventureWorks")
spark.sql("CREATE TABLE AdventureWorks.ProductsExternal USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsExternal").show(truncate=False)
Ce code crée une base de données nommée AdventureWorks, puis crée une table externe nommée ProductsExternal dans cette base de données en fonction du chemin d’accès aux fichiers Parquet que vous avez définis précédemment. Il affiche ensuite une description des propriétés de la table. Notez que la propriété Location est le chemin d’accès que vous avez spécifié.
Ajoutez une nouvelle cellule de code, puis entrez et exécutez le code suivant :
%%sql
USE AdventureWorks;
SELECT * FROM ProductsExternal;
Le code utilise SQL pour basculer le contexte vers la base de données AdventureWorks (qui ne renvoie aucune donnée), puis interroger la table ProductsExternal (qui renvoie un ensemble de résultats contenant les données des produits dans la table Delta Lake).

Créer une table gérée

Dans une nouvelle cellule de code, ajoutez et exécutez le code suivant :
df.write.format("delta").saveAsTable("AdventureWorks.ProductsManaged")
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsManaged").show(truncate=False)
Ce code crée une table gérée nommée ProductsManaged basée sur le DataFrame que vous avez chargé à l’origine à partir du fichier products.csv (avant de mettre à jour le prix du produit 771). Vous ne spécifiez pas de chemin d’accès pour les fichiers Parquet utilisés par la table : il est géré pour vous dans le metastore Hive et affiché dans la propriété Location de la description de la table (dans le chemin files/synapse/workspaces/synapsexxxxxxx/warehouse).
Ajoutez une nouvelle cellule de code, puis entrez et exécutez le code suivant :
%%sql
USE AdventureWorks;
SELECT * FROM ProductsManaged;
Le code utilise SQL pour interroger la table ProductsManaged.

Comparer les tables externes et gérées

Dans une nouvelle cellule de code, ajoutez et exécutez le code suivant :
%%sql
USE AdventureWorks;
SHOW TABLES;
Ce code répertorie les tables de la base de données AdventureWorks.
Modifiez la cellule de code comme suit, ajoutez-la, exécutez-la :
%%sql
USE AdventureWorks;
DROP TABLE IF EXISTS ProductsExternal;
DROP TABLE IF EXISTS ProductsManaged;
Ce code supprime les tables du metastore.
Revenez à l’onglet files et affichez le dossier files/delta/products-delta. Notez que les fichiers de données existent toujours à cet emplacement. La suppression de la table externe a supprimé la table du metastore, mais a laissé les fichiers de données intacts.
Affichez le dossier files/synapse/workspaces/synapsexxxxxxx/warehouse et notez qu’il n’existe pas de dossier pour les données de la table ProductsManaged. La suppression d’une table gérée supprime la table du metastore et supprime également les fichiers de données de la table.

Création d’une table à l’aide de SQL

Ajoutez une nouvelle cellule de code, puis entrez et exécutez le code suivant :
%%sql
USE AdventureWorks;
CREATE TABLE Products USING DELTA LOCATION '/delta/products-delta';
Ajoutez une nouvelle cellule de code, puis entrez et exécutez le code suivant :
%%sql
USE AdventureWorks;
SELECT * FROM Products;
Notez que la nouvelle table de catalogue a été créée pour le dossier de table Delta Lake existant, ce qui reflète les modifications apportées précédemment.

Utiliser des tables delta pour les données en continu

Le lac Delta prend en charge les données en continu. Les tables delta peuvent être un récepteur ou une source pour les flux de données créés à l’aide de l’API Spark Structured Streaming. Dans cet exemple, vous allez utiliser une table delta comme récepteur pour certaines données de streaming dans un scénario IoT (Internet des objets) simulé.
Revenez à l’onglet Notebookl 1 et ajoutez une nouvelle cellule de code. Ensuite, dans la nouvelle cellule, ajoutez le code suivant et exécutez-le :
from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a folder
inputPath = '/data/'
mssparkutils.fs.mkdirs(inputPath)

# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
Want to print your doc?
This is not the way.
Try clicking the ⋯ next to your doc name or using a keyboard shortcut (
CtrlP
) instead.