Identifiez les principales fonctionnalités d’Apache Spark. Configurez un pool Spark dans Azure Synapse Analytics. Exécutez du code pour charger, analyser et visualiser des données dans un notebook Spark. Introduction
Apache Spark est un framework de traitement parallèle open source pour le traitement et l’analytique à grande échelle des données.
Extrêmement populaire dans les scénarios de traitement de « Big Data »
Disponible dans plusieurs implémentations de plateforme, notamment :
Découvrir Apache Spark
Comment Spark fonctionne
Pools Spark dans Azure Synapse Analytics
Un cluster est implémenté en tant que pool Spark, qui fournit un runtime pour les opérations Spark. Création dans ou : nombre de nœud dans le pool, fixe ou mis à l’échelle automatiquement version du runtime Spark à utiliser dans le pool, pour Python, Java et autres. Azure Synapse Analytics sont serverless : ils démarrent à la demande et s’arrêtent lorsqu’ils sont inactifs.
Utiliser Spark dans Azure Synapse Analytics
Nombreux types d’application :
code dans des scripts Python ou Scala code Java compilé en tant qu’archive Java (JAR) 2 usages :
Travaux de traitement par lots ou en streaming pour ingérer, nettoyer et transformer des données, souvent exécutées dans le cadre d’un pipeline automatisé Sessions d’analytique interactive pour explorer, analyser et visualiser des données Bien qu’ils soient généralement utilisés de manière interactive, les notebooks peuvent être inclus dans des pipelines automatisés et s’exécuter en tant que script sans assistance.
Exécution de code Spark dans des notebooks
combiner du code avec des notes Markdown s’apparente à l’expérience des notebooks Jupyter Accès aux données à partir d’un pool Synapse Spark
Dans l’espace de travail : lac de données basé sur le compte de stockage principal pool SQL dédié ou serverless Une base de données Azure SQL ou SQL Server (avec le connecteur pour SQL Server) En tant que service lié : un lac de données basé sur le stockage une base de données analytique Azure Cosmos DB (et configurée avec Azure Synapse Link pour Cosmos DB) une base de données Azure Data Explorer Kusto un metastore Hive externe Utilisation la plus courante : utiliser des données dans un lac de données, où vous pouvez lire et écrire des fichiers dans plusieurs formats couramment utilisés, notamment du texte délimité, Parquet, Avro et autres
Analyser les données avec Spark
Avec Spark, plusieurs langages possibles PySpark par défaut, version optimisée Spark de Python et couramment utilisée par les scientifiques et analystes données en raison de sa forte prise en charge de la manipulation et de la visualisation des données. Scala : langage dérivé de Java qui peut être utilisé de manière interactive. SQL : variante du langage SQL couramment utilisé, inclus dans la bibliothèque Spark SQL pour travailler avec des structures de données relationnelles. Exploration de données avec des dataframes
Structure de données appelée Jeu de données distribué résilient (RDD, resilient distributed dataset). Mais la structure de données la plus couramment utilisée pour utiliser des données structurées dans Spark est le dataframe.
Le code en Python :
%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv', format='csv', header=True)
display(df.limit(10))
La ligne %%pyspark au début est appelée magic.
Le même code en Scala :
%%spark
val df = spark.read.format("csv").option("header", "true").load("abfss://container@store.dfs.core.windows.net/products.csv")
display(df.limit(10))
Chargement des données dans un dataframe
%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Spécification d’un schéma de dataframe
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Filtrage et regroupement des dataframes
Utiliser les méthodes de la classe Dataframe pour filtrer, trier, regrouper et manipuler les données qu’elle contient :
pricelist_df = df.select("ProductID", "ListPrice")
Autre syntaxe de sélection de colonne :
pricelist_df = df["ProductID", "ListPrice"]
Chainer les méthodes :
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Regroupement :
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Utilisation d’expressions SQL dans Spark
L’API Dataframe fait partie d’une bibliothèque Spark appelée Spark SQL, qui permet aux analystes de données d’utiliser des expressions SQL pour interroger et manipuler des données
Création d’objets de base de données dans le catalogue Spark
Le catalogue Spark est un metastore pour les objets de données relationnelles tels que les vues et les tables. Le runtime Spark peut utiliser le catalogue pour intégrer de façon fluide le code écrit dans n’importe quel langage pris en charge par Spark avec des expressions SQL qui peuvent être plus naturelles pour certains analystes de données ou développeurs.
Créer une vue temporaire est la méthode la plus simple pour rendre les données d’un dataframe disponibles pour pouvoir les interroger dans le catalogue Spark
df.createOrReplaceTempView("products")
spark.catalog.createTable : crée une table vide. Sont des structures de métadonnées qui stockent leurs données sous-jacentes dans l’emplacement de stockage associé au catalogue. La suppression d’une table supprime également ses données sous-jacentes.
saveAsTable : enregistrer un dataframe en tant que table.
spark.catalog.createExternalTable : crée une table externe.
Utilisation de l’API Spark SQL pour interroger des données
Interroge la table products en tant que dataframe.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Utilisation du code SQL
Dans un notebook, utiliser la commande magic %%sql pour exécuter le code SQL qui interroge les objets du catalogue :
%%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category