Exploiter le pool Spark d’Azure Synapse Analytics

Nous avons vu dans cet article d’introduction la dualité de Synapse Analytics entre SQL et Spark. Nous développons ici les aspects liés à Apache Spark, framework Open Source de calcul distribué, et recommandé pour le traitement de la Big Data (volume mais aussi vélocité et variété).

Apache Spark pool

L’exploration des données dans un notebook va requérir l’utilisation d’un pool Apache Spark pour exécuter les commandes, comme l’annonce le message d’alerte ci-dessous.

Please select a Spark pool to attach before running cell!

Nous naviguons dans le menu Manage pour créer un nouveau pool ou retrouver les pools existants.

Un pool correspond à un cluster (grappe) de plusieurs nodes (nœuds) pour lequel nous définissons le nombre de nœuds et leur “taille” (size) correspondant aux propriétés RAM, CPU des machines virtuelles.

La facturation de ce service dépend bien évidemment de ces deux paramètres, qui pourront être modifiés une fois le pool créé.

Je vous recommande de n’activer que l’autoscaling que si les charges de travail sont très variables, dans un sens comme dans l’autre. Débutez sur un petit nombre de nœuds que vous augmenterez au fur et à mesure, en vous assurant que le code écrit est bien en capacité de se distribuer sur les différents nœuds.

Dans le paramétrage additionnel, nous allons trouver la version principale du framework Spark et les différentes versions des langages associés.

Sur cette boîte de dialogues, nous retrouvons aussi la configuration de pause automatique, enclenchée par défaut et paramétrée pour un arrêt au bout d’une inactivité (aucun job en exécution) de 15 minutes.

Langages disponibles

Dans le notebook, nous pourrons utiliser plusieurs langages.

Il est même possible de changer de langage selon les cellules du notebook, à l’aide des commandes magiques comme %%sql, %%csharp, etc. mais cela ne doit pas être une pratique à pérenniser dans le cadre de travaux de production.

Au lancement de la première commande Spark, une nouvelle session débute et cela peut prendre quelques minutes.

La librairie mssparkutils

Un des premiers besoins va être de communiquer avec les services du workspace comme les bases SQL et les systèmes de fichiers de type data lake. Pour accéder à des services liés comme un data lake, nous allons nous appuyer sur une librairie pré-installée : mssparkutils pour Microsoft Spark Utilities. La documentation officielle est disponible sur ce lien. Cet outil est très proche de dbutils de Databricks.

Nous allons nous concentrer en particulier sur les commandes liées au file system (fs) : list, copy, move, rm, put…

Nous commençons par “monter” le data lake par défaut afin d’accéder aux fichiers. Nous utilisons pour cela le modèle de code ci-dessous.

mssparkutils.fs.mount( 
    "abfss://<containername>@<accountname>.dfs.core.windows.net", 
    "</mountname>", 
    {"linkedService":<linkedServiceName>} 
)

A noter que le nom attribué au point de montage doit débuter par un caractère /.

La commande mounts() permet de vérifier les points de montage existants et si besoin, unmount(<mountname>) s’utiliserait pour “démonter” ce point.

Attention, si nous utilisons la syntaxe seule du point de montage pour faire une lecture, nous obtenons une erreur de type PathNotFound. En effet, une première différence apparait ici pour les utilisateurs habitués au dbutils de Databricks.

Nous devons constituer un chemin débutant par le mot clé synfs, suivant de l’identifiant du job, qui se trouve être une variable d’environnement.

Prenez soin de variabiliser ces éléments car le job id changera à chaque démarrage d’une nouvelle session !

Les points de montage ne sont malheureusement pas pérennes ! A chaque nouvelle session, il sera nécessaire de relance la commande mount.

Le résultat de la commande fs est une liste et les différentes propriétés des fichiers peuvent être isolées par du code :

for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Les commandes Python de la librairie os peuvent également utiliser ce point de montage. Attention, la syntaxe du path change légèrement avec la disparition du symbole : et l’ajout d’un / avant synfs.

Réalisons maintenant quelques tâches courantes.

Lister et compter tous les fichiers des sous-répertoires

Une simple recherche Google nous montre que la première problématique avec cette librairie mssparkutils est la recherche récursive dans les répertoires !

Heureusement, la recherche nous amène rapidement à découvrir cet excellent repository GitHub : Recursively listing Data Lake files with `display` implemented · GitHub (donnez-lui une étoile :)).

