Déployer Azure Databricks avec Terraform

Déployer une infrastructure cloud est un projet qui peut (et doit !) se penser comme du code. En effet, une rigueur dans le nommage des ressources est particulièrement importante et nous aurons certainement plusieurs environnements à créer (développement, qualification, production…). Nous allons donc éviter des tâches répétitives et sécuriser ce processus. En cas de perte d’une partie ou de tout un environnement, il sera également simple de le recréer à l’identique.

Nous allons pour cela utiliser Terraform qui présente l’avantage d’être commun à différents fournisseurs de cloud.

Azure Databricks ne fait pas exception à cette démarche et nous allons illustrer ici le déploiement d’un espace de travail au travers d’un script Terraform.

Installer Terraform dans un environnement Windows

  • Télécharger la dernière version de Terraform via le lien suivant : Terraform Versions | HashiCorp Releases
  • Créer un dossier : “.terraform” dans le répertoire : C:\Users\username\.terraform
  • Dézipper le fichier téléchargé et le déplacer dans le dossier “.terraform”
  • Ajouter le chemin: C:\Users\username\.terraform dans la variable d’environnement Path
  • Depuis un terminal, tester la commande : terraform –version

Ecrire le fichier main.tf

Le début du fichier va référencer la version de Terraform utilisée.

terraform {
  required_providers {
    azurerm = {
      source  = "hashicorp/azurerm"
      version = "=2.47.0"
      skip_credentials_validation = false
    }
  }
}

Pour autoriser le script à prendre la main sur notre souscription Azure, un principal de service sera nécessaire et celui-ci doit disposer des droits de niveau gestion des ressources sur la souscription. Dans un premier temps, une authentification par Azure CLI est aussi envisageable. Les différents modes sont décrits dans la documentation de Terraform.

Depuis un environnement local, nous écrivons la valeur des variables sous la forme nomVar = “valeur” dans un fichier au nom réservé terraform.tfvars.

Les variables sont ensuite déclarées et appelées dans le code à l’aide de la syntaxe suivante :

# VARIABLES
variable "subscription_id" {}
variable "client_id" {}
variable "client_secret" {}
variable "tenant_id" {}

# Configure the Microsoft Azure Provider
provider "azurerm" {
  features {}
  subscription_id = var.subscription_id
  client_id       = var.client_id
  client_secret   = var.client_secret
  tenant_id       = var.tenant_id
}

Un groupe de ressource sera nécessaire et il se crée avec la syntaxe suivante :

resource "azurerm_resource_group" "rg" {
  name     = "rg-analytics"  
  location = "West Europe"

  tags = {
        environment = "dev"
        project = "unified analytics"
    }
}

Passons maintenant à la création de la ressource Databricks en elle-même, nous ferons référence à l’alias du groupe de ressources défini dans Terraform (ici, “rg”).

resource "azurerm_databricks_workspace" "dbx" {
  name                = "dbx-workspace"
  resource_group_name = azurerm_resource_group.rg.name
  location            = azurerm_resource_group.rg.location
  sku                 = "standard"

  tags = {
    environment = "dev"
    project = "unified analytics"
  }
}

La valeur définie dans “sku” correspond à la licence choisie parmi les valeurs standard, premium ou trial. Cette dernière permet de tester la licence Premium pendant 14 jours.

La création d’une ressource Databricks entraine la création conjointe d’un groupe de ressources managé contenant un compte de stockage, un réseau virtuel et un groupe de sécurité réseau.

Si nous avons besoin d’un cluster interactif pour utiliser cette ressource (attention, pourquoi ne pas passer par des automated clusters lancés par Azure Data Factory comme expliqué dans cet article ?).

Les propriétés utilisées sont celles visibles dans la description au format JSON du cluster, depuis l’espace de travail Databricks.

resource "databricks_cluster" "cluster" {
  cluster_name  = "myInteractiveCluster"
  spark_version = "7.3.x-scala2.12"
  node_type_id  = "Standard_F4s"
  num_workers = 2
  autotermination_minutes = 60
  library {
    pypi {
      package = "azureml-core"
    }
  }
}

Lors de la création de ce cluster, nous installons automatiquement la librairie azureml-core, qui correspond au SDK pilotant les ressources d’Azure Machine Learning, souvent évoqué sur ce blog.

Quelques commandes Terraform

Pour initialiser le répertoire contenant les différents fichiers, nous lançons la commande suivante :

terraform init

Un sous-répertoire .terraform se crée alors automatiquement.

Pour analyser le script Terraform et visualiser les éléments qui seront ajoutés ou modifiés :

terraform plan

Par défaut et sans préciser l’argument -var-file, le fichier de secrets terraform.tfvars est utilisé.

