Exploiter le pool Spark d’Azure Synapse Analytics

Nous avons vu dans cet article d’introduction la dualité de Synapse Analytics entre SQL et Spark. Nous développons ici les aspects liés à Apache Spark, framework Open Source de calcul distribué, et recommandé pour le traitement de la Big Data (volume mais aussi vélocité et variété).

Apache Spark pool

L’exploration des données dans un notebook va requérir l’utilisation d’un pool Apache Spark pour exécuter les commandes, comme l’annonce le message d’alerte ci-dessous.

Please select a Spark pool to attach before running cell!

Nous naviguons dans le menu Manage pour créer un nouveau pool ou retrouver les pools existants.

Un pool correspond à un cluster (grappe) de plusieurs nodes (nœuds) pour lequel nous définissons le nombre de nœuds et leur “taille” (size) correspondant aux propriétés RAM, CPU des machines virtuelles.

La facturation de ce service dépend bien évidemment de ces deux paramètres, qui pourront être modifiés une fois le pool créé.

Je vous recommande de n’activer que l’autoscaling que si les charges de travail sont très variables, dans un sens comme dans l’autre. Débutez sur un petit nombre de nœuds que vous augmenterez au fur et à mesure, en vous assurant que le code écrit est bien en capacité de se distribuer sur les différents nœuds.

Dans le paramétrage additionnel, nous allons trouver la version principale du framework Spark et les différentes versions des langages associés.

Sur cette boîte de dialogues, nous retrouvons aussi la configuration de pause automatique, enclenchée par défaut et paramétrée pour un arrêt au bout d’une inactivité (aucun job en exécution) de 15 minutes.

Langages disponibles

Dans le notebook, nous pourrons utiliser plusieurs langages.

Il est même possible de changer de langage selon les cellules du notebook, à l’aide des commandes magiques comme %%sql, %%csharp, etc. mais cela ne doit pas être une pratique à pérenniser dans le cadre de travaux de production.

Au lancement de la première commande Spark, une nouvelle session débute et cela peut prendre quelques minutes.

La librairie mssparkutils

Un des premiers besoins va être de communiquer avec les services du workspace comme les bases SQL et les systèmes de fichiers de type data lake. Pour accéder à des services liés comme un data lake, nous allons nous appuyer sur une librairie pré-installée : mssparkutils pour Microsoft Spark Utilities. La documentation officielle est disponible sur ce lien. Cet outil est très proche de dbutils de Databricks.

Nous allons nous concentrer en particulier sur les commandes liées au file system (fs) : list, copy, move, rm, put…

Nous commençons par “monter” le data lake par défaut afin d’accéder aux fichiers. Nous utilisons pour cela le modèle de code ci-dessous.

mssparkutils.fs.mount( 
    "abfss://<containername>@<accountname>.dfs.core.windows.net", 
    "</mountname>", 
    {"linkedService":<linkedServiceName>} 
)

A noter que le nom attribué au point de montage doit débuter par un caractère /.

La commande mounts() permet de vérifier les points de montage existants et si besoin, unmount(<mountname>) s’utiliserait pour “démonter” ce point.

Attention, si nous utilisons la syntaxe seule du point de montage pour faire une lecture, nous obtenons une erreur de type PathNotFound. En effet, une première différence apparait ici pour les utilisateurs habitués au dbutils de Databricks.

Nous devons constituer un chemin débutant par le mot clé synfs, suivant de l’identifiant du job, qui se trouve être une variable d’environnement.

Prenez soin de variabiliser ces éléments car le job id changera à chaque démarrage d’une nouvelle session !

Les points de montage ne sont malheureusement pas pérennes ! A chaque nouvelle session, il sera nécessaire de relance la commande mount.

Le résultat de la commande fs est une liste et les différentes propriétés des fichiers peuvent être isolées par du code :

for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Les commandes Python de la librairie os peuvent également utiliser ce point de montage. Attention, la syntaxe du path change légèrement avec la disparition du symbole : et l’ajout d’un / avant synfs.

Réalisons maintenant quelques tâches courantes.

Lister et compter tous les fichiers des sous-répertoires

Une simple recherche Google nous montre que la première problématique avec cette librairie mssparkutils est la recherche récursive dans les répertoires !

Heureusement, la recherche nous amène rapidement à découvrir cet excellent repository GitHub : Recursively listing Data Lake files with `display` implemented · GitHub (donnez-lui une étoile :)).

