azure
Support formation Microsoft Azure
azure
Support formation Microsoft Azure
Labs 203 | Azure Date Engineer

Lab 05 | Analyser les données d’un lac de données avec Spark

Gonzague Ducos
Apache Spark est un moteur open source pour le traitement distribué des données, et est largement utilisé pour explorer, traiter et analyser d’énormes volumes de données dans le stockage de lacs de données. Spark est disponible en tant qu’option de traitement dans de nombreux produits de plateforme de données, notamment Azure HDInsight, Azure Databricks et Azure Synapse Analytics sur la plateforme cloud Microsoft Azure. L’un des avantages de Spark est la prise en charge d’un large éventail de langages de programmation, notamment Java, Scala, Python et SQL ; faisant de Spark une solution très flexible pour les charges de travail de traitement des données, y compris le nettoyage et la manipulation des données, l’analyse statistique et l’apprentissage automatique, ainsi que l’analyse et la visualisation des données.
Cet atelier prendra environ 45 minutes.

Avant de commencer

Vous aurez besoin d’un dans lequel vous disposez d’un accès de niveau administratif.

Approvisionner un espace de travail Azure Synapse Analytics

Vous aurez besoin d’un espace de travail Azure Synapse Analytics avec accès au stockage du lac de données et d’un pool Apache Spark que vous pouvez utiliser pour interroger et traiter des fichiers dans le lac de données.
Dans cet exercice, vous allez utiliser une combinaison d’un script PowerShell et d’un modèle ARM pour provisionner un espace de travail Azure Synapse Analytics.
Connectez-vous au à l’adresse https://portal.azure.com.
Utilisez le bouton [>_] à droite de la barre de recherche en haut de la page pour créer un Cloud Shell dans le portail Azure, en sélectionnant un environnement PowerShell et en créant un stockage si vous y êtes invité. Cloud Shell fournit une interface de ligne de commande dans un volet situé au bas du portail Azure, comme illustré ici : ​
image.png
Remarque : Si vous avez déjà créé un Cloud Shell qui utilise un environnement Bash, utilisez le menu déroulant en haut à gauche du volet Cloud Shell pour le remplacer par PowerShell.
Notez que vous pouvez redimensionner le shell cloud en faisant glisser la barre de séparation en haut du volet ou en utilisant les icônes , et X en haut à droite du volet pour réduire, agrandir et fermer le volet. Pour plus d’informations sur l’utilisation d’Azure Cloud Shell, consultez la .
Dans le volet PowerShell, entrez les commandes suivantes pour cloner ce référentiel :
rm -r dp-203 -f
git clone https://github.com/MicrosoftLearning/DP-500-Azure-Data-Analyst dp-203
Une fois le référentiel cloné, entrez les commandes suivantes pour accéder au dossier de cet atelier et exécuter le script setup.ps1 qu’il contient :
cd dp-203/Allfiles/05
./setup.ps1
Si vous y êtes invité, choisissez l’abonnement que vous souhaitez utiliser (cela ne se produira que si vous avez accès à plusieurs abonnements Azure).
Lorsque vous y êtes invité, entrez un mot de passe approprié à définir pour votre pool SQL Azure Synapse. ​Remarque : N’oubliez pas ce mot de passe !
Attendez que le script soit terminé - cela prend généralement environ 10 minutes, mais dans certains cas, cela peut prendre plus de temps. Pendant que vous attendez, consultez l’article dans la documentation Azure Synapse Analytics.

Interroger des données dans des fichiers

Le script provisionne un espace de travail Azure Synapse Analytics et un compte de stockage Azure pour héberger le lac de données, puis charge certains fichiers de données dans le lac de données.

Afficher les fichiers dans le lac de données