Pour appliquer ces transformations :

terraform apply

Une validation sera demandée.

Pour supprimer les ressources définies dans le main.tf :

terraform destroy

A nouveau (et heureusement !), une validation est requise.

Voici donc nos premiers pas sous Terraform et la simplicité d’utilisation vous fera vite oublier les cases à remplir et à cocher de l’interface utilisateur du portail Azure !

Il ne reste plus qu’à intégrer ce script dans un pipeline d’intégration continue, par exemple sous Azure DevOps.

Exploiter l’API Azure Databricks en PowerShell

L’API d’Azure Databricks permet de réaliser de nombreuses actions au moyen de commandes émises au travers d’une URL, de type GET ou POST. La documentation complète est disponible sur le site de Microsoft ou bien sur celui de Databricks.

Un premier exemple prend la forme ci-dessous et permet d’obtenir des informations détaillées sur un cluster :

GET https://<databricks-instance>/api/2.0/clusters/get?cluster_id=<cluster-id>

Les identifiants nécessaires de l’espace de travail et du cluster Databricks peuvent être obtenues en se rendant sur la page Web du cluster.

Bien sûr, Databricks ne se limite pas à des clusters, il faut des notebooks contenant du code et ceux-ci sont pilotés par des jobs. Pour imaginer un scénario paramétrable, nous définissons des widgets dans le notebook, ce qui permettra de passer les valeurs de ces paramètres aux jobs.

