azure
Support formation Microsoft Azure
azure
Support formation Microsoft Azure
Labs 203 | Azure Date Engineer

Lab 15 | Ingérer des données en temps réel avec Azure Stream Analytique et Azure Synapse Analytique

Gonzague Ducos
Les solutions d’analytique de données incluent souvent l’obligation d’ingérer et de traiter des flux de données. Le traitement des flux diffère du traitement par lots en ce que les flux sont généralement illimités - en d’autres termes, ce sont des sources continues de données qui doivent être traitées perpétuellement plutôt qu’à des intervalles fixes.
Azure Stream Analytique fournit un service cloud que vous pouvez utiliser pour définir une requête qui fonctionne sur un flux de données à partir d’une source de streaming, telle qu’Azure Event Hubs ou Azure IoT Hub. Vous pouvez utiliser une requête Azure Stream Analytique pour ingérer le flux de données directement dans un magasin de données à des fins d’analyse plus approfondie, ou pour filtrer, agréger et résumer les données en fonction de fenêtres temporelles.
Dans cet exercice, vous allez utiliser Azure Stream Analytique pour traiter un flux de données de commande client, telles que celles générées à partir d’une application de vente au détail en ligne. Les données de commande sont envoyées à Azure Event Hubs, à partir duquel vos travaux Azure Stream Analytique lisent les données et les ingèrent dans Azure Synapse Analytique.
Cet exercice devrait durer environ 45 minutes.

Avant de commencer

Vous aurez besoin d’un dans lequel vous disposez d’un accès de niveau administratif.

Approvisionner des ressources Azure

Dans cet exercice, vous aurez besoin d’un espace de travail Azure Synapse Analytique avec accès au stockage du lac de données et d’un pool SQL dédié. Vous aurez également besoin d’un espace de noms Azure Event Hubs auquel les données de commande de streaming peuvent être envoyées.
Vous allez utiliser une combinaison d’un script PowerShell et d’un modèle ARM pour provisionner ces ressources.
Connectez-vous au à l’adresse https://portal.azure.com.
Utilisez le bouton [>_] à droite de la barre de recherche en haut de la page pour créer un Cloud Shell dans le portail Azure, en sélectionnant un environnement PowerShell et en créant un stockage si vous y êtes invité. Cloud Shell fournit une interface de ligne de commande dans un volet situé au bas du portail Azure, comme illustré ici : ​
Remarque : Si vous avez déjà créé un Cloud Shell qui utilise un environnement Bash, utilisez le menu déroulant en haut à gauche du volet Cloud Shell pour le remplacer par PowerShell.
Notez que vous pouvez redimensionner la coque du nuage en faisant glisser la barre de séparation en haut du volet ou en utilisant les icônes , et X en haut à droite du volet pour réduire, agrandir et fermer le volet. Pour plus d’informations sur l’utilisation d’Azure Cloud Shell, consultez la .
Dans le volet PowerShell, entrez les commandes suivantes pour cloner le référentiel contenant cet exercice :
rm -r dp-203 -f
git clone https://github.com/MicrosoftLearning/dp-203-azure-data-engineer dp-203

Une fois le référentiel cloné, entrez les commandes suivantes pour accéder au dossier de cet exercice et exécutez le script setup.ps1 qu’il contient :
cd dp-203/Allfiles/labs/18
./setup.ps1

Si vous y êtes invité, choisissez l’abonnement que vous souhaitez utiliser (cela ne se produira que si vous avez accès à plusieurs abonnements Azure).
Lorsque vous y êtes invité, entrez un mot de passe approprié à définir pour votre pool SQL Azure Synapse.
Remarque : N’oubliez pas ce mot de passe !
Attendez que le script soit terminé - cela prend généralement environ 15 minutes, mais dans certains cas, cela peut prendre plus de temps. Pendant que vous attendez, consultez l’article dans la documentation Azure Stream Analytique.

Ingérer des données de streaming dans un pool SQL dédié

Commençons par ingérer un flux de données directement dans une table d’un pool SQL dédié Azure Synapse Analytique.

Afficher la table source et base de données de streaming

