Du rôle des pipelines dans Azure ML et son SDK

Si vous êtes habitués à manipuler le package Python scikit learn, la notion de pipeline vous est sans doute familière. Cet objet permet d’enchainer des étapes (“steps“) comme la préparation de données (normalisation, réduction de dimensions, etc.), l’entrainement puis l’évaluation d’un modèle. La documentation officielle du package donne ainsi cet exemple.

>>> from sklearn.svm import SVC
>>> from sklearn.preprocessing import StandardScaler
>>> from sklearn.datasets import make_classification
>>> from sklearn.model_selection import train_test_split
>>> from sklearn.pipeline import Pipeline
>>> X, y = make_classification(random_state=0)
>>> X_train, X_test, y_train, y_test = train_test_split(X, y,
...                                                     random_state=0)
>>> pipe = Pipeline([('scaler', StandardScaler()), ('svc', SVC())])
>>> # The pipeline can be used as any other estimator
>>> # and avoids leaking the test set into the train set
>>> pipe.fit(X_train, y_train)
Pipeline(steps=[('scaler', StandardScaler()), ('svc', SVC())])
>>> pipe.score(X_test, y_test)
0.88

Le SDK Python azureml que nous avons déjà évoqué sur ce blog dispose également d’un concept de pipeline. Celui-ci ne doit pas être confondu avec les pipelines pouvant être définis dans l’interface visuelle du concepteur (“designer“). Les pipelines en tant que tels sont visibles dans un menu dédié.

(NDA : on serait en droit d’espérer à termes une fusion de ces éléments, c’est-à-dire d’observer le pipeline dans l’interface du concepteur, voire de retrouver le pipeline créé par le code dans les objets manipulables visuellement).

Les avantages des pipelines

La manipulation des pipelines, en particulier des entrées-sorties, ne sera pas triviale, alors nous allons insister sur tous les bénéfices d’un tel objet. Au delà de la représentation visuelle toujours plus parlante, le principal avantage sera la planification du pipeline. Celle-ci peut se faire de trois manières. Le code présenté ici est disponible dans ce dépôt GitHub.

Objet Scheduler

L’interface du studio Azure Machine Learning ne présente pas d’ordonnanceur visuel des notebooks. Pour autant, cette notion d’ordonnancement existe bien et se manipule au travers du SDK. Une fois publié, un pipeline dispose d’un identifiant. A partir de cet id, un objet Schedule peut être défini, et se déclenchera selon une récurrence déclarée au moyen de l’objet ScheduleRecurrence.

A sa création, l’ordonnancement est activé. Il sera possible de le désactiver à partir de son identifiant (à ne pas confondre avec l’identifiant du pipeline).

Les points négatifs de cet approche sont le manque de visibilité sur les ordonnancements définis (il est nécessaire de lancer la commande Schedule.list) et le fait que d’autres activités non définies dans des scripts présents sur dans l’espace de travail Azure Machine Learning.

Pipeline Azure DevOps

Encore un pipeline à ne pas confondre avec le pipeline du SDK azureml ! Nous parlons ici des pipelines de release d’Azure DevOps. En recherchant le terme “azureml” dans l’assistant (volet de droite), nous trouvons trois tâches, dont une permettant de lancer un pipeline Azure ML désigné à nouveau par son identifiant.

Un pipeline de release peut ensuite être ordonnancé au moyen des standards d’écriture du fichier YAML.

Activité Azure Data Factory

Nous disposons de trois activités distinctes dans le groupe “Machine Learning”. Les deux premières concernent l’ancien Azure Machine Learning studio, aujourd’hui déprécié. Concentrons-nous sur la troisième activité qui permet d’exécuter un pipeline Azure ML.

Pour remplir les différents paramètres, nous devons tout d’abord fournir un service lié (linked service) de type Azure Machine Learning (catégorie “compute“).

Nous privilégions l’authentification par identité managée, celle-ci se voyant attribuer un rôle de contributeur sur la ressource Azure Machine Learning.

Seconde information obligatoire : l’ID du pipeline publié. Un menu déroulant nous permettra de le choisir. Nous constatons ici que cette information sera particulièrement sensible lorsque nous devrons re-publier le pipeline. Il faudra donc limiter au maximum cette action, car elle engendrera une modification dans les paramètres de l’activité Data Factory. Une problématique similaire se posera dans le cas d’un déploiement automatique entre environnements (par exemple, de dev à prod) avec des ID de pipelines différents.

Et maintenant, comment réaliser un pipeline

Nous allons maintenant plonger dans le SDK Python azureml pour décortiquer les étapes de création d’un pipeline.

Il nous faut pour base un script Python qui contiendra le code à exécuter par une étape (step) du pipeline. Nous ne travaillerons ici qu’avec des étapes de PythonStepScript mais il en existe d’autres types, référencés dans la documentation officielle. Nous prendrons la bonne habitude de faire figurer dans ces scripts les lignes de code suivantes :

from azureml.core.run import Run

run = Run.get_context()
exp = run.experiment
ws = run.experiment.workspace

Celles-ci permettront de retrouver les éléments du “niveau supérieur”, c’est-à-dire l’expérience, son exécution ainsi que l’espace de travail.

Ensuite, nous pourrons travailler sur les entrées et sorties de chaque étape. Cette gestion des entrées-sorties nécessitera un article à part entière sur ce blog.