L’exécution imbriquée des deux fonctions deep_ls et convertfiles2df permet d’obtenir un dataframe Pandas avec la liste des fichiers, leur path et la taille en octets.

Nous affichons une synthèse à l’aide du print ci-dessous.

print(f"{df.shape[0]} files for a total size of {df['size'].sum()} bytes")
Afficher les premières lignes d’un fichier .csv ou .parquet

La commande mssparkutils.fs.head() répond à ce besoin.

Toutefois, il est difficile d’interpréter les sauts de ligne et de faire par exemple la différence entre les noms de colonne et les valeurs.

Nous allons nous faire aider ici par la génération automatique de syntaxe, en clic droit sur un nom de fichier ou de dossier.

Nous savons que la première ligne constitue les entêtes du fichier.

La commande df.printSchema() va nous montrer les types de colonnes. Venant d’un fichier .csv, il n’est pas étonnant de ne retrouver que des chaines de caractères (string).

En appliquant l’option inferSchema = True, nous pouvons demander au moteur de déterminer le type le plus probable de chaque colonne.

Il est important de rappeler que le format de fichier .parquet conserve les types de colonnes, nous privilégierons donc ce format pour le stockage de données dite “raffinées”.

Utiliser des packages supplémentaires

Les packages disponibles par défaut ne sont généralement pas suffisants pour traiter tous les aspects d’un projet de Data Engineering ou de Data Science. Nous allons donc charger de nouvelles librairies au moyen d’un fichier local requirements.txt.

Le fichier liste tout simplement les packages, avec, idéalement, leur numéro de version.

great-expectations==0.15.44
xgboost==1.7.3

Par défaut, l’installation de packages pour une session n’est pas activée.

Difficile de réaliser une vérification de l’installation autrement qu’en réalisant une commande d’import du package voulu.

Convertir le contenu d’une table SQL en un dataframe

Nous pouvons établir une connexion JDBC à toute base de données mais ici, nous allons de nouveau profiter, pour un scénario en lecture seule, générer le code Spark dans un notebook.

La méthode spark.read.synapsesql() est mise à profit pour créer un Spark Dataframe en mémoire. Attention, il s’agit d’une commande Scala.

Cette ressource de formation donne également un exemple d’écriture d’un dataframe vers la base de données.

Pour utiliser au mieux Spark et l’API pyspark, je vous recommande la certification Apache Spark developer associate, décrite dans cet article.

Voici un schéma récapitulatif des commandes disponibles avec la librairie MSSparkUtilities.

Découvrir toutes les fonctionnalités d’Azure Synapse Analytics

Azure Synapse Analytics (ASA) a remplacé depuis quelques temps Azure SQL Data Warehouse. Mais au delà d’une base SQL de type MPP (“massive parallel processing“), nous avons maintenant accès à des fonctionnalités offrant une vision “bout en bout” des projets data.

Une fois la ressource Azure provisionnée, nous pouvons lancer le Synapse Studio, dans une fenêtre de navigateur Web.

Le studio présente un menu latéral qui permettra de naviguer entre les différents outils composant Azure Synapse Analytics.

Dans ce premier article introductif, nous allons nous focaliser sur les trois menus Data, Integrate et Develop. Nous traiterons par la suite les aspects de Management et de Monitoring. Nous terminerons cette série sur les bonnes pratiques collaboratives : versioning et intégration / déploiement continus.

Data: le stockage

Si les bases de données (BDD) sont présentes dans notre quotidien depuis plusieurs décennies, le Data Lake (qui reste fondamentalement un système de fichiers…) reste plus récent mais est porté par la dynamique autour des formats de fichiers optimisés pour l’analytique (Parquet, Delta, Iceberg…) et autorisant les transactions (insert, update, delete), opérations qui restaient jusque-là la prérogative des BDD.

Ces deux approches vont se retrouver au sein des services internes à ASA ou en lien (“linked“).

Workspace

De manière “intégrée” dans l’espace de travail (workspace) Synapse, nous retrouvons les bases de données SQL MPP (ex SQL DWH).

Tous les objets classiques d’une BDD sont disponibles (tables, vues, procédures stockées…) et nous ferons un focus sur la notion de tables externes.

