Lab 07 | Utiliser Delta Lake avec Spark dans Azure Synapse Analytics
Gonzague Ducos
Delta Lake est un projet open source visant à créer une couche de stockage de données transactionnelles au-dessus d’un lac de données. Delta Lake ajoute la prise en charge de la sémantique relationnelle pour les opérations de données par lots et en continu, et permet la création d’une architecture Lakehouse dans laquelle Apache Spark peut être utilisé pour traiter et interroger des données dans des tables basées sur des fichiers sous-jacents dans le lac de données.
dans lequel vous disposez d’un accès de niveau administratif.
Approvisionner un espace de travail Azure Synapse Analytics
Vous aurez besoin d’un espace de travail Azure Synapse Analytics avec accès au stockage du lac de données et d’un pool Apache Spark que vous pouvez utiliser pour interroger et traiter des fichiers dans le lac de données.
Dans cet exercice, vous allez utiliser une combinaison d’un script PowerShell et d’un modèle ARM pour provisionner un espace de travail Azure Synapse Analytics.
Utilisez le bouton [>_] à droite de la barre de recherche en haut de la page pour créer un Cloud Shell dans le portail Azure, en sélectionnant un environnement PowerShell et en créant un stockage si vous y êtes invité. Cloud Shell fournit une interface de ligne de commande dans un volet situé au bas du portail Azure, comme illustré ici :
Remarque : Si vous avez déjà créé un Cloud Shell qui utilise un environnement Bash, utilisez le menu déroulant en haut à gauche du volet Cloud Shell pour le remplacer par PowerShell.
Notez que vous pouvez redimensionner la coque du nuage en faisant glisser la barre de séparation en haut du volet ou en utilisant les icônes —, ◻ et X en haut à droite du volet pour réduire, agrandir et fermer le volet. Pour plus d’informations sur l’utilisation d’Azure Cloud Shell, consultez la
Une fois le référentiel cloné, entrez les commandes suivantes pour accéder au dossier de cet exercice et exécutez le script setup.ps1 qu’il contient :
cd dp-203/Allfiles/labs/07
./setup.ps1
Si vous y êtes invité, choisissez l’abonnement que vous souhaitez utiliser (cela ne se produira que si vous avez accès à plusieurs abonnements Azure).
Lorsque vous y êtes invité, entrez un mot de passe approprié à définir pour votre pool SQL Azure Synapse.
Remarque : N’oubliez pas ce mot de passe !
Attendez que le script soit terminé - cela prend généralement environ 10 minutes, mais dans certains cas, cela peut prendre plus de temps. Pendant que vous attendez, consultez l’article
Le script provisionne un espace de travail Azure Synapse Analytics et un compte de stockage Azure pour héberger le lac de données, puis charge un fichier de données dans le lac de données.
Explorer les données du lac de données
Une fois le script terminé, dans le portail Azure, accédez au groupe de ressources dp203-xxxxxxx qu’il a créé, puis sélectionnez votre espace de travail Synapse.
Dans la page Vue d’ensemble de votre espace de travail Synapse, dans la carte Ouvrir Synapse Studio, sélectionnez Ouvrir pour ouvrir Synapse Studio dans un nouvel onglet du navigateur. Connectez-vous si vous y êtes invité.
Sur le côté gauche de Synapse Studio, utilisez l’icône ›› pour développer le menu, ce qui révèle les différentes pages de Synapse Studio que vous utiliserez pour gérer les ressources et effectuer des tâches d’analyse de données.
Dans la page Données, affichez l’onglet Lié et vérifiez que votre espace de travail inclut un lien vers votre compte de stockage Azure Data Lake Storage Gen2, qui doit avoir un nom similaire à synapsexxxxxxx (Principal - datalakexxxxxxx).
Développez votre compte de stockage et vérifiez qu’il contient un conteneur de système de fichiers nommé files.
Sélectionnez le conteneur files et notez qu’il contient un dossier nommé products. Ce dossier contient les données que vous allez utiliser dans cet exercice.
Ouvrez le dossier products et observez qu’il contient un fichier nommé products.csv.
Sélectionnez products.csv, puis dans la liste Nouveau bloc-notes de la barre d’outils, sélectionnez Charger dans DataFrame.
Dans le volet Notebook 1 qui s’ouvre, dans la liste Attacher à, sélectionnez le pool Sparkxxxxxxx et assurez-vous que la langue est définie sur PySpark (Python).
Examinez le code de la première (et unique) cellule du bloc-notes, qui doit ressembler à ceci :
Décommentez la ligne ,header=True (car le fichier products.csv a les en-têtes de colonne dans la première ligne), de sorte que votre code ressemble à ceci :
Utilisez l’icône ▷ à gauche de la cellule de code pour l’exécuter et attendez les résultats. La première fois que vous exécutez une cellule dans un bloc-notes, le pool Spark est démarré, de sorte que le retour des résultats peut prendre environ une minute. Finalement, les résultats devraient apparaître sous la cellule, et ils devraient être similaires à ceci :
Productid
Nom du produit
Catégorie
Liste de prix
771
Mountain-100 Argent, 38
Vtt
3399.9900
772
Montagne-100 Argent, 42
Vtt
3399.9900
…
…
…
…
There are no rows in this table
Charger les données du fichier dans une table delta
Sous les résultats renvoyés par la première cellule de code, utilisez le bouton + Code pour ajouter une nouvelle cellule de code. Entrez ensuite le code suivant dans la nouvelle cellule et exécutez-le :
delta_table_path ="/delta/products-delta"
df.write.format("delta").save(delta_table_path)
Dans l’onglet files, utilisez l’icône ↑ dans la barre d’outils pour revenir à la racine du conteneur files et notez qu’un nouveau dossier nommé delta a été créé. Ouvrez ce dossier et la table products-delta qu’il contient, où vous devez voir le(s) fichier(s) au format parquet contenant les données.
Revenez à l’onglet Notebook 1 et ajoutez une autre nouvelle cellule de code. Ensuite, dans la nouvelle cellule, ajoutez le code suivant et exécutez-le :
Le code charge les données de la table delta dans une trame de données à partir de son emplacement dans le lac de données, en vérifiant que la modification que vous avez apportée via un objet DeltaTable a été conservée.
Modifiez le code que vous venez d’exécuter comme suit, en spécifiant l’option permettant d’utiliser la fonction de voyage dans letemps de delta lake pour afficher une version précédente des données.
Lorsque vous exécutez le code modifié, les résultats affichent la version d’origine des données.
Ajoutez une autre nouvelle cellule de code avec le code suivant et exécutez-la :
deltaTable.history(10).show(20, False, True)
L’historique des 20 dernières modifications apportées à la table est affiché - il devrait y en avoir deux (la création originale et la mise à jour que vous avez faite).
Créer des tables de catalogue
Jusqu’à présent, vous avez utilisé des tables delta en chargeant des données à partir du dossier contenant les fichiers parquet sur lesquels la table est basée. Vous pouvez définir des tables de catalogue qui encapsulent les données et fournissent une entité de table nommée que vous pouvez référencer dans le code SQL. Spark prend en charge deux types de tables de catalogue pour delta lake :
Tables externes définies par le chemin d’accès aux fichiers Parquet contenant les données de la table.
Tables gérées, qui sont définies dans le metastore Hive pour le pool Spark.
Créer une table externe
Dans une nouvelle cellule de code, ajoutez et exécutez le code suivant :
spark.sql("CREATE DATABASE AdventureWorks")
spark.sql("CREATE TABLE AdventureWorks.ProductsExternal USING DELTA LOCATION '{0}'".format(delta_table_path))
Ce code crée une base de données nommée AdventureWorks, puis crée une table externe nommée ProductsExternal dans cette base de données en fonction du chemin d’accès aux fichiers Parquet que vous avez définis précédemment. Il affiche ensuite une description des propriétés de la table. Notez que la propriété Location est le chemin d’accès que vous avez spécifié.
Ajoutez une nouvelle cellule de code, puis entrez et exécutez le code suivant :
%%sql
USE AdventureWorks;
SELECT*FROM ProductsExternal;
Le code utilise SQL pour basculer le contexte vers la base de données AdventureWorks (qui ne renvoie aucune donnée), puis interroger la table ProductsExternal (qui renvoie un ensemble de résultats contenant les données des produits dans la table Delta Lake).
Créer une table gérée
Dans une nouvelle cellule de code, ajoutez et exécutez le code suivant :
Ce code crée une table gérée nommée ProductsManaged basée sur le DataFrame que vous avez chargé à l’origine à partir du fichier products.csv (avant de mettre à jour le prix du produit 771). Vous ne spécifiez pas de chemin d’accès pour les fichiers Parquet utilisés par la table : il est géré pour vous dans le metastore Hive et affiché dans la propriété Location de la description de la table (dans le chemin files/synapse/workspaces/synapsexxxxxxx/warehouse).
Ajoutez une nouvelle cellule de code, puis entrez et exécutez le code suivant :
%%sql
USE AdventureWorks;
SELECT*FROM ProductsManaged;
Le code utilise SQL pour interroger la table ProductsManaged.
Comparer les tables externes et gérées
Dans une nouvelle cellule de code, ajoutez et exécutez le code suivant :
%%sql
USE AdventureWorks;
SHOWTABLES;
Ce code répertorie les tables de la base de données AdventureWorks.
Modifiez la cellule de code comme suit, ajoutez-la, exécutez-la :
%%sql
USE AdventureWorks;
DROPTABLEIFEXISTS ProductsExternal;
DROPTABLEIFEXISTS ProductsManaged;
Ce code supprime les tables du metastore.
Revenez à l’onglet files et affichez le dossier files/delta/products-delta. Notez que les fichiers de données existent toujours à cet emplacement. La suppression de la table externe a supprimé la table du metastore, mais a laissé les fichiers de données intacts.
Affichez le dossier files/synapse/workspaces/synapsexxxxxxx/warehouse et notez qu’il n’existe pas de dossier pour les données de la table ProductsManaged. La suppression d’une table gérée supprime la table du metastore et supprime également les fichiers de données de la table.
Création d’une table à l’aide de SQL
Ajoutez une nouvelle cellule de code, puis entrez et exécutez le code suivant :
%%sql
USE AdventureWorks;
CREATETABLE Products USING DELTA LOCATION '/delta/products-delta';
Ajoutez une nouvelle cellule de code, puis entrez et exécutez le code suivant :
%%sql
USE AdventureWorks;
SELECT*FROM Products;
Notez que la nouvelle table de catalogue a été créée pour le dossier de table Delta Lake existant, ce qui reflète les modifications apportées précédemment.
Utiliser des tables delta pour les données en continu
Le lac Delta prend en charge les données en continu. Les tables delta peuvent être un récepteur ou une source pour les flux de données créés à l’aide de l’API Spark Structured Streaming. Dans cet exemple, vous allez utiliser une table delta comme récepteur pour certaines données de streaming dans un scénario IoT (Internet des objets) simulé.
Revenez à l’onglet Notebookl 1 et ajoutez une nouvelle cellule de code. Ensuite, dans la nouvelle cellule, ajoutez le code suivant et exécutez-le :
from notebookutils import mssparkutils
from pyspark.sql.types import*
from pyspark.sql.functions import*
# Create a folder
inputPath ='/data/'
mssparkutils.fs.mkdirs(inputPath)
# Create a stream that reads data from the folder, using a JSON schema
Assurez-vous que le message Flux source créé... est affiché. Le code que vous venez d’exécuter a créé une source de données de streaming basée sur un dossier dans lequel certaines données ont été enregistrées, représentant des lectures d’appareils IoT hypothétiques.
Dans une nouvelle cellule de code, ajoutez et exécutez le code suivant :
Ce code lit les données diffusées en continu au format delta dans une trame de données. Notez que le code de chargement des données de streaming n’est pas différent de celui utilisé pour charger des données statiques à partir d’un dossier delta.
Dans une nouvelle cellule de code, ajoutez et exécutez le code suivant :
# create a catalog table based on the streaming sink
spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))
Ce code crée une table de catalogue nommée IotDeviceData (dans la base de données par défaut) basée sur le dossier delta. Encore une fois, ce code est le même que celui utilisé pour les données pas en streaming.
Dans une nouvelle cellule de code, ajoutez et exécutez le code suivant :
%%sql
SELECT*FROM IotDeviceData;
Ce code interroge la table IotDeviceData, qui contient les données de l’appareil à partir de la source de diffusion en continu.
Dans une nouvelle cellule de code, ajoutez et exécutez le code suivant :
Ce code écrit d’autres données d’appareil hypothétiques dans la source de diffusion en continu.
Dans une nouvelle cellule de code, ajoutez et exécutez le code suivant :
%%sql
SELECT*FROM IotDeviceData;
Ce code interroge à nouveau la table IotDeviceData, qui doit maintenant inclure les données supplémentaires qui ont été ajoutées à la source de streaming.
Dans une nouvelle cellule de code, ajoutez et exécutez le code suivant :
deltastream.stop()
Ce code arrête le flux.
Interroger une table delta à partir d’un pool SQL serverless
En plus des pools Spark, Azure Synapse Analytics inclut un pool SQL serverless intégré. Vous pouvez utiliser le moteur de base de données relationnelle de ce pool pour interroger les tables delta à l’aide de SQL.
Dans l’onglet files, accédez au dossier files/delta.
Sélectionnez le dossier products-delta et, dans la barre d’outils, dans la liste déroulante Nouveau script SQL, sélectionnez Sélectionner les 100 premières lignes.
Dans le volet Sélectionner les 100 premières lignes, dans la liste Type de fichier, sélectionnez Format Delta, puis Appliquer.
Examinez le code SQL généré, qui doit ressembler à ceci :
Utilisez l’icône ▷ Exécuter pour exécuter le script et examiner les résultats. Ils devraient ressembler à ceci :
Productid
Nom du produit
Catégorie
Liste de prix
771
Mountain-100 Argent, 38
Vtt
3059.991
772
Montagne-100 Argent, 42
Vtt
3399.9900
…
…
…
…
There are no rows in this table
Cela montre comment vous pouvez utiliser un pool SQL serverless pour interroger des fichiers au format delta créés à l’aide de Spark et utiliser les résultats pour la création de rapports ou l’analyse.
Remplacez la requête par le code SQL suivant :
USE AdventureWorks;
SELECT*FROM Products;
Exécutez le code et observez que vous pouvez également utiliser le pool SQL serverless pour interroger les données Delta Lake dans les tables de catalogue définies dans le metastore Spark.
Supprimer des ressources Azure
Si vous avez terminé d’explorer Azure Synapse Analytics, vous devez supprimer les ressources que vous avez créées pour éviter des coûts Azure inutiles.
Fermez l’onglet du navigateur Synapse Studio et revenez au portail Azure.
Sur le portail Azure, dans la page d’accueil, sélectionnez Groupes de ressources.
Sélectionnez le groupe de ressources dp203-xxxxxxx pour votre espace de travail Synapse Analytics (et non le groupe de ressources géré) et vérifiez qu’il contient l’espace de travail Synapse, le compte de stockage et le pool Spark de votre espace de travail.
En haut de la page Vue d’ensemble de votre groupe de ressources, sélectionnez Supprimer le groupe de ressources.
Entrez le nom du groupe de ressources dp203-xxxxxxx pour confirmer que vous souhaitez le supprimer, puis sélectionnez Supprimer.
Au bout de quelques minutes, votre groupe de ressources d’espace de travail Azure Synapse et le groupe de ressources d’espace de travail managé qui lui est associé seront supprimés.