Associer Azure Databricks et Azure Machine Learning

Si les deux outils ont en partie des fonctionnalités qui se recouvrent, avec quelques différences (voir cet article), il est tout à fait possible de tirer profit du meilleur de chacun d’eux et de les associer dans une architecture data sur Azure.

Le scénario générique d’utilisation sera alors l’industrialisation de modèles d’apprentissage (machine learning) entrainés sur des données nécessitant la puissance de Spark (grande volumétrie voire streaming).

Nous allons détailler ici comment Databricks va interagir avec les services d’Azure Machine Learning au moyen du SDK azureml-core.

A l’inverse, il sera possible de lancer, depuis Azure ML, des traitements qui s’appuieront sur la ressource de calcul Databricks déclarée en tant que attached compute.

Le SDK azureml sous Databricks

Nous commencerons par installer le package azureml-sdk[databricks] au niveau du cluster interactif (d’autres approches d’installation sont possibles, en particulier pour les automated clusters). Le cluster doit être démarré pour pouvoir effectuer l’installation.

Nous pouvons vérifier la version installée en lançant le code suivant dans un notebook :

import azureml.core
print("Azure ML SDK Version: ", azureml.core.VERSION)

Un décalage peut exister avec la version du SDK non spécifique à Databricks. Des dépendances peuvent également être demandées pour certaines parties du SDK comme l’annonce l’avertissement reçu à l’exécution de la commande précendente.

Failure while loading azureml_run_type_providers. Failed to load entrypoint automl = azureml.train.automl.run:AutoMLRun._from_run_dto with exception (cryptography 3.1.1 (/databricks/python3/lib/python3.8/site-packages), Requirement.parse('cryptography<4.0.0,>=3.3.1; extra == "crypto"'), {'PyJWT'}).

L’étape suivante consistera à se connecter à l’espace de travail d’Azure Machine Learning depuis le script, ce qui se fait par le code ci-dessous :

from azureml.core import Workspace

ws = Workspace.from_config()

Plusieurs possibilités s’ouvrent à nous pour réaliser ce lien. La première consiste à établir explicitement ce lien dans le portail Azure, depuis la ressource Databricks.

Mais cette approche ne permet d’avoir qu’un seul espace lié, celui-ci n’étant plus modifiable au travers de l’interface Azure (mais peut-être en ligne de commandes ?). La configuration étant donnée par un fichier config.json, téléchargeable depuis la page du service Azure ML, nous pouvons placer ce fichier sur le DataBricks File System (DBFS), et donner son chemin (au format “File API format”) dans le code .

from azureml.core import Workspace

ws = Workspace.from_config(path='/dbfs/FileStore/azureml/config.json')

Une authentification “interactive” sera alors demandée : il faut saisir le code donné dans l’URL https://microsoft.com/devicelogin puis entrer ses login et mot de passe Azure.

Pour une authentification non interactive, nous utiliserons un principal de service.

Nous pouvons maintenant interagir avec les ressources du portail Azure Machine Learning et en particulier, exécuter notre code au sein d’une expérience.

from azureml.core import experiment
experiment = Experiment(workspace=ws, name="ExperimentFromDBXs")

Après l’entrainement d’un modèle, idéalement réalisé grâce à Spark, nous pouvons enregistrer celui-ci sur le portail.

L’instruction run.log() permet de conserver les métriques d’évaluation.

A noter que les widgets ne peuvent pas s’afficher dans les notebooks Databricks.

Databricks comme attached compute

Comme pour toute ressource de calcul, nous commençons par déclarer le cluster Databricks dans le portail Azure Machine Learning.

Un jeton d’accès (access token) est attendu pour réaliser l’authentification. Nous prendrons soin de le générer grâce à un principal de service pour éviter qu’il ne soit attaché à une personne (“Personal Access Token”).

La ressource est maintenant correctement déclarée.

Nous allons l’exploiter au travers du SDK Python azureml dans lequel nous disposons de deux objets liés à Databricks.

La documentation présente la classe qui permettrait de créer cette ressource au travers du code :

DatabricksCompute(workspace, name)

Nous trouvons aussi dans la documentation l’objet qui crée une étape de pipeline exécutée ensuite au sein d’une expérience :

DatabricksStep(name, inputs=None, outputs=None, existing_cluster_id=None, spark_version=None, node_type=None, instance_pool_id=None, num_workers=None, min_workers=None, max_workers=None, spark_env_variables=None, spark_conf=None, init_scripts=None, cluster_log_dbfs_path=None, notebook_path=None, notebook_params=None, python_script_path=None, python_script_params=None, main_class_name=None, jar_params=None, python_script_name=None, source_directory=None, hash_paths=None, run_name=None, timeout_seconds=None, runconfig=None, maven_libraries=None, pypi_libraries=None, egg_libraries=None, jar_libraries=None, rcran_libraries=None, compute_target=None, allow_reuse=True, version=None)

Il est possible de lancer un script Python qui sera exécuté sur le cluster Databricks mais l’usage le plus intéressant est sans doute l’exécution d’un notebook.

from azureml.core.compute import DatabricksCompute
from azureml.pipeline.steps import DatabricksStep

databricks_compute = DatabricksCompute(workspace=ws, name="my-dbx-cluster")

dbNbStep = DatabricksStep( name="DBNotebookInWS",
 num_workers=1,
 notebook_path=notebook_path,
 run_name='DB_Notebook_demo',
 compute_target=databricks_compute,
 allow_reuse=True
)

Nous pourrons alors suivre les logs du cluster dans l’expérience.

Je vous recommande ce notebook pour découvrir tous les usages de DatabricksStep, comme par exemple l’exécution d’un JAR.

En conclusion

La souplesse des services managés et la séparation stockage / calcul autorisent aujourd’hui à penser les architectures data avec les services qui sont les plus utiles au moment souhaité, quitte à avoir des redondances de fonctionnalités. Nous veillerons à préserver au maximum l’indépendance de certaines parties du code (a minima la préparation de données et l’entrainement) vis à vis des plateformes propriétaires. Il sera alors possible d’envisager des alternatives chez d’autres fournisseurs cloud ou bien encore dans le monde de l’Open Source.

Différencier Azure Databricks et Azure Machine Learning

Des machines virtuelles, du code R ou Python, des accès au Data Lake, du versionning et de l’exposition de modèles de Machine Learning, les points communs entre Azure Databricks et le service Azure Machine Learning sont nombreux ! Mais nous allons détailler dans cet article les différences entre ces deux produits, qui vous permettront de choisir le meilleur outil selon vos cas d’usage. N’oublions pas que ce sont aussi des outils complémentaires mais cet aspect sera traité dans un autre article.