Une fois le script terminé, dans le portail Azure, accédez au groupe de ressources dp203-xxxxxxx qu’il a créé, puis sélectionnez votre espace de travail Synapse.
Dans la page Vue d’ensemble de votre espace de travail Synapse, dans la carte Ouvrir Synapse Studio, sélectionnez Ouvrir pour ouvrir Synapse Studio dans un nouvel onglet du navigateur. Connectez-vous si vous y êtes invité.
Sur le côté gauche de Synapse Studio, utilisez l’icône ›› pour développer le menu, ce qui révèle les différentes pages de Synapse Studio que vous utiliserez pour gérer les ressources et effectuer des tâches d’analyse de données.
Sur la page Gérer, sélectionnez l’onglet Pools Apache Spark et notez qu’un pool Spark portant un nom similaire à sparkxxxxxxx a été provisionné dans l’espace de travail. Par la suite, vous utiliserez ce pool Spark pour charger et analyser des données à partir de fichiers dans le stockage du lac de données de l’espace de travail.
Dans la page Données, affichez l’onglet Lié et vérifiez que votre espace de travail inclut un lien vers votre compte de stockage Azure Data Lake Storage Gen2, qui doit avoir un nom similaire à synapsexxxxxxx (Principal - datalakexxxxxxx).
Développez votre compte de stockage et vérifiez qu’il contient un conteneur de système de fichiers nommé files.
Sélectionnez le conteneur files et notez qu’il contient les dossiers nommés sales et synapse. Le dossier synapse est utilisé par Azure Synapse, et le dossier sales contient les fichiers de données que vous allez interroger.
Ouvrez le dossier sales et le dossier des commandes qu’il contient, et observez que le dossier des commandes contient des fichiers .csv pour trois ans de données de vente.
Cliquez avec le bouton droit sur l’un des fichiers et sélectionnez Aperçu pour afficher les données qu’il contient. Notez que les fichiers ne contiennent pas de ligne d’en-tête, vous pouvez donc désélectionner l’option d’affichage des en-têtes de colonne.

Utiliser Spark pour explorer les données

Sélectionnez l’un des fichiers dans le dossier orders, puis dans la liste Nouveau bloc-notes de la barre d’outils, sélectionnez Charger dans DataFrame. Une trame de données est une structure dans Spark qui représente un jeu de données tabulaires.
Dans le nouvel onglet Notebook 1 qui s’ouvre, dans la liste Attacher à, sélectionnez votre pool Spark (sparkxxxxxxx). Utilisez ensuite le bouton ▷ Exécuter tout pour exécuter toutes les cellules du carnet (il n’y en a actuellement qu’une seule !).
Étant donné que c’est la première fois que vous exécutez du code Spark dans cette session, le pool Spark doit être démarré. Cela signifie que la première exécution de la session peut prendre quelques minutes. Les exécutions suivantes seront plus rapides.
Pendant que vous attendez l’initialisation de la session Spark, passez en revue le code qui a été généré ; qui ressemble à ceci :
%%pyspark
df = spark.read.load('abfss://files@datalakexxxxxxx.dfs.core.windows.net/sales/orders/2019.csv', format='csv'
## If header exists uncomment line below
##, header=True
)
display(df.limit(10))
Une fois l’exécution du code terminée, examinez la sortie sous la cellule dans le bloc-notes. Il affiche les dix premières lignes du fichier que vous avez sélectionné, avec des noms de colonnes automatiques sous la forme _c0, _c1, _c2, etc.
Modifiez le code pour que la fonction spark.read.load lise les données de tous les fichiers CSV du dossier et que la fonction display affiche les 100 premières lignes. Votre code doit ressembler à ceci (datalakexxxxxxx correspondant au nom de votre magasin de lac de données) :
%%pyspark
df = spark.read.load('abfss://files@datalakexxxxxxx.dfs.core.windows.net/sales/orders/*.csv', format='csv'
)
display(df.limit(100))
Utilisez le bouton à gauche de la cellule de code pour exécuter uniquement cette cellule et examiner les résultats.
La trame de données inclut désormais les données de tous les fichiers, mais les noms de colonne ne sont pas utiles. Spark utilise une approche « schéma à la lecture » pour essayer de déterminer les types de données appropriés pour les colonnes en fonction des données qu’elles contiennent, et si une ligne d’en-tête est présente dans un fichier texte, elle peut être utilisée pour identifier les noms de colonnes (en spécifiant un paramètre header=True dans la fonction load). Vous pouvez également définir un schéma explicite pour la trame de données.
Modifiez le code comme suit (en remplaçant datalakexxxxxxx) afin de définir un schéma explicite pour le dataframe qui inclut les noms de colonnes et les types de données. Réexécutez le code dans la cellule.
%%pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *

