from pyspark.sql.functions import split, col
# Lit des CSV d'un dossier
order_details = spark.read.csv('/data/*.csv', header=True, inferSchema=True)
# Création des champs FirstName et LastName
transformed_df = order_details.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))
# Supprime le champ CustomerName
transformed_df = transformed_df.drop("CustomerName")
# Sauvegarde les modifications
transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')
#### Partitionnement
# Création colonne Year et Month
dated_df = transformed_df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))
# Partitionne sur Année et Mois (création de dossier Year=AAAA puis Month=MMMM
dated_df.write.partitionBy("Year","Month").mode("overwrite").parquet("/partitioned_data")
# Interroge une partition
orders_2020 = spark.read.parquet('/partitioned_data/Year=2020/Month=*')
display(orders_2020.limit(5))