S’il fallait résumer en quelques mots chacun des deux produits, voici comment l’on pourrait les présenter :

  • Databricks est un cluster Spark managé dans le cloud Azure (mais aussi AWS et GCP), dédié à exécuter du code de manière distribuée. Autour de cette fonctionnalité principale, s’intègrent des produits comme MLFlow pour le versionning et serving des modèles de ML ou encore SQL Analytics et Redash pour la visualisation de données.
  • Azure Machine Learning est un portail (dit “studio”) regroupant toutes les briques d’un projet de ML, allant de l’import des données depuis des sources Azure à l’exposition (web service) et monitoring de modèles, en passant par l’entrainement de ces modèles sur différentes solutions de calcul.

Nous allons pointer les différences entre ces deux services sur les thèmes indiqués dans le schéma ci-dessous, qui sont au cœur d’un workflow de Data Science.

Les sources de données

Les sources possibles pour Azure ML sont toutes des services managés du cloud de Microsoft, de type systèmes de fichiers ou bases de données.

Les sources pour Azure Databricks sont représentées ci-dessous :

Le périmètre est ici plus large et intègre des produits classiques des architectures Big Data dans les domaines NoSQL, temps réel ou search.

La connexion à des ressources de type Azure SQL DB ou DWH se fait par un pilote générique JDBC, comme cela a été présenté ici préalablement. C’est une approche plus générique mais qui ne pas bénéficier d’optimisations propres à la communication de services Microsoft entre eux. C’est un point qui mériterait d’être testé, dans des conditions de sécurisation des échanges réseaux (VNET).

L’un des projets Open Source très présent dans Databricks est Delta Lake : un format de fichier “amélioré” par rapport au format Parquet classique, puisqu’il s’accompagne d’une capacité à traiter des transactions ACID et d’un historique au format JSON des logs transactionnels. Ce sont des opérations comme des delete, insert, update et upsert qui seront possibles ! La fonctionnalité de time travel permet de retrouver l’état des données à une date ou sur une version donnée. Nous sommes donc dans une logique “schema-on-write” d’habitude propres aux systèmes de gestion de bases de données. Est-ce déjà la fin annoncée de ces derniers ? Certainement pas !

Voici la syntaxe permettant d’écrire un dataframe Spark au format delta.

