Utiliser Databricks CLI

Jusqu’à présent, nous avons l’habitude d’utiliser un espace de travail Azure Databricks en nous connectant au portail, l’authentification étant réalisée par un couple login / password déclaré dans l’annuaire Azure Active Directory.

Dans un but d’automatisation des tâches, il est intéressant de se pencher sur les commandes en lignes ou CLI. Celles-ci sont documentées sur le site de Databricks. Les commandes peuvent être vues comme une surcouche de l’API REST de Databricks.

Installation du CLI Databricks

Un environnement Python est nécessaire. Nous pouvons ensuite lancer le téléchargement du package dédié avec la commande ci-dessous, depuis un terminal.

pip install databricks-cli

Nous vérifions dans la foulée que l’affichage de l’aide d’une commande, sur laquelle nous reviendrons plus tard, est fonctionnel.

databricks fs -h

La version installée peut être retrouvée par la commande :

databricks --version

C’est un produit qui évolue rapidement et il conviendra de le mettre à jour fréquemment, afin de bénéficier d’un maximum de fonctionnalités.

Authentification

Nous allons utiliser un Personal Access Token pour nous authentifier auprès du service. Ce jeton de sécurité est obtenu sur le portail Databricks, dans le menu User Settings.

Attention à bien noter (dans un coffre-fort électronique !) le jeton obtenu, il ne sera plus possible d’afficher sa valeur par la suite.

Nous démarrons la configuration par la commande :

databricks configure --token

Deux valeurs seront attendues : l’URL de l’espace de travail, puis le token précédemment généré. L’URL pour une ressource Azure est dorénavant de la forme https://adb-XXXXXXXXXXXXXXXX.XX.azuredatabricks.net/.

Un fichier local s’écrit alors et contient les informations renseignées.

Vérifions maintenant que l’authentification est effective en lançant une commande interagissant avec l’espace de travail :

databricks workspace ls

Si nous obtenons un message d’erreur Error: b’Bad Request’, la cause peut être un mauvais enregistrement du token dans le fichier de configuration. En ouvrant celui-ci dans un éditeur de texte, nous visualisons le résultat suivant :

Remplacer les caractères SYN par la valeur du token permettra de résoudre ce problème mais demande de stocker ce secret de manière non sécurisée.

Nous pouvons maintenant lister le contenu de l’espace de travail.

Copier un fichier depuis le FileStore

Le FileStore est une zone de stockage spécifique (un dossier) du DataBricks File System (DBFS). Celui-ci nous permet de réaliser des échanges avec l’extérieur : copie de fichiers vers ou depuis le DBFS. Une documentation complète est disponible ici.

A l’aide des commandes du CLI, nous identifions un fichier que nous pouvons recopier localement.

Comme le CLI est une surcouche de l’API REST de Databricks, il est intéressant de retrouver la commande initiale émise vers l’espace de travail. Nous pouvons l’obtenir en ajoutant –debug après la commande de copie.

Le premier appel vérifie le statut du fichier. Il est possible d’exécuter la requête dans Postman (coller un token dans la partie Authorization).

Nous trouvons ici la taille du fichier. Il faut savoir que le contenu du fichier est converti en base 64 et nous allons ensuite mieux comprendre pourquoi.

Un second appel à l’API se fait avec la méthode read et deux options &offset=0&length=1048576. Si le fichier dépasse 1 méga-octet, celui-ci est découpé en morceaux (chunks) et plusieurs appels seront nécessaires. La commande reconstitue toutefois le fichier et le décode automatiquement. Tout cela est donc transparent pour l’utilisateur !

Premiers pas avec l’API Text Analytics sous Postman

Les services cognitifs Azure permettent de bénéficier d’algorithmes déjà entrainés pour répondre à des analyses de type « cognitives », c’est-à-dire simulant le fonctionnement du cerveau humain (raisonnement) et des sens comme la vue ou l’ouïe.

Plusieurs domaines sont couverts par ces services :

  • La décision
  • Le langage
  • La parole (« speech »)
  • La vision

Dans les domaines du texte et du langage, nous bénéficions, outre la traduction, de 4 principales fonctionnalités ;

  • La détection de la langue
  • La reconnaissance d’entités nommées
  • L’extraction de phrases clés
  • L’analyse de sentiments

Mais avant de nous lancer, il nous faut créer une ressource Azure qui fournira une clé d’authentification auprès de l’API. Cette ressource est aujourd’hui (janvier 2021) commune à la plupart des APIs des services cognitifs Azure, ce qui simplifie son usage. On veillera toutefois, dans un environnement de production, à segmenter les ressources selon les différents usages. QnA Maker, Speech Services, Translator et Custom Vision ne sont actuellement disponibles que sous forme d’API individuelles.

Nous pouvons bénéficier de la tarification Standard S0, décrite sur ce lien.

Cette tarification se base sur des tranches de 1000 enregistrements de texte, avec un tarif dégressif selon des paliers d’enregistrements.

A la page de la ressource, nous trouvons deux informations qui seront ensuite indispensables : point de terminaison ou endpoint (celui-ci dépend de la région Azure préalablement sélectionnée) et deux clés d’authentification. Disposer de deux clés permet d’assurer une rotation lors du renouvellement des clés.