L’exécution imbriquée des deux fonctions deep_ls et convertfiles2df permet d’obtenir un dataframe Pandas avec la liste des fichiers, leur path et la taille en octets.

Nous affichons une synthèse à l’aide du print ci-dessous.

print(f"{df.shape[0]} files for a total size of {df['size'].sum()} bytes")
Afficher les premières lignes d’un fichier .csv ou .parquet

La commande mssparkutils.fs.head() répond à ce besoin.

Toutefois, il est difficile d’interpréter les sauts de ligne et de faire par exemple la différence entre les noms de colonne et les valeurs.

Nous allons nous faire aider ici par la génération automatique de syntaxe, en clic droit sur un nom de fichier ou de dossier.

Nous savons que la première ligne constitue les entêtes du fichier.

La commande df.printSchema() va nous montrer les types de colonnes. Venant d’un fichier .csv, il n’est pas étonnant de ne retrouver que des chaines de caractères (string).

En appliquant l’option inferSchema = True, nous pouvons demander au moteur de déterminer le type le plus probable de chaque colonne.

Il est important de rappeler que le format de fichier .parquet conserve les types de colonnes, nous privilégierons donc ce format pour le stockage de données dite “raffinées”.

Utiliser des packages supplémentaires

Les packages disponibles par défaut ne sont généralement pas suffisants pour traiter tous les aspects d’un projet de Data Engineering ou de Data Science. Nous allons donc charger de nouvelles librairies au moyen d’un fichier local requirements.txt.

Le fichier liste tout simplement les packages, avec, idéalement, leur numéro de version.

great-expectations==0.15.44
xgboost==1.7.3

Par défaut, l’installation de packages pour une session n’est pas activée.

Difficile de réaliser une vérification de l’installation autrement qu’en réalisant une commande d’import du package voulu.

Convertir le contenu d’une table SQL en un dataframe

Nous pouvons établir une connexion JDBC à toute base de données mais ici, nous allons de nouveau profiter, pour un scénario en lecture seule, générer le code Spark dans un notebook.

La méthode spark.read.synapsesql() est mise à profit pour créer un Spark Dataframe en mémoire. Attention, il s’agit d’une commande Scala.

Cette ressource de formation donne également un exemple d’écriture d’un dataframe vers la base de données.

Pour utiliser au mieux Spark et l’API pyspark, je vous recommande la certification Apache Spark developer associate, décrite dans cet article.

Voici un schéma récapitulatif des commandes disponibles avec la librairie MSSparkUtilities.

Découvrir toutes les fonctionnalités d’Azure Synapse Analytics

Azure Synapse Analytics (ASA) a remplacé depuis quelques temps Azure SQL Data Warehouse. Mais au delà d’une base SQL de type MPP (“massive parallel processing“), nous avons maintenant accès à des fonctionnalités offrant une vision “bout en bout” des projets data.

Une fois la ressource Azure provisionnée, nous pouvons lancer le Synapse Studio, dans une fenêtre de navigateur Web.

Le studio présente un menu latéral qui permettra de naviguer entre les différents outils composant Azure Synapse Analytics.

Dans ce premier article introductif, nous allons nous focaliser sur les trois menus Data, Integrate et Develop. Nous traiterons par la suite les aspects de Management et de Monitoring. Nous terminerons cette série sur les bonnes pratiques collaboratives : versioning et intégration / déploiement continus.

Data: le stockage

Si les bases de données (BDD) sont présentes dans notre quotidien depuis plusieurs décennies, le Data Lake (qui reste fondamentalement un système de fichiers…) reste plus récent mais est porté par la dynamique autour des formats de fichiers optimisés pour l’analytique (Parquet, Delta, Iceberg…) et autorisant les transactions (insert, update, delete), opérations qui restaient jusque-là la prérogative des BDD.

Ces deux approches vont se retrouver au sein des services internes à ASA ou en lien (“linked“).

Workspace

De manière “intégrée” dans l’espace de travail (workspace) Synapse, nous retrouvons les bases de données SQL MPP (ex SQL DWH).

Tous les objets classiques d’une BDD sont disponibles (tables, vues, procédures stockées…) et nous ferons un focus sur la notion de tables externes.

La ressource SQL dite “dédiée” apparaît également dans le groupe de ressources Azure. C’est une ressource dont la performance et l’espace de stockage disponible dépendront du niveau choisi (entre DW100c et DW30000c) et qui pourra être arrêtée ou démarrée. Sa facturation sera fonction de ces éléments.