(df.write.format("delta")
.mode("append"|"overwrite")
.partitionBy("date") # optional
.option("mergeSchema", "true") # option - evolve schema
.saveAsTable("events") | .save("/path/to/delta_table")

Delta Lake est un produit à part entière, alors peut-on l’utiliser depuis Azure ML ? Rappelons qu’il faut un contexte Spark et c’est ici toute la difficulté pour Azure ML qui doit, pour cela, se baser sur une ressource tierce comme Azure Synapse Analytics ou… Azure Databricks !

Les scripts et leur exécution (calcul)

Azure ML disposent de deux SDK, Python et R, mais il ne sera pas possible d’utiliser de langage Java ou Scala. Au delà de la création d’un dataset sur une source Azure SQL DB, le SQL ne pourra être utilisé qu’au travers de packages R ou Python.

Le périmètre est plus large pour Databricks dont les notebooks exécuteront Python, R, Scala ou du SQL. Ce choix se fait à la création du notebook mais n’est pas rédhibitoire car les commandes magiques permettent de changer de langage dans une cellule : %python%r%scala, %sql.

Même si la plupart des Data Scientists sont plus enclins à développer en R ou Python, et dans les déclinaisons SparkR et PySpark, Scala reste le langage natif de Spark et certainement pour l’instant le plus efficace car compilé avant de s’exécuter.

Pour utiliser Scala avec Azure ML, nous attendrons la fonctionnalité de “linked service” vers Azure Synapse Analytics.

L’exécution d’un script dans Azure ML peut se faire une instance de calcul (la VM qui exécute également le serveur de notebooks ou RStudio Server) ou sur un cluster de machines virtuelles.

Databricks est prévu “by design” pour les exécutions distribuées sur un cluster mais il est possible d’utiliser un “cluster single node“, c’est-à-dire une machine unique.

Le versionning des modèles

Commençons par le produit Open Source présent dans les deux services : MLFlow.

La même équipe se trouvant à l’origine de Spark et de MLFlow, nous disposons naturellement dans Databricks d’une intégration très simple et fluide. Le package est même installé par défaut sur les runtimes de type ML.

Les éléments nécessaires à l’évaluation et à la reproductibilité du modèle sont simples à enregistrer à l’aide de commandes comme mlflow.log_param, mlflow.log_metric, etc.

Azure ML permet donc également d’activer une URI MLFlow mais qui ne sera valable qu’une heure.

Vous risquez aussi le message d’erreur suivant :

WARNING mlflow.models: Logging model metadata to the tracking server has failed, possibly due older server version.

Il faut à mon sens voir le portail Azure ML comme principalement un outil d’affichage des logs d’exécution (runs) des scripts lancés au sein d’expériences. A nouveau, quelques commandes simples permettent d’enregistrer le binaire d’un modèle (artifact) et les informations associées : model.register, run.log, etc.

Pour remplacer la commande mlflow.sklearn.save_model, nous utiliserons dans Azure ML une déclinaison de l’exemple suivant :

from azureml.core.model import Model

Model.register(workspace=ws,
               model_name="diabetes_regression_model",
               model_path="test_diabetes/model.pkl",
               model_framework=Model.Framework.SCIKITLEARN,  # Framework used to create the model.
               model_framework_version='0.20.3',             # Version of scikit-learn used to create the model.
               tags={'alpha': alpha, 'l1_ratio': l1_ratio},
               description="Linear regression model to predict diabetes"
              )

Il est indispensable d’effectuer cette opération pour retrouver l’artefact en dehors du menu “Output & logs” de l’exécution.

L’exposition des modèles

Nous sommes ici dans le domaine de prédilection d’Azure ML qui permet une approche par l’interface ou au travers du SDK pour piloter deux ressources Azure : Container Instance et Kubernetes Services.

Mais Databricks n’est pas en reste pour l’exposition et celle-ci s’appuiera naturellement sur MLFlow. Nous commençons par enregistrer le modèle retenu.

Ensuite, depuis le menu Model, nous pouvons activer le serving, qui se fera sur une ressource de type single-node.

Dès que la ressource (une machine virtuelle) est active, il est possible d’interroger l’API.

Ordonnancement

Dans une architecture Azure PaaS, c’est Azure Data Factory (ADF) qui est la solution toute désignée pour ordonnancer les traitements. Nous y disposons d’un module pour chacun des deux services. La différence se situe sur les éléments pouvant être appelés par ADF. Seuls les objets “Pipeline” issus d’Azure ML sont utilisables et ces objets ne sont pas simples à développer.

Les deux premiers modules ML concernent l’ancienne version du studio.

Côté Databricks, ce sont des scripts Python, des notebooks ou bien des Jar qui sont exécutables au travers de l’interface.

Les deux services ont également leur propre outil de scheduling, pilotable au travers de l’interface pour Databricks ou bien grâce à leur API.

A l’aide du SDK Azure ML, nous pourrons mettre en place un trigger à l’aide du code ci-dessous :

Versionning du code et cycle CI/CD

Databricks permet de définir un fournisseur Git parmi les suivants :

Chaque notebook doit ensuite être lié au repository et il est possible de faire un commit sur une branche déjà créée (une pull request devant ensuite être faite depuis le fournisseur Git).

Récemment (mars 2021), une nouvelle approche est disponible et fournit plus de fonctionnalités.

Dans les notebooks d’Azure ML, aucun lien n’est fait avec un gestionnaire de version. Il faut donc ruser et passer par un IDE local comme Visual Studio Code qui pilotera les ressources de calcul à distance. J’ai détaillé ce fonctionnement dans cet article.

Passons maintenant à la problématique du déploiement entre deux environnements (par exemple, développement et production).

Lorsque le code se trouve versionné sur un dépôt Azure Repos, il est très simple de lancer de manière automatique un pipeline d’intégration qui réalisera des tests automatisés. Databricks s’appuiera sur une machine virtuelle munie du module databricks-connect qui permet d’exécuter du code à distance sur l’environnement Spark.

Le schéma ci-dessous résume l’isolation des environnements avec une architecture simple reposant sur un stockage Azure Data Lake et un espace de travail Databricks.

Des outils de release de la Market Place vont permettre de déplacer automatiquement des notebooks de l’environnement de développement vers celui de production.

En revanche, utiliser des ressources Azure ML par environnement est une question qui n’est pas simple à trancher. En effet, le portail est finalement une sorte d’outil “DevOps” (disons plutôt MLOps) en lui-même. Nous y trouverons ainsi les versions des datasets et des modèles, ainsi que les logs d’exécution permettant un suivi de production. Mais en pratique, et par manque d’outils de nettoyage (des expériences, des exécutions…), l’usage pour le développement va “polluer” le portail. Il sera donc indispensable de se donner des pratiques de gouvernance et de limiter les droits aux différents utilisateurs, par le biais des custom roles, comme décrit dans ce billet.

Databricks peut être utilisé comme “attached compute”, environnement d’exécution des scripts Spark.

Le Machine Learning ne se marie pas forcément bien avec l’isolation des environnements : en effet, pour démarrer le projet, il faut des données qui soient de qualité et suffisamment représentatives de la réalité. Ce n’est pas le cas des environnements de développement qui sont incomplets voire erronés. Un accès (en lecture) à la production depuis l’environnement de développement pourrait être à envisager.

Coût et FinOps

Suite au retrait de la licence Entreprise d’Azure ML, le coût d’utilisation ne correspond qu’au temps où sont utilisées les ressources de calcul. A cela s’ajoutent les services qui accompagnent Azure ML : compte de stockage, Key Vault, app insights et surtout container registry. Il faut penser à purger régulièrement ce dernier car un container registry ne donne que 10Go de stockage gratuit.

Il est tout à fait possible d’utiliser les mêmes types de VMs sur les deux outils. Pour autant, le coût d’une VM est “surchargé” par la licence Databricks, exprimée en DBU. Cette licence se décline elle-même selon le type d’espace de travail (standard ou premium) et le type de cluster (interactif ou automated).

Une piste supplémentaire est d’utiliser des “spot instances” moins chères, donnant une nouvelle opportunité d’optimisation des coûts (au détriment de la rapidité d’exécution), comme présenté dans cette vidéo.

Quelques points inclassables

En plus des outils “no code” (Concepteur et AutomatedML), Azure ML dispose d’un module de labellisation d’images, assisté par une approche d’active learning (au bout d’un nombre suffisant d’images taggés, les tags peuvent être automatiquement proposés).

Databricks s’est pourvu d’un nouvel outil “SQL Analytics” qui permet, à l’aide de requêtes SQL, de préparer des vues à destination d’un outil de reporting comme Microsoft Power BI.

En licence Premium, le service Power BI donne accès aux modèles enregistrés sur Azure ML pour que ceux-ci soient appliqués lors de la phase de préparation de données (Power Query).

Alors, lequel choisir ?

En conclusion, il faudra bien évaluer les prérequis des projets (par exemple, l’utilisation de Delta Lake) qui s’exécuteront sur l’architecture Azure pour choisir le bon outil, mais il faut aussi bien comprendre qu’il n’y a pas un surcoût trop important à les utiliser tous les deux et nous verrons surtout dans un prochain article qu’ils peuvent se montrer très complémentaires !

Wheel Python et Azure feed : centralisez et distribuez vos packages

Assurer la qualité d’un développement Python passe par le fait de packager du code (des classes, des fonctions…) dans des modules que l’on pourra ensuite simplement installer dans un nouvel environnement (avec un classique pip install...) et importer dans des scripts à l’aide des syntaxes habituelles que sont import package ou from package import function.

Créer le package Wheel

C’est bien sûr la toute première étape une fois que notre code a été écrit. Et bien écrit, c’est-à-dire en respectant par exemple la norme PEP8 (nous en reparlerons) et en intégrant des docstrings dans les fonctions.

Nous aurons besoin de quelques librairies, dont bien évidemment wheel et nous profiterons d’un environnement virtuel pour les installer (commandes Windows ci-dessous pour créer, activer et configurer cet environnement).

python -m venv env
env\Scripts\activate.bat
python -m pip install -U setuptools wheel twine

Notre exemple se basera sur du code simple générant un pandas dataframe avec des nombres aléatoires. Notons au passage (pour les puristes :)) que ce code poserait problème dès que nb_col dépasserait la valeur 26.

import pandas as pd
import numpy as np
import string


def generate_df(nb_col, nb_row):
     """
         Generate a pandas DataFrame
         with nb_col columns and nb_row rows
     """
     alphabet_string = string.ascii_uppercase
     columns_string = alphabet_string[:nb_col]
     columns_list = list(columns_string)
     df = pd.DataFrame(np.random.randint(0, 100, size=(nb_row, nb_col)), columns=columns_list)
 return df

Le script, nommé ici my_function.py, doit se trouver dans une arborescence de fichiers définie comme suit :

my_package
├── LICENSE
├── README.md
├── my_pkg
│   └── __init__.py
│   └── my_function.py
├── setup.py
└── tests

Le fichier __init__.py est tout simplement un fichier vide, seul le nom est obligatoire. Ce fichier doit être dans le répertoire dédié aux fichiers développés (sous-répertoire du répertoire principal). Il peut toutefois contenir également des fonctions qui seront chargées à l’appel du module. Pour réaliser des tests simples, voici ce qu’il contient.

def function_init():
     print('Successfully imported Init.py')


 def print_hello_iam(name):
     print(f'Hello, I am {name}')

Nous complétons avec un fichier d’informations README au format Markdown, un fichier texte contenant la licence (ci-dessous) et un répertoire, éventuellement vide dans un premier temps, qui contiendra des tests.

Copyright (c) 2018 The Python Packaging Authority
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:

 The above copyright notice and this permission notice shall be included in all
 copies or substantial portions of the Software.

 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 SOFTWARE.

Le dernier élément nécessaire est le fichier setup.py dont un modèle de contenu peut être retrouver sur cette page. Il faudra en particulier y préciser le nom souhaité pour le package.