Exemple de “graph” de sortie du pipeline (experiment)

Nous recréons ainsi, par le code (et au prix de nombreux efforts !), un objet similaire au pipeline obtenu dans le Designer.

from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep

step1 = PythonScriptStep(
    script_name="step1.py",
    compute_target=compute_target,
    source_directory="scripts",
    allow_reuse=True
)


step2 = PythonScriptStep(
    script_name="step1.py",
    compute_target=compute_target,
    source_directory="scripts",
    allow_reuse=True
)

steps = [ step1, step2 ]

pipeline = Pipeline(workspace=ws, steps=steps)
pipeline.validate()

Nous pouvons maintenant soumettre le pipeline, dont la logique des étapes aura préalablement été validée par l’instruction .validate().

pipeline_run = experiment.submit(pipeline)
pipeline_run.wait_for_completion()

Mais cela ne nous permettrait pas de réutiliser ce pipeline dans les scénarios évoqués ci-dessus, nous allons donc le publier avec l’instruction .publish().

published_pipeline = pipeline.publish(
name=published_pipeline_name,
description="ceci est un pipeline à deux étapes"
)
published_pipeline

Ceci nous permet de connaître l’identifiant du pipeline (masqué dans la copie d’écran ci-dessous).

Il ne reste plus qu’à soumettre le pipeline pour l’exécuter même si nous utiliserons vraisemblablement d’autres méthodes comme l’appel par Azure Data Factory ou l’emploi de l’API REST.

published_pipeline.submit(ws, published_pipeline_name)

Les logs de l’exécution seront accessibles dans le menu Pipelines du portail, qui nous redirigera vers le menu Experiments. N’oubliez pas d’activer l’affichage des “child runs” pour visualiser les traces de l’exécution de chacune des étapes.

Voici enfin un exemple de code qui permettra de lancer ce pipeline par requête HTTP.

from azureml.core.authentication import InteractiveLoginAuthentication
import requests

auth = InteractiveLoginAuthentication()
aad_token = auth.get_authentication_header()

rest_endpoint1 = published_pipeline.endpoint

print("You can perform HTTP POST on URL {} to trigger this pipeline".format(rest_endpoint1))

response = requests.post(rest_endpoint1, 
                         headers=aad_token, 
                         json={"ExperimentName": "My_Pipeline_batch",
                               "RunSource": "SDK"}
                        )

try:
    response.raise_for_status()
except Exception:    
    raise Exception('Received bad response from the endpoint: {}\n'
                    'Response Code: {}\n'
                    'Headers: {}\n'
                    'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))

run_id = response.json().get('Id')
print('Submitted pipeline run: ', run_id)

Attention, une réponse “correcte” à cet appel HTTP sera de confirmer le bon lancement du pipeline. Mais rien ne vous garantit que son exécution se fera avec succès jusqu’au bout ! Il faudra pour cela se pencher sur la remontée de logs (et d’alertes !) à l’aide d’un outil comme Azure Monitor.

Repos Databricks & GitHub actions, better together !

Ou en bon français, comment faire coopérer la nouvelle (mars 2021) fonctionnalité Repos de Databricks avec une logique d’intégration continue développée dans GitHub ?

Revenons tout d’abord sur le point à l’origine de la nécessité d’un tel mécanisme (une présentation plus détaillée des Repos Databricks a été faite et mise à jour sur ce blog). Dans une architecture dite “moderne” sur Azure, nous utilisons Azure Data Factory pour piloter les traitements de données et lancer conjointement des notebooks Databricks. Cela nécessite de donner le chemin du notebook sur l’espace de travail déclaré en tant que service lié.

Nous disposons de trois entrées pour définir le “notebook path“.

Si nous choisissons “Repos”, nous allons donner un chemin contenant le nom du développeur !

Les développements livrés en production doivent bien sûr être totalement indépendants d’une telle information. Un contournement consisterait à utiliser un sous-répertoire Shared dans la partie Repos mais cela reviendrait à perdre le rattachement du développement à son auteur.

Nous allons donc mettre en place un mécanisme permettant aux développeurs d’utiliser leur propre répertoire puis de versionner les développements dans un espace tiers, par exemple un repository GitHub (une démarche exploitant les repositories Azure DevOps serait tout à fait similaire).

Voici le processus suivi lors d’un développement :

  • le développeur A crée une nouvelle branche sur laquelle il réalise ses développements
  • il “commit & push” ensuite son travail
  • une Pull Request (PR) peut alors être soumise dans le but de fusionner le travail avec la branche principale (master ou main)
  • le développeur B vient alors valider la PR et le merge se lance sur la branche principale

C’est alors que se déclenche notre processus d’intégration continue (CI) qui réalise une copie des notebooks de la branche principale dans le répertoire /Shared de l’espace de travail Databricks.

Nous nous assurons ainsi d’avoir toujours la dernière “bonne” version des notebooks, sur un chemin qui sera identique entre les environnements. En effet, dans un second temps, un processus de déploiement continu (CD) viendra copier ces notebooks sur les autre environnements (qualification, production).

Comment réaliser une GitHub Action d’intégration continue

Nous nous plaçons dans le repository servant à versionner les notebooks. Le menu Actions est accessible dans la barre supérieure.

Puis, nous cliquons sur le lien “set up a workflow yourself“.

Un modèle de pipeline YAML est alors disponible et nous allons l’adapter.

Détaillons maintenant le code final, étape par étape.

# This is a basic workflow to help you get started with Actions

name: CI

# Controls when the workflow will run
on:
  # Triggers the workflow on push or pull request events but only for the master branch
  pull_request:
    branches: [ master ]
    paths: ['**.py']

  # Allows you to run this workflow manually from the Actions tab
  workflow_dispatch:

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
  # This workflow contains a single job called "build"
  build:
    # The type of runner that the job will run on
    runs-on: ubuntu-latest
    env:
      DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
      DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}

    # Steps represent a sequence of tasks that will be executed as part of the job
    steps:
      # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
      - name: Checkout
        uses: actions/checkout@v2
      
      - name: Setup Python
        uses: actions/setup-python@v2.2.2
        with:
          python-version: '3.8.10'
          architecture: 'x64'
      
      # Install pip
      - name: Install pip
        run: |
          python -m pip install --upgrade pip

      # Install databricks CLI with pip
      - name: Install databricks CLI
        run: python -m pip install --upgrade databricks-cli

      # Runs
      - name: Run a multi-line script
        run: |
          echo Test python version
          python --version
          echo Databricks CLI version
          databricks --version
          
      # Import notebook to Shared
      - name: Import local github directory to Shared
        run: databricks workspace import_dir --overwrite . /Shared
          
      # List Shared content
      - name: List workspace with databricks CLI
        run: databricks workspace list --absolute --long --id /Shared