orderSchema = StructType([
StructField("SalesOrderNumber", StringType()),
StructField("SalesOrderLineNumber", IntegerType()),
StructField("OrderDate", DateType()),
StructField("CustomerName", StringType()),
StructField("Email", StringType()),
StructField("Item", StringType()),
StructField("Quantity", IntegerType()),
StructField("UnitPrice", FloatType()),
StructField("Tax", FloatType())
])

df = spark.read.load('abfss://files@datalakexxxxxxx.dfs.core.windows.net/sales/orders/*.csv', format='csv', schema=orderSchema)
display(df.limit(100))
Sous les résultats, utilisez le bouton + Code pour ajouter une nouvelle cellule de code au bloc-notes. Ensuite, dans la nouvelle cellule, ajoutez le code suivant pour afficher le schéma de la trame de données :
df.printSchema()
Exécutez la nouvelle cellule et vérifiez que le schéma de trame de données correspond au orderSchema que vous avez défini. La fonction printSchema peut être utile lors de l’utilisation d’une trame de données avec un schéma déduit automatiquement.

Analyser des données dans un dataframe

L’objet dataframe dans Spark est similaire à un dataframe Pandas dans Python et comprend un large éventail de fonctions que vous pouvez utiliser pour manipuler, filtrer, regrouper et analyser les données qu’il contient.

Filtrer une trame de données

Ajoutez une nouvelle cellule de code au bloc-notes et entrez-y le code suivant :
customers = df['CustomerName', 'Email']
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())
Exécutez la nouvelle cellule de code et examinez les résultats. Respectez les détails suivants :
Lorsque vous effectuez une opération sur une trame de données, le résultat est une nouvelle trame de données (dans ce cas, une nouvelle trame de données customers est créée en sélectionnant un sous-ensemble spécifique de colonnes à partir de la trame de données df).
Les trames de données fournissent des fonctions telles que count et distinct qui peuvent être utilisées pour résumer et filtrer les données qu’elles contiennent.
La syntaxe dataframe['Field1', 'Field2', ...] est une façon abrégée de définir un sous-ensemble de colonne. Vous pouvez également utiliser la méthode select, de sorte que la première ligne du code ci-dessus puisse être écrite comme suit : customers = df.select("CustomerName", "Email")
Modifiez le code comme suit :
customers = df.select("CustomerName", "Email").where(df['Item']=='Road-250 Red, 52')
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())
Exécutez le code modifié pour afficher les clients qui ont acheté le produit Road-250 Red, 52. Notez que vous pouvez « chaîner » plusieurs fonctions de sorte que la sortie d’une fonction devienne l’entrée de la suivante - dans ce cas, la trame de données créée par la méthode select est la trame de données source de la méthode where utilisée pour appliquer des critères de filtrage.

Agréger et regrouper des données dans une trame de données

