Text Analytics avec le concepteur (Designer) dans Azure Machine Learning

Les citizen data scientists disposent d’un outil graphique dans le portail Azure Machine Learning nommé Concepteur (ou Designer en anglais) permettant de réaliser des pipelines de Machine Learning, pour l’apprentissage comme pour l’inférence (c’est-à-dire le fonctionnement prédictif).

Depuis la première version de l’outil, anciennement nommé Azure Machine Learning Studio, et dit maintenant “classique”, des modules pour l’analyse textuelle étaient déjà présents mais l’offre s’est aujourd’hui, en ce début 2021, agrandie.

L’objectif de cet article est de faire un survol des méthodes disponibles, dans cette approche “low code“, pour un jeu de données contenant un champ de “texte libre”, et dans le but de réaliser une méthode d’apprentissage supervisé, et plus précisément de classification.

En prérequis, rappelons les notions indispensables avant de s’attaquer à notre problématique :

– disposer d’une ressource de calcul dite “compute cluster” qui servira à exécuter les différentes itérations du pipeline (succession de modules)

– créer une nouvelle expérience, qui permettra de suivre le résultat des exécutions ou runs (outputs, logs, artefacts, métriques…)

– connaître la structure classique d’un pipeline d’entrainement supervisé (s’inspirer des exemples disponibles, en particulier “Text Classification – Wikipedia SP 500 Dataset” qui est décrit sur ce site GitHub.

Vue d’ensemble des fonctionnalités de Microsoft Machine Learning Studio (classique)
Image reprise de la documentation officielle de Microsoft et téléchargeable ici : https://download.microsoft.com/download/C/4/6/C4606116-522F-428A-BE04-B6D3213E9E52/ml_studio_overview_v1.1.pdf

Nous allons nous concentrer ici sur deux phases qui auront leurs spécificités pour le traitement de données textuelles :

– le preprocessing

– le choix du modèle d’apprentissage pour une classification

Preprocessing des données textuelles

Nous allons utiliser ici le module Preprocess text qui réalisera de nombreuses actions de nettoyage et de préparation de nos données textuelles, en une seule étape.

Ce module est très puissant et basé sur la librairie de référence SpaCy mais ne supporte pour l’instant malheureusement que la langue anglaise. Nous pourrons contourner cela par un script R ou Python pour le cas de corpus en français. Mais à l’exception de la lemmatisation, les opérations pourront toutefois s’appliquer à un corpus d’une autre langue que l’anglais.

La case à cocher “Expand verb contractions” est toutefois spécifique à des formulations anglaises comme don’t, isn’t, I’ve, you’ll, etc.

Détaillons maintenant les autres options, avec quelques exemples simples si nécessaire.

– Retrait des mots outils (stop words) : a, about, all, any, etc.

Cette opération est paramétrable au moyen de la seconde entrée du module. Nous pouvons donc utiliser un fichier, d’une colonne, contenant les mots en français. Ce site propose une liste de mots outils dans de nombreuses langues.

Lemmatisation (remplacement du mot par sa forme canonique qui est “l’entrée du dictionnaire”, c’est-à-dire, l’infinitif ou le masculin singulier lorsqu’il existe) : la phrase “The babies are walking on their feet” donnera “The baby be walk on their foot” puis sans mots outils “baby walk foot“.

– Détection des phrases (séparées alors par |||, à choisir si l’on souhaite un traitement spécifique par phrase plutôt que par ligne du jeu de données)

– Normalisation en minuscules (sinon les mots sont considérés comme différents)

Suppression des nombres, caractères spéciaux (non alphanumériques), caractères dupliqués plus de deux fois, adresses email de la forme <string>@<string>, URLs reconnues par les préfixes httphttpsftpwww

– Remplacement des backslashes \ en slashes / : l’usage ne sera pas fréquent, on passera plutôt par la suppression de ces caractères spéciaux.

Séparation des tokens (ici, les mots) sur la base de caractères spéciaux comme & (pour l’esperluette, on préfèrera à nouveau la citer dans les caractères spéciaux à supprimer) ou - : un “pense-bête” deviendra “penser bête” suite à séparation et lemmatisation en français.

En pratique, il est important de maîtriser l’ordre dans lequel ces opérations s’appliquent. Ainsi, on réalisera plutôt les opérations de nettoyage (suppressions, minuscules, mots outils…) avant les opérations de lemmatisation.

Importer la librairie NLTK et les mots outils en français

Nous disposons d’un module Python permettant d’exécuter un script, sur un ou deux dataframes en entrée de ce module. Le code suivant nous permettra d’importer la librairie NLTK et d’utiliser la liste des mots outils ou bien la lemmatisation en français.

import pandas as pd

def azureml_main(dataframe1 = None, dataframe2 = None):
    import importlib.util
    package_name = 'nltk'
    spec = importlib.util.find_spec(package_name)
    logging.debug(spec)
    if spec is None:
        import os
        os.system(f"pip install nltk")

    import nltk
    from nltk.corpus import stopwords
    stopwords = set(stopwords.words('french'))

    def preprocess(text):
        clean_data = []
        for x in (text[:]):
            print(x)
            new_text = re.sub('<.*?>', '', x)   # remove HTML tags
            new_text = re.sub(r'[^\w\s]', '', new_text) # remove punc.
            new_text = re.sub(r'\d+','', new_text)# remove numbers
            new_text = new_text.lower() # lower case, .upper() for upper          
            if new_text != '':
                clean_data.append(new_text)
        return clean_data    

    dataframe1['preprocess'] = preprocess(dataframe1['Text'])

    dataframe1['preprocess_without_stop'] = dataframe1['preprocess'].apply(lambda x: [word for word in x if word not in stopwords])

    #On peut observer la liste des mots outils de NLTK dans la seconde sortie
    dataframe2 = pd.DataFrame(list(stopwords))

    return dataframe1, dataframe2,

Apprentissage supervisé sur données textuelles

Dans le groupe Text Analytics, nous disposons des modules ci-dessous.

Vowpall Wabbit

Nous écartons d’emblée l’approche Vowpal Wabbit, framework de Machine / Online Learning initialement développé par la société Yahoo !. Celui-ci nécessite en effet d’obtenir en entrée des données préprocessées mais surtout présentées dans un format spécifique. Vous pouvez toutefois vous référer à ce blog si vous souhaitez préparer vos données en ce sens. Il faudra ensuite utiliser des lignes de commandes au sein du module Train Vowpal Wabbit Model.

Toutefois, nous allons pouvoir remplacer ce framework par une approche relativement similaire réalisée par le module Feature Hashing.

Feature Hashing

Cette méthode consiste à créer une table de hash, c’est-à-dire de valeurs numériques, construites à partir du dictionnaire des mots.

Nous prendrons comme entrée le texte préprocessé. Nous avons alors deux paramètres à renseigner dans la boîte de dialogue du module.

Une taille de 10 bits correspondra à 2^10, soit 1024, nouvelles colonnes dans le jeu de données en sortie. Ces colonnes contiendront le poids de la feature dans le texte. Une valeur de 10 se montre généralement suffisante et attention à l’explosion de la taille du jeu de données (2^20 correspondrait à 1048576 colonnes !).

Un N-gram est une suite de n mots consécutifs, considérée comme une seule unité. Dans la phrase “le ciel est bleu”, nous avons quatre unigrams, trois bigrams (“le ciel”, “ciel est”, “est bleu”) et deux trigrams (“le ciel est”, “ciel est bleu”). Le paramètre attendu est la valeur maximale, ainsi si nous choisissons la valeur 3, nous prenons en compte les unigrams, bigrams et trigrams.

Nous obtenons le résultat suivant.

Seules les colonnes numériques seront ensuite soumises à l’entrainement du modèle.

Extract N-gram feature

Nous retrouvons ici la notion de N-gram (suite de n mots consécutifs, considérée comme une seule unité). Les paramètres disponibles nous permettent de spécifier :

– la taille maximale des N-grams considérés dans le dictionnaire. Ce paramètre conditionnera la taille du jeu de données, qui peut devenir très grand.

– les longueurs minimale et maximale des mots, en nombre de lettres

– la fréquence minimale de présence d’un N-gram dans le document (ensemble des lignes du jeu de données)

– le pourcentage maximum de présence d’un N-gram par ligne sur l’ensemble des lignes. La valeur 1 (100%) correspond à retirer un N-gram présent dans chaque ligne, et donc considéré comme un bruit qu’il faut supprimer.

– la métrique de pondération calculée par ligne, pour chaque feature créée.

La métrique TF-IDF est un standard de la discipline est correspond au ratio de la fréquence d’un N-gram dans la ligne (TF) par le log d’un autre ratio dit”inversé” : le nombre de lignes du jeu de données sur la fréquence du terme dans le jeu de données. Les valeurs seront normalisées selon une norme L2 si la case correspondante est cochée dans la boîte de dialogue.

En créant le vocabulaire, nous obtenons un jeu de données en sortie tel que représenté ci-dessous. Celui-ci contient la liste des N-grams, ainsi que les métriques DF et IDF associées.

Un pronom est ici identifié dans le bigram.

C’est ce vocabulaire établi sur le jeu d’entrainement qui devra saisir à la phase de validation sur les nouvelles données. Il ne faudrait pas créer un nouveau vocabulaire sur la seconde partie des données ! Mais les métriques doivent être calculées. Nous utilisons donc la combinaison ci-dessous des modules.

Le paramètre “Vocabulary mode” est alors positionné sur la valeur ReadOnly.

Convert word to vector

Nous retrouvons ici une méthode dite de plongement lexical (word embedding) qui a connu un fort engouement ces dernières années, et plus communément appelée word2vec. Il s’agit d’utiliser des modèles pré-entrainés sur des milliards de documents, issus par exemple de Wikipedia ou de Google News.

A ce jour (janvier 2021), il ne semble pas y avoir de module permettant de tirer parti du vocabulaire obtenu en sortie. Nous privilégierons donc une approche par script Python, au moyen de librairies comme Gensim (voir le site officiel).

Latent Dirichlet Allocation

Nous sortons ici du cadre supervisé pour obtenir une méthode non supervisée et donc dédiée à rassembler des textes similaires, dans un nombre prédéfini (“Number of topics to model”) de catégories, sans que celles-ci ne soient explicitement définies.

Comme nous disposons déjà des catégories, cette méthode ne se prête pas à l’objectif mais pourrait être intéressante en amont, dans un but exploratoire (nos catégories a priori ne sont peut-être pas si pertinentes…).

Un mode d’options avancées donnera la main sur tous les hyperparamètres du module.

Chaque ligne est évaluée sur les nombre prédéfini de sujets (topics).

A partir de la sortie, il sera nécessaire de réaliser une lecture et une interprétation “humaine” sur les sur les groupes afin de les nommer. Dans la feature matrix topic, nous pouvons visualiser les N-grams les plus contributeurs de chaque sujet.

Publier le pipeline d’entrainement

Publier un pipeline permet de bénéficier de l’ordonnancement de celui-ci, par exemple avec un outil comme Azure Data Factory. Nous obtiendrons ainsi un identifiant (ID) qui devra être renseignée dans l’activité.

Les différents paramètres sélectionnés dans les modules peuvent être ajoutés comme paramètres du pipeline et ainsi renseignés lors de nouveaux lancements.

Déploiement du pipeline d’inférence

Une fois le pipeline d’entrainement soumis et correctement exécuté (tous les modules sont en vert), nous pouvons choisir entre un service Web prédictif, soit en real-time (prévision par valeur), soit en batch (prévision par lot).

Si nous avons comparé plusieurs modèles au sein du pipeline d’entrainement, il faut cliquer sur le module “Train model” correspond à celui que l’on veut déployer (en pratique, celui ayant les meilleures métriques d’évaluation).

Le pipeline graphique se simplifie alors et se voit enrichi d’une entrée et d’une sortie pour le service Web qui sera créé.

Attention, pour aller plus loin, il est nécessaire de supprimer le module d’évaluation ou sinon, vous rencontrerez le message d’erreur suivant.

Un nouveau run du pipeline doit être soumis avant de pouvoir cliquer sur le bouton de déploiement du service web.

Le déploiement se fait au choix, sur une ressource ACI (plutôt pour le test) ou AKS (plutôt pour la production).

En conclusion, même s’il paraît difficile d’aller jusqu’en production avec cet outil, il reste néanmoins très rapide pour mettre en œuvre différentes méthodes reconnues dans le Traitement Automatique du Langage (TAL) et pourra apporter des résultats pertinents dans des cas relativement simples de classification.

Poser du Parquet dans le cloud Azure ? Oui !

En quelques années de l’ère “big data”, le format de fichier orienté colonne Parquet s’est imposé comme l’un des standards du stockage sur les lacs de données (data lake) grâce à ses performances de compression, sa gestion des partitions et son intégration avec des frameworks distribués comme Spark. Il s’agit d’un projet porté par la fondation Apache qui héberge la documentation officielle.

Avant d’obtenir un fichier Parquet, le format initial est bien souvent un format texte classique structuré comme du CSV ou semi-structuré comme le JSON. Nous sommes également dans le scénario de réalisation d’un couche “clean” au sein d’un data lake, à partir de nombreux fichiers de la couche “raw“, nécessitant d’être agrégés et optimisés pour des outils d’analyse.

Dans cet article, nous allons dérouler un cas pratique allant de la constitution de ces fichiers Parquet jusqu’à leur utilisation dans un rapport Power BI.

Partons d’une année de fichiers zip issus de vélos en location de la ville de New York, soit un total de 828Mo, atteignant 3,45Go une fois décompressés (CSV).

Nous disposons ainsi en tout de 19 506 857 lignes pour l’année 2020.

Avec un peu de script Python, nous pouvons décompresser automatiquement ces fichiers.

import os
import zipfile

def unzip_files(path_to_zip_file) -> None:
    directory_to_extract_to = "datasets"
    with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
        zip_ref.extractall(directory_to_extract_to)

for file in os.listdir("zip"):
    print(f"Unzipping : {file}")
    unzip_files("zip/"+file)

Ensuite, depuis un environnement Spark comme Azure Databricks, nous pouvons lire tous les CSV, qui auront été déposés au préalable sur un compte de stockage Azure.

file_location = "/mnt/nycitibike/datasets/*"
file_type = "csv"
df = spark.read.format(file_type).option("inferSchema", "true").option("header","true").load(file_location)
display(df)

Notez qu’il faudra renommer les colonnes comportant un ou plusieurs espaces, interdits en entêtes dans le format Parquet, ce que nous faisons avec le script ci-dessous.

df = df \
.withColumnRenamed("start station id","start_station_id") \
.withColumnRenamed("start station name","start_station_name") \
.withColumnRenamed("start station latitude","start_station_latitude") \
.withColumnRenamed("start station longitude","start_station_longitude") \
.withColumnRenamed("end station id","end_station_id") \
.withColumnRenamed("end station name","end_station_name") \
.withColumnRenamed("end station latitude","end_station_latitude") \
.withColumnRenamed("end station longitude","end_station_longitude") \
.withColumnRenamed("birth year","birth_year")

L’enregistrement au format Parquet est alors possible, toujours à l’aide de l’API PySpark.

df.write.format("parquet").mode("overwrite").save("/mnt/nycitibike/dbx_parquet")

Les données sont automatiquement partitionnées (il est possible d’avoir la main sur ce mécanisme, comme nous le verrons plus tard) et quelques fichiers spécifiques à Databricks sont ajoutés :

– ficher “commited” qui liste les différentes parties

– fichier vide “started”

– fichier vide _SUCCESS

Ces fichiers qui tracent la transaction réalisées vont nous gêner pour une usage dans Power BI. Nous pourrions éviter leur génération en définissant les options suivantes dans notre script Spark comme l’indiquent ces échanges sur le forum Databricks.

Il est possible de prendre la main sur le nombre de fichiers obtenus, ainsi que sur la clé qui aidera au partitionnement. Nous ajoutons deux fonctions que sont repartition() et partitionBy() dans la syntaxe d’écriture du Parquet.

df.repartition(1).write.partitionBy("YearMonth").format("parquet").mode('overwrite').save("/mnt/nycitibike/dbx_parquet")

Dans Power BI Desktop, nous disposons d’un type de source Parquet, à partir du menu “Obtenir les données”.

Ce connecteur attend le chemin d’un fichier Parquet et non d’un répertoire contenant les partitions sous forme de fichiers distincts. Nous découvrons ainsi la fonction du langage M qui réalise la lecture du fichier : Parquet.Document().

= Parquet.Document(File.Contents("C:\Users\PaulPeton\Documents\Python Scripts\Parquet\my_to_parquet.parquet"))

Cette fonction est documentée depuis juin 2020 mais elle n’a pas fait l’objet d’une grande communication jusqu’à présent (janvier 2021) et la roadmap Power BI indique un (nouveau ?) connecteur Parquet pour mars 2021.

Pour lire le contenu d’un dossier, nous pouvons sans problème appliquer la technique de connexion à un dossier sur un compte de stockage Azure, puis lire chaque fichier à l’aide du bouton “expand” de la colonne “Content” contenant tous les binaries. Cette manipulation crée automatique une fonction d’import contenant le code suivant.

= (Paramètre1) => let
Source = Parquet.Document(Paramètre1)
in
Source

Attention, pour réaliser cette opération, il faut au préalable filtrer les fichiers ne correspondant pas à des partitions (commit, started, SUCCESS pour un dossier Parquet généré par Databricks), qui débutent tous par un caractère underscore.

= Table.SelectRows(Source, each not Text.StartsWith([Name], "_"))

Les données se chargent alors en mettant bout à bout toutes les partitions !

Il est maintenant tentant d’essayer une actualisation incrémentielle de notre dataset, sur une période d’un mois. Pour autant, celle-ci n’est pas garantie comme nous l’indique le message d’alerte dans la pop-up de paramétrage du rafraichissement.

Une traduction approximative annonce que la requête “ne peut être pliée”, il s’agit ici du concept de query folding : la requête ne pourrait appliquer un filtre (de dates) en amont du chargement complet des données. Ceci signifie tout simplement de le mécanisme incrémentiel ne se réaliserait pas.

Afin de tester ce fonctionnement, nous publions le rapport sur un espace de travail Premium du service Power BI. Il sera alors possible d’ouvrir une connexion à l’espace à partir du client lourd SQL Server Management Studio.

Au premier chargement, nous n’aurons qu’une seule partition.

Mais en actualisant le dataset depuis le service, les partitions apparaissent.

Nous observons bien qu’une nouvelle actualisation du jeu de données n’affecte pas toutes les partitions. C’est gagné !

En ajoutant des fichiers supplémentaires, nous observons aussi que le paramètre de stockage de lignes (ici, sur un an) fonctionne et cela permet donc de créer un dataset actualisé sur une période glissante.

Pour terminer, précisons que la requête réalisée ici peut tout à fait être exécutée au sein d’un dataflow Power BI et servir ainsi à la création de différents rapports.

[EDIT du 23/01/2021 : cette approche fonctionne aussi avec le format Delta Lake créé depuis Azure Databricks, et c’est tant mieux car ce format apporte beaucoup d’avantages, qui seront sûrement abordés dans un prochain article.]

Ajuster le niveau d’une Azure SQL DB avant et après un traitement Data Factory

Nous avons déjà évoqué sur ce blog le driver JDBC permettant d’écrire depuis un notebook Azure Databricks dans une base de données managée ou encore comment utiliser Polybase pour pérenniser une table du métastore Databricks dans Azure SQL DWH (aujourd’hui Synapse Analytics).

Cette action d’écriture demande souvent un niveau de ressource suffisamment élevé, tout comme l’import de données réalisé dans un dataset Power BI. Le reste du temps, une puissance plus faible peut être tout à fait acceptable. Et comme le niveau de facturation est lié au niveau de la ressource, pouvoir maîtriser par code, et non manuellement depuis l’interface, le scape up ou scale down de sa ressource peut s’avérer très intéressant dans une démarche d’optimisation des coûts (FinOps).

Nous nous plaçons ici dans un scénario de pipeline de transformation de données réalisé par Azure Databricks puis d’import dans un dataset Power BI. L’actualisation par API REST de ce dernier est détaillé dans cet article.

Au démarrage de notre pipeline, nous souhaitons augmenter la puissance (scale up) et ceci peut se faire avec des instructions PowerShell documentées ici.

Il est possible d’exécuter un script PowerShell depuis Azure Data Factory à l’aide de ce qu’on appelle une “custom activity“. Celle-ci s’appuie sur un service lié de type Azure Batch. Et ce service n’est pas gratuit, voire même relativement cher pour ce que l’on souhaite réaliser.

Nous optons donc pour la programmation d’une Azure Function, qui supporte le langage PowerShell ! Nous choisissons le type de fonction Trigger HTTP.

Dans Visual Studio Code, nous obtenons l’arborescence ci-dessous.

Le code principal de la fonction doit se trouver dans le fichier run.ps1.

Un premier point important consistera à renseigner les dépendances de librairies PowerShell dans le fichier requirements.psd1. Une combinaison fonctionnelle est la suivante :

# This file enables modules to be automatically managed by the Functions service
# See https://aka.ms/functionsmanageddependency for additional information.
#
@{
    'Az.Accounts' = '2.1.2'
    'Az.Sql' = '2.11.1'
 }

Second élément indispensable pour l’intégration dans une activité Web de Data Factory : le retour de la fonction doit obligatoirement être un objet JSON. Nous terminons donc la fonction par le code ci-dessous :

$body = @{name = 'Scale done !'} | ConvertTo-Json

#Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
     StatusCode = [HttpStatusCode]::OK
     Body = $body
})