La ressource SQL dite “dédiée” apparaît également dans le groupe de ressources Azure. C’est une ressource dont la performance et l’espace de stockage disponible dépendront du niveau choisi (entre DW100c et DW30000c) et qui pourra être arrêtée ou démarrée. Sa facturation sera fonction de ces éléments.

Les tables externes : un fichier “vu” en SQL

Grâce aux tables externes, nous allons pouvoir interroger un système de fichier (Blob Storage, Data Lake) à l’aide d’instructions SQL. Le langage SQL étant si répandu dans le monde professionnel de la data, il s’agit là d’une fonctionnalité particulièrement intéressante.

Création d’une table externe (script automatiquement généré)

Un article dédié sera nécessaire pour décrire les cas d’usage des tables externes, différencier tables externes et vues, ainsi que noter les différences entre les deux pools SQL (dédié et serverless).

La nouveauté (2022) : le lake database

Nous avons vu qu’il est désormais facile d’interroger un fichier comme si l’on utilisait une table (plutôt une vue non matérialisée) d’une base de données. Une difficulté va tout de même se présenter lorsqu’il s’agira de bien appréhender le dictionnaire des données (tous les champs disponibles, associés à leur type respectif) ainsi que les relations entre les différents fichiers (au sens des cardinalités : un à plusieurs, plusieurs à plusieurs, etc.).

Le Lake Database vient proposer une solution à cette problématique, en proposant de mieux visualiser toutes les métadonnées, tout en conservant un stockage de type fichier. De manière sous-jacente, ce sont les ressources SQL Pool serverless ou Spark Pool qui seront utilisées.

Nous créons pour cela une base de type lake, en définissant le container de stockage des données (input folder) ainsi que le format à utiliser (.csv ou .parquet, Delta à venir).

Nous pouvons maintenant utiliser l’interface visuelle pour définir des tables:

  • personnalisées (custom), c’est-à-dire en renseignant chaque champ manuellement
  • depuis des modèles (template) proposés
  • depuis des fichiers du Data Lake

Visitons la galerie de modèles, ceux-ci couvrent de nombreux domaines fonctionnels.

Par domaine, nous disposons d’un grand nombre de schémas de tables, pour lesquelles les clés primaires et étrangères sont déjà déterminées. Nous obtenons donc déjà des relations dans le modèle en cours de création.

Les tables (vides pour l’instant) sont maintenant visibles dans le menu latéral de navigation.

Il sera alors nécessaire de remplir les tables, soit par des instructions SQL INSERT, soit au moyen de l’outil de mapping présenté dans la documentation officielle.

Un bon cas d’usage consiste à mapper les champs du Dataverse utilisé par les produits de la Power Platform. En effet, nous découvrirons ci-dessous la fonctionnalité Synapse Link qui permettra une synchronisation des données en quasi temps réel.

Linked

Les ressources liées sont par définition extérieures à la ressource Synapse Analytics. On utilisera ici principalement un ou plusieurs Data Lake Azure (de génération 2 uniquement) dont les différents file systems seront visibles.

Les notions bronze, silver et god correspondent à l’approche dite “medallion”.

La liste des services externes est présentée ci-dessous.

Il n’est étonnant de retrouver le stockage Blob car le Data Lake de génération 2 n’est autre qu’un Blob Storage augmenté par un “hierarchical namespace“. Nous retrouvons également deux APIs de la base Cosmos DB : l’API native pour le NoSQL et l’API pour MongoDB. Nous écartons donc les APIs PostgreSQL, Cassandra ou Gremlin.

Les integration datasets listeront toutes les sources ou destinations (sink) des pipelines de données qui seront abordés dans le prochain paragraphe. Ainsi, pour une source de type file system, nous disposerons des formats de fichiers suivants:

Integrate : les pipelines “no code”

L’intégration de données consiste à faire entrer dans un environnement de destination des données issues d’une ou plusieurs sources, et ce, à des intervalles de temps réguliers ou sur la détection d’événements (par l’exemple, l’arrivée de fichiers dans un data lake déclenche leur intégration en base de données).

Pipeline

Nous voici dans un univers bien connu des utilisateur.ices d’Azure Data Factory, même si la parité des fonctions n’est pas encore totalement établie (janvier 2023).

Un pipeline est composé d’une ou plusieurs activités, s’exécutant en parallèle ou bien s’enchaînant selon des règles définies.