Ajoutez une nouvelle cellule de code au bloc-notes et entrez-y le code suivant :
productSales = df.select("Item", "Quantity").groupBy("Item").sum()
display(productSales)
Exécutez la cellule de code que vous avez ajoutée et notez que les résultats affichent la somme des quantités commandées regroupées par produit. La méthode groupBy regroupe les lignes par Item, et la fonction d’agrégation sum suivante est appliquée à toutes les colonnes numériques restantes (dans ce cas, Quantity).
Ajoutez une autre nouvelle cellule de code au bloc-notes et entrez-y le code suivant :
yearlySales = df.select(year("OrderDate").alias("Year")).groupBy("Year").count().orderBy("Year")
display(yearlySales)
Exécutez la cellule de code que vous avez ajoutée et notez que les résultats indiquent le nombre de commandes client par an. Notez que la méthode select inclut une fonction SQL year pour extraire le composant year du champ OrderDate, puis une méthode alias est utilisée pour attribuer un nom de columm à la valeur d’année extraite. Les données sont ensuite regroupées par la colonne dérivée Année et le nombre de lignes dans chaque groupe est calculé avant que la méthode orderBy ne soit utilisée pour trier la trame de données résultante.

Interroger des données à l’aide de Spark SQL

Comme vous l’avez vu, les méthodes natives de l’objet dataframe vous permettent d’interroger et d’analyser les données de manière assez efficace. Cependant, de nombreux analystes de données sont plus à l’aise avec la syntaxe SQL. Spark SQL est une API de langage SQL dans Spark que vous pouvez utiliser pour exécuter des instructions SQL, ou même conserver des données dans des tables relationnelles.

Utiliser Spark SQL dans le code PySpark

Le langage par défaut dans les notebooks Azure Synapse Studio est PySpark, qui est un environnement d’exécution Python basé sur Spark. Dans ce runtime, vous pouvez utiliser la bibliothèque spark.sql pour incorporer la syntaxe SQL Spark dans votre code Python et utiliser des constructions SQL telles que des tables et des vues.
Ajoutez une nouvelle cellule de code au bloc-notes et entrez-y le code suivant :
df.createOrReplaceTempView("salesorders")
spark_df = spark.sql("SELECT * FROM salesorders")
display(spark_df)
Exécutez la cellule et examinez les résultats. Observez que :
Le code conserve les données dans la trame de données df sous la forme d’une vue temporaire nommée salesorders. Spark SQL prend en charge l’utilisation de vues temporaires ou de tables persistantes comme sources pour les requêtes SQL.
La méthode spark.sql est ensuite utilisée pour exécuter une requête SQL sur la vue salesorders.
Les résultats de la requête sont stockés dans une trame de données.

Exécuter du code SQL dans une cellule

Bien qu’il soit utile de pouvoir intégrer des instructions SQL dans une cellule contenant du code PySpark, les analystes de données souhaitent souvent travailler directement en SQL.
Ajoutez une nouvelle cellule de code au bloc-notes et entrez-y le code suivant :
%%sql
SELECT YEAR(OrderDate) AS OrderYear,
SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue
FROM salesorders
GROUP BY YEAR(OrderDate)
ORDER BY OrderYear;
Exécutez la cellule et examinez les résultats. Observez que :
Le %%sql au début de la cellule (appelée magie) indique que le runtime du langage SQL Spark doit être utilisé pour exécuter le code dans cette cellule au lieu de PySpark.
Le code SQL fait référence à la vue salesorder que vous avez créée précédemment à l’aide de PySpark.
La sortie de la requête SQL s’affiche automatiquement en tant que résultat sous la cellule.
Remarque : Pour plus d’informations sur Spark SQL et les dataframes, consultez la .

Visualiser les données avec Spark

Une image vaut proverbialement mille mots, et un graphique vaut souvent mieux qu’un millier de lignes de données. Bien que les notebooks d’Azure Synapse Analytics incluent une vue graphique intégrée pour les données affichées à partir d’une trame de données ou d’une requête SQL Spark, elle n’est pas conçue pour la création de graphiques complets. Cependant, vous pouvez utiliser des bibliothèques graphiques Python telles que matplotlib et seaborn pour créer des graphiques à partir de données dans des dataframes.

Afficher les résultats sous forme de graphique

