azure
Support formation Microsoft Azure
azure
Support formation Microsoft Azure
DP-203

icon picker
Utiliser Delta Lake dans Azure Synapse Analytics

Gonzague Ducos
Delta Lake est une couche de stockage open source qui ajoute la sémantique d’une base de données relationnelle au traitement du lac de données basé sur Spark.
Est pris en charge dans les pools Spark Azure Synapse Analytics pour le code PySpark, Scala et .NET.
Avantages :
Des tables relationnelles qui prennent en charge l’interrogation et la modification des données : opérations CRUD.
Prise en charge des transactions ACID :
Atomicité : transactions terminées sous forme d’une unité de travail unique
Cohérence : les transactions laissent la base de données dans un état cohérent
Isolation : les transactions in-process ne peuvent pas interférer entre elles
Durabilité : lorsqu’une transaction se termine, les modifications apportées sont persistantes Delta Lake apporte cette même prise en charge transactionnelle à Spark en implémentant un journal des transactions et en appliquant une isolation sérialisable pour les opérations simultanées (garantit que même si les transactions s'exécutent en parallèle, le résultat est le même que si elles avaient été exécutées une par une, sans aucune concurrence)
Contrôle de version des données et voyage dans le temps : comme toutes les transactions sont enregistrées dans le journal des transactions, vous pouvez suivre plusieurs versions de chaque ligne de table et même utiliser la fonctionnalité de voyage dans le temps pour récupérer une version précédente d’une ligne dans une requête.
Prise en charge des données de traitement par lots et de diffusion en continu : Spark inclut la prise en charge native des données de diffusion en continu via l’API Spark Structured Streaming. Les tables Delta Lake peuvent être utilisées en tant que récepteurs (destinations) et sources pour la diffusion en continu de données.
Formats standard et interopérabilité :
Données sous-jacentes des tables Delta Lake sont stockées au format Parquet, qui est couramment utilisé dans les pipelines d’ingestion de lac de données.
Utiliser le pool SQL serverless dans Azure Synapse Analytics pour interroger des tables Delta Lake dans SQL.

Créer des tables Delta Lake

Delta Lake repose sur les tables, qui fournissent une abstraction du stockage relationnel sur les fichiers d’un lac de données.

Création d’une table Delta Lake à partir d’un dataframe

Méthode la plus simple pour créer une table Delta Lake: enregistrer un dataframe au format delta, en spécifiant un chemin d’accès où les fichiers de données et les informations de métadonnées associées pour la table doivent être stockés.
# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)
# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)
L’emplacement du chemin que vous avez spécifié inclut :
des fichiers Parquet pour les données (quel que soit le format du fichier source que vous avez chargé dans le dataframe)
un dossier _delta_log contenant le journal des transactions pour la table
info
Le journal des transactions enregistre toutes les modifications de données apportées à la table.
En journalisant chaque modification, la cohérence transactionnelle peut être appliquée et les informations de contrôle de version de la table peuvent être conservées.
Remplacer une table Delta Lake existante par le contenu d’un dataframe :
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
Ajouter des lignes d’un dataframe à une table existante :
new_rows_df.write.format("delta").mode("append").save(delta_table_path)

Apport de mises à jour conditionnelles

CRUD plutôt que remplacer :
from delta.tables import *
from pyspark.sql.functions import *
# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })

Interrogation de la version précédente d’une table

Le journal des transactions enregistre les modifications apportées à la table, en notant le timestamp et le numéro de version pour chaque transaction.
Afficher les versions précédentes de la table = une fonctionnalité appelée voyage dans le temps.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
Spécifier un timestamp :
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)

Créer des tables de catalogue

2 types de tables :
Instances de table Delta Lake créées à partir de dataframes et modifiées via l’API Delta Lake
Tables Delta Lake en tant que tables de catalogue dans le metastore Hive pour votre pool Spark et les utiliser à l’aide de SQL
2 types de tables de catalogue :
Une table managée est définie sans emplacement spécifié et les fichiers de données sont stockés dans le stockage utilisé par le metastore. La suppression de la table supprime ses métadonnées du catalogue et le dossier dans lequel ses fichiers de données sont stockés.
Une table externe est définie pour un emplacement de fichier personnalisé, où les données de la table sont stockées. Les métadonnées de la table sont définies dans le catalogue Spark. La suppression de la table supprime les métadonnées du catalogue, mais n’affecte pas les fichiers de données

Création de tables de catalogue

1/3 Création d’une table de catalogue à partir d’un dataframe

# Save a dataframe as a managed table
df.write.format("delta").saveAsTable("MyManagedTable")
## specify a path option to save as an external table
df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")

2/3 Création d’une table de catalogue à l’aide de SQL

spark.sql("CREATE TABLE MyExternalTable USING DELTA LOCATION '/mydata'")
%%sql
CREATE TABLE MyExternalTable
USING DELTA
LOCATION '/mydata'
Le cas échéant : CREATE TABLE IF NOT EXISTS ou CREATE OR REPLACE TABLE.

Définition du schéma de table

Tables créées sans schéma explicite. Si depuis un dataframe, schéma hérité du dataframe. Si création table externe, schéma hérité de tous les actuellement stockés dans l’emplacement de la table. Mais si emplacement vide dans les 2 cas, définir un schéma de table spécifiant nom, type et nullabilité :
%%sql
CREATE TABLE ManagedSalesOrders
(
Orderid INT NOT NULL,
OrderDate TIMESTAMP NOT NULL,
CustomerName STRING,
SalesTotal FLOAT NOT NULL
)
USING DELTA

3/3 Utilisation de l’API DeltaTableBuilder

Fait partie de l’API Delta Lake.
from delta.tables import *
DeltaTable.create(spark) \
.tableName("default.ManagedProducts") \
.addColumn("Productid", "INT") \
.addColumn("ProductName", "STRING") \
.addColumn("Category", "STRING") \
.addColumn("Price", "FLOAT") \
.execute()
Le cas échéant : createIfNotExists ou createOrReplace.

Utilisation des tables de catalogue

Utiliser ces tables comme n’importe quelle tables de n’importe quelle base de données relationnelle SQL.
%%sql
SELECT orderid, salestotal
FROM ManagedSalesOrders

Utiliser Delta Lake avec des données de diffusion en continu

Spark Structured Streaming

Solution classique :
lecture constante d’un flux de données à partir d’une source,
traitement facultatif pour :
Want to print your doc?
This is not the way.
Try clicking the ⋯ next to your doc name or using a keyboard shortcut (
CtrlP
) instead.