L’activité de base est la copie de données (copy data) et nous retrouvons également trois activités permettant d’ordonnancer d’autres éléments propres à ASA : les jobs Spark et les procédures stockées SQL.

Synapse Link Connection

Azure Synapse Link est un outil permettant un transfert rapide de données vers une base SQL dédiée (SQL dedicated pool) depuis des sources comme :

  • Azure SQL DB
  • SQL Server 2022 (on premises)
  • Azure Cosmos DB (APIs NoSQL, Cosmos DB & Gremlin)
  • Dataverse

L’objectif est ici de réaliser, “near real time“, des travaux analytiques depuis les données enregistrées dans des bases dont l’usage est plutôt transactionnel. Il n’est plus nécessaire ici de déployer des pipelines ou jobs d’intégration.

Copy Data Tool

Il s’agit sans doute de la manière la plus simple pour copier des données d’une source à une destination (sink).

Ici, il n’est pas envisageable d’ajouter des transformations plus poussées lors de la copie. Nous retrouvons toutefois les propriétés de déclenchement :

  • unique
  • planifié
  • sur des fenêtres glissantes (tumbling windows)

Develop : l’approche “full code”

Les différents scripts compatibles avec les éléments de ASA sont présentés ci-dessous.

Les data flows pourraient sembler être un intrus dans cette liste car c’est bien une interface de type “no code” qui va permettre de les développer. La justification tient sans doute dans le fait que les data flows sont convertis en langage Spark lors de leur exécution.

De même, les jobs Apache Spark fonctionnent à partir de fichiers .JAR qui ne seront pas développés directement dans le studio ASA.

Les scripts SQL travaillent sur les bases de données de type SQL Pools (serverless ou dedicated).

Les scripts KQL (KustoQL) travaillent uniquement sur les bases de données Azure Data Explorer.

Les notebooks s’exécutent grâce à un Pool Spark et peuvent interagir avec les données, qu’elles soient sous forme de fichiers ou intégrées dans les bases de données de type SQL (et non KQL), à l’aide d’un connecteur JDBC.

Nous devons tout d’abord instancier un pool Apache Spark, tout comme nous avons dû créer un pool SQL dédié. Seul le pool SQL serverless est présent par défaut dans le workspace ASA.

Le pool est une grappe (cluster) de plusieurs machines virtuelles (nodes), puisque Spark est un framework distribué.

La version de Spark est modifiable, elle conditionne les versions des langages sous-jacents (Python, Scala, Java, .NET, R…)

La propriété “automatic pausing” est intéressante afin de réduire les coûts d’utilisation mais nous verrons, dans un prochain article, qu’elle implique une gestion fine des sessions Spark.

Premier exemple “de bout en bout”

Prenons l’exemple d’un fichier .csv “brut” qui a été uploadé dans le container Bronze du Data Lake.

Notre premier objectif sera de stoker le fichier dans un format optimisé comme Parquet dans le container Silver. Nous avons pour cela plusieurs possibilités.

Lire avec OPENROWSET

Un clic droit sur un fichier ou un répertoire propose les options suivantes.

Voici le code automatiquement généré par un “Select TOP 100 rows”.

Il est nécessaire d’ajouter l’option HEADER_ROW = TRUE afin de considérer la première ligne du fichier comme l’en-tête. Les autres bulk options sont données dans cette documentation officielle.

Pipeline de copie de CSV vers Parquet

Il s’agit ici de réaliser une copie conforme de la source mais nous allons utiliser pour cela l’outil “copy data tool” car l’objectif est de changer de format de fichier. Nous définissons donc une activité de copie dans un pipeline de type Azure Data Factory.

Le mapping permet de définir le type de données contenues dans chaque champ. C’est en effet une propriété des fichiers .parquet que de pouvoir stocker cette information.

Pipeline de copie du fichier Parquet dans une table

Posons l’objectif de matérialiser les données au sein de la BDD. Nous allons donc utiliser le pool SQL dédié. En effet, dans sa version serverless, nous ne faisons que créer des métadonnées facilitant l’interprétation des fichiers comme des vues.

Nous faisons de nouveau appel à une activité de copie dans un pipeline de type Azure Data Factory. Nous allons laisser le soin à l’activité de copie de créer directement le schéma de table attendu. Pour cela, nous choisissons le mode “Bulk insert” et l’option “Auto create table”.