Ajoutez une nouvelle cellule de code au bloc-notes et entrez-y le code suivant :
%%sql
SELECT * FROM salesorders
Exécutez le code et observez qu’il renvoie les données de la vue salesorder que vous avez créée précédemment.
Dans la section des résultats sous la cellule, remplacez l’option Afficher par Tableau.
Utilisez le bouton Afficher les options en haut à droite du graphique pour afficher le volet d’options du graphique. Définissez ensuite les options comme suit et sélectionnez Appliquer:
Type de graphique : Graphique à barres
Clé : Item
Valeurs : Quantité
Groupe de séries : laisser vide
Agrégation : Somme
Empilé : Non sélectionné
Vérifiez que le graphique ressemble à ceci : ​
notebook-chart.png

Démarrer avec matplotlib

Ajoutez une nouvelle cellule de code au bloc-notes et entrez-y le code suivant :
sqlQuery = "SELECT CAST(YEAR(OrderDate) AS CHAR(4)) AS OrderYear, \
SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue \
FROM salesorders \
GROUP BY CAST(YEAR(OrderDate) AS CHAR(4)) \
ORDER BY OrderYear"
df_spark = spark.sql(sqlQuery)
df_spark.show()
Exécutez le code et observez qu’il retourne une trame de données Spark contenant le chiffre d’affaires annuel.
Pour visualiser les données sous forme de graphique, nous allons commencer par utiliser la bibliothèque Python matplotlib. Cette bibliothèque est la bibliothèque de traçage de base sur laquelle beaucoup d’autres sont basées, et offre une grande flexibilité dans la création de graphiques.
Ajoutez une nouvelle cellule de code au bloc-notes et ajoutez-y le code suivant :
from matplotlib import pyplot as plt

# matplotlib requires a Pandas dataframe, not a Spark one
df_sales = df_spark.toPandas()

# Create a bar plot of revenue by year
plt.bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'])

# Display the plot
plt.show()
Exécutez la cellule et examinez les résultats, qui consistent en un histogramme avec le revenu brut total pour chaque année. Notez les caractéristiques suivantes du code utilisé pour produire ce graphique :
La bibliothèque matplotlib nécessite une trame de données Pandas, vous devez donc convertir la trame de données Spark renvoyée par la requête SQL Spark dans ce format.
Au cœur de la bibliothèque matplotlib se trouve l’objet pyplot. C’est la base de la plupart des fonctionnalités de traçage.
Les paramètres par défaut permettent d’obtenir un graphique utilisable, mais il est possible de le personnaliser.
Modifiez le code pour tracer le graphique comme suit :
# Clear the plot area
plt.clf()

# Create a bar plot of revenue by year
plt.bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange')

# Customize the chart
plt.title('Revenue by Year')
plt.xlabel('Year')
plt.ylabel('Revenue')
plt.grid(color='#95a5a6', linestyle='--', linewidth=2, axis='y', alpha=0.7)
plt.xticks(rotation=45)

# Show the figure
plt.show()
Réexécutez la cellule de code et affichez les résultats. Le graphique comprend maintenant un peu plus d’informations.
Une parcelle est techniquement contenue par une figure. Dans les exemples précédents, la figure a été créée implicitement pour vous ; Mais vous pouvez le créer explicitement.
Modifiez le code pour tracer le graphique comme suit :
# Clear the plot area
plt.clf()

# Create a Figure
fig = plt.figure(figsize=(8,3))

# Create a bar plot of revenue by year
plt.bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange')

# Customize the chart
plt.title('Revenue by Year')
plt.xlabel('Year')
plt.ylabel('Revenue')
plt.grid(color='#95a5a6', linestyle='--', linewidth=2, axis='y', alpha=0.7)
Want to print your doc?
This is not the way.
Try clicking the ⋯ next to your doc name or using a keyboard shortcut (
CtrlP
) instead.