Pour générer l’archive de distribution, nous lançons la commande suivante, au niveau du répertoire contenant le fichier setup.py :

python setup.py sdist bdist_wheel

Un répertoire dist est alors créé et contient deux fichiers : le fichier .whl et une archive .tar.gz.

Le nom du package wheel est normalisé de la sorte :

{dist}-{version}(-{build})?-{python}-{abi}-{platform}.whl

EDIT : vérifiez également que le dossier finissant par “egg-info” contient bien les fichiers suivants.

Le fichier top_level.txt contient en particulier le nom qui servira à appeler le package dans les syntaxes du type from package import …

Réalisons tout de suite un premier test pour valider que notre package est bien construit. Au niveau du dossier contenant le fichier .whl, nous lançons dans un terminal la commande suivante, depuis le répertoire dist :

python -m pip install my_pkg_byPaulPETON-0.0.1-py3-none-any.whl

Puis dans un prompt Python, nous vérifions que l’import des nos méthodes est bien reconnu :

from my_pkg.__init__ import *
from my_pkg.my_function import *

Tester le package depuis un cluster Azure Databricks

Nous allons maintenant installer manuellement le fichier .whl disponible dans le répertoire dist sur un cluster Databricks. Nous commençons par le télécharger sur un cluster interactif démarré.

Dès lors, les fonctions deviennent disponibles dans un notebook.

Notez au passage le “dark mode” des notebooks Databricks 🙂

Charger le package sur un feed Azure DevOps

Nous n’allons bien sûr pas utiliser le fichier .whl localement, celui-ci doit être hébergé sur une plateforme accessible de tous les développeurs de l’équipe.

Tout comme le code est placé dans un dépôt (repository) d’un gestionnaire de versions comme Azure DevOps, nous allons placer le package, ici nommé artefact, dans un feed, accessible aux personnes autorisées.

Nous réalisons tout d’abord la création du feed. Il sera ici public, donc accessible au travers d’Internet. Pour une pratique en entreprise, un projet privé est bien évidemment recommandé.

Le feed est maintenant bien actif.

Attention, il est préférable de vérifier vos options de facturation (dans le menu “Organization Settings”). Un compte gratuit présentera des limites de taille pour l’usage des artefacts stockés dans le feed.

Vérifions également le stockage associé aux artefacts (ici au niveau projet, puisque c’est le périmètre qui a été défini).

Il s’agit ensuite d’automatiser la création du package wheel par un pipeline d’intégration continue. Nous démarrons à partir d’un “starter pipeline”.

Le code complet de ce pipeline, enregistré automatiquement dans un fichier azure-pipelines.yml, ajouté au repository, est disponible par exemple sur ce GitHub (veillez à adapter si besoin la version de Python attendue ainsi que le nom de la branche – main pour master – si vous souhaitez un lancement automatique de la pipeline à chaque commit).

Ce script YAML reprend l’exécution du fichier setup.py pour créer l’artefact dans un conteneur dédié.

Nous avons ensuite besoin d’un pipeline de release qui déposera l’artefact dans le feed qui servira de point de distribution.

A partir d’un modèle vide de pipeline de release (“empty job“), nous attachons le résultat du pipeline de build réalisé précédemment.

Nous ajoutons trois activités qui sont :

  • Python twine upload authenticate
  • Command line pour l’installation de Twine
  • Command line pour la publication de l’artefact

L’étape 1 s’authentifie auprès du feed, dont il faut saisir le nom.

L’étape 2 réalise l’installation du package Twine par la commande pip install.

L’étape 3 utilise Twine pour télécharger le package.

Voici la ligne de code nécessaire :

twine upload -r <feed_name> --config-file $(PYPIRC_PATH) d:\a\r1\a\_<project_name>\dist\dist*

Cette étape est sans doute la plus délicate. En cas d’erreur pour localiser ce répertoire, je vous conseille de regarder les logs de la première étape du pipeline de build afin de visualiser l’artefact dans son arborescence.

Utiliser le package depuis un nouveau script

Nous allons avoir besoin d’une nouvelle librairie : artifacts-keyring..

pip install artifacts-keyring

Ensuite, nous pouvons faire appel à la commande classique “pip install” pointant vers notre feed.

pip install packageName --index-url https://pkgs.dev.azure.com/<organization_name>/<project_name>/_packaging/<feed_name>/pypi/simple/

Le “project_name” est facultatif si le feed est déclaré au niveau de l’organisation. Il sera possible de remplacer par l’option –index-url par –extra-index-url comme indiqué dans cet article mais ceci est à réaliser seulement si l’on utilise la méthode proposée dans le portail Azure DevOps : la création d’un fichier pip.ini.

En cas de project privé, une authentification sera alors demandée, de manière interactive, au travers d’un navigateur.

Ce mode n’est bien sûr pas envisageable pour une installation devant s’exécuter de manière autonome. Nous allons donc générer un Personal Access Token (PAT) qui permettra de nous authentifier. Ce jeton s’obtient dans Azure DevOps.

Nous donnons les droits au niveau “packaging“.

Dans l’environnement d’exécution, une variable d’environnement sera nécessaire pour désactiver l’authentification interactive et tenir compte du PAT. Cela se fait par exemple dans Windows par le menu “Modifier les variables d’environnement système” ou dans un dockerfile avec la ligne ci-dessous.

VAR ARTIFACTS_KEYRING_NONINTERACTIVE_MODE=true

Il sera alors possible d’appeler le package avec la syntaxe suivante :

pip install packageName --extra-index-url=https://<PAT>@pkgs.dev.azure.com/<organization_name>/<project_name>/_packaging/<feed_name>/pypi/simple/

Ca y est ! Le package est maintenant installé et nous pouvons nous appuyer sur les fonctions qu’il contient.

from my_pkg.my_function import *

En conclusion

Nous avons mis en place ici une approche dédiée à l’industrialisation d’un développement, accompagné d’un processus CI/CD. Cela demande un investissement en temps et en prise en main de cette procédure mais c’est une garantie de stabilité et de non régression sur le livrable en production.

Déployer Azure Databricks avec Terraform

Déployer une infrastructure cloud est un projet qui peut (et doit !) se penser comme du code. En effet, une rigueur dans le nommage des ressources est particulièrement importante et nous aurons certainement plusieurs environnements à créer (développement, qualification, production…). Nous allons donc éviter des tâches répétitives et sécuriser ce processus. En cas de perte d’une partie ou de tout un environnement, il sera également simple de le recréer à l’identique.

Nous allons pour cela utiliser Terraform qui présente l’avantage d’être commun à différents fournisseurs de cloud.

Azure Databricks ne fait pas exception à cette démarche et nous allons illustrer ici le déploiement d’un espace de travail au travers d’un script Terraform.