Nous pouvons maintenant visualiser la table de destination dans le pool dédié et l’interroger au moyen du langage SQL.

En guise de conclusion, et avant de rentrer plus en détails dans certains aspects de cette ressource Azure, nous avons montré ici la polyvalence de l’outil pour les scénarios analytiques :

  • à partir de fichiers de données ou de tables contenues dans une base de données
  • avec la simplicité de la syntaxe SQL directement sur le contenu des fichiers
  • dans une démarche no code ou full code, selon les compétences des utilisateurs.

Réviser la certification Associate Developer for Apache Spark

Si vous parcourez régulièrement ce blog, vous avez dû tomber sur un article traitant de Azure Databricks. Au cœur de ce produit, tourne le moteur Open Source Apache Spark. Ce moteur a révolutionné le monde de la Big Data, par sa capacité à réaliser des calculs distribués, en mémoire, sur des clusters de machines (souvent virtuelles et bien souvent dans le cloud).

Que l’on soit Data Engineer ou Data Scientist, cet outil est aujourd’hui (2022) incontournable et une certification apporte une reconnaissance de votre expertise dans le domaine. La première certification proposée par Databricks permet d’obtenir le titre Certified Associate Developer for Apache Spark. Cette certification demande de répondre en 2h à 60 questions fermées, et d’obtenir un minimum de 70% de bonnes réponses. Voici un exemple de questions posées à l’examen, fourni par l’éditeur.

Environ un quart des questions traitent des mécanismes techniques du moteur (je vous recommande cette vidéo de Advancing Analytics) et le reste sont des questions de code. Pour celles-ci, nous aurons à disposition la documentation officielle de Spark au travers de cette page web. Entrainez-vous à faire des recherches dans cette page, vous aurez besoin d’aller vite le jour de l’examen !

Attention, la recherche ne semble pas possible dans l’interface de l’examen !

EDIT suite au passage de la certification avec Webassessor :

Au cours de l’examen, l’écran sera séparé en deux : la question à gauche, la documentation à droite. Mauvaise surprise (sous Edge uniquement ?) : impossible de faire une recherche dans la documentation, pas de raccourci ctrl+F non plus. Mon astuce est donc d’utiliser la zone de prise de notes, noter les noms de fonctions sur lesquelles vous avez un doute, puis parcourir la documentation en suivant l’ordre alphabétique.

Je vous encourage à tester également la Spark UI au travers de ce simulateur pour bien comprendre l’exécution concrète des différentes syntaxes de code et connaître les grandes lignes de cette interface. Ce sera également une bonne façon de hiérarchiser les différents niveaux d’exécution job / stage / task.

Deux éléments sont importants pour bien appréhender les questions de code. Tout d’abord, il faudra choisir la réponse “correcte” ou bien la seule “incorrecte” par une liste de cinq propositions. Prenez soin de bien déterminer à quel type de question vous êtes en train de répondre. Ensuite, les différentes modalités de réponse “joueront sur les mots” c’est-à-dire que des variations de code vous seront proposées mais 4 syntaxes sur 5 ne seront pas correctes (dans le cas d’une question de type “correct” !). Il est donc important de bien avoir en tête les confusions classiques de syntaxe.

Nous allons décrire ci-dessous plusieurs hésitations possibles entre des syntaxes PySpark (certaines syntaxes n’existant d’ailleurs qu’un Python, souvent en lien avec les DataFrames Pandas).

Bonne chance à vous pour l’examen !

show() versus display()

La fonction show() permet d’afficher un résultat (c’est donc une action au sens Spark).

df.show(truncate=False)

La fonction display() n’existe que dans Databrick, version commerciale construite autour de Spark.

withColumn() versus withColumnRenamed()

withColumn() permet d’ajouter une nouvelle colonne dans un Spark DataFrame. Notons ici l’emploi de la fonction col() pour appeler les colonnes du DataFrame dans la formule de la nouvelle colonne. Pour une constante, nous utiliserons la fonction lit().

 df.WithColumn("nouveau nom", col("ancien nom") )

Les fonctions lit() et col() auront été préalablement importées.

from pyspark.sql.functions import col, lit

withColumnRenamed() permet de renommer une colonne. C’est donc la syntaxe la plus adaptée par rapport au workaround proposé ci-dessous.

df.WithColumnRenamed("ancien nom", "nouveau nom")

