azure
Support formation Microsoft Azure
azure
Support formation Microsoft Azure
Composants

icon picker
Delta Lake

Gonzague Ducos
Delta Lake est un projet open source visant à créer une couche de stockage de données transactionnelles au-dessus d’un lac de données.
Ajoute la prise en charge de la sémantique relationnelle pour les opérations de données par lots et en continu,
Permet la création d’une architecture Lakehouse dans laquelle Apache Spark peut être utilisé pour traiter et interroger des données dans des tables basées sur des fichiers sous-jacents dans le lac de données.

Créer des tables Delta

from delta.tables import *
from pyspark.sql.functions import *

# Crée un Dataframe avec un fichier CSV
df = spark.read.load('abfss://files@datalakeXXXXXXX.dfs.core.windows.net/products/products.csv', format='csv', header=True)

# Variable du chemin vers les données
delta_table_path = "/delta/products-delta"

# Les données sont visibles dans le dossier au format parquet
df.write.format("delta").save(delta_table_path)

# Crée un objet deltaTable avec les fichiers parquet
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Modifie la table, un 2e fichier parquet est créé
deltaTable.update(
condition = "ProductID == 771",
set = { "ListPrice": "ListPrice * 0.5" })

# Affiche la table Delta modifiée
deltaTable.toDF().show(10)

# Charge dans un DataFrame le parquet avec la modification
new_df = spark.read.format("delta").load(delta_table_path)
new_df.show(10)

# Charge la version d'origine (0) des données
new_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
new_df.show(10)

# Charge les 10 dernières versions de la table
deltaTable.history(10).show(20, False, True)

Créer des tables de catalogue

Les tables delta chargent des données à partir du dossier contenant les fichiers parquet sur lesquels la table est basée.
Les tables de catalogue encapsulent les données et fournissent une entité de table nommée que vous pouvez référencer dans le code SQL.
Spark prend en charge deux types de tables de catalogue pour delta lake :
Tables externes définies par le chemin d’accès aux fichiers Parquet contenant les données de la table.
Tables gérées, qui sont définies dans le metastore Hive pour le pool Spark.

Table externe

# Crée une base de données
spark.sql("CREATE DATABASE AdventureWorks")
# Crée une table basée sur le fichier parquet
spark.sql("CREATE TABLE AdventureWorks.ProductsExternal USING DELTA LOCATION '{0}'".format(delta_table_path))
# Affiche une description de la table
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsExternal").show(truncate=False)
La propriété Location indique les dossiers de stockage des fichiers parquets présents : abfss://files@datalakeXXXXXXX.dfs.core.windows.net/delta/products-delta
Affiche le contenu de la table :
%%sql
USE AdventureWorks;
SELECT * FROM ProductsExternal;
image.png
La suppression de la table (DROP TABLE) conserve les fichiers sources.

Table gérée

La table est stockée dans un metastore situé : files > synapse > workspaces > synapseXXXXXXX > wharehouse > basededonnees.db > nomtable > *.parquet.
# Crée une table gérée à partir d'une Dataframe
df.write.format("delta").saveAsTable("AdventureWorks.ProductsManaged")
order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')
# Affiche la description de la table
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsManaged").show(truncate=False)
image.png
Affiche la table au format SQL :
%%sql
USE AdventureWorks;
SELECT * FROM ProductsManaged;
La suppression de la table (DROP TABLE) supprime les fichiers sources du metastore.

Autres (Lab 7)

Les fichiers de données sont également abstraits par une table dans le metastore, vous pouvez interroger les données directement à l'aide de SQL
# Création d'une table virtuelle liée
order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')

# Interrogation des fichiers via du SQL
sql_transform = spark.sql("SELECT *, YEAR(OrderDate) AS Year, MONTH(OrderDate) AS Month FROM sales_orders")

# On peut aussi partitionner la table
sql_transform.write.partitionBy("Year","Month").saveAsTable('transformed_orders', format='parquet', mode='overwrite', path='/transformed_orders_table')

SELECT * FROM transformed_orders WHERE Year = 2021 AND Month = 1
Avec l’argument Path, ne crée pas une base de données.
Want to print your doc?
This is not the way.
Try clicking the ⋯ next to your doc name or using a keyboard shortcut (
CtrlP
) instead.