Installer Terraform dans un environnement Windows

  • Télécharger la dernière version de Terraform via le lien suivant : Terraform Versions | HashiCorp Releases
  • Créer un dossier : “.terraform” dans le répertoire : C:\Users\username\.terraform
  • Dézipper le fichier téléchargé et le déplacer dans le dossier “.terraform”
  • Ajouter le chemin: C:\Users\username\.terraform dans la variable d’environnement Path
  • Depuis un terminal, tester la commande : terraform –version

Ecrire le fichier main.tf

Le début du fichier va référencer la version de Terraform utilisée.

terraform {
  required_providers {
    azurerm = {
      source  = "hashicorp/azurerm"
      version = "=2.47.0"
      skip_credentials_validation = false
    }
  }
}

Pour autoriser le script à prendre la main sur notre souscription Azure, un principal de service sera nécessaire et celui-ci doit disposer des droits de niveau gestion des ressources sur la souscription. Dans un premier temps, une authentification par Azure CLI est aussi envisageable. Les différents modes sont décrits dans la documentation de Terraform.

Depuis un environnement local, nous écrivons la valeur des variables sous la forme nomVar = “valeur” dans un fichier au nom réservé terraform.tfvars.

Les variables sont ensuite déclarées et appelées dans le code à l’aide de la syntaxe suivante :

# VARIABLES
variable "subscription_id" {}
variable "client_id" {}
variable "client_secret" {}
variable "tenant_id" {}

# Configure the Microsoft Azure Provider
provider "azurerm" {
  features {}
  subscription_id = var.subscription_id
  client_id       = var.client_id
  client_secret   = var.client_secret
  tenant_id       = var.tenant_id
}

Un groupe de ressource sera nécessaire et il se crée avec la syntaxe suivante :

resource "azurerm_resource_group" "rg" {
  name     = "rg-analytics"  
  location = "West Europe"

  tags = {
        environment = "dev"
        project = "unified analytics"
    }
}

Passons maintenant à la création de la ressource Databricks en elle-même, nous ferons référence à l’alias du groupe de ressources défini dans Terraform (ici, “rg”).

resource "azurerm_databricks_workspace" "dbx" {
  name                = "dbx-workspace"
  resource_group_name = azurerm_resource_group.rg.name
  location            = azurerm_resource_group.rg.location
  sku                 = "standard"

  tags = {
    environment = "dev"
    project = "unified analytics"
  }
}

La valeur définie dans “sku” correspond à la licence choisie parmi les valeurs standard, premium ou trial. Cette dernière permet de tester la licence Premium pendant 14 jours.

La création d’une ressource Databricks entraine la création conjointe d’un groupe de ressources managé contenant un compte de stockage, un réseau virtuel et un groupe de sécurité réseau.

Si nous avons besoin d’un cluster interactif pour utiliser cette ressource (attention, pourquoi ne pas passer par des automated clusters lancés par Azure Data Factory comme expliqué dans cet article ?).

Les propriétés utilisées sont celles visibles dans la description au format JSON du cluster, depuis l’espace de travail Databricks.

resource "databricks_cluster" "cluster" {
  cluster_name  = "myInteractiveCluster"
  spark_version = "7.3.x-scala2.12"
  node_type_id  = "Standard_F4s"
  num_workers = 2
  autotermination_minutes = 60
  library {
    pypi {
      package = "azureml-core"
    }
  }
}

Lors de la création de ce cluster, nous installons automatiquement la librairie azureml-core, qui correspond au SDK pilotant les ressources d’Azure Machine Learning, souvent évoqué sur ce blog.

Quelques commandes Terraform

Pour initialiser le répertoire contenant les différents fichiers, nous lançons la commande suivante :

terraform init

Un sous-répertoire .terraform se crée alors automatiquement.

Pour analyser le script Terraform et visualiser les éléments qui seront ajoutés ou modifiés :

terraform plan

Par défaut et sans préciser l’argument -var-file, le fichier de secrets terraform.tfvars est utilisé.

Pour appliquer ces transformations :

terraform apply

Une validation sera demandée.

Pour supprimer les ressources définies dans le main.tf :

terraform destroy

A nouveau (et heureusement !), une validation est requise.

Voici donc nos premiers pas sous Terraform et la simplicité d’utilisation vous fera vite oublier les cases à remplir et à cocher de l’interface utilisateur du portail Azure !

Il ne reste plus qu’à intégrer ce script dans un pipeline d’intégration continue, par exemple sous Azure DevOps.

Exploiter l’API Azure Databricks en PowerShell

L’API d’Azure Databricks permet de réaliser de nombreuses actions au moyen de commandes émises au travers d’une URL, de type GET ou POST. La documentation complète est disponible sur le site de Microsoft ou bien sur celui de Databricks.

Un premier exemple prend la forme ci-dessous et permet d’obtenir des informations détaillées sur un cluster :

GET https://<databricks-instance>/api/2.0/clusters/get?cluster_id=<cluster-id>

Les identifiants nécessaires de l’espace de travail et du cluster Databricks peuvent être obtenues en se rendant sur la page Web du cluster.

Bien sûr, Databricks ne se limite pas à des clusters, il faut des notebooks contenant du code et ceux-ci sont pilotés par des jobs. Pour imaginer un scénario paramétrable, nous définissons des widgets dans le notebook, ce qui permettra de passer les valeurs de ces paramètres aux jobs.