parquet_file = dbutils.widgets.get("pParquetFile")
limit = int(dbutils.widgets.get("pLimit")

La définition du job se fait dans l’interface dédiée et les paramètres peuvent y être déclarés. Il faut noter ici l’identifiant du job, nous en aurons besoin par la suite.

Sauf à planifier le job, ces étapes resteront manuelles et les valeurs des paramètres seront à préciser à chaque exécution.

En intégrant différentes instructions dans un script PowerShell, nous pouvons élaborer le scénario suivant :

Le code correspondant est détaillé ci-dessous :

$param1 = "abc"
$param2 = 123
$limitRows = 100

$dbxPAT = "dapixxxxxxxxxxxxxxxxxxxxx"
$dbxBearerToken = "Bearer " + $dbxPAT
$dbxWorkspaceURL = "https://adb-00000000000000000.0.azuredatabricks.net/"
$clusterID = "0000-000000-xxxxx000"
$jobID = 42
$localPath = "C:\temp\"
$filePath = $localPath+$fileName

$headers = New-Object "System.Collections.Generic.Dictionary[[String],[String]]"
$headers.Add("Authorization", $dbxBearerToken)

 Write-Output "Cluster status"
 $URL = $dbxWorkspaceURL + "api/2.0/clusters/get?cluster_id="+$($clusterID)
 $responseCluster = Invoke-RestMethod $URL -Method 'GET' -Headers $headers
 $clusterState = $responseCluster.state
 Write-Output "Cluster state :" $clusterState

 If($clusterState -ne "RUNNING")
 {
     Write-Output "Start cluster"
     $URL = $dbxWorkspaceURL + "api/2.0/clusters/start"
     $body = @{"cluster_id"=$clusterID} | ConvertTo-Json
     $responseStart = Invoke-RestMethod $URL -Method 'POST' -Headers $headers -Body $body
     $clusterState = $responseStart.state
 
    While($clusterState -ne "RUNNING")
  {
     Write-Output "Please wait one minute more...the cluster is starting"
     Start-Sleep -Seconds 60
     $URL = $dbxWorkspaceURL + "api/2.0/clusters/get?cluster_id="+$($clusterID)
     $responseStart = Invoke-RestMethod $URL -Method 'GET' -Headers $headers
     $clusterState = $responseStart.state
     Write-Output $clusterState
  }
 }

Write-Output "Run job"
$body = @{
   "job_id"= $jobID
   "notebook_params"= @{"P1"= $param1 ;"P2"= $param2 ;"limit"= $limitRows }
 } | ConvertTo-Json
$URL = $dbxWorkspaceURL + "api/2.0/jobs/run-now"
$responseJob = Invoke-RestMethod $URL -Method 'POST' -Headers $headers -Body $body
$runId = $responseJob.run_id
Write-Output "run ID : "$runId

$URL = $dbxWorkspaceURL + "api/2.0/jobs/runs/get-output?run_id="+$($runId)
Write-Output "Run status and Get output"
$runStatus = "PENDING"
Do
{
 Write-Output "Please wait one minute more...the job is running"
 Start-Sleep -Seconds 60
 $responseRun = Invoke-RestMethod $URL -Method 'GET' -Headers $headers
 $responseun | ConvertTo-Json
 $runStatus = $responseRun.metadata.state.life_cycle_state
 Write-Output $runStatus
 }
Until($runStatus -eq "TERMINATED")

Write-Output "Check run result state"
$runResultState = $responseRun.metadata.state.result_state
Write-Output $runResultState

If($runResultState -ne "SUCCESS")
{
    exit;
}

Write-Output "Export result to file"
$result = $responseRun.notebook_output.result
$isTruncated = $responseRun.notebook_output.truncated
if($isTruncated -eq "False")
{
    $result | Out-File -FilePath $filePath -Force
    Write-Output "Export done"
}

Dans ce code, nous nous appuyons sur la fonction Invoke-RestMethod suivie de l’URL de l’API Databricks. La réponse sera ensuite exploitée pour continuer le programme.

L’instruction api/2.0/jobs/runs/get-output?run_id= permet de retourner un texte passé en paramètre de la commande Databricks qui viendra conclure le notebook (aucune autre cellule ne sera ensuite exécutée) :

dbutils.notebook.exit(textObject)

Le contenu de la variable textObject se retrouve alors au niveau metadata.state.result_state du résultat de l’instruction. La sortie ne peut dépasser un volume de plus de 5Mo. Nous pouvons vérifier que le résultat n’est pas tronqué à l’aide de la valeur de l’élément notebook_output.truncated à false.

En mettant en œuvre ce code au sein d’une ressource comme Azure Function (les paramètres définis au début du code intégrant alors la route de la fonction), nous avons obtenu une “meta API” paramétrable, restituant un résultat sous forme d’export de données !

Utiliser Databricks CLI

Jusqu’à présent, nous avons l’habitude d’utiliser un espace de travail Azure Databricks en nous connectant au portail, l’authentification étant réalisée par un couple login / password déclaré dans l’annuaire Azure Active Directory.

Dans un but d’automatisation des tâches, il est intéressant de se pencher sur les commandes en lignes ou CLI. Celles-ci sont documentées sur le site de Databricks. Les commandes peuvent être vues comme une surcouche de l’API REST de Databricks.

Installation du CLI Databricks

Un environnement Python est nécessaire. Nous pouvons ensuite lancer le téléchargement du package dédié avec la commande ci-dessous, depuis un terminal.

pip install databricks-cli

Nous vérifions dans la foulée que l’affichage de l’aide d’une commande, sur laquelle nous reviendrons plus tard, est fonctionnel.

databricks fs -h

La version installée peut être retrouvée par la commande :

databricks --version

C’est un produit qui évolue rapidement et il conviendra de le mettre à jour fréquemment, afin de bénéficier d’un maximum de fonctionnalités.

Authentification

Nous allons utiliser un Personal Access Token pour nous authentifier auprès du service. Ce jeton de sécurité est obtenu sur le portail Databricks, dans le menu User Settings.

Attention à bien noter (dans un coffre-fort électronique !) le jeton obtenu, il ne sera plus possible d’afficher sa valeur par la suite.

Nous démarrons la configuration par la commande :

databricks configure --token

Deux valeurs seront attendues : l’URL de l’espace de travail, puis le token précédemment généré. L’URL pour une ressource Azure est dorénavant de la forme https://adb-XXXXXXXXXXXXXXXX.XX.azuredatabricks.net/.

Un fichier local s’écrit alors et contient les informations renseignées.

Vérifions maintenant que l’authentification est effective en lançant une commande interagissant avec l’espace de travail :

databricks workspace ls

Si nous obtenons un message d’erreur Error: b’Bad Request’, la cause peut être un mauvais enregistrement du token dans le fichier de configuration. En ouvrant celui-ci dans un éditeur de texte, nous visualisons le résultat suivant :

Remplacer les caractères SYN par la valeur du token permettra de résoudre ce problème mais demande de stocker ce secret de manière non sécurisée.

Nous pouvons maintenant lister le contenu de l’espace de travail.

Copier un fichier depuis le FileStore

Le FileStore est une zone de stockage spécifique (un dossier) du DataBricks File System (DBFS). Celui-ci nous permet de réaliser des échanges avec l’extérieur : copie de fichiers vers ou depuis le DBFS. Une documentation complète est disponible ici.

A l’aide des commandes du CLI, nous identifions un fichier que nous pouvons recopier localement.

Comme le CLI est une surcouche de l’API REST de Databricks, il est intéressant de retrouver la commande initiale émise vers l’espace de travail. Nous pouvons l’obtenir en ajoutant –debug après la commande de copie.

Le premier appel vérifie le statut du fichier. Il est possible d’exécuter la requête dans Postman (coller un token dans la partie Authorization).

Nous trouvons ici la taille du fichier. Il faut savoir que le contenu du fichier est converti en base 64 et nous allons ensuite mieux comprendre pourquoi.

Un second appel à l’API se fait avec la méthode read et deux options &offset=0&length=1048576. Si le fichier dépasse 1 méga-octet, celui-ci est découpé en morceaux (chunks) et plusieurs appels seront nécessaires. La commande reconstitue toutefois le fichier et le décode automatiquement. Tout cela est donc transparent pour l’utilisateur !

Ajuster le niveau d’une Azure SQL DB avant et après un traitement Data Factory

Nous avons déjà évoqué sur ce blog le driver JDBC permettant d’écrire depuis un notebook Azure Databricks dans une base de données managée ou encore comment utiliser Polybase pour pérenniser une table du métastore Databricks dans Azure SQL DWH (aujourd’hui Synapse Analytics).

Cette action d’écriture demande souvent un niveau de ressource suffisamment élevé, tout comme l’import de données réalisé dans un dataset Power BI. Le reste du temps, une puissance plus faible peut être tout à fait acceptable. Et comme le niveau de facturation est lié au niveau de la ressource, pouvoir maîtriser par code, et non manuellement depuis l’interface, le scape up ou scale down de sa ressource peut s’avérer très intéressant dans une démarche d’optimisation des coûts (FinOps).

Nous nous plaçons ici dans un scénario de pipeline de transformation de données réalisé par Azure Databricks puis d’import dans un dataset Power BI. L’actualisation par API REST de ce dernier est détaillé dans cet article.

Au démarrage de notre pipeline, nous souhaitons augmenter la puissance (scale up) et ceci peut se faire avec des instructions PowerShell documentées ici.

Il est possible d’exécuter un script PowerShell depuis Azure Data Factory à l’aide de ce qu’on appelle une “custom activity“. Celle-ci s’appuie sur un service lié de type Azure Batch. Et ce service n’est pas gratuit, voire même relativement cher pour ce que l’on souhaite réaliser.

Nous optons donc pour la programmation d’une Azure Function, qui supporte le langage PowerShell ! Nous choisissons le type de fonction Trigger HTTP.

Dans Visual Studio Code, nous obtenons l’arborescence ci-dessous.

Le code principal de la fonction doit se trouver dans le fichier run.ps1.

Un premier point important consistera à renseigner les dépendances de librairies PowerShell dans le fichier requirements.psd1. Une combinaison fonctionnelle est la suivante :

# This file enables modules to be automatically managed by the Functions service
# See https://aka.ms/functionsmanageddependency for additional information.
#
@{
    'Az.Accounts' = '2.1.2'
    'Az.Sql' = '2.11.1'
 }

Second élément indispensable pour l’intégration dans une activité Web de Data Factory : le retour de la fonction doit obligatoirement être un objet JSON. Nous terminons donc la fonction par le code ci-dessous :

$body = @{name = 'Scale done !'} | ConvertTo-Json

#Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
     StatusCode = [HttpStatusCode]::OK
     Body = $body
})