Le principe est d’utiliser une machine virtuelle munie d’un système d’exploitation Ubuntu (‘latest’ pour obtenir la dernière version disponible), d’y installer un environnement Python dans la version souhaitée (ici, 3.8.10) puis le CLI de Databricks.

Pour connecter ce dernier à notre espace de travail, nous définissons au préalable deux variables d’environnement :

  • DATABRICKS_HOST qui contient l’URL de notre espace de travail
  • DATABRICKS_TOKEN qui contient un jeton d’authentification

Pour ne pas faire figurer ces valeurs dans le fichier YAML (qui est lui-même versionné dans le repository), nous définirons au préalable deux secrets à l’aide du menu Settings.

Nous choisissons ici une portée des secrets au niveau repository.

L’instruction uses: actions/checkout@v2 garantit la disponibilité des fichiers archivés dans le repository (ceux-ci sont copiés sur la machine virtuelle) et le point (.) indique simplement ce chemin dans l’instruction du CLI import_dir.

Ainsi, dès la fusion d’une Pull Request avec la branche principale, le code se lance automatiquement et nous pouvons vérifier sa bonne exécution dans le menu “View runs”.

Une telle approche sera bien sûr complétée idéalement par une stratégie de tests portant sur le contenu des notebooks ou mieux encore, sur du code packagé qui y sera utilisé (voir cet article).

Poser du Parquet dans le cloud Azure ? Oui !

En quelques années de l’ère “big data”, le format de fichier orienté colonne Parquet s’est imposé comme l’un des standards du stockage sur les lacs de données (data lake) grâce à ses performances de compression, sa gestion des partitions et son intégration avec des frameworks distribués comme Spark. Il s’agit d’un projet porté par la fondation Apache qui héberge la documentation officielle.

Avant d’obtenir un fichier Parquet, le format initial est bien souvent un format texte classique structuré comme du CSV ou semi-structuré comme le JSON. Nous sommes également dans le scénario de réalisation d’un couche “clean” au sein d’un data lake, à partir de nombreux fichiers de la couche “raw“, nécessitant d’être agrégés et optimisés pour des outils d’analyse.

Dans cet article, nous allons dérouler un cas pratique allant de la constitution de ces fichiers Parquet jusqu’à leur utilisation dans un rapport Power BI.

Partons d’une année de fichiers zip issus de vélos en location de la ville de New York, soit un total de 828Mo, atteignant 3,45Go une fois décompressés (CSV).

Nous disposons ainsi en tout de 19 506 857 lignes pour l’année 2020.

Avec un peu de script Python, nous pouvons décompresser automatiquement ces fichiers.

import os
import zipfile

def unzip_files(path_to_zip_file) -> None:
    directory_to_extract_to = "datasets"
    with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
        zip_ref.extractall(directory_to_extract_to)

for file in os.listdir("zip"):
    print(f"Unzipping : {file}")
    unzip_files("zip/"+file)

Ensuite, depuis un environnement Spark comme Azure Databricks, nous pouvons lire tous les CSV, qui auront été déposés au préalable sur un compte de stockage Azure.

file_location = "/mnt/nycitibike/datasets/*"
file_type = "csv"
df = spark.read.format(file_type).option("inferSchema", "true").option("header","true").load(file_location)
display(df)

Notez qu’il faudra renommer les colonnes comportant un ou plusieurs espaces, interdits en entêtes dans le format Parquet, ce que nous faisons avec le script ci-dessous.

df = df \
.withColumnRenamed("start station id","start_station_id") \
.withColumnRenamed("start station name","start_station_name") \
.withColumnRenamed("start station latitude","start_station_latitude") \
.withColumnRenamed("start station longitude","start_station_longitude") \
.withColumnRenamed("end station id","end_station_id") \
.withColumnRenamed("end station name","end_station_name") \
.withColumnRenamed("end station latitude","end_station_latitude") \
.withColumnRenamed("end station longitude","end_station_longitude") \
.withColumnRenamed("birth year","birth_year")