Les tables externes : un fichier “vu” en SQL

Grâce aux tables externes, nous allons pouvoir interroger un système de fichier (Blob Storage, Data Lake) à l’aide d’instructions SQL. Le langage SQL étant si répandu dans le monde professionnel de la data, il s’agit là d’une fonctionnalité particulièrement intéressante.

Création d’une table externe (script automatiquement généré)

Un article dédié sera nécessaire pour décrire les cas d’usage des tables externes, différencier tables externes et vues, ainsi que noter les différences entre les deux pools SQL (dédié et serverless).

La nouveauté (2022) : le lake database

Nous avons vu qu’il est désormais facile d’interroger un fichier comme si l’on utilisait une table (plutôt une vue non matérialisée) d’une base de données. Une difficulté va tout de même se présenter lorsqu’il s’agira de bien appréhender le dictionnaire des données (tous les champs disponibles, associés à leur type respectif) ainsi que les relations entre les différents fichiers (au sens des cardinalités : un à plusieurs, plusieurs à plusieurs, etc.).

Le Lake Database vient proposer une solution à cette problématique, en proposant de mieux visualiser toutes les métadonnées, tout en conservant un stockage de type fichier. De manière sous-jacente, ce sont les ressources SQL Pool serverless ou Spark Pool qui seront utilisées.

Nous créons pour cela une base de type lake, en définissant le container de stockage des données (input folder) ainsi que le format à utiliser (.csv ou .parquet, Delta à venir).

Nous pouvons maintenant utiliser l’interface visuelle pour définir des tables:

  • personnalisées (custom), c’est-à-dire en renseignant chaque champ manuellement
  • depuis des modèles (template) proposés
  • depuis des fichiers du Data Lake

Visitons la galerie de modèles, ceux-ci couvrent de nombreux domaines fonctionnels.

Par domaine, nous disposons d’un grand nombre de schémas de tables, pour lesquelles les clés primaires et étrangères sont déjà déterminées. Nous obtenons donc déjà des relations dans le modèle en cours de création.

Les tables (vides pour l’instant) sont maintenant visibles dans le menu latéral de navigation.

Il sera alors nécessaire de remplir les tables, soit par des instructions SQL INSERT, soit au moyen de l’outil de mapping présenté dans la documentation officielle.

Un bon cas d’usage consiste à mapper les champs du Dataverse utilisé par les produits de la Power Platform. En effet, nous découvrirons ci-dessous la fonctionnalité Synapse Link qui permettra une synchronisation des données en quasi temps réel.

Linked

Les ressources liées sont par définition extérieures à la ressource Synapse Analytics. On utilisera ici principalement un ou plusieurs Data Lake Azure (de génération 2 uniquement) dont les différents file systems seront visibles.

Les notions bronze, silver et god correspondent à l’approche dite “medallion”.

La liste des services externes est présentée ci-dessous.

Il n’est étonnant de retrouver le stockage Blob car le Data Lake de génération 2 n’est autre qu’un Blob Storage augmenté par un “hierarchical namespace“. Nous retrouvons également deux APIs de la base Cosmos DB : l’API native pour le NoSQL et l’API pour MongoDB. Nous écartons donc les APIs PostgreSQL, Cassandra ou Gremlin.

Les integration datasets listeront toutes les sources ou destinations (sink) des pipelines de données qui seront abordés dans le prochain paragraphe. Ainsi, pour une source de type file system, nous disposerons des formats de fichiers suivants:

Integrate : les pipelines “no code”

L’intégration de données consiste à faire entrer dans un environnement de destination des données issues d’une ou plusieurs sources, et ce, à des intervalles de temps réguliers ou sur la détection d’événements (par l’exemple, l’arrivée de fichiers dans un data lake déclenche leur intégration en base de données).

Pipeline

Nous voici dans un univers bien connu des utilisateur.ices d’Azure Data Factory, même si la parité des fonctions n’est pas encore totalement établie (janvier 2023).

Un pipeline est composé d’une ou plusieurs activités, s’exécutant en parallèle ou bien s’enchaînant selon des règles définies.

L’activité de base est la copie de données (copy data) et nous retrouvons également trois activités permettant d’ordonnancer d’autres éléments propres à ASA : les jobs Spark et les procédures stockées SQL.

Synapse Link Connection