Une fois l’exécution du script d’installation terminée, réduisez le volet Cloud Shell (vous y reviendrez ultérieurement). Ensuite, dans le portail Azure, accédez au groupe de ressources dp203-xxxxxxx qu’il a créé et notez que ce groupe de ressources contient un espace de travail Azure Synapse, un compte de stockage pour votre lac de données, un pool SQL dédié et un espace de noms Event Hubs.
Sélectionnez votre espace de travail Synapse, puis dans sa page Vue d’ensemble, dans la carte Ouvrir Synapse Studio, sélectionnez Ouvrir pour ouvrir Synapse Studio dans un nouvel onglet du navigateur. Synapse Studio est une interface Web que vous pouvez utiliser pour travailler avec votre espace de travail Synapse Analytique.
Sur le côté gauche de Synapse Studio, utilisez l’icône ›› pour développer le menu, ce qui révèle les différentes pages de Synapse Studio que vous utiliserez pour gérer les ressources et effectuer des tâches d’analytique de données.
Sur la page Gérer, dans la section Pools SQL, sélectionnez la ligne du pool SQL dédié sql xxxxxxx, puis utilisez son icône pour la réactiver.
Pendant que vous attendez que le pool SQL démarre, revenez à l’onglet du navigateur contenant le portail Azure et rouvrez le volet Cloud Shell.
Dans le volet Cloud Shell, entrez la commande suivante pour exécuter une application cliente qui envoie 100 commandes simulées à Azure Event Hubs :
code
node ~/dp-203/Allfiles/labs/18/orderclient

Observez les données de la commande au fur et à mesure de leur envoi - chaque commande se compose d’un ID de produit et d’une quantité.
Une fois l’application client de commande terminée, réduisez le volet Cloud Shell et revenez à l’onglet du navigateur Synapse Studio.
Dans Synapse Studio, sur la page Gérer, assurez-vous que l’état En ligne de votre pool SQL dédié est Online, puis basculez vers la page de données et, dans le volet Espace de travail, développez Base de données SQL, votre pool SQL sql xxxxxxx et Tables pour afficher le dbo. Table FactOrder.
Dans le menu ... pour le dbo. Table FactOrder, sélectionnez Nouveau script SQL > Sélectionnez les 100 premières lignes et examinez les résultats. Notez que la table comprend des colonnes pour OrderDateTime, ProductID et Quantity, mais qu’il n’y a actuellement aucune ligne de données.

Créer un travail Azure Stream Analytique pour ingérer des données de commande

Revenez à l’onglet du navigateur contenant le portail Azure et notez la région dans laquelle votre groupe de ressources dp203-xxxxxxx a été approvisionné : vous allez créer votre travail Stream Analytique dans la même région.
Sur la page d’accueil, sélectionnez + Créer une ressource et recherchez Stream Analytics job. Créez ensuite un travail Stream Analytique avec les propriétés suivantes :
Notions de base:
Abonnement : votre abonnement Azure
Groupe de ressources : sélectionnez le groupe de ressources dp203-xxxxxxx existant.
Nom: ingest-orders
Région : sélectionnez la même région que celle dans laquelle votre espace de travail Synapse Analytique est provisionné.
Environnement d’hébergement : Cloud
Unités de streaming : 1
Stockage:
Ajouter un compte de stockage : sélectionné
Abonnement : votre abonnement Azure
Comptes de stockage : sélectionnez le compte de stockage datalake xxxxxxx
Mode d’authentification : Chaîne de connexion
Sécuriser les données privées dans le compte de stockage : sélectionné
Étiquettes:
Aucun
Attendez la fin du déploiement, puis accédez à la ressource de travail Stream Analytique déployée.

Créer une entrée pour le flux de données d’événement