Une fois la fonction publiée sur Azure, nous pouvons définir le pipeline complet.

La fonction Azure étant pensée de manière paramétrable, nous pouvons donner le niveau de ressource comme un élément de l’URL appelée par la méthode POST. Nous utilisons dans l’exemple ci-dessous le niveau “S” suivi d’un chiffre.

Ce niveau modifiera automatiquement le configuration de la ressource Azure SQL DB, en mode de tarification basé sur les DTUs (et non les vCores).

L’activité de traitement de la donnée, ici portée par Azure Databricks, réalise le démarrage d’un cluster de type interactif. Lorsque le traitement est terminé, le cluster peut s’éteindre automatiquement si la fonctionnalité “terminate after…” a été mise en place. Mais il faut ici attendre que l’import du dataset Power BI soit terminé ! Nous gérons donc la fin du cluster Databricks au moyen de l’API dédiée, dans une activité Web.

La syntaxe utilisée est la suivante :

POST https://adb-XXXXXXXXXXXXXXXX.X.azuredatabricks.net/api/2.0/clusters/delete

Les informations nécessaires seront : l’URL du workspace, un personal token d’authentification et l’identifiant du cluster.

Dans un optique d’optimisation des coûts, nous positionnons comme dernière activité du pipeline le scale down de la base de données.

Nous obtenons ainsi une chaîne complète optimisée sur l’enchainement des tâches et sur les coûts engendrés par les services PaaS.

Connecteur Power BI dédié à Azure Databricks

Jusqu’à présent, nous utilisions le connecteur Spark générique comme présenté dans cet article. Le seul mode d’authentification possible consistait à utiliser un jeton personnel (personal access token).