Azure Synapse Link est un outil permettant un transfert rapide de données vers une base SQL dédiée (SQL dedicated pool) depuis des sources comme :

  • Azure SQL DB
  • SQL Server 2022 (on premises)
  • Azure Cosmos DB (APIs NoSQL, Cosmos DB & Gremlin)
  • Dataverse

L’objectif est ici de réaliser, “near real time“, des travaux analytiques depuis les données enregistrées dans des bases dont l’usage est plutôt transactionnel. Il n’est plus nécessaire ici de déployer des pipelines ou jobs d’intégration.

Copy Data Tool

Il s’agit sans doute de la manière la plus simple pour copier des données d’une source à une destination (sink).

Ici, il n’est pas envisageable d’ajouter des transformations plus poussées lors de la copie. Nous retrouvons toutefois les propriétés de déclenchement :

  • unique
  • planifié
  • sur des fenêtres glissantes (tumbling windows)

Develop : l’approche “full code”

Les différents scripts compatibles avec les éléments de ASA sont présentés ci-dessous.

Les data flows pourraient sembler être un intrus dans cette liste car c’est bien une interface de type “no code” qui va permettre de les développer. La justification tient sans doute dans le fait que les data flows sont convertis en langage Spark lors de leur exécution.

De même, les jobs Apache Spark fonctionnent à partir de fichiers .JAR qui ne seront pas développés directement dans le studio ASA.

Les scripts SQL travaillent sur les bases de données de type SQL Pools (serverless ou dedicated).

Les scripts KQL (KustoQL) travaillent uniquement sur les bases de données Azure Data Explorer.

Les notebooks s’exécutent grâce à un Pool Spark et peuvent interagir avec les données, qu’elles soient sous forme de fichiers ou intégrées dans les bases de données de type SQL (et non KQL), à l’aide d’un connecteur JDBC.

Nous devons tout d’abord instancier un pool Apache Spark, tout comme nous avons dû créer un pool SQL dédié. Seul le pool SQL serverless est présent par défaut dans le workspace ASA.

Le pool est une grappe (cluster) de plusieurs machines virtuelles (nodes), puisque Spark est un framework distribué.

La version de Spark est modifiable, elle conditionne les versions des langages sous-jacents (Python, Scala, Java, .NET, R…)

La propriété “automatic pausing” est intéressante afin de réduire les coûts d’utilisation mais nous verrons, dans un prochain article, qu’elle implique une gestion fine des sessions Spark.

Premier exemple “de bout en bout”

Prenons l’exemple d’un fichier .csv “brut” qui a été uploadé dans le container Bronze du Data Lake.

Notre premier objectif sera de stoker le fichier dans un format optimisé comme Parquet dans le container Silver. Nous avons pour cela plusieurs possibilités.

Lire avec OPENROWSET

Un clic droit sur un fichier ou un répertoire propose les options suivantes.

Voici le code automatiquement généré par un “Select TOP 100 rows”.

Il est nécessaire d’ajouter l’option HEADER_ROW = TRUE afin de considérer la première ligne du fichier comme l’en-tête. Les autres bulk options sont données dans cette documentation officielle.

Pipeline de copie de CSV vers Parquet

Il s’agit ici de réaliser une copie conforme de la source mais nous allons utiliser pour cela l’outil “copy data tool” car l’objectif est de changer de format de fichier. Nous définissons donc une activité de copie dans un pipeline de type Azure Data Factory.

Le mapping permet de définir le type de données contenues dans chaque champ. C’est en effet une propriété des fichiers .parquet que de pouvoir stocker cette information.

Pipeline de copie du fichier Parquet dans une table

Posons l’objectif de matérialiser les données au sein de la BDD. Nous allons donc utiliser le pool SQL dédié. En effet, dans sa version serverless, nous ne faisons que créer des métadonnées facilitant l’interprétation des fichiers comme des vues.

Nous faisons de nouveau appel à une activité de copie dans un pipeline de type Azure Data Factory. Nous allons laisser le soin à l’activité de copie de créer directement le schéma de table attendu. Pour cela, nous choisissons le mode “Bulk insert” et l’option “Auto create table”.

Nous pouvons maintenant visualiser la table de destination dans le pool dédié et l’interroger au moyen du langage SQL.