Une fois la fonction publiée sur Azure, nous pouvons définir le pipeline complet.

La fonction Azure étant pensée de manière paramétrable, nous pouvons donner le niveau de ressource comme un élément de l’URL appelée par la méthode POST. Nous utilisons dans l’exemple ci-dessous le niveau “S” suivi d’un chiffre.

Ce niveau modifiera automatiquement le configuration de la ressource Azure SQL DB, en mode de tarification basé sur les DTUs (et non les vCores).

L’activité de traitement de la donnée, ici portée par Azure Databricks, réalise le démarrage d’un cluster de type interactif. Lorsque le traitement est terminé, le cluster peut s’éteindre automatiquement si la fonctionnalité “terminate after…” a été mise en place. Mais il faut ici attendre que l’import du dataset Power BI soit terminé ! Nous gérons donc la fin du cluster Databricks au moyen de l’API dédiée, dans une activité Web.

La syntaxe utilisée est la suivante :

POST https://adb-XXXXXXXXXXXXXXXX.X.azuredatabricks.net/api/2.0/clusters/delete

Les informations nécessaires seront : l’URL du workspace, un personal token d’authentification et l’identifiant du cluster.

Dans un optique d’optimisation des coûts, nous positionnons comme dernière activité du pipeline le scale down de la base de données.