Nous pouvons nous connecter à des tables créées dans des databases du metastore du cluster Databricks et cela implique que le cluster soit démarré afin que la connexion soit possible.

Ce sont donc des informations au niveau cluster dont nous aurons besoin pour nous connecter. Ce paragraphe détaille les éléments attendus que sont le hostname, le port (443 par défaut) et le HTTP path.

Un connecteur dédié est apparu en préversion publique depuis octobre 2020 et présenté sur cette page de la documentation officielle Microsoft.

Nous remarquons que les deux modes import et DirectQuery sont disponibles, le second étant bien sûr conditionné par le statut démarré permanent du cluster.

Un paramètre est ici très important : le batch size. Il s’agit de la taille des “paquets” de lignes qui seront extraits du cluster. Nous reviendrons sur ce paramètre dans la section liée à la performance.

Nous disposons ensuite de trois modes de connexion, dont le mode “classique” par Personal Access Token mais également l’authentification au travers de l’annuaire Azure Active Directory (AAD).

C’est ce dernier que nous utiliserons ici.

Nous obtenons alors l’accès au metastore du cluster, afin de sélectionner une ou plusieurs tables.

Il est alors possible de charger directement les données dans une table du modèle ou bien d’ajouter des transformations dans la fenêtre Power Query. Pour autant, l’intérêt du cluster Spark est bien de réaliser toutes les transformations de données avant de créer une table “nettoyée”.

Pour l’actualisation planifiée du rapport dans le service Power BI, nous choisissons le mode d’authentification OAuth2.

Le niveau de confidentialité Organizational exige que la source de données Azure Databricks fasse partie de l’abonnement Azure sur lequel est défini l’annuaire AAD.

Afin de ne pas lier un compte personnel à un jeu de données Power BI, il sera préférable d’utiliser un compte de service. A l’heure actuelle (novembre 2020), la connexion au travers d’un principal de service ou d’une identité managée n’est pas réalisable.

Qu’attendre des performances ?

Afin de tester ce connecteur, nous chargeons comme table du cluster le fichier des Demandes de Valeur Foncière de 2019, soit 400Mo pour environ 3 millions de lignes.

La configuration du cluster est également à prendre en compte puisqu’elle déterminera la capacité à lire la donnée stockée dans la table. Nous débutons avec la configuration ci-dessous, et une version 2.4.5 de Spark.

En réglant le batch size à 100000 puis 200000 lignes, nous passons de 4 minutes à 2’30. L’augmentation de taille n’apporte alors plus de gain significatif.

A l’inverse, une taille de batch à 10000 serait dramatique : l’actualisation du jeu de données depuis Power BI prend alors plus de 11 minutes ! Si l’on ne précise pas le paramètre, le temps d’actualisation est correcte : 3’30.

Changeons maintenant le runtime du cluster pour une version 7.2 s’appuyant sur Spark 3. Sur la base d’un batch size de 200000 lignes, il n’y a pas de gain de temps de chargement.

Changeons enfin la configuration du driver : celui se base maintenant sur une VM Standard_F8s de 16Go de RAM et 8 cœurs. Sur ce jeu de données relativement petit pour un contexte Spark, pas d’amélioration du temps de chargement. La même observation se répète en changeant cette fois-ci la configuration des workers.

Sans avoir pu le tester, il semble important, à l’évidence, que les ressources Azure Databricks et Power BI soient situées dans la même région.

Pour conclure cette partie de tests, sachez que le temps d’actualisation avec le connecteur Spark générique est d’environ 5 minutes (batch size de 200000 lignes), un léger gain est donc obtenu.

Peut-on faire de l’actualisation incrémentielle ?

Par défaut, l’actualisation d’un jeu de données annule et remplace toutes les données. Il est toutefois possible de mettre en place une approche sur un champ de type datetime pour n’actualiser qu’une plage de dates définie. Sur cet écran, nous souhaitons conserver 2 années d’historique et ne mettre à jour que les 12 derniers mois.

Un message d’alerte que le mécanisme ne sera effectif que la requête M est “pliable” (traduction approximative du concept de query folding), ce qui signifie que le moteur d’exécution de la requête (ici, le moteur Spark) doit pouvoir interpréter la requête dans un langage natif, comme le SQL. Concrètement, les paramètres de dates deviennent des conditions “WHERE” dans la requête. D’un point de vue du stockage de données, celles-ci sont partitionnées selon la granularité de dates utilisée dans la paramétrage (ici, le mois).

Afin de vérifier si toutes les partitions ou non sont actualisées, il faut utiliser un espace de travail Power BI de capacité Premium et se connecter selon le processus détaillé dans cette page.