Sur la page de présentation des ordres d’acquisition, sélectionnez la page Entrées. Utilisez le menu Ajouter une entrée pour ajouter une entrée Event Hub avec les propriétés suivantes :
Alias d’entrée: orders
Sélectionnez Event Hub dans vos abonnements : Sélectionné
Abonnement : votre abonnement Azure
Espace de noms Event Hub : sélectionnez l’espace de noms events xxxxxxx Event Hubs
Nom du hub d’événements : sélectionnez le hub d’événementseventhub xxxxxxx existant.
Groupe de consommateurs Event Hub : sélectionnez Utiliser l’existant, puis sélectionnez le groupe de consommateurs $Default
Mode d’authentification : Créer une identité managée affectée par le système
Clé de partition : laissez vide
Format de sérialisation des événements : JSON
Codage : UTF-8
Enregistrez l’entrée et patientez pendant sa création. Vous verrez plusieurs notifications. Attendez une notification de test de connexion réussi.

Créer une sortie pour la table SQL

Consultez la page Sorties du travail Stream Analytique des ordres d’ingestion. Utilisez ensuite le menu Ajouter une sortie pour ajouter une sortie Azure Synapse Analytique avec les propriétés suivantes :
Alias de sortie: FactOrder
Sélectionnez Azure Synapse Analytique dans vos abonnements : Sélectionné
Abonnement : votre abonnement Azure
Base de données : sélectionnez la base de données sqlxxxxxxx (synapsexxxxxxx)
Mode d’authentification : authentification SQL Server
Nom d’utilisateur : SQLUser
Mot de passe : mot de passe que vous avez spécifié pour votre pool SQL lors de l’exécution du script d’installation
Table: FactOrder
Enregistrez la sortie et patientez pendant sa création. Vous verrez plusieurs notifications. Attendez une notification de test de connexion réussi.

Créer une requête pour ingérer le flux d’événements

Consultez la page Requête pour le travail Stream Analytique d’ingestion. Attendez ensuite quelques instants jusqu’à ce que l’aperçu de l’entrée s’affiche (en fonction des événements de commande client précédemment capturés dans le hub d’événements).
Notez que les données d’entrée incluent les champs ProductID et Quantity dans les messages envoyés par l’application cliente, ainsi que des champs Event Hubs supplémentaires, y compris le champ EventProcessedUtcTime qui indique quand l’événement a été ajouté au hub d’événements.
Modifiez la requête par défaut comme suit :
code
SELECT
EventProcessedUtcTime AS OrderDateTime,
ProductID,
Quantity
INTO
[FactOrder]
FROM
[orders]

Notez que cette requête prend des champs de l’entrée (hub d’événements) et les écrit directement dans la sortie (table SQL).
Enregistrez la requête.

Exécuter la tâche de streaming pour ingérer les données de commande

Consultez la page Vue d’ensemble du travail Stream Analytique des ordres d’ingestion et, sous l’onglet Propriétés, passez en revue les entrées, la requête, les sorties et les fonctions du travail. Si le nombre d’entrées et de sorties est égal à 0, utilisez le bouton ↻ Actualiser sur la page Vue d’ensemble pour afficher l’entrée des commandes et la sortie de FactTable.
Sélectionnez le bouton ▷ Démarrer et lancez la tâche de streaming maintenant. Attendez d’être informé que la tâche de diffusion en continu a démarré avec succès.
Rouvrez le volet Cloud Shell et réexécutez la commande suivante pour soumettre 100 commandes supplémentaires.
node ~/dp-203/Allfiles/labs/18/orderclient

Pendant que l’application client de commande est en cours d’exécution, basculez vers l’onglet du navigateur Synapse Studio et affichez la requête que vous avez exécutée précédemment pour sélectionner les 100 premières lignes du dbo. Table FactOrder.
Utilisez le bouton ▷ Exécuter pour réexécuter la requête et vérifier que la table contient maintenant les données d’ordre du flux d’événements (si ce n’est pas le cas, attendez une minute et réexécutez la requête). Le travail Stream Analytique envoie toutes les nouvelles données d’événement dans la table tant que le travail est en cours d’exécution et que les événements d’ordre sont envoyés au hub d’événements.
Sur la page Gérer, suspendez le pool SQL dédié sql xxxxxxx (pour éviter les frais Azure inutiles).
Revenez à l’onglet du navigateur contenant le portail Azure et réduisez le volet Cloud Shell. Utilisez ensuite le 🗆 bouton Arrêter pour arrêter le travail Stream Analytique et attendez la notification indiquant que le travail Stream Analytique s’est arrêté avec succès.