Nous obtenons ainsi une chaîne complète optimisée sur l’enchainement des tâches et sur les coûts engendrés par les services PaaS.

Azure Data Factory pilote les actualisations Power BI

L’actualisation planifiée dans le service Power BI se fait à heure (ou demi-heure) fixe et ne peut pas être conditionnée par un événement antérieur. Le reporting est pourtant bien souvent la dernière étape de toute une chaîne de transformation de la donnée. Nous cherchons donc une solution de trigger (déclencheur) pouvant rejoindre le flux complet sous Azure.

La Power Platform propose des pistes à l’aide de Power Automate (Flow) mais nous souhaitons ici rester dans le monde Azure. Un outil comme Logic Apps dispose de telles fonctionnalités. Mais dans une approche ETL ou ELT, c’est Azure Data Factory (ADF) qui est bien souvent au centre du pilotage des traitements. Nous allons donc exploiter l’API REST de Power BI au travers d’activités Web au sein d’un pipeline ADF.

Nous visons un fonctionnement de la sorte (exemple intégrant un notebook Azure Databricks réalisant le pré-traitement de la donnée) :

Sur le schéma ci-dessous, des activités peuvent précéder la série d’activités qui réaliseront les instructions d’API afin d’actualiser un dataflow Power BI.

De nombreux articles de blog ont déjà décrit ce fonctionnement et je vous conseille en particulier cet article de Dave Ruijter et le repository associé qui vous permettra de charger un template complet réalisant l’actualisation d’un dataset.