En guise de conclusion, et avant de rentrer plus en détails dans certains aspects de cette ressource Azure, nous avons montré ici la polyvalence de l’outil pour les scénarios analytiques :

  • à partir de fichiers de données ou de tables contenues dans une base de données
  • avec la simplicité de la syntaxe SQL directement sur le contenu des fichiers
  • dans une démarche no code ou full code, selon les compétences des utilisateurs.

Dataïku DSS et Azure Synapse Analytics : connexion à la base de données

Nous avons présenté dans un précédent article la plateforme de Data Science de Dataïku et son installation sur une VM dans Azure. Nous avons également déjà abordé la connexion à un compte de stockage Azure, afin de lire des fichiers de données ou d’écrire les différents résultats obtenus.

Dans un Système d’Information non centré autour du Data Lake, certaines données particulièrement intéressantes pour le Machine Learning peuvent être stockées dans les tables d’une base de données, souvent orientée “analytique”. Le Data Warehouse d’Azure, Synapse Analytics, est disponible dans les connexions prévues par Dataïku DSS. Nous allons détailler ici les actions à réaliser pour définir la connexion à la base de données. Un grand merci à mon collègue Benjamin BENITO qui a réalisé les opérations techniques décrites ci-dessous.

Dans les paramètres de Dataïku DSS, nous pouvons trouver la liste des différentes bases de données SQL compatibles avec le Studio de Data Science.

Nous choisissons Azure Synapse. Il faut noter, que dans une exploitation plus large des capacités de Synapse Analytics, il sera aussi possible d’utiliser cette ressource comme moteur de calcul Spark. Nous verrons ceci dans un prochain article.

Dedicated SQL Pool

Une base de données de type “dedicated SQL pool” aura été préalablement créée du côté de Synapse Analytics.

Pour la démonstration, nous disposons d’une table à une colonne, contenant une valeur.

Nous pouvons maintenant renseigner les différents éléments attendus pour la définition de la connexion. Commençons par cocher la case “Use custom JDBC URL”.

L’URL JDBC se trouve sur le portail Azure, à la page de la ressource Dedicated SQL Pool, onglet “connection strings“. Nous utiliserons ici la première version, utilisant la connexion SQL.

L’URL JDBC se détaille comme ci-dessous.

jdbc:sqlserver://<synapse-name>.sql.azuresynapse.net:1433;database=<db-name>;user=<admin-login>@<synapse-name>;password={your_password_here};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;

Il faudra remplacer ici le texte {your_password_here} par le mot de passe du compte administrateur.

Ces informations, ainsi que le port ouvert (1433) pourront également être précisées dans les cases disponibles. Il ne reste plus alors qu’à tester la connexion à l’aide du bouton situé en bas à gauche de l’écran. Je vous recommande de créer la connexion (bouton “create”) au préalable afin de sauvegarder les informations saisies.

Il est alors fort probable que vous soyez confronté.e.s à l’erreur ci-dessous.

Cette erreur est due à l’absence de driver SQL Server sur la VM où est installé Dataïku DSS. L’URL suivante détaille cette erreur.

ERR_SQL_CANNOT_LOAD_DRIVER: Failed to load database driver — Dataiku DSS 9.0 documentation

Nous allons donc nous connecter à cette machine virtuelle Linux pour effectuer manuellement l’installation du driver attendu. Nous devons disposer pour cela d’un login / password ou d’une clé SSH.

Voici les différentes commandes à jouer successivement.

wget -O /tmp/sqljdbc_10.2.1.0_enu.zip https://download.microsoft.com/download/4/d/5/4d5a79be-35f8-48d4-a984-473747362f99/sqljdbc_10.2.1.0_enu.zip
unzip /tmp/sqljdbc_10.2.1.0_enu.zip -d /tmp/
sudo cp /tmp/sqljdbc_10.2\enu/mssql-jdbc-10.2.1.jre8.jar /home/dataiku/dss/lib/jdbc/

sudo su dataiku
dataiku/dss/bin/dss stop
dataiku/dss/bin/dss start

Les premières commandes réalisent le téléchargement du binaire nécessaire, le dézippe puis le copie à l’endroit attendu. Un restart de DSS est ensuite nécessaire.

La connexion est maintenant établie !

Nous pouvons utiliser le lien “Import tables to datasets” pour obtenir des données stockées dans le Data Warehouse.

Serverless SQL Pool

Pour utiliser le mode serverless de Synapse Analytics, nous devrions nous orienter vers un autre type d’authentification. En effet, ce mode correspond à la création de méta-données sur des fichiers présents dans un Data Lake mais sans matérialisation dans le Data Warehouse. Des droits sous-jacents (sur les fichiers du Data Lake) sont donc nécessaires et il n’est pas possible d’attribuer de tels droits au user connu uniquement de la base de données.