Ensuite, depuis SQL Server Management Studio, nous pouvons visualiser les partitions et l’heure de traitement.

Le jeu de données ne couvre que l’année 2019, ce qui explique les nombres de lignes à 0 à partir de 2020. Les partitions antérieures à décembre 2019 n’ont pas été rafraichies, ce qui correspond bien au comportement souhaité. En revanche, il faut se méfier du temps total que peut prendre une telle approche car elle multiplie les requêtes auprès du cluster, partition par partition.

Utiliser un automated cluster “single node”

Ne le cachons pas, Spark déploie toute sa puissance lorsque les volumes de données sont significatifs. Pour autant, devant la simplicité d’utilisation d’Azure Databricks, il est tentant de centraliser tous les traitements de données dans des notebooks lancés sur un cluster.

Et un cluster, par définition, est constitué de plusieurs machines, a minima un driver et un worker qui échangeront des informations. C’est toute la puissance de cette architecture qui distribue les traitements sur un nombre de workers éventuellement automatiquement “scalable”.

Mais dans le cas de traitements simples, une seule machine virtuelle (bien dimensionnée) serait suffisante et éviterait en plus des échanges réseaux qui pourraient même perturber le traitement. En étant pragmatique, ce sont aussi des coûts divisés au moins par deux !

Une solution existe maintenant : le mode de cluster “Single Node“.

Cette fonctionnalité est aujourd’hui (octobre 2020) en préversion publique et décrite sur le site de Databricks.

On observe ce paramétrage dans la version JSON de la définition du cluster, au niveau de la configuration du cluster.

{
"num_workers": 0,
"cluster_name": "MySingleNodeCluster",
"spark_version": "7.0.x-scala2.12",
"spark_conf": {
"spark.master": "local[*]",
"spark.databricks.cluster.profile": "singleNode"
},
"node_type_id": "Standard_DS3_v2",
"ssh_public_keys": [],
"custom_tags": {
"ResourceClass": "SingleNode"
},
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
},
"autotermination_minutes": 120,
"enable_elastic_disk": true,
"cluster_source": "UI",
"init_scripts": []
}

Imaginons maintenant que vous souhaitiez lancer vos traitements avec la logique “automated cluster“, s’appuyant éventuellement sur un pool de machines virtuelles. Cela est tout à fait possible depuis l’interface de jobs de Databricks mais dans une architecture data Azure plus large, on s’appuyera souvent sur le rôle d’ordonnanceur de Azure Data Factory.

Dans Azure Data Factory, nous définissons un service lié Databricks. Dans l’interface graphique, nous ne trouvons pas le mode SingleNode mais la configuration Spark peut être précisée.

Cela semble suffisant mais nous pouvons aussi préciser le nombre de workers à 0, même si un message d’alerte apparaît alors. Celui-ci n’est pas bloquant.

Afin de contrôler que ce sont bien des “automated single node clusters” qui se sont exécutés, vous pouvez consulter les logs de l’activité Data Factory où se trouve un lien pointant vers l’exécution du notebook et la configuration de clsuter associée.

Databricks vers Azure SQL DB avec SQL Spark Connector

Dans un précédent article, je décrivais le connecteur JDBC permettant de lire ou écrire entre un cluster Databricks et une base Azure SQL.

Un nouveau connecteur est maintenant disponible et merci à Benjamin CHOURAKI qui me l’a signalé sur LinkedIn :

https://github.com/microsoft/sql-spark-connector

La documentation indique qu’il faut, dans le cas d’un cluster Databricks, redéfinir le driver, ce qui se fait au niveau de la configuration Spark du cluster.

Nous éditions pour cela le cluster afin d’ajouter la ligne suivante dans les options avancées :

connectionProperties = { "Driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" }

Le JAR du connecteur est téléchargeable à la page des releases du projet : https://github.com/microsoft/sql-spark-connector/releases

Il suffit ensuite de l’installer au niveau du cluster.

La librairie est maintenant installée.

Il est alors possible d’utiliser le driver au travers d’un code pyspark, dans un notebook. Nous commençons par définir tous les éléments de connexion.

hostname = "<servername>.database.windows.net"
server_name = "jdbc:sqlserver://{0}".format(hostname)

database_name = "<databasename>"
url = server_name + ";" + "databaseName=" + database_name + ";"
print(url)

table_name = "<tablename>"
username = "<username>"
password = dbutils.secrets.get(scope='', key='<passwordScopeName')

Notez au passage l’appel au secret scope de Databricks, lui-même synchronisé avec une ressource Azure Key Vault.

Le code suivant permet de définir la structure d’un Spark Dataframe qui sera utilisé en insertion (mode append) ou bien en « annule et remplace » (mode overwrite). Il n’est bien sûr pas possible de traiter directement un pandas dataframe, nous en réalisons donc une conversion.