Les différentes étapes de ce pipeline sont :

– l’obtention d’un token d’authentification grâce à une application définie dans l’annuaire Azure Active Directory (AAD). Le client secret de cette application est au préalable récupéré dans un coffre-fort Azure Key Vault (AKV).

– la commande d’actualisation du dataflow

– une boucle d’attente “Until” demandant à intervalles réguliers (activité Wait) le statut de la dernière actualisation. Ce statut est stocké dans une variable.

– dans l’activité “If condition”, la branche True lancera l’actualisation d’un dataset Power BI basé sur le dataflow

Afin de lever une erreur dans la branche False, nous utilisons l’astuce de définir une variable réalisant une division par 0. Cette division interdite fera échouer l’activité et donc le pipeline complet.

En reprenant une logique similaire, nous ajoutons une boucle d’attente de l’actualisation du dataset et une nouvelle activité “If Condition” permettant de lever une erreur en cas d’échec de l’actualisation.

Il n’est pas possible de placer ces deux dernières briques dans la branche True du “If Condition”, c’est ici une limite d’ADF mais elle ne se révèle pas bloquante.

L’API non documenté des dataflows Power BI

Nous allons préciser ici des éléments ne figurant pas dans la documentation officielle de l’API REST de Power BI.

En préambule, nous devons avoir déclarer une application (app registration) disposant des droits Dataflow.ReadWrite.All. Ce point peut être contrôler dans l’annuaire AAD et l’API lèvera une erreur 401 si les droits sont uniquement au niveau Dataflow.Read.All.