parquet_file = dbutils.widgets.get("pParquetFile")
limit = int(dbutils.widgets.get("pLimit")

La définition du job se fait dans l’interface dédiée et les paramètres peuvent y être déclarés. Il faut noter ici l’identifiant du job, nous en aurons besoin par la suite.

Sauf à planifier le job, ces étapes resteront manuelles et les valeurs des paramètres seront à préciser à chaque exécution.

En intégrant différentes instructions dans un script PowerShell, nous pouvons élaborer le scénario suivant :

Le code correspondant est détaillé ci-dessous :

$param1 = "abc"
$param2 = 123
$limitRows = 100

$dbxPAT = "dapixxxxxxxxxxxxxxxxxxxxx"
$dbxBearerToken = "Bearer " + $dbxPAT
$dbxWorkspaceURL = "https://adb-00000000000000000.0.azuredatabricks.net/"
$clusterID = "0000-000000-xxxxx000"
$jobID = 42
$localPath = "C:\temp\"
$filePath = $localPath+$fileName

$headers = New-Object "System.Collections.Generic.Dictionary[[String],[String]]"
$headers.Add("Authorization", $dbxBearerToken)

 Write-Output "Cluster status"
 $URL = $dbxWorkspaceURL + "api/2.0/clusters/get?cluster_id="+$($clusterID)
 $responseCluster = Invoke-RestMethod $URL -Method 'GET' -Headers $headers
 $clusterState = $responseCluster.state
 Write-Output "Cluster state :" $clusterState

 If($clusterState -ne "RUNNING")
 {
     Write-Output "Start cluster"
     $URL = $dbxWorkspaceURL + "api/2.0/clusters/start"
     $body = @{"cluster_id"=$clusterID} | ConvertTo-Json
     $responseStart = Invoke-RestMethod $URL -Method 'POST' -Headers $headers -Body $body
     $clusterState = $responseStart.state
 
    While($clusterState -ne "RUNNING")
  {
     Write-Output "Please wait one minute more...the cluster is starting"
     Start-Sleep -Seconds 60
     $URL = $dbxWorkspaceURL + "api/2.0/clusters/get?cluster_id="+$($clusterID)
     $responseStart = Invoke-RestMethod $URL -Method 'GET' -Headers $headers
     $clusterState = $responseStart.state
     Write-Output $clusterState
  }
 }

Write-Output "Run job"
$body = @{
   "job_id"= $jobID
   "notebook_params"= @{"P1"= $param1 ;"P2"= $param2 ;"limit"= $limitRows }
 } | ConvertTo-Json
$URL = $dbxWorkspaceURL + "api/2.0/jobs/run-now"
$responseJob = Invoke-RestMethod $URL -Method 'POST' -Headers $headers -Body $body
$runId = $responseJob.run_id
Write-Output "run ID : "$runId

$URL = $dbxWorkspaceURL + "api/2.0/jobs/runs/get-output?run_id="+$($runId)
Write-Output "Run status and Get output"
$runStatus = "PENDING"
Do
{
 Write-Output "Please wait one minute more...the job is running"
 Start-Sleep -Seconds 60
 $responseRun = Invoke-RestMethod $URL -Method 'GET' -Headers $headers
 $responseun | ConvertTo-Json
 $runStatus = $responseRun.metadata.state.life_cycle_state
 Write-Output $runStatus
 }
Until($runStatus -eq "TERMINATED")

Write-Output "Check run result state"
$runResultState = $responseRun.metadata.state.result_state
Write-Output $runResultState

If($runResultState -ne "SUCCESS")
{
    exit;
}

Write-Output "Export result to file"
$result = $responseRun.notebook_output.result
$isTruncated = $responseRun.notebook_output.truncated
if($isTruncated -eq "False")
{
    $result | Out-File -FilePath $filePath -Force
    Write-Output "Export done"
}

Dans ce code, nous nous appuyons sur la fonction Invoke-RestMethod suivie de l’URL de l’API Databricks. La réponse sera ensuite exploitée pour continuer le programme.

L’instruction api/2.0/jobs/runs/get-output?run_id= permet de retourner un texte passé en paramètre de la commande Databricks qui viendra conclure le notebook (aucune autre cellule ne sera ensuite exécutée) :

dbutils.notebook.exit(textObject)

Le contenu de la variable textObject se retrouve alors au niveau metadata.state.result_state du résultat de l’instruction. La sortie ne peut dépasser un volume de plus de 5Mo. Nous pouvons vérifier que le résultat n’est pas tronqué à l’aide de la valeur de l’élément notebook_output.truncated à false.

En mettant en œuvre ce code au sein d’une ressource comme Azure Function (les paramètres définis au début du code intégrant alors la route de la fonction), nous avons obtenu une “meta API” paramétrable, restituant un résultat sous forme d’export de données !

Utiliser Databricks CLI

Jusqu’à présent, nous avons l’habitude d’utiliser un espace de travail Azure Databricks en nous connectant au portail, l’authentification étant réalisée par un couple login / password déclaré dans l’annuaire Azure Active Directory.

Dans un but d’automatisation des tâches, il est intéressant de se pencher sur les commandes en lignes ou CLI. Celles-ci sont documentées sur le site de Databricks. Les commandes peuvent être vues comme une surcouche de l’API REST de Databricks.

Installation du CLI Databricks

Un environnement Python est nécessaire. Nous pouvons ensuite lancer le téléchargement du package dédié avec la commande ci-dessous, depuis un terminal.

pip install databricks-cli

Nous vérifions dans la foulée que l’affichage de l’aide d’une commande, sur laquelle nous reviendrons plus tard, est fonctionnel.

databricks fs -h

La version installée peut être retrouvée par la commande :

databricks --version

C’est un produit qui évolue rapidement et il conviendra de le mettre à jour fréquemment, afin de bénéficier d’un maximum de fonctionnalités.

Authentification

Nous allons utiliser un Personal Access Token pour nous authentifier auprès du service. Ce jeton de sécurité est obtenu sur le portail Databricks, dans le menu User Settings.

Attention à bien noter (dans un coffre-fort électronique !) le jeton obtenu, il ne sera plus possible d’afficher sa valeur par la suite.

Nous démarrons la configuration par la commande :

databricks configure --token

Deux valeurs seront attendues : l’URL de l’espace de travail, puis le token précédemment généré. L’URL pour une ressource Azure est dorénavant de la forme https://adb-XXXXXXXXXXXXXXXX.XX.azuredatabricks.net/.

Un fichier local s’écrit alors et contient les informations renseignées.

Vérifions maintenant que l’authentification est effective en lançant une commande interagissant avec l’espace de travail :

databricks workspace ls

Si nous obtenons un message d’erreur Error: b’Bad Request’, la cause peut être un mauvais enregistrement du token dans le fichier de configuration. En ouvrant celui-ci dans un éditeur de texte, nous visualisons le résultat suivant :

Remplacer les caractères SYN par la valeur du token permettra de résoudre ce problème mais demande de stocker ce secret de manière non sécurisée.

Nous pouvons maintenant lister le contenu de l’espace de travail.

Copier un fichier depuis le FileStore

Le FileStore est une zone de stockage spécifique (un dossier) du DataBricks File System (DBFS). Celui-ci nous permet de réaliser des échanges avec l’extérieur : copie de fichiers vers ou depuis le DBFS. Une documentation complète est disponible ici.

A l’aide des commandes du CLI, nous identifions un fichier que nous pouvons recopier localement.

Comme le CLI est une surcouche de l’API REST de Databricks, il est intéressant de retrouver la commande initiale émise vers l’espace de travail. Nous pouvons l’obtenir en ajoutant –debug après la commande de copie.

Le premier appel vérifie le statut du fichier. Il est possible d’exécuter la requête dans Postman (coller un token dans la partie Authorization).

Nous trouvons ici la taille du fichier. Il faut savoir que le contenu du fichier est converti en base 64 et nous allons ensuite mieux comprendre pourquoi.

Un second appel à l’API se fait avec la méthode read et deux options &offset=0&length=1048576. Si le fichier dépasse 1 méga-octet, celui-ci est découpé en morceaux (chunks) et plusieurs appels seront nécessaires. La commande reconstitue toutefois le fichier et le décode automatiquement. Tout cela est donc transparent pour l’utilisateur !

Ajuster le niveau d’une Azure SQL DB avant et après un traitement Data Factory

Nous avons déjà évoqué sur ce blog le driver JDBC permettant d’écrire depuis un notebook Azure Databricks dans une base de données managée ou encore comment utiliser Polybase pour pérenniser une table du métastore Databricks dans Azure SQL DWH (aujourd’hui Synapse Analytics).

Cette action d’écriture demande souvent un niveau de ressource suffisamment élevé, tout comme l’import de données réalisé dans un dataset Power BI. Le reste du temps, une puissance plus faible peut être tout à fait acceptable. Et comme le niveau de facturation est lié au niveau de la ressource, pouvoir maîtriser par code, et non manuellement depuis l’interface, le scape up ou scale down de sa ressource peut s’avérer très intéressant dans une démarche d’optimisation des coûts (FinOps).

Nous nous plaçons ici dans un scénario de pipeline de transformation de données réalisé par Azure Databricks puis d’import dans un dataset Power BI. L’actualisation par API REST de ce dernier est détaillé dans cet article.

Au démarrage de notre pipeline, nous souhaitons augmenter la puissance (scale up) et ceci peut se faire avec des instructions PowerShell documentées ici.

Il est possible d’exécuter un script PowerShell depuis Azure Data Factory à l’aide de ce qu’on appelle une “custom activity“. Celle-ci s’appuie sur un service lié de type Azure Batch. Et ce service n’est pas gratuit, voire même relativement cher pour ce que l’on souhaite réaliser.

Nous optons donc pour la programmation d’une Azure Function, qui supporte le langage PowerShell ! Nous choisissons le type de fonction Trigger HTTP.

Dans Visual Studio Code, nous obtenons l’arborescence ci-dessous.

Le code principal de la fonction doit se trouver dans le fichier run.ps1.

Un premier point important consistera à renseigner les dépendances de librairies PowerShell dans le fichier requirements.psd1. Une combinaison fonctionnelle est la suivante :

# This file enables modules to be automatically managed by the Functions service
# See https://aka.ms/functionsmanageddependency for additional information.
#
@{
    'Az.Accounts' = '2.1.2'
    'Az.Sql' = '2.11.1'
 }

Second élément indispensable pour l’intégration dans une activité Web de Data Factory : le retour de la fonction doit obligatoirement être un objet JSON. Nous terminons donc la fonction par le code ci-dessous :

$body = @{name = 'Scale done !'} | ConvertTo-Json

#Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
     StatusCode = [HttpStatusCode]::OK
     Body = $body
})