L’enregistrement au format Parquet est alors possible, toujours à l’aide de l’API PySpark.

df.write.format("parquet").mode("overwrite").save("/mnt/nycitibike/dbx_parquet")

Les données sont automatiquement partitionnées (il est possible d’avoir la main sur ce mécanisme, comme nous le verrons plus tard) et quelques fichiers spécifiques à Databricks sont ajoutés :

– ficher “commited” qui liste les différentes parties

– fichier vide “started”

– fichier vide _SUCCESS

Ces fichiers qui tracent la transaction réalisées vont nous gêner pour une usage dans Power BI. Nous pourrions éviter leur génération en définissant les options suivantes dans notre script Spark comme l’indiquent ces échanges sur le forum Databricks.

Il est possible de prendre la main sur le nombre de fichiers obtenus, ainsi que sur la clé qui aidera au partitionnement. Nous ajoutons deux fonctions que sont repartition() et partitionBy() dans la syntaxe d’écriture du Parquet.

df.repartition(1).write.partitionBy("YearMonth").format("parquet").mode('overwrite').save("/mnt/nycitibike/dbx_parquet")

Dans Power BI Desktop, nous disposons d’un type de source Parquet, à partir du menu “Obtenir les données”.

Ce connecteur attend le chemin d’un fichier Parquet et non d’un répertoire contenant les partitions sous forme de fichiers distincts. Nous découvrons ainsi la fonction du langage M qui réalise la lecture du fichier : Parquet.Document().

= Parquet.Document(File.Contents("C:\Users\PaulPeton\Documents\Python Scripts\Parquet\my_to_parquet.parquet"))

Cette fonction est documentée depuis juin 2020 mais elle n’a pas fait l’objet d’une grande communication jusqu’à présent (janvier 2021) et la roadmap Power BI indique un (nouveau ?) connecteur Parquet pour mars 2021.

Pour lire le contenu d’un dossier, nous pouvons sans problème appliquer la technique de connexion à un dossier sur un compte de stockage Azure, puis lire chaque fichier à l’aide du bouton “expand” de la colonne “Content” contenant tous les binaries. Cette manipulation crée automatique une fonction d’import contenant le code suivant.

= (Paramètre1) => let
Source = Parquet.Document(Paramètre1)
in
Source

Attention, pour réaliser cette opération, il faut au préalable filtrer les fichiers ne correspondant pas à des partitions (commit, started, SUCCESS pour un dossier Parquet généré par Databricks), qui débutent tous par un caractère underscore.

= Table.SelectRows(Source, each not Text.StartsWith([Name], "_"))

Les données se chargent alors en mettant bout à bout toutes les partitions !

Il est maintenant tentant d’essayer une actualisation incrémentielle de notre dataset, sur une période d’un mois. Pour autant, celle-ci n’est pas garantie comme nous l’indique le message d’alerte dans la pop-up de paramétrage du rafraichissement.

Une traduction approximative annonce que la requête “ne peut être pliée”, il s’agit ici du concept de query folding : la requête ne pourrait appliquer un filtre (de dates) en amont du chargement complet des données. Ceci signifie tout simplement de le mécanisme incrémentiel ne se réaliserait pas.

Afin de tester ce fonctionnement, nous publions le rapport sur un espace de travail Premium du service Power BI. Il sera alors possible d’ouvrir une connexion à l’espace à partir du client lourd SQL Server Management Studio.

Au premier chargement, nous n’aurons qu’une seule partition.

Mais en actualisant le dataset depuis le service, les partitions apparaissent.

Nous observons bien qu’une nouvelle actualisation du jeu de données n’affecte pas toutes les partitions. C’est gagné !

En ajoutant des fichiers supplémentaires, nous observons aussi que le paramètre de stockage de lignes (ici, sur un an) fonctionne et cela permet donc de créer un dataset actualisé sur une période glissante.

Pour terminer, précisons que la requête réalisée ici peut tout à fait être exécutée au sein d’un dataflow Power BI et servir ainsi à la création de différents rapports.

[EDIT du 23/01/2021 : cette approche fonctionne aussi avec le format Delta Lake créé depuis Azure Databricks, et c’est tant mieux car ce format apporte beaucoup d’avantages, qui seront sûrement abordés dans un prochain article.]

Ajuster le niveau d’une Azure SQL DB avant et après un traitement Data Factory

Nous avons déjà évoqué sur ce blog le driver JDBC permettant d’écrire depuis un notebook Azure Databricks dans une base de données managée ou encore comment utiliser Polybase pour pérenniser une table du métastore Databricks dans Azure SQL DWH (aujourd’hui Synapse Analytics).

Cette action d’écriture demande souvent un niveau de ressource suffisamment élevé, tout comme l’import de données réalisé dans un dataset Power BI. Le reste du temps, une puissance plus faible peut être tout à fait acceptable. Et comme le niveau de facturation est lié au niveau de la ressource, pouvoir maîtriser par code, et non manuellement depuis l’interface, le scape up ou scale down de sa ressource peut s’avérer très intéressant dans une démarche d’optimisation des coûts (FinOps).