Le piège est donc ici dans l’ordre des paramètres et en particulier dans la position du nom de la colonne créée.

printSchema() versus describe()

Après la lecture d’une source de données, nous souhaitons vérifier le schéma du DataFrame. La fonction printSchema() est la bonne fonction pour réaliser cet objectif.

describe() versus summary()

Les fonctions describe() et summary() permettent d’obtenir les statistiques descriptives sur un DataFrame. La fonction summary() ajoute les 1e, 2e et 3e quartiles.

La différence se situe sur le rôle du paramètre dans chacune des deux fonctions.

Variantes de spark.read

Nous allons voir ici deux écritures possibles pour un même résultat.

Syntaxe 1 :

df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)

Syntaxe 2 :

df = spark.read \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.csv(file_location)

Syntaxe 3 :

df = spark.read \
.load(file_location, format="csv", inferSchema=infer_schema, header=first_row_is_header, sep=delimiter)

En compléments, allez voir les options de sauvegarde d’un Spark DataFrame.

collect() versus select()

Il faut ici bien maîtriser les concepts de transformations et d’actions, propres au moteur Spark.

La fonction collect() est une action qui retourne tous les éléments du Spark DataFrame sous forme de tableau (array) au nœud Driver.

La fonction select() est une transformation qui projette, c’est-à-dire conserve une colonne ou liste de colonnes. Comme il s’agit d’une transformation, il est nécessaire d’y ajouter une action (par exemple show) pour forcer l’évaluation du résultat.

La liste des colonnes donnée en paramètre de select() peut s’écrire de deux façons :

df.select("Pregnancies", "Outcome").show()
df.select(["Pregnancies", "Outcome"]).show()

La première version est plus “légère” mais la seconde s’adapte mieux à un passage de variable de type liste déjà définie.

Attention, la fonction drop() n’autorise pas l’usage d’un objet de type liste.

distinct() versus unique()

La fonction distinct() renvoie les lignes distinctes, sur la base des colonnes du Spark DataFrame ou de la sélection faite au préalable.

df.select('column1').distinct().collect()

La fonction unique() ne s’applique qu’à un Pandas DataFrame !

join() versus merge()

Il s’agit ici de fusionner deux DataFrames. Les variantes d’écriture se font au niveau de l’expression de la jointure.