Le message d’erreur rencontré sera alors :

Failed to read data from table, caused by: SQLServerException: External table 'dbo.<external_table>' is not accessible because location does not exist or it is used by another process.

La documentation officielle de Dataïku donne toutes les étapes permettant d’utiliser l’authentification au travers d’Azure Active Directory.

Nous avons besoin d’un Principal de Service, c’est-à-dire d’une identité dans Azure. Nous relevons tout d’abord l’application ID (ou client ID) qu’il sera nécessaire de renseigner dans Dataïku DSS.

Dans le menu “Endpoints”, nous notons l’information de l’OAuth 2.0 token endpoint (v1).

Dans le menu “API permisssions”, nous devons ajouter la permission Azure SQL Database.

Choisir ensuite “Delegated permissions” puis cocher la case user_impersonation.

Nous terminons le paramétrage de cette identité en lui associant un secret. Notez ici bien la valeur du secret, que vous ne pourrez plus retrouver par la suite.

Toujours dans le portail Azure, il faut donner les droits de lecture sur le Data Lake au principal de service. Cela se fait au moyen de l’ajout d’un RBAC de type “Blob storage data reader”.

Sinon, une erreur comme ci-dessous se produira lors de l’aperçu de la table ou de la vue :

Failed to read data from table, caused by: SQLServerException: File 'https://adlsdataiku.dfs.core.windows.net/dataiku/input/diamonds.csv' cannot be opened because it does not exist or it is used by another process.

Nous reprenons maintenant la définition d’une nouvelle connexion Synapse Analytics depuis le Data Science Studio.

En cochant la case “Use custom JDBC URL”, il faut donner une Connection URL de la forme suivante :

jdbc:sqlserver://<synapse-name>-ondemand.sql.azuresynapse.net:1433;database=master;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30

Nous précisons le port (1433 par défaut) et le nom de la Database.

Pour la partie liée à l’Azure AD, il faut cocher la case “Login with Azure OAuth” puis coller les informations préalablement collectées :

  • STS URL (OAuth 2.0 token endpoint (v1))
  • Client id (application id)
  • Client secret (la valeur du secret et non le secret id)

Deux modes de “credentials” sont possibles : Global ou Per user. Ce second mode signifie que c’est le login utilisé dans DSS qui est repris pour s’authentifier auprès de la base Synapse. Il ne sera alors pas nécessaire de renseigner le client secret.

Depuis Synapse Analytics, dans une nouvelle requête SQL, nous allons déclarer le principal de service (SPN) comme un utilisateur de la base de données. Veillez à bien être connecté à “Built-in”, c’est-à-dire la base serverless.

Voici le code à exécuter, où le nom du SPN doit être mis à jour. Il est nécessaire d’attribuer des droits de type “BULK” puis “SELECT” sur les objets qui seront autorisés à Dataïku DSS.

CREATE USER [dataikuspn] FROM  EXTERNAL PROVIDER  WITH DEFAULT_SCHEMA=[dbo];
GO
GRANT ADMINISTER DATABASE BULK OPERATIONS TO dataikuspn;
GO
GRANT SELECT ON OBJECT::dbo.V_diamonds TO [dataikuspn];
GO

Il est maintenant possible d’ouvrir la liste des tables à importer depuis l’interface de DSS.

Ce second scénario s’avère intéressant pour limiter les coûts d’utilisation de Synapse Analytics (coût à la quantité de données lues) et facilite l’exploitation des données du Data Lake qui auront pu être réorganisées sous forme de vues SQL plus lisibles des utilisateurs de Dataïku.

Poser du Parquet dans le cloud Azure ? Oui !

En quelques années de l’ère “big data”, le format de fichier orienté colonne Parquet s’est imposé comme l’un des standards du stockage sur les lacs de données (data lake) grâce à ses performances de compression, sa gestion des partitions et son intégration avec des frameworks distribués comme Spark. Il s’agit d’un projet porté par la fondation Apache qui héberge la documentation officielle.

Avant d’obtenir un fichier Parquet, le format initial est bien souvent un format texte classique structuré comme du CSV ou semi-structuré comme le JSON. Nous sommes également dans le scénario de réalisation d’un couche “clean” au sein d’un data lake, à partir de nombreux fichiers de la couche “raw“, nécessitant d’être agrégés et optimisés pour des outils d’analyse.