Nous nous plaçons ici dans un scénario de pipeline de transformation de données réalisé par Azure Databricks puis d’import dans un dataset Power BI. L’actualisation par API REST de ce dernier est détaillé dans cet article.

Au démarrage de notre pipeline, nous souhaitons augmenter la puissance (scale up) et ceci peut se faire avec des instructions PowerShell documentées ici.

Il est possible d’exécuter un script PowerShell depuis Azure Data Factory à l’aide de ce qu’on appelle une “custom activity“. Celle-ci s’appuie sur un service lié de type Azure Batch. Et ce service n’est pas gratuit, voire même relativement cher pour ce que l’on souhaite réaliser.

Nous optons donc pour la programmation d’une Azure Function, qui supporte le langage PowerShell ! Nous choisissons le type de fonction Trigger HTTP.

Dans Visual Studio Code, nous obtenons l’arborescence ci-dessous.

Le code principal de la fonction doit se trouver dans le fichier run.ps1.

Un premier point important consistera à renseigner les dépendances de librairies PowerShell dans le fichier requirements.psd1. Une combinaison fonctionnelle est la suivante :

# This file enables modules to be automatically managed by the Functions service
# See https://aka.ms/functionsmanageddependency for additional information.
#
@{
    'Az.Accounts' = '2.1.2'
    'Az.Sql' = '2.11.1'
 }

Second élément indispensable pour l’intégration dans une activité Web de Data Factory : le retour de la fonction doit obligatoirement être un objet JSON. Nous terminons donc la fonction par le code ci-dessous :

$body = @{name = 'Scale done !'} | ConvertTo-Json

#Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
     StatusCode = [HttpStatusCode]::OK
     Body = $body
})

Une fois la fonction publiée sur Azure, nous pouvons définir le pipeline complet.

La fonction Azure étant pensée de manière paramétrable, nous pouvons donner le niveau de ressource comme un élément de l’URL appelée par la méthode POST. Nous utilisons dans l’exemple ci-dessous le niveau “S” suivi d’un chiffre.

Ce niveau modifiera automatiquement le configuration de la ressource Azure SQL DB, en mode de tarification basé sur les DTUs (et non les vCores).

L’activité de traitement de la donnée, ici portée par Azure Databricks, réalise le démarrage d’un cluster de type interactif. Lorsque le traitement est terminé, le cluster peut s’éteindre automatiquement si la fonctionnalité “terminate after…” a été mise en place. Mais il faut ici attendre que l’import du dataset Power BI soit terminé ! Nous gérons donc la fin du cluster Databricks au moyen de l’API dédiée, dans une activité Web.

La syntaxe utilisée est la suivante :

POST https://adb-XXXXXXXXXXXXXXXX.X.azuredatabricks.net/api/2.0/clusters/delete

Les informations nécessaires seront : l’URL du workspace, un personal token d’authentification et l’identifiant du cluster.

Dans un optique d’optimisation des coûts, nous positionnons comme dernière activité du pipeline le scale down de la base de données.

Nous obtenons ainsi une chaîne complète optimisée sur l’enchainement des tâches et sur les coûts engendrés par les services PaaS.

Azure Data Factory pilote les actualisations Power BI

L’actualisation planifiée dans le service Power BI se fait à heure (ou demi-heure) fixe et ne peut pas être conditionnée par un événement antérieur. Le reporting est pourtant bien souvent la dernière étape de toute une chaîne de transformation de la donnée. Nous cherchons donc une solution de trigger (déclencheur) pouvant rejoindre le flux complet sous Azure.

La Power Platform propose des pistes à l’aide de Power Automate (Flow) mais nous souhaitons ici rester dans le monde Azure. Un outil comme Logic Apps dispose de telles fonctionnalités. Mais dans une approche ETL ou ELT, c’est Azure Data Factory (ADF) qui est bien souvent au centre du pilotage des traitements. Nous allons donc exploiter l’API REST de Power BI au travers d’activités Web au sein d’un pipeline ADF.

Nous visons un fonctionnement de la sorte (exemple intégrant un notebook Azure Databricks réalisant le pré-traitement de la donnée) :

Sur le schéma ci-dessous, des activités peuvent précéder la série d’activités qui réaliseront les instructions d’API afin d’actualiser un dataflow Power BI.

De nombreux articles de blog ont déjà décrit ce fonctionnement et je vous conseille en particulier cet article de Dave Ruijter et le repository associé qui vous permettra de charger un template complet réalisant l’actualisation d’un dataset.

Les différentes étapes de ce pipeline sont :

– l’obtention d’un token d’authentification grâce à une application définie dans l’annuaire Azure Active Directory (AAD). Le client secret de cette application est au préalable récupéré dans un coffre-fort Azure Key Vault (AKV).

– la commande d’actualisation du dataflow

– une boucle d’attente “Until” demandant à intervalles réguliers (activité Wait) le statut de la dernière actualisation. Ce statut est stocké dans une variable.

– dans l’activité “If condition”, la branche True lancera l’actualisation d’un dataset Power BI basé sur le dataflow

Afin de lever une erreur dans la branche False, nous utilisons l’astuce de définir une variable réalisant une division par 0. Cette division interdite fera échouer l’activité et donc le pipeline complet.