from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.types import StructType
from pyspark.sql.types import *

schema = StructType([
StructField("champ_date",TimestampType(),True),
StructField("champ_texte",StringType(),False),
StructField("champ_num",IntegerType(),True)
])

# conversion si le dataframe est un pandas dataframe
sparkdf = spark.createDataFrame(df, schema)

try:
sparkdf.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
.option("reliabilityLevel", "BEST_EFFORT") \
.option("tableLock", "true") \
.save()
except ValueError as error :
print("Connector write failed", error)

Les différentes options, décrites dans cette documentation, permettent d’optimiser la requête en écriture. Ainsi, pour l’écriture d’environ 100000 lignes, l’option tableLock à True permet de passer 1,62m à 1,29m.

D’autres exemples d’utilisation du connecteur sont disponibles ici : https://github.com/microsoft/sql-spark-connector/tree/master/samples

Utiliser le driver jdbc depuis Azure Databricks

Dans une architecture cloud Azure, la ressource de “compute” Databricks va bien souvent être utilisée pour transformer la donnée brute en donnée dite nettoyée ou enrichie. Cette donnée peut bien sûr être stockée sur un Data Lake, par exemple dans un format Parquet (nous y reviendrons en fin d’article) mais les outils d’exploration et de visualisation de données comme Microsoft Power BI présentent de nombreux avantages à s’appuyer sur une base de données relationnelle (actualisation incrémentielle, DirectQuery…).

Nous partirons ainsi de l’architecture Azure ci-dessous :

Architecture Azure hybride pour des projets data de visualisation et de prévision

Nous lançons tout d’abord un notebook Python où nous définissons la chaîne de connexion. Il sera bien sûr très judicieux d’utiliser ici le secret scope de Databricks pour stocker toutes ces informations.

Il s’agit maintenant d’écrire un jeu de données nettoyées et travaillées en mémoire sous forme de Spark dataframe dans une table de la base de données. Cette opération se fait tout simplement au moyen de la méthode write associée aux informations de connexion : URL JDBC et propriétés de connexion.

Le paramètre de mode permet de choisir entre un “annule et remplace” de la table au moyen de la valeur overwrite ou une insertion à l’aide du mot clé append.

Il n’y a donc ici pas de mode prévu pour la suppression ou la mise à jour. Il faudra penser ce scénario de manière différente et peut-être au travers du format de fichier Delta, basé sur le format Parquet et sur lequel existent des méthodes delete et upsert. Pour autant, ce fichier restera en dehors de la base de données.

La méthode read de Spark est également possible et se fait en soumettant une requête SQL au travers du driver JDBC. Nous utilisons ici la syntaxe SQL propre à la base de données, ici le Transac-SQL de Microsoft.

L’alias de table sur la requête est indispensable pour être interpréter par le paramètre table de la méthode read.

Pour aller un peu plus loin dans l’exploitation de ce driver JDBC, nous pouvons créer une table dans le métastore du cluster, copie d’une table de la base de données.

Il est alors possible de créer des interactions en Spark SQL entre des vues créées à partir de dataframes Spark (ou Pandas en les convertissant au préalable) et la table du métastore. Ce scénario ne réalise qu’une lecture des données de la base et des opérations d’écriture sur cette table ne seront bien sûr pas répercutées sur la base de données.

Nous avons ici utilisé le driver JDBC de manière simple avec une ressource de type SQL Database. Vous retrouverez ici une autre manière de procéder au travers de Polybase pour Azure SQL Datawarehouse. Ce service Azure étant maintenant renommé Azure Synapse Analytics et disposant de nouvelles fonctionnalités, de prochains articles décriront les modes d’interaction entre fichiers, dataframes et tables. En attendant, je vous recommande cet épisode du podcast Big Data Hebdo autour de Synapse.

Ajouter des paramètres pour le déploiement du template ARM Azure Data Factory

La longueur du titre de cet article laisse présager du niveau de précision dans lequel nous allons nous lancer ! Je vais donc rapidement situer le contexte faisant appel à un tel besoin.

Nous cherchons à déployer de manière automatisée un environnement de développement Azure Data Factory sur l’environnement de production. Nous utilisons pour cela un pipeline de release Azure DevOps. Le processus DevOps pour ADF est expliqué dans ce livre blanc Microsoft ou dans cet excellent article de Florian Eiden.

Pour résumer les grandes lignes du fonctionnement, nous remplaçons dans un fichier de paramètres au format JSON les valeurs des chaînes de connexion de l’environnement de développement par celles de production. Ceci se fait au moyen de la case « override template parameters ».