Dans cet article, nous allons dérouler un cas pratique allant de la constitution de ces fichiers Parquet jusqu’à leur utilisation dans un rapport Power BI.

Partons d’une année de fichiers zip issus de vélos en location de la ville de New York, soit un total de 828Mo, atteignant 3,45Go une fois décompressés (CSV).

Nous disposons ainsi en tout de 19 506 857 lignes pour l’année 2020.

Avec un peu de script Python, nous pouvons décompresser automatiquement ces fichiers.

import os
import zipfile

def unzip_files(path_to_zip_file) -> None:
    directory_to_extract_to = "datasets"
    with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
        zip_ref.extractall(directory_to_extract_to)

for file in os.listdir("zip"):
    print(f"Unzipping : {file}")
    unzip_files("zip/"+file)

Ensuite, depuis un environnement Spark comme Azure Databricks, nous pouvons lire tous les CSV, qui auront été déposés au préalable sur un compte de stockage Azure.

file_location = "/mnt/nycitibike/datasets/*"
file_type = "csv"
df = spark.read.format(file_type).option("inferSchema", "true").option("header","true").load(file_location)
display(df)

Notez qu’il faudra renommer les colonnes comportant un ou plusieurs espaces, interdits en entêtes dans le format Parquet, ce que nous faisons avec le script ci-dessous.

df = df \
.withColumnRenamed("start station id","start_station_id") \
.withColumnRenamed("start station name","start_station_name") \
.withColumnRenamed("start station latitude","start_station_latitude") \
.withColumnRenamed("start station longitude","start_station_longitude") \
.withColumnRenamed("end station id","end_station_id") \
.withColumnRenamed("end station name","end_station_name") \
.withColumnRenamed("end station latitude","end_station_latitude") \
.withColumnRenamed("end station longitude","end_station_longitude") \
.withColumnRenamed("birth year","birth_year")

L’enregistrement au format Parquet est alors possible, toujours à l’aide de l’API PySpark.

df.write.format("parquet").mode("overwrite").save("/mnt/nycitibike/dbx_parquet")

Les données sont automatiquement partitionnées (il est possible d’avoir la main sur ce mécanisme, comme nous le verrons plus tard) et quelques fichiers spécifiques à Databricks sont ajoutés :

– ficher “commited” qui liste les différentes parties

– fichier vide “started”

– fichier vide _SUCCESS

Ces fichiers qui tracent la transaction réalisées vont nous gêner pour une usage dans Power BI. Nous pourrions éviter leur génération en définissant les options suivantes dans notre script Spark comme l’indiquent ces échanges sur le forum Databricks.

Il est possible de prendre la main sur le nombre de fichiers obtenus, ainsi que sur la clé qui aidera au partitionnement. Nous ajoutons deux fonctions que sont repartition() et partitionBy() dans la syntaxe d’écriture du Parquet.

df.repartition(1).write.partitionBy("YearMonth").format("parquet").mode('overwrite').save("/mnt/nycitibike/dbx_parquet")

Dans Power BI Desktop, nous disposons d’un type de source Parquet, à partir du menu “Obtenir les données”.

Ce connecteur attend le chemin d’un fichier Parquet et non d’un répertoire contenant les partitions sous forme de fichiers distincts. Nous découvrons ainsi la fonction du langage M qui réalise la lecture du fichier : Parquet.Document().

= Parquet.Document(File.Contents("C:\Users\PaulPeton\Documents\Python Scripts\Parquet\my_to_parquet.parquet"))

Cette fonction est documentée depuis juin 2020 mais elle n’a pas fait l’objet d’une grande communication jusqu’à présent (janvier 2021) et la roadmap Power BI indique un (nouveau ?) connecteur Parquet pour mars 2021.

Pour lire le contenu d’un dossier, nous pouvons sans problème appliquer la technique de connexion à un dossier sur un compte de stockage Azure, puis lire chaque fichier à l’aide du bouton “expand” de la colonne “Content” contenant tous les binaries. Cette manipulation crée automatique une fonction d’import contenant le code suivant.

= (Paramètre1) => let
Source = Parquet.Document(Paramètre1)
in
Source

Attention, pour réaliser cette opération, il faut au préalable filtrer les fichiers ne correspondant pas à des partitions (commit, started, SUCCESS pour un dossier Parquet généré par Databricks), qui débutent tous par un caractère underscore.