L’actualisation d’un dataflow se fait par la commande POST suivante :

POST https://api.powerbi.com/v1.0/myorg/groups/{groupId}/dataflows/{dataflowId}/refreshes

Les identifiants demandés (la notion de group correspondant historiquement à celle d’espace de travail ou workspace) se retrouvent facilement dans les URLs du service Power BI.

Lors de l’actualisation, le statut de l’opération passera à “InProgress” (contre “Unknown” pour un dataset) puis “Success” (contre “Completed”).

L’instruction permettant d’obtenir le dernier statut d’actualisation est une commande GET :

GET https://api.powerbi.com/v1.0/myorg/groups/{groupId}/dataflows/{dataflowId}/transactions?$top=1

Notez le mot-clé “transactions” à l’inverse de “top” pour un dataset en paramètre de cette instruction.

Un grand merci à mon confrère Joël CREST pour son aide sur ce sujet. Je vous encourage à consulter son blog et sa chaîne YouTube.

Utiliser (encore) des notebooks sous Azure

Au 15 janvier 2021, le service Azure Notebooks disparaît complètement. Nous cherchons donc une alternative en version SaaS, toujours sur Azure et celle-ci se trouve dans le portail Azure Machine Learning.

Un article d’introduction à ce service est disponible dans les archives de ce blog.