En reprenant une logique similaire, nous ajoutons une boucle d’attente de l’actualisation du dataset et une nouvelle activité “If Condition” permettant de lever une erreur en cas d’échec de l’actualisation.

Il n’est pas possible de placer ces deux dernières briques dans la branche True du “If Condition”, c’est ici une limite d’ADF mais elle ne se révèle pas bloquante.

L’API non documenté des dataflows Power BI

Nous allons préciser ici des éléments ne figurant pas dans la documentation officielle de l’API REST de Power BI.

En préambule, nous devons avoir déclarer une application (app registration) disposant des droits Dataflow.ReadWrite.All. Ce point peut être contrôler dans l’annuaire AAD et l’API lèvera une erreur 401 si les droits sont uniquement au niveau Dataflow.Read.All.

L’actualisation d’un dataflow se fait par la commande POST suivante :

POST https://api.powerbi.com/v1.0/myorg/groups/{groupId}/dataflows/{dataflowId}/refreshes

Les identifiants demandés (la notion de group correspondant historiquement à celle d’espace de travail ou workspace) se retrouvent facilement dans les URLs du service Power BI.

Lors de l’actualisation, le statut de l’opération passera à “InProgress” (contre “Unknown” pour un dataset) puis “Success” (contre “Completed”).

L’instruction permettant d’obtenir le dernier statut d’actualisation est une commande GET :

GET https://api.powerbi.com/v1.0/myorg/groups/{groupId}/dataflows/{dataflowId}/transactions?$top=1

Notez le mot-clé “transactions” à l’inverse de “top” pour un dataset en paramètre de cette instruction.

Un grand merci à mon confrère Joël CREST pour son aide sur ce sujet. Je vous encourage à consulter son blog et sa chaîne YouTube.

Les activités Data Factory dans le linéage Azure Purview

Avec la sortie fin 2020 d’Azure Purview, beaucoup de démonstrations ont mis en avant la capacité à scanner un tenant Power BI et à présenter le linéage entre les rapports et les données sources.

Ce principe, fort utile, est aussi applicable aux activités Azure Data Factory. Considérons une souscription Azure au sein de laquelle a été créé un service Azure Purview (je vous conseille cette vidéo de Joël CREST pour démarrer votre projet Purview).

Nous nous rendons dans le Management Center sur le portail Purview, au niveau des external connections.

Une permission doit être au préalable accordée, l’utilisateur doit disposer d’un des rôles suivants sur le service Purview : contributor, owner, reader ou User Access Administrator (UAA). Ceci se fait au niveau de l’Access Control (IAM). Le droit doit être “direct” et non hérité d’un niveau supérieur.

Un bouton +New apparaît alors en haut de la fenêtre et permet d’ajouter de désigner la souscription et la ressource Data Factory visée.

Bien vérifier alors que le statut “connected” apparaît au bout de la ligne.

Revenons ensuite dans la partie Sources d’Azure Purview.

Ici, il faut déclarer une première collection dans laquelle nous définirons les sources des données, qui sont les linked services d’Azure Data Factory.

Pour l’exemple mis ici en œuvre, nous déclarons un compte de stockage Azure Blob et une base de données managée Azure SQL DB. Pour cette dernière, il faut au préalable déclarer l’identité managée (MSI) de Purview en tant que user de la base de données. Cette opération doit être réalisée par un administrateur de la base, présent dans Azure Active Directory. La syntaxe SQL est la suivante :

CREATE USER [nomDuServicePurview] FROM EXTERNAL PROVIDER
GO
EXEC sp_addrolemember 'db_owner', [nomDuServicePurview]
GO

Nous lançons ensuite un scan de ces deux sources.

Voici l’activité Data Factory que nous souhaitons retrouver :

Elle réalise la copie d’un fichier CSV dans une table de la base de données, préalablement vidée par une procédure stockée.

Attention, les activités et dont le linéage associés ne seront visibles dans Purview qu’après une exécution, même en debug, de l’activité !

Il suffit de rechercher le nom de l’activité dans la barre de recherche pour vérifier que celle-ci est maintenant bien disponible.

Le linéage est porté par la destination, ici la table de la base de données.

Un lien est disponible pour ouvrir directement cette activité dans le service Azure Data Factory.

Au fil des exécutions des activités, le champ de la recherche dans Purview va ainsi s’étendre pour couvrir l’ensemble des traitements.

Une bonne pratique semble d’ores et déjà s’imposer : seuls les Data Factory de production sont à déclarer dans Azure Purview. La limite est à ce jour de 10 services ADF.

Utiliser un automated cluster “single node”

Ne le cachons pas, Spark déploie toute sa puissance lorsque les volumes de données sont significatifs. Pour autant, devant la simplicité d’utilisation d’Azure Databricks, il est tentant de centraliser tous les traitements de données dans des notebooks lancés sur un cluster.

Et un cluster, par définition, est constitué de plusieurs machines, a minima un driver et un worker qui échangeront des informations. C’est toute la puissance de cette architecture qui distribue les traitements sur un nombre de workers éventuellement automatiquement “scalable”.

Mais dans le cas de traitements simples, une seule machine virtuelle (bien dimensionnée) serait suffisante et éviterait en plus des échanges réseaux qui pourraient même perturber le traitement. En étant pragmatique, ce sont aussi des coûts divisés au moins par deux !