= Table.SelectRows(Source, each not Text.StartsWith([Name], "_"))

Les données se chargent alors en mettant bout à bout toutes les partitions !

Il est maintenant tentant d’essayer une actualisation incrémentielle de notre dataset, sur une période d’un mois. Pour autant, celle-ci n’est pas garantie comme nous l’indique le message d’alerte dans la pop-up de paramétrage du rafraichissement.

Une traduction approximative annonce que la requête “ne peut être pliée”, il s’agit ici du concept de query folding : la requête ne pourrait appliquer un filtre (de dates) en amont du chargement complet des données. Ceci signifie tout simplement de le mécanisme incrémentiel ne se réaliserait pas.

Afin de tester ce fonctionnement, nous publions le rapport sur un espace de travail Premium du service Power BI. Il sera alors possible d’ouvrir une connexion à l’espace à partir du client lourd SQL Server Management Studio.

Au premier chargement, nous n’aurons qu’une seule partition.

Mais en actualisant le dataset depuis le service, les partitions apparaissent.

Nous observons bien qu’une nouvelle actualisation du jeu de données n’affecte pas toutes les partitions. C’est gagné !

En ajoutant des fichiers supplémentaires, nous observons aussi que le paramètre de stockage de lignes (ici, sur un an) fonctionne et cela permet donc de créer un dataset actualisé sur une période glissante.

Pour terminer, précisons que la requête réalisée ici peut tout à fait être exécutée au sein d’un dataflow Power BI et servir ainsi à la création de différents rapports.

[EDIT du 23/01/2021 : cette approche fonctionne aussi avec le format Delta Lake créé depuis Azure Databricks, et c’est tant mieux car ce format apporte beaucoup d’avantages, qui seront sûrement abordés dans un prochain article.]

Déploiement continu avec Azure Synapse Analytics – partie 1

S’il était bien une fonctionnalité attendue pour la nouvelle (2020) version d’Azure Synapse Analytics, c’était la gestion du versionning et du déploiement des scripts créés dans l’outil.

Depuis la disponibilité générale (fin novembre 2020), nous trouvons un menu Source control dans la fenêtre Manage.

Nous pouvons définir la Git configuration, c’est-à-dire l’outil qui servira à l’enregistrement des développements et à leur versionning. Nous pouvons choisir entre Azure DevOps ou GitHub.

Nous choisissons ici Azure DevOps mais la partie GitHub est documentée sur ce lien officiel.

Nous devons disposer au préalable d’un projet DevOps et d’un repository dont l’URL devra être renseignée dans Synapse.

The Dev Ops repository link must start with http or https and be in DevOps format. For example: https://account.visualstudio.com/project/_git/repository

Dès lors, le dépôt se remplie avec les premiers éléments faisant partie de la ressource Synapse Analytics.

Dans le studio Synapse, nous disposons de plusieurs actions :

La validation permet de contrôler le développement réalisé et des erreurs seront remontées le cas échéant.

Le bouton Commit permet de “sauvegarder” le développement réalisé sur la branche active. Il est recommandé de ne pas développer directement sur la branche master.

Le bouton Publish de l’interface Synapse permet de créer ou mettre à jour la branche workspace_publish.

Il est donc logique de les utiliser successivement, dans cet ordre. L’action Publish réalisera une sauvegarde si nécessaire. A noter que les sorties (outputs) des cellules des notebooks sont effacées au moment de la publication. C’est une bonne pratique pour le versionning mais peut déstabiliser l’utilisateur souhaitant conserver l’affichage de ses résultats sans relancer le notebook complet.

Au fur et à mesure de l’utilisation de Synapse Analytics, de nouveaux répertoires vont se créer sur la branche collaborative.

Manquent ici triggers, credentials et Spark job definiition

Dans la branche workspace_publish, nous retrouvons deux fichiers JSON dans une logique complètement similaire à celle d’Azure Data Factory, permettant un déploiement de type “template ARM“.

Le fichier TemplateParametersForWorkspace.json contient en particulier le nom des informations de connexion (nom du workspace, URL, etc.) et les mots de passe, secrets ou token, sont effacés. Il faudra écraser ou renseigner ces valeurs lors du déploiement dans un autre environnement.

Dans une seconde partie, nous développerons les approches possibles pour le déploiement continue à l’aide d’Azure Pipelines.