df_inner = df1.join(df2, on=['id'], how='inner')
df_inner = df1.join(df2, df1.id == df2.id, how='inner’)

Le troisième paramètre “how” définissant le type de jointure est facultatif, la valeur par défaut étant alors “inner” (jointure interne).

La fonction merge() ne s’applique qu’à un Pandas DataFrame !

union() versus unionByName() versus concat()

La fonction union() ajoute les lignes d’un DataFrame, donné en paramètre, à un autre DataFrame. L’ajout se fait alors sur la base des indices des colonnes.

La fonction unionByName() s’utilise pour ajouter des DataFrames sur la base des noms de colonnes et non de leur position.

La fonction Spark concat() n’a rien à voir avec ce type d’opération puisqu’elle réalise une concaténation de colonnes (astuce utile pour l’optimisation de la mémoire !).

df.select(concat(df.s, df.d).alias('sd')).collect()

Il ne faut pas confondre avec la fonction concat() appliquée sur des Pandas DataFrames, qui réalise quant à elle une fusion de ces jeux de données.

agg() versus groupBy()

Utilisée seule, la fonction agg() agrège un Spark DataFrame sans colonne de regroupement.

df.agg({'Glucose' : 'max'}).collect()

Il s’agit toutefois d’un “sucre syntaxique” de l’expression ci-dessous.

df.groupBy().agg({'Glucose' : 'max'}).collect()

La fonction groupBy() réalise un groupe sur la colonne ou liste de colonne spécifiée puis évalue les agrégats proposés ensuite.

df.groupBy('Outcome').max('Glucose').show()
df.groupBy('Outcome').agg({'Glucose' : 'max', 'Insulin' : 'max'}).show()
df.groupBy(['Outcome', 'Pregnancies']).agg({'Glucose' : 'max', 'Insulin' : 'max'}).show()

cast() versus astype()

La fonction cast() permet de modifier le type d’une colonne. Elle accepte plusieurs variantes de syntaxe, dans l’expression d’un type attendu.

from pyspark.sql.types import IntegerType

df.withColumn("age",df.age.cast(IntegerType()))
df.withColumn("age",df.age.cast('int'))
df.withColumn("age",df.age.cast('integer'))

La fonction astype() est un alias de la fonction cast().

filter() versus where()

La fonction where() est un alias de la fonction filter(). Différentes syntaxes sont possibles pour exprimer la condition booléenne.

df.filter(df.Pregnancies <= 10).show()
df.filter("Pregnancies <= 10").show()
from operator import ge, le, gt, lt

df.filter(lt(df.Pregnancies, lit(10))).show()

contains() versus isin()

Voici deux exemples illustrant les utilisations de ces deux fonctions, elles-mêmes utilisées dans des logiques de filtre sur un DataFrame.

df.filter(df.Pregnancies.contains('0')).collect()
Attention, nous obtenons les valeurs numériques 0 et 10 !
df[df.Pregnancies.isin([1, 2, 3])].collect()

La différence revient donc à chercher un contenu dans une colonne (contains()) ou à vérifier que le contenu d’une colonne appartient à une liste (isin()).

A noter, la fonction array_contains() permet d’étendre la logique de la fonction contains() sur les valeurs d’un tableau, contenu dans une colonne d’un Spark DataFrame.

filtered_df=df.where(array_contains(col("SparkAPI"),"pyspark"))

size() versus count()

La fonction size() renvoie le nombre d’éléments d’un tableau (array) alors que la fonction count() renvoie le nombre de ligne d’un Spark DataFrame.

Null versus NaN

La valeur Null correspond à une donnée manquante, c’est-à-dire une donnée qui n’existe pas.

df.where(col("a").isNull())
df.where(col("a").isNotNull())

La valeur NaN, pour Not a Number, est le résultat d’une opération mathématique dont le résultat ne fait pas sens (par exemple, une division par 0).

from pyspark.sql.functions import isnan


df.where(isnan(col("a")))

na.drop() versus dropna()

Ces deux fonctions sont des alias l’une de l’autre.

Elles disposent d’un paramètre thres qui permet la suppression des lignes ayant un nombre de valeurs non-nulles inférieures à ce seuil.

dropDuplicates() versus drop_duplicates()

Ces deux fonctions sont des alias l’une de l’autre. Elles permettent de retirer les valeurs en doublon dans un DataFrame.

take() vs head() vs first() / tail() vs limit()

Visualiser les données d’un Spark DataFrame est fondamental ! Mais nous ne pouvons pas tout afficher, il faut alors réduire le nombre de lignes. Il existe pour cela de nombreuses fonctions et quelques subtilités entre toutes celles-ci.

Les fonctions take() et head() sont des alias, la seconde se donnant une syntaxe identique à celle utilisée pour les Pandas DataFrames. Elles prennent en argument le nombre de lignes (rows) à renvoyer sous forme d’une liste.

C’est le type d’objet renvoyé qui diffère dans la fonction limit(), celle-ci produisant un Spark DataFrame.

Les fonctions first() et tail() renvoient respectivement la première et la dernière ligne du DataFrame. Attention, la seconde attend un nombre de lignes en paramètre.

sort() versus orderBy()

La fonction orderBy() est un alias de la fonction sort().

Toutefois, il semble que celle-ci engendre la collecte de toutes les données sur un seul exécuteur, ce qui a l’avantage de fiabiliser le tri mais engendre un temps de calcul plus long, voire un risque de saturation mémoire.

Voici quelques exemples de syntaxe alternatives avec la fonction sort(). A noter que le sens de tri par défaut est ascendant.

df.sort(asc("value"))

df.sort(asc(col("value")))

df.sort(df.value)

df.sort(df.value.asc())

cache() versus persist()

La fonction cache() stocke un maximum de partitions possibles du DataFrame en mémoire et le reste sur disque. Ceci correspond au niveau de stockage dit MEMORY_AND_DISK, et ce comportement n’est pas modifiable.

A l’inverse, et même si son comportement par défaut est similaire, la fonction persist() permet d’utiliser les autres niveaux de stockage que sont MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, etc.

repartition() versus coalesce()

< A VENIR >

repartition(1) versus coalesce(1)

< A VENIR >

map() versus foreach()

< A VENIR >

from_timestamp versus unix_timestamp()

< A VENIR >

explode() versus split()

< A VENIR >