from delta.tables import *
from pyspark.sql.functions import *
# Crée un Dataframe avec un fichier CSV
df = spark.read.load('abfss://files@datalakeXXXXXXX.dfs.core.windows.net/products/products.csv', format='csv', header=True)
# Variable du chemin vers les données
delta_table_path = "/delta/products-delta"
# Les données sont visibles dans le dossier au format parquet
df.write.format("delta").save(delta_table_path)
# Crée un objet deltaTable avec les fichiers parquet
deltaTable = DeltaTable.forPath(spark, delta_table_path)
# Modifie la table, un 2e fichier parquet est créé
deltaTable.update(
condition = "ProductID == 771",
set = { "ListPrice": "ListPrice * 0.5" })
# Affiche la table Delta modifiée
deltaTable.toDF().show(10)
# Charge dans un DataFrame le parquet avec la modification
new_df = spark.read.format("delta").load(delta_table_path)
new_df.show(10)
# Charge la version d'origine (0) des données
new_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
new_df.show(10)
# Charge les 10 dernières versions de la table
deltaTable.history(10).show(20, False, True)