Une solution existe maintenant : le mode de cluster “Single Node“.

Cette fonctionnalité est aujourd’hui (octobre 2020) en préversion publique et décrite sur le site de Databricks.

On observe ce paramétrage dans la version JSON de la définition du cluster, au niveau de la configuration du cluster.

{
"num_workers": 0,
"cluster_name": "MySingleNodeCluster",
"spark_version": "7.0.x-scala2.12",
"spark_conf": {
"spark.master": "local[*]",
"spark.databricks.cluster.profile": "singleNode"
},
"node_type_id": "Standard_DS3_v2",
"ssh_public_keys": [],
"custom_tags": {
"ResourceClass": "SingleNode"
},
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
},
"autotermination_minutes": 120,
"enable_elastic_disk": true,
"cluster_source": "UI",
"init_scripts": []
}

Imaginons maintenant que vous souhaitiez lancer vos traitements avec la logique “automated cluster“, s’appuyant éventuellement sur un pool de machines virtuelles. Cela est tout à fait possible depuis l’interface de jobs de Databricks mais dans une architecture data Azure plus large, on s’appuyera souvent sur le rôle d’ordonnanceur de Azure Data Factory.

Dans Azure Data Factory, nous définissons un service lié Databricks. Dans l’interface graphique, nous ne trouvons pas le mode SingleNode mais la configuration Spark peut être précisée.

Cela semble suffisant mais nous pouvons aussi préciser le nombre de workers à 0, même si un message d’alerte apparaît alors. Celui-ci n’est pas bloquant.

Afin de contrôler que ce sont bien des “automated single node clusters” qui se sont exécutés, vous pouvez consulter les logs de l’activité Data Factory où se trouve un lien pointant vers l’exécution du notebook et la configuration de clsuter associée.

Ajouter des paramètres pour le déploiement du template ARM Azure Data Factory

La longueur du titre de cet article laisse présager du niveau de précision dans lequel nous allons nous lancer ! Je vais donc rapidement situer le contexte faisant appel à un tel besoin.

Nous cherchons à déployer de manière automatisée un environnement de développement Azure Data Factory sur l’environnement de production. Nous utilisons pour cela un pipeline de release Azure DevOps. Le processus DevOps pour ADF est expliqué dans ce livre blanc Microsoft ou dans cet excellent article de Florian Eiden.

Pour résumer les grandes lignes du fonctionnement, nous remplaçons dans un fichier de paramètres au format JSON les valeurs des chaînes de connexion de l’environnement de développement par celles de production. Ceci se fait au moyen de la case « override template parameters ».

Valeurs manquantes pour le service lié Databricks

La chose se complique lorsque nous utilisons un service lié de calcul de type Azure Databricks puisque les paramètres de ce service n’apparaissent pas par défaut dans le fichier de paramétrage ! Nous avons en particulier besoin de remplacer :

  • La Databricks Workspace URL
  • Le Secret name (où se trouve enregistré un Personal Access Token correspondant à l’espace de travail souhaité)

En effet, un choix arbitraire a été fait par Microsoft pour présenter uniquement certains paramètres mais un grand nombre de valeurs existent et elles sont visibles en éditant le fichier disponible dans le menu : Parametrization template.

Ce fichier au format JSON recense l’intégralité des paramètres disponibles par défaut, ainsi que leur visibilité au moyen d’une propriété qui n’est pas simple à interpréter :

= (signe égal) permet de conserver la valeur actuelle en tant que valeur par défaut pour le paramètre

– (signe moins) permet de ne pas conserver la valeur par défaut pour le paramètre

Le symbole | (pipe) permet de réaliser le lien avec une valeur stockée dans le coffre-fort Azure Key Vault.

Nous allons maintenant rechercher le nom des paramètres manquants. Pour cela, nous passons par la définition du service lié Databricks au format JSON.

Nous obtenons ainsi le nom exact des paramètres (ne pas se fier à l’interface graphique).

L’URL de l’espace de travail se nomme ainsi domain, le secret du Key Vault se désigne par secretName et ses deux propriétés dépendent du niveau TypeProperties et du sous-niveau accessToken pour le secret.

Nous pouvons donc maintenant citer ces propriétés dans le JSON des paramètres, en respectant l’arborescence des propriétés.

La définition des paramètres dans la branche Master

Arrive ici la partie peut-être la moins documentée (jusqu’à présent !). Il faut comprendre que ce fichier JSON doit figurer à la racine de la branche Master du dépôt contenant la synchronisation du Data Factory avec un gestionnaire de version comme GitHub ou un repo Azure DevOps, sous le nom arm-template-parameters-definition.json.

Il ne faut donc pas utiliser la branche spécifique à Data Factory qui se nomme adf_publish mais nous irons ensuite vérifier que les deux fichiers JSON qu’elle contient ont bien été modifiés en conséquence.

Pour lancer cette modification, nous devons tout d’abord faire les deux actions suivantes :

  • Refresh
  • Publish

Le volet latéral du Publish (pending changes) doit s’afficher même si aucune modification n’est visible.

Nous pouvons enfin vérifier que les paramètres sont disponibles dans le fichier ARMTemplateParametersForFactory.json de la branche adf_publish.

Modifier la valeur des paramètres pendant la release