Une fois la fonction publiée sur Azure, nous pouvons définir le pipeline complet.

La fonction Azure étant pensée de manière paramétrable, nous pouvons donner le niveau de ressource comme un élément de l’URL appelée par la méthode POST. Nous utilisons dans l’exemple ci-dessous le niveau “S” suivi d’un chiffre.

Ce niveau modifiera automatiquement le configuration de la ressource Azure SQL DB, en mode de tarification basé sur les DTUs (et non les vCores).

L’activité de traitement de la donnée, ici portée par Azure Databricks, réalise le démarrage d’un cluster de type interactif. Lorsque le traitement est terminé, le cluster peut s’éteindre automatiquement si la fonctionnalité “terminate after…” a été mise en place. Mais il faut ici attendre que l’import du dataset Power BI soit terminé ! Nous gérons donc la fin du cluster Databricks au moyen de l’API dédiée, dans une activité Web.

La syntaxe utilisée est la suivante :

POST https://adb-XXXXXXXXXXXXXXXX.X.azuredatabricks.net/api/2.0/clusters/delete

Les informations nécessaires seront : l’URL du workspace, un personal token d’authentification et l’identifiant du cluster.

Dans un optique d’optimisation des coûts, nous positionnons comme dernière activité du pipeline le scale down de la base de données.

Nous obtenons ainsi une chaîne complète optimisée sur l’enchainement des tâches et sur les coûts engendrés par les services PaaS.

Connecteur Power BI dédié à Azure Databricks

Jusqu’à présent, nous utilisions le connecteur Spark générique comme présenté dans cet article. Le seul mode d’authentification possible consistait à utiliser un jeton personnel (personal access token).

Nous pouvons nous connecter à des tables créées dans des databases du metastore du cluster Databricks et cela implique que le cluster soit démarré afin que la connexion soit possible.

Ce sont donc des informations au niveau cluster dont nous aurons besoin pour nous connecter. Ce paragraphe détaille les éléments attendus que sont le hostname, le port (443 par défaut) et le HTTP path.

Un connecteur dédié est apparu en préversion publique depuis octobre 2020 et présenté sur cette page de la documentation officielle Microsoft.

Nous remarquons que les deux modes import et DirectQuery sont disponibles, le second étant bien sûr conditionné par le statut démarré permanent du cluster.

Un paramètre est ici très important : le batch size. Il s’agit de la taille des “paquets” de lignes qui seront extraits du cluster. Nous reviendrons sur ce paramètre dans la section liée à la performance.

Nous disposons ensuite de trois modes de connexion, dont le mode “classique” par Personal Access Token mais également l’authentification au travers de l’annuaire Azure Active Directory (AAD).

C’est ce dernier que nous utiliserons ici.

Nous obtenons alors l’accès au metastore du cluster, afin de sélectionner une ou plusieurs tables.

Il est alors possible de charger directement les données dans une table du modèle ou bien d’ajouter des transformations dans la fenêtre Power Query. Pour autant, l’intérêt du cluster Spark est bien de réaliser toutes les transformations de données avant de créer une table “nettoyée”.

Pour l’actualisation planifiée du rapport dans le service Power BI, nous choisissons le mode d’authentification OAuth2.

Le niveau de confidentialité Organizational exige que la source de données Azure Databricks fasse partie de l’abonnement Azure sur lequel est défini l’annuaire AAD.

Afin de ne pas lier un compte personnel à un jeu de données Power BI, il sera préférable d’utiliser un compte de service. A l’heure actuelle (novembre 2020), la connexion au travers d’un principal de service ou d’une identité managée n’est pas réalisable.

Qu’attendre des performances ?

Afin de tester ce connecteur, nous chargeons comme table du cluster le fichier des Demandes de Valeur Foncière de 2019, soit 400Mo pour environ 3 millions de lignes.

La configuration du cluster est également à prendre en compte puisqu’elle déterminera la capacité à lire la donnée stockée dans la table. Nous débutons avec la configuration ci-dessous, et une version 2.4.5 de Spark.

En réglant le batch size à 100000 puis 200000 lignes, nous passons de 4 minutes à 2’30. L’augmentation de taille n’apporte alors plus de gain significatif.

A l’inverse, une taille de batch à 10000 serait dramatique : l’actualisation du jeu de données depuis Power BI prend alors plus de 11 minutes ! Si l’on ne précise pas le paramètre, le temps d’actualisation est correcte : 3’30.

Changeons maintenant le runtime du cluster pour une version 7.2 s’appuyant sur Spark 3. Sur la base d’un batch size de 200000 lignes, il n’y a pas de gain de temps de chargement.

Changeons enfin la configuration du driver : celui se base maintenant sur une VM Standard_F8s de 16Go de RAM et 8 cœurs. Sur ce jeu de données relativement petit pour un contexte Spark, pas d’amélioration du temps de chargement. La même observation se répète en changeant cette fois-ci la configuration des workers.

