azure
Support formation Microsoft Azure
azure
Support formation Microsoft Azure
Azure Synapse Analytics

icon picker
Extraits de codes

Gonzague Ducos

Magic dans Notebook

%%pyspark

PySpark et PySpark SQL

La documentation PySpark se trouve aussi dans la documentation PySpark SQL.

Lire des fichiers

df = spark.read.load('abfss://files@datalakeXXXXXXX.dfs.core.windows.net/folder/folder/file.csv', format='csv'
## If header exists uncomment line below
, header=True
)
Autre code possible : *.csv. /**/*.csv
from pyspark.sql.types import *
from pyspark.sql.functions import *

orderSchema = StructType([
StructField("SalesOrderNumber", StringType()),
StructField("SalesOrderLineNumber", IntegerType()),
StructField("OrderDate", DateType()),
StructField("CustomerName", StringType()),
StructField("Email", StringType()),
StructField("Item", StringType()),
StructField("Quantity", IntegerType()),
StructField("UnitPrice", FloatType()),
StructField("Tax", FloatType())
])

df = spark.read.load('abfss://files@datalakexxxxxxx.dfs.core.windows.net/folder/folder/*.csv', format='csv', schema=orderSchema)
display(df.limit(100))
header=True n’est donc pas nécessaire.

Ecrire des fichiers

delta_table_path = "/delta/products-delta"
df.write.mode("overwrite").format("delta").save(delta_table_path)
Aide save :

Manipuler une frame

L’objet dataframe dans Spark est similaire à un dataframe Pandas dans Python.
# Affiche le schéma
df.printSchema()

# Version complète
customers = df.select("CustomerName", "Email")

# Version abrégée
customers = df['CustomerName', 'Email']

# Regroupe les colonnes groubBy et sum() toutes les autres colonnes (ici sum(Quantity)
productSales = df.select("Item", "Quantity").groupBy("Item").sum()

# Utilise la fonction Year
yearlySales = df.select(year("OrderDate").alias("Year")).groupBy("Year").count().orderBy("Year")

# Sélection avec where
customers = df.select("CustomerName", "Email").where(df['Item']=='Road-250 Red, 52')

from pyspark.sql.functions import split, col

# Lit des CSV d'un dossier
order_details = spark.read.csv('/data/*.csv', header=True, inferSchema=True)

# Création des champs FirstName et LastName
transformed_df = order_details.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Supprime le champ CustomerName
transformed_df = transformed_df.drop("CustomerName")

# Sauvegarde les modifications
transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')


#### Partitionnement

# Création colonne Year et Month
dated_df = transformed_df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))

# Partitionne sur Année et Mois (création de dossier Year=AAAA puis Month=MMMM
dated_df.write.partitionBy("Year","Month").mode("overwrite").parquet("/partitioned_data")

# Interroge une partition
orders_2020 = spark.read.parquet('/partitioned_data/Year=2020/Month=*')
display(orders_2020.limit(5))

Bibliothèque spark.sql

## Crée une vue temporaire
df.createOrReplaceTempView("salesorders")
## Interroge la vue et la stock dans un dataframe
spark_df = spark.sql("SELECT * FROM salesorders")

##Interrogation dans
%%sql
SELECT YEAR(OrderDate) AS OrderYear,
SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue
FROM salesorders
GROUP BY YEAR(OrderDate)
ORDER BY OrderYear;

Want to print your doc?
This is not the way.
Try clicking the ⋯ next to your doc name or using a keyboard shortcut (
CtrlP
) instead.