Il ne reste qu’à utiliser ces noms de paramètres dans le pipeline de release Azure DevOps. Les nouvelles valeurs peuvent être définies comme des variables du pipeline pour plus de commodité.

Cet article a pu être écrit grâce à la documentation officielle de Microsoft ainsi que d’autres articles de blog, en anglais, que leurs auteurs en soient ici remerciés :

https://medium.com/@sanajitghosh/manage-ci-cd-pipelines-using-azure-devops-azure-data-factory-azure-databricks-8239a9ceef3

https://www.modern-dataengineering.com/post/how-to-add-custom-parameters-to-data-factory-templates

https://medium.com/@patrickpicard_50914/azure-data-factory-modifying-arm-template-parameters-53b11f38bced

Relancer un pipeline Azure Data Factory

Azure Data Factory est à la fois un ordonnanceur de traitements, un ETL ou un ELT selon la manière dont on pense les transformations et le chargement dans la destination (« sink » dans le vocable d’ADF). Il est muni d’une fenêtre de monitoring permettant de superviser l’exécution de pipelines au travers de triggers (déclencheurs en bon français). Et il ne sera pas rare (le plus rare possible, on vous le souhaite !) de rencontrer les icônes rouges de l’échec du traitement dans cette fenêtre.

Pipeline run : status failed

Dans cet article, nous allons explorer différentes méthodes de déclenchement ou relance d’un pipeline Azure Data Factory.

Relancer manuellement

Depuis le monitoring des pipelines, nous disposons de plusieurs boutons comme le bouton « rerun » au niveau de l’exécution globale du trigger.

Lorsqu’il existe une ou plusieurs activités dans le pipeline, il suffit d’aller sur le détail de l’exécution du pipeline pour choisir une des activités (cliquer dessus) et la relancer (« rerun from failed activity »). Ce second mode est particulièrement intéressant lorsqu’une chaîne de traitement a déjà réalisé des étapes importantes et ne nécessite pas d’être reprise depuis le début. Les gains de temps de traitement seront sans doute conséquents.

Ces deux stratégies sont bien sûr manuelles et nécessitent de venir observer la console de supervision. Nous allons maintenant explorer des stratégies plus automatisées.

Démarrage ou relance automatique

Une approche classique de déclenchement des pipelines consiste à définir un jour et une heure de déclenchement. S’il existe des dépendances aux activités, on prendra soin d’inclure différentes activités au sein du même pipeline et de les relier grâce aux différentes sorties disponibles (« add activity on »).

Il existe quatre types de conditions :

  • Success
  • Failure
  • Completion : exécution de l’activité suivante en cas de succès ou d’échec de l’activité
  • Skipped : exécution de l’activité suivante uniquement si l’activité n’a pas été exécutée

Ce blog vous permettra de rentrer plus en détails dans le fonctionnement de ces conditions.

Mais certains événements comme la présence d’un fichier, de lignes dans une table ou encore la réponse à une requête HTTP peuvent être des conditions de déclenchement attendues.

Par le paramétrage d’une activité

La manière sans doute la plus simple et basique de relancer une activité consiste à utilise le paramétrage de l’activité elle-même. Pour cela, dans la section General du module :

  • Renseigner un nombre maximum de Retry
  • Renseigner un intervalle en secondes entre chaque essai

Remarque : le time out par défaut est à 7 jours, il peut être diminuer.

Il faut toutefois anticiper ici un nombre maximum fini de relances, ce que l’on n’est pas forcément en capacité d’anticiper.

A la première exécution avec succès, les retry ne sont bien sûr plus pris en compte.

Les activités de type for each ou execute pipeline ne disposent pas de ce paramétrage.

Avec le composant Until

La documentation du composant Until est disponible sur ce lien et définit son fonctionnement de la sorte :

The Until activity provides the same functionality that a do-until looping structure provides in programming languages. It executes a set of activities in a loop until the condition associated with the activity evaluates to true. You can specify a timeout value for the until activity in Data Factory.

Nous allons créer ainsi le scénario suivant : sur présence d’un fichier testé par le composant Until, nous déclenchons une activité de copie.

Une variable nommée FileExists, de type chaîne de texte (string), est définie au niveau du pipeline.

A l’intérieur du composant Until, nous allons travailler avec deux activités que sont Get Medatada et Wait.

L’activité Get Metadata permet d’obtenir des informations sur un dataset préalablement défini, comme un checksum de type MD5, le nom, le type ou l’existence d’un item.

Le composant suivant Set variable n’est pas indispensable mais il nous permet d’illustrer ici la manière de conserver dans une variable une partie de l’information obtenue par l’activité Get Metadata.

La valeur de la variable est définie de la sorte :

@string(activity('Get Metadata from Source').output.exists)

On utilisera la fenêtre d’ajout de contenu dynamique pour obtenir directement certaines parties de cette expression.

Enfin, une activité Wait est déclenchée pour laisser un laps de temps s’écouler avant de tester à nouveau la présence du fichier.

Le composant If exists permet de ne pas jouer l’activité Wait lorsque le fichier attendu est détecté. Il n’est pas nécessaire de définir la partie True de ce composant.

L’activité Until peut enfin paramétrée à l’aide de l’expression suivante :

@equals(variables(‘FileExists’),’True’)

Voici enfin une démonstration du fonctionnement complet de ce pipeline.