Valeurs manquantes pour le service lié Databricks

La chose se complique lorsque nous utilisons un service lié de calcul de type Azure Databricks puisque les paramètres de ce service n’apparaissent pas par défaut dans le fichier de paramétrage ! Nous avons en particulier besoin de remplacer :

  • La Databricks Workspace URL
  • Le Secret name (où se trouve enregistré un Personal Access Token correspondant à l’espace de travail souhaité)

En effet, un choix arbitraire a été fait par Microsoft pour présenter uniquement certains paramètres mais un grand nombre de valeurs existent et elles sont visibles en éditant le fichier disponible dans le menu : Parametrization template.

Ce fichier au format JSON recense l’intégralité des paramètres disponibles par défaut, ainsi que leur visibilité au moyen d’une propriété qui n’est pas simple à interpréter :

= (signe égal) permet de conserver la valeur actuelle en tant que valeur par défaut pour le paramètre

– (signe moins) permet de ne pas conserver la valeur par défaut pour le paramètre

Le symbole | (pipe) permet de réaliser le lien avec une valeur stockée dans le coffre-fort Azure Key Vault.

Nous allons maintenant rechercher le nom des paramètres manquants. Pour cela, nous passons par la définition du service lié Databricks au format JSON.

Nous obtenons ainsi le nom exact des paramètres (ne pas se fier à l’interface graphique).

L’URL de l’espace de travail se nomme ainsi domain, le secret du Key Vault se désigne par secretName et ses deux propriétés dépendent du niveau TypeProperties et du sous-niveau accessToken pour le secret.

Nous pouvons donc maintenant citer ces propriétés dans le JSON des paramètres, en respectant l’arborescence des propriétés.

La définition des paramètres dans la branche Master

Arrive ici la partie peut-être la moins documentée (jusqu’à présent !). Il faut comprendre que ce fichier JSON doit figurer à la racine de la branche Master du dépôt contenant la synchronisation du Data Factory avec un gestionnaire de version comme GitHub ou un repo Azure DevOps, sous le nom arm-template-parameters-definition.json.

Il ne faut donc pas utiliser la branche spécifique à Data Factory qui se nomme adf_publish mais nous irons ensuite vérifier que les deux fichiers JSON qu’elle contient ont bien été modifiés en conséquence.

Pour lancer cette modification, nous devons tout d’abord faire les deux actions suivantes :

  • Refresh
  • Publish

Le volet latéral du Publish (pending changes) doit s’afficher même si aucune modification n’est visible.

Nous pouvons enfin vérifier que les paramètres sont disponibles dans le fichier ARMTemplateParametersForFactory.json de la branche adf_publish.

Modifier la valeur des paramètres pendant la release

Il ne reste qu’à utiliser ces noms de paramètres dans le pipeline de release Azure DevOps. Les nouvelles valeurs peuvent être définies comme des variables du pipeline pour plus de commodité.

Cet article a pu être écrit grâce à la documentation officielle de Microsoft ainsi que d’autres articles de blog, en anglais, que leurs auteurs en soient ici remerciés :

https://medium.com/@sanajitghosh/manage-ci-cd-pipelines-using-azure-devops-azure-data-factory-azure-databricks-8239a9ceef3

https://www.modern-dataengineering.com/post/how-to-add-custom-parameters-to-data-factory-templates

https://medium.com/@patrickpicard_50914/azure-data-factory-modifying-arm-template-parameters-53b11f38bced

Relancer un notebook Databricks en cas d’échec

Le code utilisé dans un notebook peut échouer pour une raison autre qu’un erreur de développement : fichier absent, API ne répondant pas, etc. Il peut donc être pertinent de relancer automatiquement un traitement en cas d’échec.

La documentation Databricks fournit un exemple de fonction, en Python ou en Scala, qui réalise ce mécanisme. Le code est bien sûr basé sur la fonction dbutils.notebook.run déjà présentée dans un précédent post.

Voici le code en Python, où un nombre d’essais maximum de 3 est paramétré par défaut :

 # Errors in workflows thrown a WorkflowException. 
def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
   num_retries = 0
   while True:
     try:
       return dbutils.notebook.run(notebook, timeout, args)
     except Exception as e:
       if num_retries > max_retries:
         raise e
       else:
         print "Retrying error", e
         num_retries += 1

Et voici ce que l’on obtient à l’exécution. Attention à bien préciser le chemin relatif du notebook ainsi piloté, si celui-ci n’est pas situé au même niveau que le ce “master notebook”.

Attention à ne pas abuser de ce processus ! Il est essentiel de comprendre la nature des erreurs rencontrées et d’y apporter des réponses au travers du code.