De nombreux langages de programmation permettent d’appeler les APIs des services cognitifs (C#, Python, node.js, Go, Ruby…) mais pour simplifier le propos et aller directement à l’essentiel, nous utiliserons ici le client Postman, téléchargeable sur ce lien, qui permettra d’interroger le point de terminaison et de bien détailler le rôle de chaque élément.

La page de documentation la plus utile sera certainement celle-ci. Nous y trouvons les différents endpoints régionaux, et dans le menu de gauche, les quatre actions possibles à partir du service cognitif Text Analytics.

Interrogation sous Postman

Le premier élément nécessaire sera l’URL de l’API, de la forme :

https://{endpoint}/text/analytics/vX.X/languages[?showStats]

Le endpoint, visible dans le portail Azure, s’écrit :

<nom-de-la -ressource>.cognitiveservices.azure.com

Nous adaptons cette URL à notre région, en retirant le paramètres ?showStats et en positionnant la boîte de dialogue de Postman sur POST. La version de l’API doit être précisée. A ce jour, nous utilisons la version 2.1 ou 3.0 voire la préversion 3.1 qui apporte de nouvelles fonctionnalités. Celles-ci sont détaillées dans cette documentation officielle.

Deux informations d’entêtes sont requises, sous forme de couple clé-valeur et nous les renseignons dans le menu Headers :

  • Le type de contenu
  • La clé d’authentification

En basculant sur le contenu brut (bouton « bulk edit » de l’interface), nous obtenons le script suivant :

Content-Type:application/json
Ocp-Apim-Subscription-Key:XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

Pour renseigner le corps de la requête (« Body »), il est également possible de basculer sur l’affichage « raw », pour un contenu de type JSON.

Le schéma attendu nous est à nouveau donné par la page de documentation de l’API.

Pour la détection du langage, chaque document est attendu, au sein d’une liste marquée par des crochets, sous la forme de trois éléments :

  • countryHint (facultatif)
  • id
  • text

Pour une seule phrase, le contenu minimal de ce corps est le suivant :

{documents:[{"id":0, "text": "dans quelle langue cette phrase est-elle écrite ?"}]}

La réponse obtenue, de statut 200 OK, prend alors la forme ci-dessous :

Le score donne un indicateur de confiance, entre 0 et 1, de la prévision réalisée. En pratique, un doute sera émis dès que le score sera inférieur à 1.

L’information countryHint peut être renseignée par un code sur deux lettres représentant le pays d’où est issu le texte, ce qui permet de retirer d’éventuelles ambiguités lorsqu’un même mot est utilisé dans plusieurs langues. Le champ peut être soit omis, soit renseigné comme une chaîne vide countryHint = “”.

Lorsqu’il n’est pas possible d’identifier un langage, la réponse otenue sera le mot « (Unknown) ».

Le schéma ci-dessous résume les principales syntaxes d’interrogation de l’API Text Analytics.

L’API Translator

Dans la foulée de la reconnaissance de la langue, il est logique de rechercher à traduire le texte. Nous appelons ici une autre API : Translator, réalisant ci-dessous une traduction en français.

https://api.cognitive.microsofttranslator.com/translate?api-version=3.0&to=fr

Le body de la requête s’écrit de la sorte :

[{"Text":"Azure Cognitive Services are cloud-based services with REST APIs and client library SDKs available
to help you build cognitive intelligence into your applications.
You can add cognitive features to your applications without having artificial intelligence (AI) or data science skills."}]

Nous obtenons la sortie suivante.

Si besoin, la langue d’origine peut être précisée et ce n’est pas le mécanisme d’auto-détection qui est mis en œuvre.

https://api.cognitive.microsofttranslator.com/translate?api-version=3.0&from=en&to=fr

La documentation détaillée de l’API est disponible sur ce GitHub et un extrait est illustré ci-dessous.

Un niveau de tarification supérieur à S0 est nécessaire, sinon le message suivant apparaît :

The Translate Operation under Translator Text API v3.0 is not supported with the current subscription key
and pricing tier CognitiveServices.S0.

Les méthodes l’API Text Analytics

Voyons maintenant les trois autres méthodes disponibles avec l’API Text Analytics.

Celles-ci ne seront pas toujours disponibles en français, ou dans d’autres langues que l’anglais. La vérification peut être faite, en fonction de la version de l’API, sur cette page.

L’analyse de sentiments évalue un texte dans sa totalité en se basant sur le poids des mots utilisés, pour attribuer trois scores : positif, neutre, négatif. Un sentiment global est donné sous forme de texte : positive, mixed, negative ou neutral si aucun des trois cas précédents n’est vérifié.

https:// <nom-de-la-ressouce>.cognitiveservices.azure.com/text/analytics/v3.1-preview.3/sentiment

Pour obtenir l’exploration des opinions, uniquement en anglais à ce jour, nous ajoutons à la fin de l’URL : ?opinionMining=true

{documents:[{"language": "en", "id":0, "text": "Very welcome and advice from the guests. Very clean accommodation. We recommend it!"}]}

Nous obtenons alors, lorsqu’une relation de type opinion est détectée, un groupe d’informations supplémentaires intitulé « opinions », disposant de deux scores, positif et négatif.

L’extraction de phrases clés permet d’identifier des éléments (mots ou groupes de mots) les plus importants au sein d’un texte (phrase ou ensemble de phrases). On pourrait voir cela comme une extension d’un preprocessing visant à retirer les mots outils (« stop words »).

https://< nom-de-la-ressouce>.cognitiveservices.azure.com/text/analytics/v3.0/keyPhrases

Nous obtenons le résultat suivant avec la première phrase du roman de Marcel Proust, A la Recherche du Temps Perdu.

La reconnaissance d’entités nommées se fait avec la syntaxe :

recognition/general

Nous retrouvons certains éléments ressortant dans les phrases clés, pour lesquels une catégorie et sous-catégorie sont données, associées à un score de confiance. A ce jour (février 2021), c’est la version 2.1 de l’API qui est utilisée en langue française, même si la version 3.0 est indiquée dans l’URL.

La liaison d’entités (non disponible pour l’instant en français) ajoute une information supplémentaire à la reconnaissance générale d’entité en spécifiant l’URL d’une page Wikipedia dédiée à cette entité.

https://<nom-de-la -ressource>.cognitiveservices.azure.com/text/analytics/v3.0/entities/linking

Les Informations d’identification personnelle (PII) sont des patterns correspondant à différents éléments : numéro de téléphone, adresse e-mail, adresse postale, numéro de passeport.

https://<nom-de-la -ressource>.cognitiveservices.azure.com/text/analytics/ v3.1-preview.3/entities/recognition/pii

Nous remarquons que la sortie donne un élément redactedText qui remplace les éléments reconnus comme informations personnelles par des étoiles (*).

Un domaine (optionnel) peut être précisé pour obtenir les informations (personnelles) médicales mais il n’existe à ce jour pas de documentation explicitant les éléments pouvant être identifiés.

https://<nom-de-la -ressource>.cognitiveservices.azure.com/text/analytics/v3.1-preview.3/entities/recognition/pii?domain=phi

Text Analytics avec le concepteur (Designer) dans Azure Machine Learning

Les citizen data scientists disposent d’un outil graphique dans le portail Azure Machine Learning nommé Concepteur (ou Designer en anglais) permettant de réaliser des pipelines de Machine Learning, pour l’apprentissage comme pour l’inférence (c’est-à-dire le fonctionnement prédictif).

Depuis la première version de l’outil, anciennement nommé Azure Machine Learning Studio, et dit maintenant “classique”, des modules pour l’analyse textuelle étaient déjà présents mais l’offre s’est aujourd’hui, en ce début 2021, agrandie.

L’objectif de cet article est de faire un survol des méthodes disponibles, dans cette approche “low code“, pour un jeu de données contenant un champ de “texte libre”, et dans le but de réaliser une méthode d’apprentissage supervisé, et plus précisément de classification.

En prérequis, rappelons les notions indispensables avant de s’attaquer à notre problématique :

– disposer d’une ressource de calcul dite “compute cluster” qui servira à exécuter les différentes itérations du pipeline (succession de modules)

– créer une nouvelle expérience, qui permettra de suivre le résultat des exécutions ou runs (outputs, logs, artefacts, métriques…)

– connaître la structure classique d’un pipeline d’entrainement supervisé (s’inspirer des exemples disponibles, en particulier “Text Classification – Wikipedia SP 500 Dataset” qui est décrit sur ce site GitHub.

Vue d’ensemble des fonctionnalités de Microsoft Machine Learning Studio (classique)
Image reprise de la documentation officielle de Microsoft et téléchargeable ici : https://download.microsoft.com/download/C/4/6/C4606116-522F-428A-BE04-B6D3213E9E52/ml_studio_overview_v1.1.pdf

Nous allons nous concentrer ici sur deux phases qui auront leurs spécificités pour le traitement de données textuelles :

– le preprocessing

– le choix du modèle d’apprentissage pour une classification

Preprocessing des données textuelles

Nous allons utiliser ici le module Preprocess text qui réalisera de nombreuses actions de nettoyage et de préparation de nos données textuelles, en une seule étape.

Ce module est très puissant et basé sur la librairie de référence SpaCy mais ne supporte pour l’instant malheureusement que la langue anglaise. Nous pourrons contourner cela par un script R ou Python pour le cas de corpus en français. Mais à l’exception de la lemmatisation, les opérations pourront toutefois s’appliquer à un corpus d’une autre langue que l’anglais.

La case à cocher “Expand verb contractions” est toutefois spécifique à des formulations anglaises comme don’t, isn’t, I’ve, you’ll, etc.

Détaillons maintenant les autres options, avec quelques exemples simples si nécessaire.

– Retrait des mots outils (stop words) : a, about, all, any, etc.

Cette opération est paramétrable au moyen de la seconde entrée du module. Nous pouvons donc utiliser un fichier, d’une colonne, contenant les mots en français. Ce site propose une liste de mots outils dans de nombreuses langues.

Lemmatisation (remplacement du mot par sa forme canonique qui est “l’entrée du dictionnaire”, c’est-à-dire, l’infinitif ou le masculin singulier lorsqu’il existe) : la phrase “The babies are walking on their feet” donnera “The baby be walk on their foot” puis sans mots outils “baby walk foot“.

– Détection des phrases (séparées alors par |||, à choisir si l’on souhaite un traitement spécifique par phrase plutôt que par ligne du jeu de données)

– Normalisation en minuscules (sinon les mots sont considérés comme différents)

Suppression des nombres, caractères spéciaux (non alphanumériques), caractères dupliqués plus de deux fois, adresses email de la forme <string>@<string>, URLs reconnues par les préfixes httphttpsftpwww

– Remplacement des backslashes \ en slashes / : l’usage ne sera pas fréquent, on passera plutôt par la suppression de ces caractères spéciaux.

Séparation des tokens (ici, les mots) sur la base de caractères spéciaux comme & (pour l’esperluette, on préfèrera à nouveau la citer dans les caractères spéciaux à supprimer) ou - : un “pense-bête” deviendra “penser bête” suite à séparation et lemmatisation en français.

En pratique, il est important de maîtriser l’ordre dans lequel ces opérations s’appliquent. Ainsi, on réalisera plutôt les opérations de nettoyage (suppressions, minuscules, mots outils…) avant les opérations de lemmatisation.

Importer la librairie NLTK et les mots outils en français

Nous disposons d’un module Python permettant d’exécuter un script, sur un ou deux dataframes en entrée de ce module. Le code suivant nous permettra d’importer la librairie NLTK et d’utiliser la liste des mots outils ou bien la lemmatisation en français.

import pandas as pd

def azureml_main(dataframe1 = None, dataframe2 = None):
    import importlib.util
    package_name = 'nltk'
    spec = importlib.util.find_spec(package_name)
    logging.debug(spec)
    if spec is None:
        import os
        os.system(f"pip install nltk")

    import nltk
    from nltk.corpus import stopwords
    stopwords = set(stopwords.words('french'))

    def preprocess(text):
        clean_data = []
        for x in (text[:]):
            print(x)
            new_text = re.sub('<.*?>', '', x)   # remove HTML tags
            new_text = re.sub(r'[^\w\s]', '', new_text) # remove punc.
            new_text = re.sub(r'\d+','', new_text)# remove numbers
            new_text = new_text.lower() # lower case, .upper() for upper          
            if new_text != '':
                clean_data.append(new_text)
        return clean_data    

    dataframe1['preprocess'] = preprocess(dataframe1['Text'])

    dataframe1['preprocess_without_stop'] = dataframe1['preprocess'].apply(lambda x: [word for word in x if word not in stopwords])

    #On peut observer la liste des mots outils de NLTK dans la seconde sortie
    dataframe2 = pd.DataFrame(list(stopwords))

    return dataframe1, dataframe2,

Apprentissage supervisé sur données textuelles

Dans le groupe Text Analytics, nous disposons des modules ci-dessous.

Vowpall Wabbit

Nous écartons d’emblée l’approche Vowpal Wabbit, framework de Machine / Online Learning initialement développé par la société Yahoo !. Celui-ci nécessite en effet d’obtenir en entrée des données préprocessées mais surtout présentées dans un format spécifique. Vous pouvez toutefois vous référer à ce blog si vous souhaitez préparer vos données en ce sens. Il faudra ensuite utiliser des lignes de commandes au sein du module Train Vowpal Wabbit Model.

Toutefois, nous allons pouvoir remplacer ce framework par une approche relativement similaire réalisée par le module Feature Hashing.

Feature Hashing

Cette méthode consiste à créer une table de hash, c’est-à-dire de valeurs numériques, construites à partir du dictionnaire des mots.

Nous prendrons comme entrée le texte préprocessé. Nous avons alors deux paramètres à renseigner dans la boîte de dialogue du module.

Une taille de 10 bits correspondra à 2^10, soit 1024, nouvelles colonnes dans le jeu de données en sortie. Ces colonnes contiendront le poids de la feature dans le texte. Une valeur de 10 se montre généralement suffisante et attention à l’explosion de la taille du jeu de données (2^20 correspondrait à 1048576 colonnes !).

Un N-gram est une suite de n mots consécutifs, considérée comme une seule unité. Dans la phrase “le ciel est bleu”, nous avons quatre unigrams, trois bigrams (“le ciel”, “ciel est”, “est bleu”) et deux trigrams (“le ciel est”, “ciel est bleu”). Le paramètre attendu est la valeur maximale, ainsi si nous choisissons la valeur 3, nous prenons en compte les unigrams, bigrams et trigrams.

Nous obtenons le résultat suivant.

Seules les colonnes numériques seront ensuite soumises à l’entrainement du modèle.

Extract N-gram feature

Nous retrouvons ici la notion de N-gram (suite de n mots consécutifs, considérée comme une seule unité). Les paramètres disponibles nous permettent de spécifier :

– la taille maximale des N-grams considérés dans le dictionnaire. Ce paramètre conditionnera la taille du jeu de données, qui peut devenir très grand.

– les longueurs minimale et maximale des mots, en nombre de lettres

– la fréquence minimale de présence d’un N-gram dans le document (ensemble des lignes du jeu de données)

– le pourcentage maximum de présence d’un N-gram par ligne sur l’ensemble des lignes. La valeur 1 (100%) correspond à retirer un N-gram présent dans chaque ligne, et donc considéré comme un bruit qu’il faut supprimer.

– la métrique de pondération calculée par ligne, pour chaque feature créée.

La métrique TF-IDF est un standard de la discipline est correspond au ratio de la fréquence d’un N-gram dans la ligne (TF) par le log d’un autre ratio dit”inversé” : le nombre de lignes du jeu de données sur la fréquence du terme dans le jeu de données. Les valeurs seront normalisées selon une norme L2 si la case correspondante est cochée dans la boîte de dialogue.

En créant le vocabulaire, nous obtenons un jeu de données en sortie tel que représenté ci-dessous. Celui-ci contient la liste des N-grams, ainsi que les métriques DF et IDF associées.

Un pronom est ici identifié dans le bigram.

C’est ce vocabulaire établi sur le jeu d’entrainement qui devra saisir à la phase de validation sur les nouvelles données. Il ne faudrait pas créer un nouveau vocabulaire sur la seconde partie des données ! Mais les métriques doivent être calculées. Nous utilisons donc la combinaison ci-dessous des modules.

Le paramètre “Vocabulary mode” est alors positionné sur la valeur ReadOnly.

Convert word to vector

Nous retrouvons ici une méthode dite de plongement lexical (word embedding) qui a connu un fort engouement ces dernières années, et plus communément appelée word2vec. Il s’agit d’utiliser des modèles pré-entrainés sur des milliards de documents, issus par exemple de Wikipedia ou de Google News.

A ce jour (janvier 2021), il ne semble pas y avoir de module permettant de tirer parti du vocabulaire obtenu en sortie. Nous privilégierons donc une approche par script Python, au moyen de librairies comme Gensim (voir le site officiel).

Latent Dirichlet Allocation

Nous sortons ici du cadre supervisé pour obtenir une méthode non supervisée et donc dédiée à rassembler des textes similaires, dans un nombre prédéfini (“Number of topics to model”) de catégories, sans que celles-ci ne soient explicitement définies.

Comme nous disposons déjà des catégories, cette méthode ne se prête pas à l’objectif mais pourrait être intéressante en amont, dans un but exploratoire (nos catégories a priori ne sont peut-être pas si pertinentes…).

Un mode d’options avancées donnera la main sur tous les hyperparamètres du module.

Chaque ligne est évaluée sur les nombre prédéfini de sujets (topics).

A partir de la sortie, il sera nécessaire de réaliser une lecture et une interprétation “humaine” sur les sur les groupes afin de les nommer. Dans la feature matrix topic, nous pouvons visualiser les N-grams les plus contributeurs de chaque sujet.

Publier le pipeline d’entrainement

Publier un pipeline permet de bénéficier de l’ordonnancement de celui-ci, par exemple avec un outil comme Azure Data Factory. Nous obtiendrons ainsi un identifiant (ID) qui devra être renseignée dans l’activité.

Les différents paramètres sélectionnés dans les modules peuvent être ajoutés comme paramètres du pipeline et ainsi renseignés lors de nouveaux lancements.

Déploiement du pipeline d’inférence

Une fois le pipeline d’entrainement soumis et correctement exécuté (tous les modules sont en vert), nous pouvons choisir entre un service Web prédictif, soit en real-time (prévision par valeur), soit en batch (prévision par lot).

Si nous avons comparé plusieurs modèles au sein du pipeline d’entrainement, il faut cliquer sur le module “Train model” correspond à celui que l’on veut déployer (en pratique, celui ayant les meilleures métriques d’évaluation).

Le pipeline graphique se simplifie alors et se voit enrichi d’une entrée et d’une sortie pour le service Web qui sera créé.

Attention, pour aller plus loin, il est nécessaire de supprimer le module d’évaluation ou sinon, vous rencontrerez le message d’erreur suivant.

Un nouveau run du pipeline doit être soumis avant de pouvoir cliquer sur le bouton de déploiement du service web.

Le déploiement se fait au choix, sur une ressource ACI (plutôt pour le test) ou AKS (plutôt pour la production).

En conclusion, même s’il paraît difficile d’aller jusqu’en production avec cet outil, il reste néanmoins très rapide pour mettre en œuvre différentes méthodes reconnues dans le Traitement Automatique du Langage (TAL) et pourra apporter des résultats pertinents dans des cas relativement simples de classification.

Poser du Parquet dans le cloud Azure ? Oui !

En quelques années de l’ère “big data”, le format de fichier orienté colonne Parquet s’est imposé comme l’un des standards du stockage sur les lacs de données (data lake) grâce à ses performances de compression, sa gestion des partitions et son intégration avec des frameworks distribués comme Spark. Il s’agit d’un projet porté par la fondation Apache qui héberge la documentation officielle.

Avant d’obtenir un fichier Parquet, le format initial est bien souvent un format texte classique structuré comme du CSV ou semi-structuré comme le JSON. Nous sommes également dans le scénario de réalisation d’un couche “clean” au sein d’un data lake, à partir de nombreux fichiers de la couche “raw“, nécessitant d’être agrégés et optimisés pour des outils d’analyse.

Dans cet article, nous allons dérouler un cas pratique allant de la constitution de ces fichiers Parquet jusqu’à leur utilisation dans un rapport Power BI.

Partons d’une année de fichiers zip issus de vélos en location de la ville de New York, soit un total de 828Mo, atteignant 3,45Go une fois décompressés (CSV).

Nous disposons ainsi en tout de 19 506 857 lignes pour l’année 2020.

Avec un peu de script Python, nous pouvons décompresser automatiquement ces fichiers.

import os
import zipfile

def unzip_files(path_to_zip_file) -> None:
    directory_to_extract_to = "datasets"
    with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
        zip_ref.extractall(directory_to_extract_to)

for file in os.listdir("zip"):
    print(f"Unzipping : {file}")
    unzip_files("zip/"+file)

Ensuite, depuis un environnement Spark comme Azure Databricks, nous pouvons lire tous les CSV, qui auront été déposés au préalable sur un compte de stockage Azure.

file_location = "/mnt/nycitibike/datasets/*"
file_type = "csv"
df = spark.read.format(file_type).option("inferSchema", "true").option("header","true").load(file_location)
display(df)

Notez qu’il faudra renommer les colonnes comportant un ou plusieurs espaces, interdits en entêtes dans le format Parquet, ce que nous faisons avec le script ci-dessous.

df = df \
.withColumnRenamed("start station id","start_station_id") \
.withColumnRenamed("start station name","start_station_name") \
.withColumnRenamed("start station latitude","start_station_latitude") \
.withColumnRenamed("start station longitude","start_station_longitude") \
.withColumnRenamed("end station id","end_station_id") \
.withColumnRenamed("end station name","end_station_name") \
.withColumnRenamed("end station latitude","end_station_latitude") \
.withColumnRenamed("end station longitude","end_station_longitude") \
.withColumnRenamed("birth year","birth_year")

L’enregistrement au format Parquet est alors possible, toujours à l’aide de l’API PySpark.

df.write.format("parquet").mode("overwrite").save("/mnt/nycitibike/dbx_parquet")

Les données sont automatiquement partitionnées (il est possible d’avoir la main sur ce mécanisme, comme nous le verrons plus tard) et quelques fichiers spécifiques à Databricks sont ajoutés :

– ficher “commited” qui liste les différentes parties

– fichier vide “started”

– fichier vide _SUCCESS

Ces fichiers qui tracent la transaction réalisées vont nous gêner pour une usage dans Power BI. Nous pourrions éviter leur génération en définissant les options suivantes dans notre script Spark comme l’indiquent ces échanges sur le forum Databricks.

Il est possible de prendre la main sur le nombre de fichiers obtenus, ainsi que sur la clé qui aidera au partitionnement. Nous ajoutons deux fonctions que sont repartition() et partitionBy() dans la syntaxe d’écriture du Parquet.

df.repartition(1).write.partitionBy("YearMonth").format("parquet").mode('overwrite').save("/mnt/nycitibike/dbx_parquet")

Dans Power BI Desktop, nous disposons d’un type de source Parquet, à partir du menu “Obtenir les données”.

Ce connecteur attend le chemin d’un fichier Parquet et non d’un répertoire contenant les partitions sous forme de fichiers distincts. Nous découvrons ainsi la fonction du langage M qui réalise la lecture du fichier : Parquet.Document().

= Parquet.Document(File.Contents("C:\Users\PaulPeton\Documents\Python Scripts\Parquet\my_to_parquet.parquet"))

Cette fonction est documentée depuis juin 2020 mais elle n’a pas fait l’objet d’une grande communication jusqu’à présent (janvier 2021) et la roadmap Power BI indique un (nouveau ?) connecteur Parquet pour mars 2021.

Pour lire le contenu d’un dossier, nous pouvons sans problème appliquer la technique de connexion à un dossier sur un compte de stockage Azure, puis lire chaque fichier à l’aide du bouton “expand” de la colonne “Content” contenant tous les binaries. Cette manipulation crée automatique une fonction d’import contenant le code suivant.

= (Paramètre1) => let
Source = Parquet.Document(Paramètre1)
in
Source

Attention, pour réaliser cette opération, il faut au préalable filtrer les fichiers ne correspondant pas à des partitions (commit, started, SUCCESS pour un dossier Parquet généré par Databricks), qui débutent tous par un caractère underscore.

= Table.SelectRows(Source, each not Text.StartsWith([Name], "_"))

Les données se chargent alors en mettant bout à bout toutes les partitions !

Il est maintenant tentant d’essayer une actualisation incrémentielle de notre dataset, sur une période d’un mois. Pour autant, celle-ci n’est pas garantie comme nous l’indique le message d’alerte dans la pop-up de paramétrage du rafraichissement.

Une traduction approximative annonce que la requête “ne peut être pliée”, il s’agit ici du concept de query folding : la requête ne pourrait appliquer un filtre (de dates) en amont du chargement complet des données. Ceci signifie tout simplement de le mécanisme incrémentiel ne se réaliserait pas.

Afin de tester ce fonctionnement, nous publions le rapport sur un espace de travail Premium du service Power BI. Il sera alors possible d’ouvrir une connexion à l’espace à partir du client lourd SQL Server Management Studio.

Au premier chargement, nous n’aurons qu’une seule partition.

Mais en actualisant le dataset depuis le service, les partitions apparaissent.

Nous observons bien qu’une nouvelle actualisation du jeu de données n’affecte pas toutes les partitions. C’est gagné !

En ajoutant des fichiers supplémentaires, nous observons aussi que le paramètre de stockage de lignes (ici, sur un an) fonctionne et cela permet donc de créer un dataset actualisé sur une période glissante.

Pour terminer, précisons que la requête réalisée ici peut tout à fait être exécutée au sein d’un dataflow Power BI et servir ainsi à la création de différents rapports.

[EDIT du 23/01/2021 : cette approche fonctionne aussi avec le format Delta Lake créé depuis Azure Databricks, et c’est tant mieux car ce format apporte beaucoup d’avantages, qui seront sûrement abordés dans un prochain article.]

Ajuster le niveau d’une Azure SQL DB avant et après un traitement Data Factory

Nous avons déjà évoqué sur ce blog le driver JDBC permettant d’écrire depuis un notebook Azure Databricks dans une base de données managée ou encore comment utiliser Polybase pour pérenniser une table du métastore Databricks dans Azure SQL DWH (aujourd’hui Synapse Analytics).

Cette action d’écriture demande souvent un niveau de ressource suffisamment élevé, tout comme l’import de données réalisé dans un dataset Power BI. Le reste du temps, une puissance plus faible peut être tout à fait acceptable. Et comme le niveau de facturation est lié au niveau de la ressource, pouvoir maîtriser par code, et non manuellement depuis l’interface, le scape up ou scale down de sa ressource peut s’avérer très intéressant dans une démarche d’optimisation des coûts (FinOps).

Nous nous plaçons ici dans un scénario de pipeline de transformation de données réalisé par Azure Databricks puis d’import dans un dataset Power BI. L’actualisation par API REST de ce dernier est détaillé dans cet article.

Au démarrage de notre pipeline, nous souhaitons augmenter la puissance (scale up) et ceci peut se faire avec des instructions PowerShell documentées ici.

Il est possible d’exécuter un script PowerShell depuis Azure Data Factory à l’aide de ce qu’on appelle une “custom activity“. Celle-ci s’appuie sur un service lié de type Azure Batch. Et ce service n’est pas gratuit, voire même relativement cher pour ce que l’on souhaite réaliser.

Nous optons donc pour la programmation d’une Azure Function, qui supporte le langage PowerShell ! Nous choisissons le type de fonction Trigger HTTP.

Dans Visual Studio Code, nous obtenons l’arborescence ci-dessous.

Le code principal de la fonction doit se trouver dans le fichier run.ps1.

Un premier point important consistera à renseigner les dépendances de librairies PowerShell dans le fichier requirements.psd1. Une combinaison fonctionnelle est la suivante :

# This file enables modules to be automatically managed by the Functions service
# See https://aka.ms/functionsmanageddependency for additional information.
#
@{
    'Az.Accounts' = '2.1.2'
    'Az.Sql' = '2.11.1'
 }

Second élément indispensable pour l’intégration dans une activité Web de Data Factory : le retour de la fonction doit obligatoirement être un objet JSON. Nous terminons donc la fonction par le code ci-dessous :

$body = @{name = 'Scale done !'} | ConvertTo-Json

#Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
     StatusCode = [HttpStatusCode]::OK
     Body = $body
})

Une fois la fonction publiée sur Azure, nous pouvons définir le pipeline complet.

La fonction Azure étant pensée de manière paramétrable, nous pouvons donner le niveau de ressource comme un élément de l’URL appelée par la méthode POST. Nous utilisons dans l’exemple ci-dessous le niveau “S” suivi d’un chiffre.

Ce niveau modifiera automatiquement le configuration de la ressource Azure SQL DB, en mode de tarification basé sur les DTUs (et non les vCores).

L’activité de traitement de la donnée, ici portée par Azure Databricks, réalise le démarrage d’un cluster de type interactif. Lorsque le traitement est terminé, le cluster peut s’éteindre automatiquement si la fonctionnalité “terminate after…” a été mise en place. Mais il faut ici attendre que l’import du dataset Power BI soit terminé ! Nous gérons donc la fin du cluster Databricks au moyen de l’API dédiée, dans une activité Web.

La syntaxe utilisée est la suivante :

POST https://adb-XXXXXXXXXXXXXXXX.X.azuredatabricks.net/api/2.0/clusters/delete

Les informations nécessaires seront : l’URL du workspace, un personal token d’authentification et l’identifiant du cluster.

Dans un optique d’optimisation des coûts, nous positionnons comme dernière activité du pipeline le scale down de la base de données.

Nous obtenons ainsi une chaîne complète optimisée sur l’enchainement des tâches et sur les coûts engendrés par les services PaaS.

Azure Data Factory pilote les actualisations Power BI

L’actualisation planifiée dans le service Power BI se fait à heure (ou demi-heure) fixe et ne peut pas être conditionnée par un événement antérieur. Le reporting est pourtant bien souvent la dernière étape de toute une chaîne de transformation de la donnée. Nous cherchons donc une solution de trigger (déclencheur) pouvant rejoindre le flux complet sous Azure.

La Power Platform propose des pistes à l’aide de Power Automate (Flow) mais nous souhaitons ici rester dans le monde Azure. Un outil comme Logic Apps dispose de telles fonctionnalités. Mais dans une approche ETL ou ELT, c’est Azure Data Factory (ADF) qui est bien souvent au centre du pilotage des traitements. Nous allons donc exploiter l’API REST de Power BI au travers d’activités Web au sein d’un pipeline ADF.

Nous visons un fonctionnement de la sorte (exemple intégrant un notebook Azure Databricks réalisant le pré-traitement de la donnée) :

Sur le schéma ci-dessous, des activités peuvent précéder la série d’activités qui réaliseront les instructions d’API afin d’actualiser un dataflow Power BI.

De nombreux articles de blog ont déjà décrit ce fonctionnement et je vous conseille en particulier cet article de Dave Ruijter et le repository associé qui vous permettra de charger un template complet réalisant l’actualisation d’un dataset.

Les différentes étapes de ce pipeline sont :

– l’obtention d’un token d’authentification grâce à une application définie dans l’annuaire Azure Active Directory (AAD). Le client secret de cette application est au préalable récupéré dans un coffre-fort Azure Key Vault (AKV).

– la commande d’actualisation du dataflow

– une boucle d’attente “Until” demandant à intervalles réguliers (activité Wait) le statut de la dernière actualisation. Ce statut est stocké dans une variable.

– dans l’activité “If condition”, la branche True lancera l’actualisation d’un dataset Power BI basé sur le dataflow

Afin de lever une erreur dans la branche False, nous utilisons l’astuce de définir une variable réalisant une division par 0. Cette division interdite fera échouer l’activité et donc le pipeline complet.

En reprenant une logique similaire, nous ajoutons une boucle d’attente de l’actualisation du dataset et une nouvelle activité “If Condition” permettant de lever une erreur en cas d’échec de l’actualisation.

Il n’est pas possible de placer ces deux dernières briques dans la branche True du “If Condition”, c’est ici une limite d’ADF mais elle ne se révèle pas bloquante.

L’API non documenté des dataflows Power BI

Nous allons préciser ici des éléments ne figurant pas dans la documentation officielle de l’API REST de Power BI.

En préambule, nous devons avoir déclarer une application (app registration) disposant des droits Dataflow.ReadWrite.All. Ce point peut être contrôler dans l’annuaire AAD et l’API lèvera une erreur 401 si les droits sont uniquement au niveau Dataflow.Read.All.

L’actualisation d’un dataflow se fait par la commande POST suivante :

POST https://api.powerbi.com/v1.0/myorg/groups/{groupId}/dataflows/{dataflowId}/refreshes

Les identifiants demandés (la notion de group correspondant historiquement à celle d’espace de travail ou workspace) se retrouvent facilement dans les URLs du service Power BI.

Lors de l’actualisation, le statut de l’opération passera à “InProgress” (contre “Unknown” pour un dataset) puis “Success” (contre “Completed”).

L’instruction permettant d’obtenir le dernier statut d’actualisation est une commande GET :

GET https://api.powerbi.com/v1.0/myorg/groups/{groupId}/dataflows/{dataflowId}/transactions?$top=1

Notez le mot-clé “transactions” à l’inverse de “top” pour un dataset en paramètre de cette instruction.

Un grand merci à mon confrère Joël CREST pour son aide sur ce sujet. Je vous encourage à consulter son blog et sa chaîne YouTube.

Utiliser (encore) des notebooks sous Azure

Au 15 janvier 2021, le service Azure Notebooks disparaît complètement. Nous cherchons donc une alternative en version SaaS, toujours sur Azure et celle-ci se trouve dans le portail Azure Machine Learning.

Un article d’introduction à ce service est disponible dans les archives de ce blog.

Quelques nouveautés sont disponibles et annoncées à l’ouverture d’un premier notebook. Nous reviendrons sur ces nouveautés dans la suite de cet article.

Nous pouvons alors nommer un nouveau fichier, à l’extension classique .ipynb.

Nous retrouvons ce qui caractérise les notebooks : des cellules de code, suivies par les sorties des différentes instructions, ou bien des cellules de type formaté en Markdown.

Il n’est pourtant pas possible d’exécuter du code immédiatement, il faut au préalable relier ce notebook à une instance de calcul, déjà définie dans le portail (menu “Compute”).

Dans le menu dédié aux ressources de calcul, nous retrouvons des raccourcis vers l’utilisation d’une interface Jupyter, JupyterLab ou encore RStudio.

La nécessité d’une instance de calcul rend ce service payant, et facturé au temps d’utilisation de la machine virtuelle définie. Il sera très important de ne pas oublier d’éteindre la machine après utilisation, il n’existe à ce jour aucun système natif d’extinction planifiée !

Les utilisateurs férus de Visual Studio Code (VSC) peuvent travailler à distance avec l’environnement de calcul d’Azure Machine Learning. En installant conjointement les extensions Python et Jupyter, il était déjà possible de travailler avec un notebook (fichier reconnu par son extension .ipynb) dans l’IDE de Microsoft. En ajoutant l’extension Azure Machine Learning, nous pouvons nous connecter à la ressource Azure.

Les “objets” de l’espace Azure ML sont alors visibles depuis le menu Azure de VSC.

Il est possible de réaliser des actions à distance sur les instances de calcul.

Par défaut, un notebook dans VSC utilise le serveur Jupyter local. Mais il est possible de désigner un serveur à distance (“remote“) en cliquant sur le bouton en haut à droite indiquant “Jupyter Server : local”.

Nous obtenons la boîte de dialogues ci-dessous.

La succession de choix proposés permettra d’associer le notebook à une instance de calcul Azure ML.

Une composante indispensable de l’approche par le code est le gestion du versionnig, par un service compatible avec Git (GitHub, GitLab, Bitbucket, Azure DevOps, etc.), comme décrit sur ce lien.

Nous allons bénéficier ici de l’affichage en terminal présentés parmi les nouvelles (janvier 2021) fonctionnalités disponibles.

Nous devons tout d’abord créer une clé SSH au moyen de la commande ci-dessous :

ssh-keygen -t rsa -b 4096 -C "your_email@example.com"

La clé SSH doit être enregistrée dans le chemin /home/azureuser/.ssh qui est spécifique à l’instance de calcul (valider par Entrée le prompt proposé après la commande précédente). Il est alors possible de saisir une passphrase, facultative mais fortement conseillée. Un fichier id_rsa.pub est ainsi créé et son contenu peut être copié (ctrl + inser).

Nous utiliserons dans cet article Azure DevOps, où nous allons pouvoir saisir cette clé publique.

De retour dans le terminal, nous pouvons lancer la commande de test :

ssh -T git@ssh.dev.azure.com

Le résultat attendu est le suivant :

Depuis le repository Azure DevOps, nous obtenons la syntaxe SSH qui viendra compléter une instruction git remote, lancée depuis le terminal.

Si cela n’a pas fait, dans le terminal et depuis le chemin voulu, saisir la commande git init pour initialiser le répertoire. Définir également l’utilisateur qui réalisera les instructions de commit :

git config --global user.email "you@example.com"
git config --global user.name "Your Name"

La commande git remote add origin peut alors être exécutée avec succès.

Il pourra être nécessaire de passer la commande git add . pour ajouter l’intégralité du contenu dans ce qui doit être géré par git, puis un git commit.

De même, le repository Azure doit avoir été initialisé, par exemple avec un fichier readme.md et disposer ainsi d’une branche (main ou master).

git push -u origin --all

Enfin, un rappel des commandes Git indispensables est donné ici : Git · GitHub

Une autre nouvelle intéressante consiste à “nettoyer” le notebook avant de le publier. C’est en effet un reproche souvent fait aux notebooks : ceux-ci mélangent code, sorties et commentaires et ne sont pas optimaux pour les tests unitaires et le déploiement dans un environnement de production.

La fonctionnalité “Gather” (documentation officielle) sur une cellule permet de créer un autre notebook qui conservera uniquement les cellules de code dépendant de la cellule sélectionnée.

Déploiement continu avec Azure Synapse Analytics – partie 1

S’il était bien une fonctionnalité attendue pour la nouvelle (2020) version d’Azure Synapse Analytics, c’était la gestion du versionning et du déploiement des scripts créés dans l’outil.

Depuis la disponibilité générale (fin novembre 2020), nous trouvons un menu Source control dans la fenêtre Manage.

Nous pouvons définir la Git configuration, c’est-à-dire l’outil qui servira à l’enregistrement des développements et à leur versionning. Nous pouvons choisir entre Azure DevOps ou GitHub.

Nous choisissons ici Azure DevOps mais la partie GitHub est documentée sur ce lien officiel.

Nous devons disposer au préalable d’un projet DevOps et d’un repository dont l’URL devra être renseignée dans Synapse.

The Dev Ops repository link must start with http or https and be in DevOps format. For example: https://account.visualstudio.com/project/_git/repository

Dès lors, le dépôt se remplie avec les premiers éléments faisant partie de la ressource Synapse Analytics.

Dans le studio Synapse, nous disposons de plusieurs actions :

La validation permet de contrôler le développement réalisé et des erreurs seront remontées le cas échéant.

Le bouton Commit permet de “sauvegarder” le développement réalisé sur la branche active. Il est recommandé de ne pas développer directement sur la branche master.

Le bouton Publish de l’interface Synapse permet de créer ou mettre à jour la branche workspace_publish.

Il est donc logique de les utiliser successivement, dans cet ordre. L’action Publish réalisera une sauvegarde si nécessaire. A noter que les sorties (outputs) des cellules des notebooks sont effacées au moment de la publication. C’est une bonne pratique pour le versionning mais peut déstabiliser l’utilisateur souhaitant conserver l’affichage de ses résultats sans relancer le notebook complet.

Au fur et à mesure de l’utilisation de Synapse Analytics, de nouveaux répertoires vont se créer sur la branche collaborative.

Manquent ici triggers, credentials et Spark job definiition

Dans la branche workspace_publish, nous retrouvons deux fichiers JSON dans une logique complètement similaire à celle d’Azure Data Factory, permettant un déploiement de type “template ARM“.

Le fichier TemplateParametersForWorkspace.json contient en particulier le nom des informations de connexion (nom du workspace, URL, etc.) et les mots de passe, secrets ou token, sont effacés. Il faudra écraser ou renseigner ces valeurs lors du déploiement dans un autre environnement.

Dans une seconde partie, nous développerons les approches possibles pour le déploiement continue à l’aide d’Azure Pipelines.

Les activités Data Factory dans le linéage Azure Purview

Avec la sortie fin 2020 d’Azure Purview, beaucoup de démonstrations ont mis en avant la capacité à scanner un tenant Power BI et à présenter le linéage entre les rapports et les données sources.

Ce principe, fort utile, est aussi applicable aux activités Azure Data Factory. Considérons une souscription Azure au sein de laquelle a été créé un service Azure Purview (je vous conseille cette vidéo de Joël CREST pour démarrer votre projet Purview).

Nous nous rendons dans le Management Center sur le portail Purview, au niveau des external connections.

Une permission doit être au préalable accordée, l’utilisateur doit disposer d’un des rôles suivants sur le service Purview : contributor, owner, reader ou User Access Administrator (UAA). Ceci se fait au niveau de l’Access Control (IAM). Le droit doit être “direct” et non hérité d’un niveau supérieur.

Un bouton +New apparaît alors en haut de la fenêtre et permet d’ajouter de désigner la souscription et la ressource Data Factory visée.

Bien vérifier alors que le statut “connected” apparaît au bout de la ligne.

Revenons ensuite dans la partie Sources d’Azure Purview.

Ici, il faut déclarer une première collection dans laquelle nous définirons les sources des données, qui sont les linked services d’Azure Data Factory.

Pour l’exemple mis ici en œuvre, nous déclarons un compte de stockage Azure Blob et une base de données managée Azure SQL DB. Pour cette dernière, il faut au préalable déclarer l’identité managée (MSI) de Purview en tant que user de la base de données. Cette opération doit être réalisée par un administrateur de la base, présent dans Azure Active Directory. La syntaxe SQL est la suivante :

CREATE USER [nomDuServicePurview] FROM EXTERNAL PROVIDER
GO
EXEC sp_addrolemember 'db_owner', [nomDuServicePurview]
GO

Nous lançons ensuite un scan de ces deux sources.

Voici l’activité Data Factory que nous souhaitons retrouver :

Elle réalise la copie d’un fichier CSV dans une table de la base de données, préalablement vidée par une procédure stockée.

Attention, les activités et dont le linéage associés ne seront visibles dans Purview qu’après une exécution, même en debug, de l’activité !

Il suffit de rechercher le nom de l’activité dans la barre de recherche pour vérifier que celle-ci est maintenant bien disponible.

Le linéage est porté par la destination, ici la table de la base de données.

Un lien est disponible pour ouvrir directement cette activité dans le service Azure Data Factory.

Au fil des exécutions des activités, le champ de la recherche dans Purview va ainsi s’étendre pour couvrir l’ensemble des traitements.

Une bonne pratique semble d’ores et déjà s’imposer : seuls les Data Factory de production sont à déclarer dans Azure Purview. La limite est à ce jour de 10 services ADF.