Sans avoir pu le tester, il semble important, à l’évidence, que les ressources Azure Databricks et Power BI soient situées dans la même région.

Pour conclure cette partie de tests, sachez que le temps d’actualisation avec le connecteur Spark générique est d’environ 5 minutes (batch size de 200000 lignes), un léger gain est donc obtenu.

Peut-on faire de l’actualisation incrémentielle ?

Par défaut, l’actualisation d’un jeu de données annule et remplace toutes les données. Il est toutefois possible de mettre en place une approche sur un champ de type datetime pour n’actualiser qu’une plage de dates définie. Sur cet écran, nous souhaitons conserver 2 années d’historique et ne mettre à jour que les 12 derniers mois.

Un message d’alerte que le mécanisme ne sera effectif que la requête M est “pliable” (traduction approximative du concept de query folding), ce qui signifie que le moteur d’exécution de la requête (ici, le moteur Spark) doit pouvoir interpréter la requête dans un langage natif, comme le SQL. Concrètement, les paramètres de dates deviennent des conditions “WHERE” dans la requête. D’un point de vue du stockage de données, celles-ci sont partitionnées selon la granularité de dates utilisée dans la paramétrage (ici, le mois).

Afin de vérifier si toutes les partitions ou non sont actualisées, il faut utiliser un espace de travail Power BI de capacité Premium et se connecter selon le processus détaillé dans cette page.

Ensuite, depuis SQL Server Management Studio, nous pouvons visualiser les partitions et l’heure de traitement.

Le jeu de données ne couvre que l’année 2019, ce qui explique les nombres de lignes à 0 à partir de 2020. Les partitions antérieures à décembre 2019 n’ont pas été rafraichies, ce qui correspond bien au comportement souhaité. En revanche, il faut se méfier du temps total que peut prendre une telle approche car elle multiplie les requêtes auprès du cluster, partition par partition.

Utiliser un automated cluster “single node”

Ne le cachons pas, Spark déploie toute sa puissance lorsque les volumes de données sont significatifs. Pour autant, devant la simplicité d’utilisation d’Azure Databricks, il est tentant de centraliser tous les traitements de données dans des notebooks lancés sur un cluster.

Et un cluster, par définition, est constitué de plusieurs machines, a minima un driver et un worker qui échangeront des informations. C’est toute la puissance de cette architecture qui distribue les traitements sur un nombre de workers éventuellement automatiquement “scalable”.

Mais dans le cas de traitements simples, une seule machine virtuelle (bien dimensionnée) serait suffisante et éviterait en plus des échanges réseaux qui pourraient même perturber le traitement. En étant pragmatique, ce sont aussi des coûts divisés au moins par deux !

Une solution existe maintenant : le mode de cluster “Single Node“.

Cette fonctionnalité est aujourd’hui (octobre 2020) en préversion publique et décrite sur le site de Databricks.

On observe ce paramétrage dans la version JSON de la définition du cluster, au niveau de la configuration du cluster.

{
"num_workers": 0,
"cluster_name": "MySingleNodeCluster",
"spark_version": "7.0.x-scala2.12",
"spark_conf": {
"spark.master": "local[*]",
"spark.databricks.cluster.profile": "singleNode"
},
"node_type_id": "Standard_DS3_v2",
"ssh_public_keys": [],
"custom_tags": {
"ResourceClass": "SingleNode"
},
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
},
"autotermination_minutes": 120,
"enable_elastic_disk": true,
"cluster_source": "UI",
"init_scripts": []
}

Imaginons maintenant que vous souhaitiez lancer vos traitements avec la logique “automated cluster“, s’appuyant éventuellement sur un pool de machines virtuelles. Cela est tout à fait possible depuis l’interface de jobs de Databricks mais dans une architecture data Azure plus large, on s’appuyera souvent sur le rôle d’ordonnanceur de Azure Data Factory.

Dans Azure Data Factory, nous définissons un service lié Databricks. Dans l’interface graphique, nous ne trouvons pas le mode SingleNode mais la configuration Spark peut être précisée.

Cela semble suffisant mais nous pouvons aussi préciser le nombre de workers à 0, même si un message d’alerte apparaît alors. Celui-ci n’est pas bloquant.

Afin de contrôler que ce sont bien des “automated single node clusters” qui se sont exécutés, vous pouvez consulter les logs de l’activité Data Factory où se trouve un lien pointant vers l’exécution du notebook et la configuration de clsuter associée.

Databricks vers Azure SQL DB avec SQL Spark Connector

Dans un précédent article, je décrivais le connecteur JDBC permettant de lire ou écrire entre un cluster Databricks et une base Azure SQL.

Un nouveau connecteur est maintenant disponible et merci à Benjamin CHOURAKI qui me l’a signalé sur LinkedIn :

https://github.com/microsoft/sql-spark-connector

La documentation indique qu’il faut, dans le cas d’un cluster Databricks, redéfinir le driver, ce qui se fait au niveau de la configuration Spark du cluster.

Nous éditions pour cela le cluster afin d’ajouter la ligne suivante dans les options avancées :

connectionProperties = { "Driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" }

Le JAR du connecteur est téléchargeable à la page des releases du projet : https://github.com/microsoft/sql-spark-connector/releases

Il suffit ensuite de l’installer au niveau du cluster.

La librairie est maintenant installée.

Il est alors possible d’utiliser le driver au travers d’un code pyspark, dans un notebook. Nous commençons par définir tous les éléments de connexion.

hostname = "<servername>.database.windows.net"
server_name = "jdbc:sqlserver://{0}".format(hostname)

database_name = "<databasename>"
url = server_name + ";" + "databaseName=" + database_name + ";"
print(url)

table_name = "<tablename>"
username = "<username>"
password = dbutils.secrets.get(scope='', key='<passwordScopeName')

Notez au passage l’appel au secret scope de Databricks, lui-même synchronisé avec une ressource Azure Key Vault.

Le code suivant permet de définir la structure d’un Spark Dataframe qui sera utilisé en insertion (mode append) ou bien en « annule et remplace » (mode overwrite). Il n’est bien sûr pas possible de traiter directement un pandas dataframe, nous en réalisons donc une conversion.

from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.types import StructType
from pyspark.sql.types import *

schema = StructType([
StructField("champ_date",TimestampType(),True),
StructField("champ_texte",StringType(),False),
StructField("champ_num",IntegerType(),True)
])

# conversion si le dataframe est un pandas dataframe
sparkdf = spark.createDataFrame(df, schema)

try:
sparkdf.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
.option("reliabilityLevel", "BEST_EFFORT") \
.option("tableLock", "true") \
.save()
except ValueError as error :
print("Connector write failed", error)

Les différentes options, décrites dans cette documentation, permettent d’optimiser la requête en écriture. Ainsi, pour l’écriture d’environ 100000 lignes, l’option tableLock à True permet de passer 1,62m à 1,29m.

D’autres exemples d’utilisation du connecteur sont disponibles ici : https://github.com/microsoft/sql-spark-connector/tree/master/samples