Quelques nouveautés sont disponibles et annoncées à l’ouverture d’un premier notebook. Nous reviendrons sur ces nouveautés dans la suite de cet article.

Nous pouvons alors nommer un nouveau fichier, à l’extension classique .ipynb.

Nous retrouvons ce qui caractérise les notebooks : des cellules de code, suivies par les sorties des différentes instructions, ou bien des cellules de type formaté en Markdown.

Il n’est pourtant pas possible d’exécuter du code immédiatement, il faut au préalable relier ce notebook à une instance de calcul, déjà définie dans le portail (menu “Compute”).

Dans le menu dédié aux ressources de calcul, nous retrouvons des raccourcis vers l’utilisation d’une interface Jupyter, JupyterLab ou encore RStudio.

La nécessité d’une instance de calcul rend ce service payant, et facturé au temps d’utilisation de la machine virtuelle définie. Il sera très important de ne pas oublier d’éteindre la machine après utilisation, il n’existe à ce jour aucun système natif d’extinction planifiée !

Les utilisateurs férus de Visual Studio Code (VSC) peuvent travailler à distance avec l’environnement de calcul d’Azure Machine Learning. En installant conjointement les extensions Python et Jupyter, il était déjà possible de travailler avec un notebook (fichier reconnu par son extension .ipynb) dans l’IDE de Microsoft. En ajoutant l’extension Azure Machine Learning, nous pouvons nous connecter à la ressource Azure.

