azure
Support formation Microsoft Azure
azure
Support formation Microsoft Azure
DP-203

icon picker
Utiliser Delta Lake dans Azure Synapse Analytics

Gonzague Ducos
Delta Lake est une couche de stockage open source qui ajoute la sémantique d’une base de données relationnelle au traitement du lac de données basé sur Spark.
Est pris en charge dans les pools Spark Azure Synapse Analytics pour le code PySpark, Scala et .NET.
Avantages :
Des tables relationnelles qui prennent en charge l’interrogation et la modification des données : opérations CRUD.
Prise en charge des transactions ACID :
Atomicité : transactions terminées sous forme d’une unité de travail unique
Cohérence : les transactions laissent la base de données dans un état cohérent
Isolation : les transactions in-process ne peuvent pas interférer entre elles
Durabilité : lorsqu’une transaction se termine, les modifications apportées sont persistantes Delta Lake apporte cette même prise en charge transactionnelle à Spark en implémentant un journal des transactions et en appliquant une isolation sérialisable pour les opérations simultanées (garantit que même si les transactions s'exécutent en parallèle, le résultat est le même que si elles avaient été exécutées une par une, sans aucune concurrence)
Contrôle de version des données et voyage dans le temps : comme toutes les transactions sont enregistrées dans le journal des transactions, vous pouvez suivre plusieurs versions de chaque ligne de table et même utiliser la fonctionnalité de voyage dans le temps pour récupérer une version précédente d’une ligne dans une requête.
Prise en charge des données de traitement par lots et de diffusion en continu : Spark inclut la prise en charge native des données de diffusion en continu via l’API Spark Structured Streaming. Les tables Delta Lake peuvent être utilisées en tant que récepteurs (destinations) et sources pour la diffusion en continu de données.
Formats standard et interopérabilité :
Données sous-jacentes des tables Delta Lake sont stockées au format Parquet, qui est couramment utilisé dans les pipelines d’ingestion de lac de données.
Utiliser le pool SQL serverless dans Azure Synapse Analytics pour interroger des tables Delta Lake dans SQL.

Créer des tables Delta Lake

Delta Lake repose sur les tables, qui fournissent une abstraction du stockage relationnel sur les fichiers d’un lac de données.

Création d’une table Delta Lake à partir d’un dataframe

Méthode la plus simple pour créer une table Delta Lake: enregistrer un dataframe au format delta, en spécifiant un chemin d’accès où les fichiers de données et les informations de métadonnées associées pour la table doivent être stockés.
# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)
# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)
L’emplacement du chemin que vous avez spécifié inclut :
des fichiers Parquet pour les données (quel que soit le format du fichier source que vous avez chargé dans le dataframe)
un dossier _delta_log contenant le journal des transactions pour la table
info
Le journal des transactions enregistre toutes les modifications de données apportées à la table.
En journalisant chaque modification, la cohérence transactionnelle peut être appliquée et les informations de contrôle de version de la table peuvent être conservées.
Remplacer une table Delta Lake existante par le contenu d’un dataframe :
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
Ajouter des lignes d’un dataframe à une table existante :
new_rows_df.write.format("delta").mode("append").save(delta_table_path)

Apport de mises à jour conditionnelles

CRUD plutôt que remplacer :
from delta.tables import *
from pyspark.sql.functions import *
# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })

Interrogation de la version précédente d’une table

Le journal des transactions enregistre les modifications apportées à la table, en notant le timestamp et le numéro de version pour chaque transaction.
Afficher les versions précédentes de la table = une fonctionnalité appelée voyage dans le temps.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
Spécifier un timestamp :
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)

Créer des tables de catalogue

2 types de tables :
Instances de table Delta Lake créées à partir de dataframes et modifiées via l’API Delta Lake
Tables Delta Lake en tant que tables de catalogue dans le metastore Hive pour votre pool Spark et les utiliser à l’aide de SQL
2 types de tables de catalogue :
Une table managée est définie sans emplacement spécifié et les fichiers de données sont stockés dans le stockage utilisé par le metastore. La suppression de la table supprime ses métadonnées du catalogue et le dossier dans lequel ses fichiers de données sont stockés.
Une table externe est définie pour un emplacement de fichier personnalisé, où les données de la table sont stockées. Les métadonnées de la table sont définies dans le catalogue Spark. La suppression de la table supprime les métadonnées du catalogue, mais n’affecte pas les fichiers de données

Création de tables de catalogue

1/3 Création d’une table de catalogue à partir d’un dataframe

# Save a dataframe as a managed table
df.write.format("delta").saveAsTable("MyManagedTable")
## specify a path option to save as an external table
df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")

2/3 Création d’une table de catalogue à l’aide de SQL

spark.sql("CREATE TABLE MyExternalTable USING DELTA LOCATION '/mydata'")
%%sql
CREATE TABLE MyExternalTable
USING DELTA
LOCATION '/mydata'
Le cas échéant : CREATE TABLE IF NOT EXISTS ou CREATE OR REPLACE TABLE.

Définition du schéma de table