Synthétiser les données de streaming dans un lac de données

Jusqu’à présent, vous avez vu comment utiliser une tâche Stream Analytique pour ingérer des messages provenant d’une source de diffusion en continu dans une table SQL. Voyons maintenant comment utiliser Azure Stream Analytique pour agréger des données sur des fenêtres temporelles, dans ce cas, pour calculer la quantité totale de chaque produit vendu toutes les 5 secondes. Nous allons également explorer comment utiliser un autre type de sortie pour le travail en écrivant les résultats au format CSV dans un magasin d’objets blob de lac de données.

Créer un travail Azure Stream Analytique pour agréger les données de commande

Dans le portail Azure, sur la page d’accueil, sélectionnez + Créer une ressource et recherchez Stream Analytics job. Créez ensuite un travail Stream Analytique avec les propriétés suivantes :
Notions de base:
Abonnement : votre abonnement Azure
Groupe de ressources : sélectionnez le groupe de ressources dp203-xxxxxxx existant.
Nom: aggregate-orders
Région : sélectionnez la même région que celle dans laquelle votre espace de travail Synapse Analytique est provisionné.
Environnement d’hébergement : Cloud
Unités de streaming : 1
Stockage:
Ajouter un compte de stockage : sélectionné
Abonnement : votre abonnement Azure
Comptes de stockage : sélectionnez le compte de stockage datalake xxxxxxx
Mode d’authentification : Chaîne de connexion
Sécuriser les données privées dans le compte de stockage : sélectionné
Étiquettes:
Aucun
Attendez la fin du déploiement, puis accédez à la ressource de travail Stream Analytique déployée.

Création d’une entrée pour les données brutes de commande

Sur la page de présentation des ordres agrégés, sélectionnez la page Entrées. Utilisez le menu Ajouter une entrée pour ajouter une entrée Event Hub avec les propriétés suivantes :
Alias d’entrée: orders
Sélectionnez Event Hub dans vos abonnements : Sélectionné
Abonnement : votre abonnement Azure
Espace de noms Event Hub : sélectionnez l’espace de noms events xxxxxxx Event Hubs
Nom du hub d’événements : sélectionnez le hub d’événementseventhub xxxxxxx existant.
Groupe de consommateurs Event Hub : sélectionnez le groupe de consommateurs $Default existant
Mode d’authentification : Créer une identité managée affectée par le système
Clé de partition : laissez vide
Format de sérialisation des événements : JSON
Codage : UTF-8
Enregistrez l’entrée et patientez pendant sa création. Vous verrez plusieurs notifications. Attendez une notification de test de connexion réussi.

Créer une sortie pour le magasin de lac de données

Consultez la page Sorties pour le travail Stream Analytique des ordres agrégés. Utilisez ensuite le menu Ajouter une sortie pour ajouter une sortie de stockage Blob/ADLS Gen2 avec les propriétés suivantes :
Alias de sortie: datalake
Sélectionnez Sélectionner Stockage Blob/ADLS Gen2 à partir de vos abonnements à partir de vos abonnements : Sélectionné
Abonnement : votre abonnement Azure
Compte de stockage : sélectionnez le compte de stockage datalake xxxxxxx
Conteneur : sélectionnez Utiliser existant, puis dans la liste, sélectionnez le conteneur de fichiers
Mode d’authentification : Chaîne de connexion
Format de sérialisation des événements : CSV - Virgule (,)
Codage : UTF-8
Mode d’écriture : Ajouter, au fur et à mesure que les résultats arrivent
Modèle de chemin: {date}
Format de date : AAAA/MM/JJ
Format de l’heure : Sans objet
Nombre minimum de rangées : 20
Temps maximum : 0 Heures, 1 minutes, 0 secondes
Enregistrez la sortie et patientez pendant sa création. Vous verrez plusieurs notifications. Attendez une notification de test de connexion réussi.

Créer une requête pour agréger les données d’événement