Les “objets” de l’espace Azure ML sont alors visibles depuis le menu Azure de VSC.

Il est possible de réaliser des actions à distance sur les instances de calcul.

Par défaut, un notebook dans VSC utilise le serveur Jupyter local. Mais il est possible de désigner un serveur à distance (“remote“) en cliquant sur le bouton en haut à droite indiquant “Jupyter Server : local”.

Nous obtenons la boîte de dialogues ci-dessous.

La succession de choix proposés permettra d’associer le notebook à une instance de calcul Azure ML.

Une composante indispensable de l’approche par le code est le gestion du versionnig, par un service compatible avec Git (GitHub, GitLab, Bitbucket, Azure DevOps, etc.), comme décrit sur ce lien.

Nous allons bénéficier ici de l’affichage en terminal présentés parmi les nouvelles (janvier 2021) fonctionnalités disponibles.

Nous devons tout d’abord créer une clé SSH au moyen de la commande ci-dessous :

ssh-keygen -t rsa -b 4096 -C "your_email@example.com"

La clé SSH doit être enregistrée dans le chemin /home/azureuser/.ssh qui est spécifique à l’instance de calcul (valider par Entrée le prompt proposé après la commande précédente). Il est alors possible de saisir une passphrase, facultative mais fortement conseillée. Un fichier id_rsa.pub est ainsi créé et son contenu peut être copié (ctrl + inser).

Nous utiliserons dans cet article Azure DevOps, où nous allons pouvoir saisir cette clé publique.

De retour dans le terminal, nous pouvons lancer la commande de test :

ssh -T git@ssh.dev.azure.com

Le résultat attendu est le suivant :

Depuis le repository Azure DevOps, nous obtenons la syntaxe SSH qui viendra compléter une instruction git remote, lancée depuis le terminal.

Si cela n’a pas fait, dans le terminal et depuis le chemin voulu, saisir la commande git init pour initialiser le répertoire. Définir également l’utilisateur qui réalisera les instructions de commit :

git config --global user.email "you@example.com"
git config --global user.name "Your Name"

La commande git remote add origin peut alors être exécutée avec succès.

Il pourra être nécessaire de passer la commande git add . pour ajouter l’intégralité du contenu dans ce qui doit être géré par git, puis un git commit.

De même, le repository Azure doit avoir été initialisé, par exemple avec un fichier readme.md et disposer ainsi d’une branche (main ou master).

git push -u origin --all

Enfin, un rappel des commandes Git indispensables est donné ici : Git · GitHub

Une autre nouvelle intéressante consiste à “nettoyer” le notebook avant de le publier. C’est en effet un reproche souvent fait aux notebooks : ceux-ci mélangent code, sorties et commentaires et ne sont pas optimaux pour les tests unitaires et le déploiement dans un environnement de production.

La fonctionnalité “Gather” (documentation officielle) sur une cellule permet de créer un autre notebook qui conservera uniquement les cellules de code dépendant de la cellule sélectionnée.

Déploiement continu avec Azure Synapse Analytics – partie 1

S’il était bien une fonctionnalité attendue pour la nouvelle (2020) version d’Azure Synapse Analytics, c’était la gestion du versionning et du déploiement des scripts créés dans l’outil.

Depuis la disponibilité générale (fin novembre 2020), nous trouvons un menu Source control dans la fenêtre Manage.

Nous pouvons définir la Git configuration, c’est-à-dire l’outil qui servira à l’enregistrement des développements et à leur versionning. Nous pouvons choisir entre Azure DevOps ou GitHub.

Nous choisissons ici Azure DevOps mais la partie GitHub est documentée sur ce lien officiel.