Tables créées sans schéma explicite. Si depuis un dataframe, schéma hérité du dataframe. Si création table externe, schéma hérité de tous les actuellement stockés dans l’emplacement de la table. Mais si emplacement vide dans les 2 cas, définir un schéma de table spécifiant nom, type et nullabilité :
%%sql
CREATE TABLE ManagedSalesOrders
(
Orderid INT NOT NULL,
OrderDate TIMESTAMP NOT NULL,
CustomerName STRING,
SalesTotal FLOAT NOT NULL
)
USING DELTA

3/3 Utilisation de l’API DeltaTableBuilder

Fait partie de l’API Delta Lake.
from delta.tables import *
DeltaTable.create(spark) \
.tableName("default.ManagedProducts") \
.addColumn("Productid", "INT") \
.addColumn("ProductName", "STRING") \
.addColumn("Category", "STRING") \
.addColumn("Price", "FLOAT") \
.execute()
Le cas échéant : createIfNotExists ou createOrReplace.

Utilisation des tables de catalogue

Utiliser ces tables comme n’importe quelle tables de n’importe quelle base de données relationnelle SQL.
%%sql
SELECT orderid, salestotal
FROM ManagedSalesOrders

Utiliser Delta Lake avec des données de diffusion en continu

Spark Structured Streaming

Solution classique :
lecture constante d’un flux de données à partir d’une source,
traitement facultatif pour :
sélectionner des champs spécifiques,
l’agrégation
regroupement de valeurs,
ou la manipulation des données
écriture des résultats dans un récepteur

Utilisation d’une table Delta Lake comme source de diffusion en continu

Spark Structured Streaming est une API basée sur un dataframe sans limite dans lequel les données de diffusion en continu sont capturées pour traitement.
Peut lire des données à partir de nombreux types de sources de diffusion en continu :
ports réseau,
services de répartiteur de messages en temps réel comme Azure Event Hubs ou Kafka
emplacements du système de fichiers

Diffusion en continu avec des tables Delta Lake

Utiliser une table Delta Lake comme source ou récepteur pour Spark Structured Streaming

Utilisation d’une table Delta Lake comme source de diffusion en continu

from pyspark.sql.types import *
from pyspark.sql.functions import *
# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.load("/delta/internetorders")
# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
Utiliser l’API Spark Structured Streaming pour les traiter ensuite.

Utilisation d’une table Delta Lake comme récepteur de diffusion en continu

Flux de données lu à partir de fichiers JSON sous la forme {"device":"Dev1","status":"ok"}
Le flux d’entrée est un dataframe sans limite, qui est ensuite écrit au format delta dans un emplacement de dossier pour une table Delta Lake :
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)
L’option checkpointLocation est utilisée pour écrire un fichier de point de contrôle qui suit l’état du traitement de flux. Ce fichier vous permet de récupérer à partir d’une défaillance au point où le traitement de flux a été arrêté.
Une fois le processus de diffusion en continu démarré, vous pouvez interroger la table Delta Lake dans laquelle la sortie de diffusion en continu est écrite pour afficher les données les plus récentes
%%sql
CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';
SELECT device, status
FROM DeviceTable;
Arrêter le flux :
delta_stream.stop()

Utiliser Delta Lake dans un pool SQL

Interrogation de fichiers de format delta avec OPENROWSET

Exécuter cette requête dans un pool SQL serverless :
SELECT *
FROM
OPENROWSET(
BULK 'https://mystore.dfs.core.windows.net/files/delta/mytable/',
FORMAT = 'DELTA'
) AS deltadata
Créer une base de données et ajouter une source de données qui encapsule l’emplacement des fichiers de données Delta Lake :
CREATE DATABASE MyDB
COLLATE Latin1_General_100_BIN2_UTF8;
GO;
USE MyDB;
GO
CREATE EXTERNAL DATA SOURCE DeltaLakeStore
WITH
(
LOCATION = 'https://mystore.dfs.core.windows.net/files/delta/'
);
GO
SELECT TOP 10 *
FROM OPENROWSET(
BULK 'mytable',
DATA_SOURCE = 'DeltaLakeStore',
FORMAT = 'DELTA'
) as deltadata;
Lors de l’utilisation de données Delta Lake, stockées au format Parquet, il est généralement préférable de créer une base de données avec un classement basé sur UTF-8 afin de garantir la compatibilité des chaînes.

Interrogation de tables de catalogue

Le pool SQL serverless dans Azure Synapse Analytics a un accès partagé aux bases de données dans le metastore Spark.
Vous pouvez donc interroger des tables de catalogue créées à l’aide de Spark SQL.
Dans l’exemple suivant, une requête SQL dans un pool SQL serverless interroge une table de catalogue qui contient des données Delta Lake :
-- By default, Spark catalog tables are created in a database named "default"
-- If you created another database using Spark SQL, you can use it here
USE default;
SELECT * FROM MyDeltaTable;


Want to print your doc?
This is not the way.
Try clicking the ⋯ next to your doc name or using a keyboard shortcut (
CtrlP
) instead.