Consultez la page Requête pour le travail Stream Analytique d’ordres agrégés.
Modifiez la requête par défaut comme suit :
SELECT
DateAdd(second,-5,System.TimeStamp) AS StartTime,
System.TimeStamp AS EndTime,
ProductID,
SUM(Quantity) AS Orders
INTO
[datalake]
FROM
[orders] TIMESTAMP BY EventProcessedUtcTime
GROUP BY ProductID, TumblingWindow(second, 5)
HAVING COUNT(*) > 1

Notez que cette requête utilise System.Timestamp (basé sur le champ EventProcessedUtcTime) pour définir le début et la fin de chaque fenêtre de basculement de 5 secondes (séquentielle non chevauchante) dans laquelle la quantité totale de chaque ID de produit est calculée.
Enregistrez la requête.

Exécuter la tâche de streaming pour agréger les données de commande

Affichez la page Vue d’ensemble du travail Stream Analytique d’ordres agrégés et, sous l’onglet Propriétés, passez en revue les entrées, la requête, les sorties et les fonctions du travail. Si le nombre d’entrées et de sorties est égal à 0, utilisez le bouton ↻ Actualiser sur la page Vue d’ensemble pour afficher l’entrée des commandes et la sortie du datalake.
Sélectionnez le bouton ▷ Démarrer et lancez la tâche de streaming maintenant. Attendez d’être informé que la tâche de diffusion en continu a démarré avec succès.
Rouvrez le volet Cloud Shell et réexécutez la commande suivante pour soumettre 100 commandes supplémentaires :
node ~/dp-203/Allfiles/labs/18/orderclient

Une fois l’application de commande terminée, réduisez le volet Cloud Shell. Basculez ensuite vers l’onglet du navigateur Synapse Studio et, dans la page Données, sous l’onglet Lié, développez Azure Data Lake Storage Gen2 > synapsexxxxxxx (principal - datalakexxxxxxx) et sélectionnez le conteneur de fichiers (principal).
Si le conteneur de fichiers est vide, attendez environ une minute, puis utilisez le ↻ Actualiser pour actualiser la vue. À terme, un dossier nommé pour l’année en cours devrait s’afficher. Celui-ci contient à son tour des dossiers pour le mois et le jour.
Sélectionnez le dossier de l’année et, dans le menu Nouveau script SQL, sélectionnez Sélectionner les 100 premières lignes. Définissez ensuite le type de fichier sur Format texte et appliquez les paramètres.
Dans le volet de requête qui s’ouvre, modifiez la requête pour ajouter un HEADER_ROW = TRUE comme indiqué ici :
SELECT
TOP 100 *
FROM
OPENROWSET(
BULK 'https://datalakexxxxxxx.dfs.core.windows.net/files/2023/**',
FORMAT = 'CSV',
PARSER_VERSION = '2.0',
HEADER_ROW = TRUE
) AS [result]

Utilisez le bouton ▷ Exécuter pour exécuter la requête SQL et afficher les résultats, qui indiquent la quantité de chaque produit commandé par périodes de cinq secondes.
Revenez à l’onglet du navigateur contenant le portail Azure et utilisez le 🗆 bouton Arrêter pour arrêter le travail Stream Analytique et attendre la notification indiquant que le travail Stream Analytique s’est arrêté avec succès.

Supprimer des ressources Azure

Si vous avez terminé d’explorer Azure Stream Analytique, vous devez supprimer les ressources que vous avez créées pour éviter des coûts Azure inutiles.
Fermez l’onglet du navigateur Azure Synapse Studio et revenez au portail Azure.
Sur le portail Azure, dans la page Accueil, sélectionnez Groupes de ressources.
Sélectionnez le groupe de ressources dp203-xxxxxxx contenant vos ressources Azure Synapse, Event Hubs et Stream Analytique (et non le groupe de ressources managé).
En haut de la page Vue d’ensemble de votre groupe de ressources, sélectionnez Supprimer le groupe de ressources.
Entrez le nom du groupe de ressources dp203-xxxxxxx pour confirmer que vous souhaitez le supprimer, puis sélectionnez Supprimer.
Au bout de quelques minutes, les ressources créées dans cet exercice seront supprimées.
Want to print your doc?
This is not the way.
Try clicking the ⋯ next to your doc name or using a keyboard shortcut (
CtrlP
) instead.