Nous devons disposer au préalable d’un projet DevOps et d’un repository dont l’URL devra être renseignée dans Synapse.

The Dev Ops repository link must start with http or https and be in DevOps format. For example: https://account.visualstudio.com/project/_git/repository

Dès lors, le dépôt se remplie avec les premiers éléments faisant partie de la ressource Synapse Analytics.

Dans le studio Synapse, nous disposons de plusieurs actions :

La validation permet de contrôler le développement réalisé et des erreurs seront remontées le cas échéant.

Le bouton Commit permet de “sauvegarder” le développement réalisé sur la branche active. Il est recommandé de ne pas développer directement sur la branche master.

Le bouton Publish de l’interface Synapse permet de créer ou mettre à jour la branche workspace_publish.

Il est donc logique de les utiliser successivement, dans cet ordre. L’action Publish réalisera une sauvegarde si nécessaire. A noter que les sorties (outputs) des cellules des notebooks sont effacées au moment de la publication. C’est une bonne pratique pour le versionning mais peut déstabiliser l’utilisateur souhaitant conserver l’affichage de ses résultats sans relancer le notebook complet.

Au fur et à mesure de l’utilisation de Synapse Analytics, de nouveaux répertoires vont se créer sur la branche collaborative.

Manquent ici triggers, credentials et Spark job definiition

Dans la branche workspace_publish, nous retrouvons deux fichiers JSON dans une logique complètement similaire à celle d’Azure Data Factory, permettant un déploiement de type “template ARM“.

Le fichier TemplateParametersForWorkspace.json contient en particulier le nom des informations de connexion (nom du workspace, URL, etc.) et les mots de passe, secrets ou token, sont effacés. Il faudra écraser ou renseigner ces valeurs lors du déploiement dans un autre environnement.

Dans une seconde partie, nous développerons les approches possibles pour le déploiement continue à l’aide d’Azure Pipelines.

Les activités Data Factory dans le linéage Azure Purview

Avec la sortie fin 2020 d’Azure Purview, beaucoup de démonstrations ont mis en avant la capacité à scanner un tenant Power BI et à présenter le linéage entre les rapports et les données sources.

Ce principe, fort utile, est aussi applicable aux activités Azure Data Factory. Considérons une souscription Azure au sein de laquelle a été créé un service Azure Purview (je vous conseille cette vidéo de Joël CREST pour démarrer votre projet Purview).

Nous nous rendons dans le Management Center sur le portail Purview, au niveau des external connections.

Une permission doit être au préalable accordée, l’utilisateur doit disposer d’un des rôles suivants sur le service Purview : contributor, owner, reader ou User Access Administrator (UAA). Ceci se fait au niveau de l’Access Control (IAM). Le droit doit être “direct” et non hérité d’un niveau supérieur.

Un bouton +New apparaît alors en haut de la fenêtre et permet d’ajouter de désigner la souscription et la ressource Data Factory visée.

Bien vérifier alors que le statut “connected” apparaît au bout de la ligne.

Revenons ensuite dans la partie Sources d’Azure Purview.

Ici, il faut déclarer une première collection dans laquelle nous définirons les sources des données, qui sont les linked services d’Azure Data Factory.

Pour l’exemple mis ici en œuvre, nous déclarons un compte de stockage Azure Blob et une base de données managée Azure SQL DB. Pour cette dernière, il faut au préalable déclarer l’identité managée (MSI) de Purview en tant que user de la base de données. Cette opération doit être réalisée par un administrateur de la base, présent dans Azure Active Directory. La syntaxe SQL est la suivante :

CREATE USER [nomDuServicePurview] FROM EXTERNAL PROVIDER
GO
EXEC sp_addrolemember 'db_owner', [nomDuServicePurview]
GO

Nous lançons ensuite un scan de ces deux sources.

Voici l’activité Data Factory que nous souhaitons retrouver :

Elle réalise la copie d’un fichier CSV dans une table de la base de données, préalablement vidée par une procédure stockée.

Attention, les activités et dont le linéage associés ne seront visibles dans Purview qu’après une exécution, même en debug, de l’activité !

Il suffit de rechercher le nom de l’activité dans la barre de recherche pour vérifier que celle-ci est maintenant bien disponible.

Le linéage est porté par la destination, ici la table de la base de données.

Un lien est disponible pour ouvrir directement cette activité dans le service Azure Data Factory.

Au fil des exécutions des activités, le champ de la recherche dans Purview va ainsi s’étendre pour couvrir l’ensemble des traitements.

Une bonne pratique semble d’ores et déjà s’imposer : seuls les Data Factory de production sont à déclarer dans Azure Purview. La limite est à ce jour de 10 services ADF.