Connecteur Power BI dédié à Azure Databricks

Jusqu’à présent, nous utilisions le connecteur Spark générique comme présenté dans cet article. Le seul mode d’authentification possible consistait à utiliser un jeton personnel (personal access token).

Nous pouvons nous connecter à des tables créées dans des databases du metastore du cluster Databricks et cela implique que le cluster soit démarré afin que la connexion soit possible.

Ce sont donc des informations au niveau cluster dont nous aurons besoin pour nous connecter. Ce paragraphe détaille les éléments attendus que sont le hostname, le port (443 par défaut) et le HTTP path.

Un connecteur dédié est apparu en préversion publique depuis octobre 2020 et présenté sur cette page de la documentation officielle Microsoft.

Nous remarquons que les deux modes import et DirectQuery sont disponibles, le second étant bien sûr conditionné par le statut démarré permanent du cluster.

Un paramètre est ici très important : le batch size. Il s’agit de la taille des “paquets” de lignes qui seront extraits du cluster. Nous reviendrons sur ce paramètre dans la section liée à la performance.

Nous disposons ensuite de trois modes de connexion, dont le mode “classique” par Personal Access Token mais également l’authentification au travers de l’annuaire Azure Active Directory (AAD).

C’est ce dernier que nous utiliserons ici.

Nous obtenons alors l’accès au metastore du cluster, afin de sélectionner une ou plusieurs tables.

Il est alors possible de charger directement les données dans une table du modèle ou bien d’ajouter des transformations dans la fenêtre Power Query. Pour autant, l’intérêt du cluster Spark est bien de réaliser toutes les transformations de données avant de créer une table “nettoyée”.

Pour l’actualisation planifiée du rapport dans le service Power BI, nous choisissons le mode d’authentification OAuth2.

Le niveau de confidentialité Organizational exige que la source de données Azure Databricks fasse partie de l’abonnement Azure sur lequel est défini l’annuaire AAD.

Afin de ne pas lier un compte personnel à un jeu de données Power BI, il sera préférable d’utiliser un compte de service. A l’heure actuelle (novembre 2020), la connexion au travers d’un principal de service ou d’une identité managée n’est pas réalisable.

Qu’attendre des performances ?

Afin de tester ce connecteur, nous chargeons comme table du cluster le fichier des Demandes de Valeur Foncière de 2019, soit 400Mo pour environ 3 millions de lignes.

La configuration du cluster est également à prendre en compte puisqu’elle déterminera la capacité à lire la donnée stockée dans la table. Nous débutons avec la configuration ci-dessous, et une version 2.4.5 de Spark.

En réglant le batch size à 100000 puis 200000 lignes, nous passons de 4 minutes à 2’30. L’augmentation de taille n’apporte alors plus de gain significatif.

A l’inverse, une taille de batch à 10000 serait dramatique : l’actualisation du jeu de données depuis Power BI prend alors plus de 11 minutes ! Si l’on ne précise pas le paramètre, le temps d’actualisation est correcte : 3’30.

Changeons maintenant le runtime du cluster pour une version 7.2 s’appuyant sur Spark 3. Sur la base d’un batch size de 200000 lignes, il n’y a pas de gain de temps de chargement.

Changeons enfin la configuration du driver : celui se base maintenant sur une VM Standard_F8s de 16Go de RAM et 8 cœurs. Sur ce jeu de données relativement petit pour un contexte Spark, pas d’amélioration du temps de chargement. La même observation se répète en changeant cette fois-ci la configuration des workers.

Sans avoir pu le tester, il semble important, à l’évidence, que les ressources Azure Databricks et Power BI soient situées dans la même région.

Pour conclure cette partie de tests, sachez que le temps d’actualisation avec le connecteur Spark générique est d’environ 5 minutes (batch size de 200000 lignes), un léger gain est donc obtenu.

Peut-on faire de l’actualisation incrémentielle ?

Par défaut, l’actualisation d’un jeu de données annule et remplace toutes les données. Il est toutefois possible de mettre en place une approche sur un champ de type datetime pour n’actualiser qu’une plage de dates définie. Sur cet écran, nous souhaitons conserver 2 années d’historique et ne mettre à jour que les 12 derniers mois.

Un message d’alerte que le mécanisme ne sera effectif que la requête M est “pliable” (traduction approximative du concept de query folding), ce qui signifie que le moteur d’exécution de la requête (ici, le moteur Spark) doit pouvoir interpréter la requête dans un langage natif, comme le SQL. Concrètement, les paramètres de dates deviennent des conditions “WHERE” dans la requête. D’un point de vue du stockage de données, celles-ci sont partitionnées selon la granularité de dates utilisée dans la paramétrage (ici, le mois).

Afin de vérifier si toutes les partitions ou non sont actualisées, il faut utiliser un espace de travail Power BI de capacité Premium et se connecter selon le processus détaillé dans cette page.

Ensuite, depuis SQL Server Management Studio, nous pouvons visualiser les partitions et l’heure de traitement.

Le jeu de données ne couvre que l’année 2019, ce qui explique les nombres de lignes à 0 à partir de 2020. Les partitions antérieures à décembre 2019 n’ont pas été rafraichies, ce qui correspond bien au comportement souhaité. En revanche, il faut se méfier du temps total que peut prendre une telle approche car elle multiplie les requêtes auprès du cluster, partition par partition.

Utiliser un automated cluster “single node”

Ne le cachons pas, Spark déploie toute sa puissance lorsque les volumes de données sont significatifs. Pour autant, devant la simplicité d’utilisation d’Azure Databricks, il est tentant de centraliser tous les traitements de données dans des notebooks lancés sur un cluster.

Et un cluster, par définition, est constitué de plusieurs machines, a minima un driver et un worker qui échangeront des informations. C’est toute la puissance de cette architecture qui distribue les traitements sur un nombre de workers éventuellement automatiquement “scalable”.

Mais dans le cas de traitements simples, une seule machine virtuelle (bien dimensionnée) serait suffisante et éviterait en plus des échanges réseaux qui pourraient même perturber le traitement. En étant pragmatique, ce sont aussi des coûts divisés au moins par deux !

Une solution existe maintenant : le mode de cluster “Single Node“.

Cette fonctionnalité est aujourd’hui (octobre 2020) en préversion publique et décrite sur le site de Databricks.

On observe ce paramétrage dans la version JSON de la définition du cluster, au niveau de la configuration du cluster.

{
"num_workers": 0,
"cluster_name": "MySingleNodeCluster",
"spark_version": "7.0.x-scala2.12",
"spark_conf": {
"spark.master": "local[*]",
"spark.databricks.cluster.profile": "singleNode"
},
"node_type_id": "Standard_DS3_v2",
"ssh_public_keys": [],
"custom_tags": {
"ResourceClass": "SingleNode"
},
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
},
"autotermination_minutes": 120,
"enable_elastic_disk": true,
"cluster_source": "UI",
"init_scripts": []
}

Imaginons maintenant que vous souhaitiez lancer vos traitements avec la logique “automated cluster“, s’appuyant éventuellement sur un pool de machines virtuelles. Cela est tout à fait possible depuis l’interface de jobs de Databricks mais dans une architecture data Azure plus large, on s’appuyera souvent sur le rôle d’ordonnanceur de Azure Data Factory.

Dans Azure Data Factory, nous définissons un service lié Databricks. Dans l’interface graphique, nous ne trouvons pas le mode SingleNode mais la configuration Spark peut être précisée.

Cela semble suffisant mais nous pouvons aussi préciser le nombre de workers à 0, même si un message d’alerte apparaît alors. Celui-ci n’est pas bloquant.

Afin de contrôler que ce sont bien des “automated single node clusters” qui se sont exécutés, vous pouvez consulter les logs de l’activité Data Factory où se trouve un lien pointant vers l’exécution du notebook et la configuration de clsuter associée.

Databricks vers Azure SQL DB avec SQL Spark Connector

Dans un précédent article, je décrivais le connecteur JDBC permettant de lire ou écrire entre un cluster Databricks et une base Azure SQL.

Un nouveau connecteur est maintenant disponible et merci à Benjamin CHOURAKI qui me l’a signalé sur LinkedIn :

https://github.com/microsoft/sql-spark-connector

La documentation indique qu’il faut, dans le cas d’un cluster Databricks, redéfinir le driver, ce qui se fait au niveau de la configuration Spark du cluster.

Nous éditions pour cela le cluster afin d’ajouter la ligne suivante dans les options avancées :

connectionProperties = { "Driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" }

Le JAR du connecteur est téléchargeable à la page des releases du projet : https://github.com/microsoft/sql-spark-connector/releases

Il suffit ensuite de l’installer au niveau du cluster.

La librairie est maintenant installée.

Il est alors possible d’utiliser le driver au travers d’un code pyspark, dans un notebook. Nous commençons par définir tous les éléments de connexion.

hostname = "<servername>.database.windows.net"
server_name = "jdbc:sqlserver://{0}".format(hostname)

database_name = "<databasename>"
url = server_name + ";" + "databaseName=" + database_name + ";"
print(url)

table_name = "<tablename>"
username = "<username>"
password = dbutils.secrets.get(scope='', key='<passwordScopeName')

Notez au passage l’appel au secret scope de Databricks, lui-même synchronisé avec une ressource Azure Key Vault.

Le code suivant permet de définir la structure d’un Spark Dataframe qui sera utilisé en insertion (mode append) ou bien en « annule et remplace » (mode overwrite). Il n’est bien sûr pas possible de traiter directement un pandas dataframe, nous en réalisons donc une conversion.

from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.types import StructType
from pyspark.sql.types import *

schema = StructType([
StructField("champ_date",TimestampType(),True),
StructField("champ_texte",StringType(),False),
StructField("champ_num",IntegerType(),True)
])

# conversion si le dataframe est un pandas dataframe
sparkdf = spark.createDataFrame(df, schema)

try:
sparkdf.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
.option("reliabilityLevel", "BEST_EFFORT") \
.option("tableLock", "true") \
.save()
except ValueError as error :
print("Connector write failed", error)

Les différentes options, décrites dans cette documentation, permettent d’optimiser la requête en écriture. Ainsi, pour l’écriture d’environ 100000 lignes, l’option tableLock à True permet de passer 1,62m à 1,29m.

D’autres exemples d’utilisation du connecteur sont disponibles ici : https://github.com/microsoft/sql-spark-connector/tree/master/samples

Utiliser le driver jdbc depuis Azure Databricks

Dans une architecture cloud Azure, la ressource de “compute” Databricks va bien souvent être utilisée pour transformer la donnée brute en donnée dite nettoyée ou enrichie. Cette donnée peut bien sûr être stockée sur un Data Lake, par exemple dans un format Parquet (nous y reviendrons en fin d’article) mais les outils d’exploration et de visualisation de données comme Microsoft Power BI présentent de nombreux avantages à s’appuyer sur une base de données relationnelle (actualisation incrémentielle, DirectQuery…).

Nous partirons ainsi de l’architecture Azure ci-dessous :

Architecture Azure hybride pour des projets data de visualisation et de prévision

Nous lançons tout d’abord un notebook Python où nous définissons la chaîne de connexion. Il sera bien sûr très judicieux d’utiliser ici le secret scope de Databricks pour stocker toutes ces informations.

Il s’agit maintenant d’écrire un jeu de données nettoyées et travaillées en mémoire sous forme de Spark dataframe dans une table de la base de données. Cette opération se fait tout simplement au moyen de la méthode write associée aux informations de connexion : URL JDBC et propriétés de connexion.

Le paramètre de mode permet de choisir entre un “annule et remplace” de la table au moyen de la valeur overwrite ou une insertion à l’aide du mot clé append.

Il n’y a donc ici pas de mode prévu pour la suppression ou la mise à jour. Il faudra penser ce scénario de manière différente et peut-être au travers du format de fichier Delta, basé sur le format Parquet et sur lequel existent des méthodes delete et upsert. Pour autant, ce fichier restera en dehors de la base de données.

La méthode read de Spark est également possible et se fait en soumettant une requête SQL au travers du driver JDBC. Nous utilisons ici la syntaxe SQL propre à la base de données, ici le Transac-SQL de Microsoft.

L’alias de table sur la requête est indispensable pour être interpréter par le paramètre table de la méthode read.

Pour aller un peu plus loin dans l’exploitation de ce driver JDBC, nous pouvons créer une table dans le métastore du cluster, copie d’une table de la base de données.

Il est alors possible de créer des interactions en Spark SQL entre des vues créées à partir de dataframes Spark (ou Pandas en les convertissant au préalable) et la table du métastore. Ce scénario ne réalise qu’une lecture des données de la base et des opérations d’écriture sur cette table ne seront bien sûr pas répercutées sur la base de données.

Nous avons ici utilisé le driver JDBC de manière simple avec une ressource de type SQL Database. Vous retrouverez ici une autre manière de procéder au travers de Polybase pour Azure SQL Datawarehouse. Ce service Azure étant maintenant renommé Azure Synapse Analytics et disposant de nouvelles fonctionnalités, de prochains articles décriront les modes d’interaction entre fichiers, dataframes et tables. En attendant, je vous recommande cet épisode du podcast Big Data Hebdo autour de Synapse.

Ajouter des paramètres pour le déploiement du template ARM Azure Data Factory

La longueur du titre de cet article laisse présager du niveau de précision dans lequel nous allons nous lancer ! Je vais donc rapidement situer le contexte faisant appel à un tel besoin.

Nous cherchons à déployer de manière automatisée un environnement de développement Azure Data Factory sur l’environnement de production. Nous utilisons pour cela un pipeline de release Azure DevOps. Le processus DevOps pour ADF est expliqué dans ce livre blanc Microsoft ou dans cet excellent article de Florian Eiden.

Pour résumer les grandes lignes du fonctionnement, nous remplaçons dans un fichier de paramètres au format JSON les valeurs des chaînes de connexion de l’environnement de développement par celles de production. Ceci se fait au moyen de la case « override template parameters ».

Valeurs manquantes pour le service lié Databricks

La chose se complique lorsque nous utilisons un service lié de calcul de type Azure Databricks puisque les paramètres de ce service n’apparaissent pas par défaut dans le fichier de paramétrage ! Nous avons en particulier besoin de remplacer :

  • La Databricks Workspace URL
  • Le Secret name (où se trouve enregistré un Personal Access Token correspondant à l’espace de travail souhaité)

En effet, un choix arbitraire a été fait par Microsoft pour présenter uniquement certains paramètres mais un grand nombre de valeurs existent et elles sont visibles en éditant le fichier disponible dans le menu : Parametrization template.

Ce fichier au format JSON recense l’intégralité des paramètres disponibles par défaut, ainsi que leur visibilité au moyen d’une propriété qui n’est pas simple à interpréter :

= (signe égal) permet de conserver la valeur actuelle en tant que valeur par défaut pour le paramètre

– (signe moins) permet de ne pas conserver la valeur par défaut pour le paramètre

Le symbole | (pipe) permet de réaliser le lien avec une valeur stockée dans le coffre-fort Azure Key Vault.

Nous allons maintenant rechercher le nom des paramètres manquants. Pour cela, nous passons par la définition du service lié Databricks au format JSON.

Nous obtenons ainsi le nom exact des paramètres (ne pas se fier à l’interface graphique).

L’URL de l’espace de travail se nomme ainsi domain, le secret du Key Vault se désigne par secretName et ses deux propriétés dépendent du niveau TypeProperties et du sous-niveau accessToken pour le secret.

Nous pouvons donc maintenant citer ces propriétés dans le JSON des paramètres, en respectant l’arborescence des propriétés.

La définition des paramètres dans la branche Master

Arrive ici la partie peut-être la moins documentée (jusqu’à présent !). Il faut comprendre que ce fichier JSON doit figurer à la racine de la branche Master du dépôt contenant la synchronisation du Data Factory avec un gestionnaire de version comme GitHub ou un repo Azure DevOps, sous le nom arm-template-parameters-definition.json.

Il ne faut donc pas utiliser la branche spécifique à Data Factory qui se nomme adf_publish mais nous irons ensuite vérifier que les deux fichiers JSON qu’elle contient ont bien été modifiés en conséquence.

Pour lancer cette modification, nous devons tout d’abord faire les deux actions suivantes :

  • Refresh
  • Publish

Le volet latéral du Publish (pending changes) doit s’afficher même si aucune modification n’est visible.

Nous pouvons enfin vérifier que les paramètres sont disponibles dans le fichier ARMTemplateParametersForFactory.json de la branche adf_publish.

Modifier la valeur des paramètres pendant la release

Il ne reste qu’à utiliser ces noms de paramètres dans le pipeline de release Azure DevOps. Les nouvelles valeurs peuvent être définies comme des variables du pipeline pour plus de commodité.

Cet article a pu être écrit grâce à la documentation officielle de Microsoft ainsi que d’autres articles de blog, en anglais, que leurs auteurs en soient ici remerciés :

https://medium.com/@sanajitghosh/manage-ci-cd-pipelines-using-azure-devops-azure-data-factory-azure-databricks-8239a9ceef3

https://www.modern-dataengineering.com/post/how-to-add-custom-parameters-to-data-factory-templates

https://medium.com/@patrickpicard_50914/azure-data-factory-modifying-arm-template-parameters-53b11f38bced

Relancer un notebook Databricks en cas d’échec

Le code utilisé dans un notebook peut échouer pour une raison autre qu’un erreur de développement : fichier absent, API ne répondant pas, etc. Il peut donc être pertinent de relancer automatiquement un traitement en cas d’échec.

La documentation Databricks fournit un exemple de fonction, en Python ou en Scala, qui réalise ce mécanisme. Le code est bien sûr basé sur la fonction dbutils.notebook.run déjà présentée dans un précédent post.

Voici le code en Python, où un nombre d’essais maximum de 3 est paramétré par défaut :

 # Errors in workflows thrown a WorkflowException. 
def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
   num_retries = 0
   while True:
     try:
       return dbutils.notebook.run(notebook, timeout, args)
     except Exception as e:
       if num_retries > max_retries:
         raise e
       else:
         print "Retrying error", e
         num_retries += 1

Et voici ce que l’on obtient à l’exécution. Attention à bien préciser le chemin relatif du notebook ainsi piloté, si celui-ci n’est pas situé au même niveau que le ce “master notebook”.

Attention à ne pas abuser de ce processus ! Il est essentiel de comprendre la nature des erreurs rencontrées et d’y apporter des réponses au travers du code.

Utiliser les variables d’environnement pour faciliter le déploiement continu des notebooks Databricks

Une fois l’infrastructure définie autour d’un cluster Databricks, les notebooks sont les éléments qui vont évoluer au gré des développements. Il faut bien sûr a minima définir deux environnements : l’un de développement, l’autre de production. Nous verrons ainsi plusieurs astuces et bonnes pratiques permettant de réaliser le processus du déploiement continu des notebooks.

Nous avons pu voir dans de précédents articles :

  • Comment définir un point de montage vers un compte de stockage Azure
  • Comment versionner les notebooks Databricks par exemple sous GitHub

Nous allons utiliser ici la notion de variable d’environnement, propre au cluster Spark.

Le schéma ci-dessous illustre le mécanisme DevOps qui sera mis en place.

Mais pour l’instant, focalisons-nous sur le chemin menant vers les données. Nous utilisons ici deux environnements identiques d’un point de vue de l’architecture, dont une vision simplifiée est donnée sur le schéma ci-dessous :

L’accès au point de montage défini sur le compte de stockage Azure se fait par exemple au moyen des commandes Databricks dbutils :

dbutils.fs.ls('/mnt/dev/mysfilesystem/')

Les variables d’environnement sont quant à elles définies au niveau d’un cluster. Elles seront donc accessibles de n’importe quel notebook attaché au cluster. Nous les trouvons en dépliant le menu des options avancées, onglet Spark.

Par convention, nous utilisons une casse majuscule pour le nom des variables.

Attention à ne pas mettre d’espace autour du signe « = ». Les guillemets ne sont en revanche pas indispensables autour de la valeur de la variable.

Un redémarrage du cluster sera alors nécessaire, suite à la modification des variables d’environnement.

Maintenant, différentes commandes, dans les langages supportés, vont nous permettre d’accéder aux variables définies. Pour la compatibilité dans un même notebook, les lignes de scripts seront ici précédées du langage dans lequel elles sont écrites, vous pourrez ainsi copier ce code tel quel dans n’importe quel notebook Databricks.

Liste des variables d’environnement :

%sh printenv

Valeur de la variable en Shell :

%sh echo $MOUNT_PATH

Valeur de la variable d’environnement en Python :

%python

import os

key = 'MOUNT_PATH'
value = os.getenv(key)

print("Value of 'MOUNT_PATH' environment variable :", value)

A noter que la commande getenv() peut être remplacée par environ.get() issue également de la librairie os. Les différences entre les deux sont traitées dans cette question sur StackOverFlow.

Valeur de la variable d’environnement en Scala :

%scala
sys.env("MOUNT_PATH")

Il est donc maintenant possible d’utiliser ces variables dans les chaînes d’accès au système de fichier, avec un code qui réagira alors en fonction de l’environnement !

%python
dbutils.fs.ls(os.getenv('MOUNT_PATH') + '/mysfilesystem/')

La définition des variables d’environnement est également réalisable si vous utilisez un “automated cluster“, c’est-à-dire un cluster créé à la volée lors du lancement d’un job planifié.

La configuration du cluster est disponible en cliquant sur le bouton “edit”.

Pérenniser une table Azure Databricks dans SQL DWH

Le proverbe bien connu “diviser pour mieux régner” a sa déclinaison dans le monde de la Data et des services managés : “séparer le traitement du stockage”. Par cela, il faut comprendre que l’utilisation de deux services différents pour ces deux tâches est particulièrement intéressant.

En effet, le stockage se doit d’être permanent et toujours accessible, en tenant compte de différents degrés de “chaleur”. En revanche, la puissance de calcul n’est nécessaire que pendant les traitements et il faudra pouvoir faire évoluer cette puissance selon le besoin. Par exemple, un entrainement de modèle prédictif, opération qui peut être très coûteuse, bénéficiera de l’élasticité d’un service managé comme Azure Databricks mais ne sera peut-être pas réalisé quotidiennement.

Nous allons détailler ici comment pérenniser les données issues d’un traitement de préparation réalisé sur le cluster managé Spark. La solution de stockage choisie ici est Azure SQL Data Warehouse.

Créer une ressource Azure SQL DWH

Il est tout d’abord nécessaire de disposer d’une ressource Azure de serveur de bases de données.

La documentation officielle d’Azure Databricks recommande de cocher la case “Allow Azure services to access server”.

Nous sélectionnons maintenant la ressource Azure SQL Data Warehouse dans la catégorie Databases.

Le Data Warehouse est associé à un groupe de ressources et à un serveur de base de données (ici, créé simultanément). Le niveau de performance choisi va déterminer le coût associé à une heure de service de l’entrepôt.

Cette ressource se montra particulièrement efficace dans le cadre d’une connexion vers un outil de dashboarding comme Power BI et autorise le mode direct query, qui pourra se révéler pertinent dans des modèles de données composites, mêlant import et connexion directe.

Une clé de chiffrement pour la base étant obligatoire, il sera nécessaire de créer une database master key au travers d’une nouvelle requête sur la base de données. Cela peut se faire par exemple dans le client SQL Server Management Studio ou sur le portail Azure par le query editor actuellement en préversion.

--Creates the database master key
CREATE MASTER KEY ENCRYPTION BY PASSWORD = "yourStr0ngPa$$W0rd"

Faire communiquer Azure Databricks et SQL DWH

Afin de bien paramétrer la communication, il faut tout d’abord comprendre comment fonctionne le mécanisme. La subtilité à bien saisir est l’importance d’un troisième élément qui est le compte de stockage Azure utilisé comme zone temporaire et sollicité par le composant PolyBase.

Schéma extrait de la documentation officielle Databricks
Source : https://docs.databricks.com/data/data-sources/azure/sql-data-warehouse.html

Vérifions tout d’abord que le connecteur SQL DWH est présent sur le runtime Databricks associé au cluster Spark au moyen de la commande Scala ci-dessous.

La commande ne doit pas renvoyer d’erreur “ClassNotFoundException”.

Dans une cellule d’un notebook, nous déclarons toutes les variables nécessaires à la bonne communication entre les différentes briques, en particulier le compte de stockage Azure (Blob Storage ou Data Lake Storage gen2).

Le code ci-dessous est donné pour un notebook Python mais sa variante en Scala s’obtiendra facilement en ajoutant le mot-clé var au devant de chaque déclaration de variable.

storage_account_name = "nomDuBlobStorage"
blobStorage = storage_account_name+".blob.core.windows.net" blobContainer = "nomDuConteneur"
blobAccessKey =  "MauvaiseIdéeDeCopierIciUneClé000PensezAuSecretScope!"

tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
 acntInfo = "fs.azure.account.key."+ blobStorage

dwServer = "nomDuServeur"+".database.windows.net"
dwDatabase = "nomDuDWH" 
dwUser = "nomDeLUtilisateur"
dwPass = "ToujoursUneMauvaiseIdéePensezVraimentAuSecretScope"
dwJdbcPort =  "1433"

dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"

sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass

Attention à faire cette étape proprement au moyen d’un secret scope !

Des travaux de data preparation nous ont permis de réaliser un DataFrame propre contenant des données plus exploitables. Une sauvegarde du DataFrame est réalisée sous forme de table sur le cluster mais celle-ci ne sera accessible que lorsque le service Azure Databricks est démarré (et donc facturé).

Nous allons donc réaliser une copie des données sur Azure SQL Data Warehouse.

Grâce aux actions préalables, il est maintenant possible de lancer les commandes load ou write pour communiquer avec Azure SQL Data Warehouse. Dans la commande ci-dessous, nous créons une nouvelle table dans la base de données à partir d’un DataFrame en mémoire du cluster.

dwTable = "nomDeLaNouvelleTableDuDWH"

df.write.format("com.databricks.spark.sqldw") \
    .option("url", sqlDwUrlSmall) \
    .option("forwardSparkAzureStorageCredentials", "true") \
    .option("dbTable",dwTable) \
    .option("tempDir",tempDir) \
    .save()
Les logs du cluster montrent que la communication s’effectue bien avec le Data Warehouse.

Automatiser les traitements

Nous avons donc réalisé ici la partie cruciale de la chaîne de la data, en séparant traitement et stockage des résultats. Pour rendre cette architecture encore plus efficace, il sera nécessaire de planifier le traitement de préparation des données . Plusieurs solutions sont ici disponibles :

  • l’ordonnanceur d’Azure Databricks, couplé à une logique d’enchaînement de notebooks
  • l’utilisation d’un pipeline Azure Data Factory et d’une activité Databricks

Ces deux approches sont décrites dans cet article.

Dupliquer les données, bonne idée ?

A cette question, nous pourrons formuler la traditionnelle réponse du consultant : “ça dépend”.

Rappelons qu’il faut bien évaluer les usages et les coûts d’une telle architecture. Quel est le public qui a besoin de cette donnée préparée ? Plutôt des data analysts au sein de Power BI ? Plutôt des data scientists dans un cluster “bac à sable” ?

Azure SQL Data Warehouse offre une puissance d’accès pour des usages analytique mais il faut mesurer son coût si la base reste accessible en continu. A l’inverse, les tables matérialisées sur le cluster retirent une brique de l’architecture (et de la facture !) mais les performances du connecteur Spark pour Power BI ne me semblent pas aujourd’hui suffisantes pour des volumes de données importants.

Une fois de plus, la bonne architecture cloud Data sera celle qui répondra le mieux aux besoins, dans un cadre gouverné et dont le coût et la performance seront supervisés de près.

[EDIT 13 novembre 2019 : Microsoft a annoncé lors de l’Ignite d’Orlando qu’Azure SQL Data Warehouse évoluait et devenait au passage Azure Synapse Analytics. Nous suivrons de près cette évolution.]

D’Azure Databricks à Power BI : quel(s) chemin(s) ?

Si vous avez suivi mes derniers articles sur ce blog, vous aurez deviné que je suis plus que convaincu de l’intérêt de mettre le service de clusters managés Databricks au sein d’une architecture cloud data.

Si l’on met de côté l’exploitation des données par des algorithmes de Data Science, il sera toujours très intéressant de visualiser et d’explorer la donnée dans un outil d’analyse dynamique comme Power BI. Et cela tombe bien, il existe un connecteur (générique) Spark !

Connecter Power BI Desktop à une table du cluster Databricks

Voici comment procéder pour charger les données d’un cluster dans un modèle Power BI.

Tout d’abord, il faudra installer sur le poste exécutant Power BI Desktop le driver Spark ODBC. Celui-ci peut être téléchargé au travers d’un lien reçu par mail suite à l’inscription sur ce formulaire. L’installation ne révèle aucune difficulté : next, next, next…

Passons ensuite sur l’interface de notre espace de travail Azure Databricks. Nous démarrons le cluster et il sera possible d’y trouver une information importante qu’est l’URL JDBC.

Cette URL va permettre de construire le chemin du serveur attendu dans la boîte de dialogue sous la forme générique suivante :

  https://<region>.azuredatabricks.net:443/sql/protocolv1/o/0123456789012345/01234-012345-xxxxxxx 

Il faut donc ici remplacer <region> par le nom de la région Azure où se trouve la ressource Databricks, par exemple : westus. A la suite du port 443, on copiera la partie de l’URL JDBC allant de sql au point-virgule suivant.

Seconde étape à l’intérieur de l’interface Databricks, nous créons maintenant un jeton d’accès pour l’application Power BI à partir des Users settings.

  Attention à bien copier la valeur affichée, il ne sera plus possible de la revoir !

 Revenons à Power BI. Dans la boîte de dialogue de connexion, coller l’URL construite dans la case Server, choisir le Protocol HTTP.

 En mode import, l’avantage sera de pouvoir continuer à travailler sans que le cluster soit démarré. Mais il faudra attendre un bon moment pour que le chargement de données se fasse dans Power BI. En effet, si l’on utilise un cluster Spark, c’est que bien souvent les volumes de données sont importants…

 En mode direct query, chaque évaluation de visuel dans la page de rapport établira une requête vers le cluster, qui bien évidemment devra être actif.

 Le user name est tout simplement le mot token. Coller ici le jeton généré depuis Azure Databricks.

 Nous accédons maintenant à toutes les tables ou vues du cluster ! N’insistez pas trop pour obtenir un aperçu, cette fonctionnalité semble peiner à répondre mais l’important est bien d’obtenir les données dans l’éditeur de requêtes.

Voici le code obtenu dans l’éditeur avancé. Nous retrouvons une logique classique de source et de navigation dans un élément de la source, ici une table. Le schéma de la table est respecté, il n’est pas nécessaire de typer à nouveau les champs dans Power Query.

Connecter un Dataflow à Azure Databricks

Les Dataflows de Power BI (à ne surtout pas confondre avec les data flows de Azure Data Factory !) sont une nouveauté du service Power BI qui vient de connaître beaucoup d’évolutions.

Pour l’expliquer simplement, on peut dire que Dataflow correspond à la version en ligne de Power Query, avec donc une capacité de traitement issue du cloud (partagée ou dédié dans le cadre d’une licence Premium) et la possibilité de partager le résultat des requêtes (appelées entités) à des créateurs de nouveaux rapports. Contrairement à un jeu de données partagé (shared dataset), il est possible de croiser plusieurs entités dataflows au sein d’un même modèle.

Les dataflows sont enfin le support des techniques de Machine Learning dans Power BI mais nous parlerons de tout cela une prochaine fois !

Début novembre 2019, de nouvelles sources de données sont disponibles dont la source Spark. Nous allons donc tenter de reproduire la démarche réalisée dans Power BI Desktop.

Nous retrouvons les mêmes paramètres de connexion, à savoir :

  • Server
  • Protocol (http)
  • Pas besoin de Gateway, les données sont déjà dans Azure
  • Username : token
  • Password : le jeton généré (on vous avait prévenu de conserver sa valeur 😊)

Il faut ensuite choisir la table ou la vue souhaitée.

Petite différence, les types de données ne sont pas conservés, il faut donc exécuter une commande « Detect data type » sur toutes les colonnes.

Rappelons enfin qu’un dataflow n’est pas chargé tant qu’il n’est pas rafraîchi une premier fois. Cliquer ici sur Refresh now.

Un rafraichissement pour aussi être planifié mais il faudra bien s’assurer que le cluster Databricks soit démarré pour que la connexion puisse se faire.

Une fois le dataflow créé, il est accessible de manière pérenne aux développeurs qui travaillent dans Power BI Desktop et qui ont accès à l’espace de travail Power BI où a été créé le dataflow.

Nous vérifions ici dans l’aperçu que les champs sont maintenant bien typés.

En conclusion

Nous avons ici utilisé le connecteur Spark et celui-ci a nous permis, à partir de Power BI ou des dataflows du service Power BI, de nous connecter aux tables vues au travers du cluster Databricks.

Il s’agit là d’un connecteur générique et celui-ci n’est sans doute pas optimisé pour travailler la source Azure Databricks mais notons que le mode direct query est tout de même disponible.

Cette approche montrera rapidement ces limites quand les volumétries de données exploseront. Il sera alors nécessaire de réfléchir à une solution de stockage des données entre le cluster et Power BI comme Azure SQL DB ou Azure SQL DWH (bientôt Azure Synapse Analytics ?), portées ensuite éventuellement par un cube Azure Analysis Services qui exécutera les calculs nécessaires aux indicateurs présentés dans Power BI.

Toutefois, la faisabilité de cette connexion permettra de mener rapidement une preuve de concept jusqu’à la représentation visuelle des données. A la contrainte d’avoir le cluster démarré pour charger les données, on répondra par leur écriture au sein d’un dataflow (qui est techniquement un stockage parquet dans un Azure Data Lake Storage gen2 !). Attention, les dataflows ont leurs limites : ils ne peuvent être utilisés qu’au sein d’un seul espace de travail Power BI, sauf à disposer d’une licence Premium qui permettra de lier ce dataflow à cinq espaces de travail.

Quel langage pour la préparation de données sur Azure Databricks ?

Azure Databricks se positionne comme une plateforme unifiée pour le traitement de la donnée et en effet, le service de clusters managés permet d’aborder des problématiques comme le traitement batch, mais aussi le streaming, voire l’exposition de données à un outil de visualisation comme Power BI.

La capacité de mise à l’échelle (scalabilité ici horizontale) d’Azure Databricks  est également un argument de poids pour établir cette solution dans un contexte de forte volumétrie. Par forte volumétrie, nous entendons ici le fait qu’un tableau de données, ou les opérations qui permettraient d’y parvenir,  dépasse la mémoire disponible sur une seule machine. La solution sera donc de distribuer la donnée ou les traitements. Mais attention, il va falloir bien choisir ses armes avant de se lancer dans un premier notebook !

Le choix des armes

A la création d’un notebook sur Azure Databricks, quatre langages sont disponibles.

Sauf à ne vouloir faire que du SQL, je vous déconseille de prendre ce type de notebook puisqu’il sera très simple d’exécuter une commande SQL dans une cellule à l’aide la commande magique %sql.

Si vous venez de la Data Science « traditionnelle » (rien de péjoratif, c’est d’ailleurs mon parcours, comprendre ici l’approche statistique antérieure à l’ère de la Big Data), vous serez attirés par le langage R. Mais vous savez sûrement que R est un langage, de naissance, single-threaded et peu apte à paralléliser les traitements. Microsoft a pourtant racheté la solution RevoScaleR de Revolution Analytics et l’a intégrée en particulier à SQL Server. Dans un contexte Spark, on peut s’orienter vers l’API SparkR mais celle-ci ne semble pas soulever un très gros engouement (contredisez-moi dans les commentaires) dans la communauté.

Python remporte aujourd’hui une plus forte adhésion, en particulier auprès des profils venant du monde du développement. Attention, coder un traitement de données en Python ne vous apportera aucune garantie d’amélioration par rapport au même code R ! Ici, plusieurs choix se présentent à nouveau.

Une première piste sera d’exploiter la librairie Dask qui permet de distribuer les traitements.

Une deuxième possibilité est de remplacer les instructions de la librairie pandas par la librairie développée par Databricks : koalas. Je détaillerai sûrement l’intérêt de cette librairie dans un prochain article.

Le troisième choix possible sera à mon sens le meilleur : l’API pyspark. Cette API va vous permettre :

  • de créer des objets Spark DataFrames
  • d’enregistrer ces DataFrames sous formes de tables (locales ou globales dans Databricks)
  • d’exécuter des requêtes SQL sur ces tables
  • de lire des fichiers au format parquet

Les API Spark permettent d’écrire dans un langage de plus haut niveau et plus accessible pour les développeurs (data scientists ou data engineers) qui sera ensuite traduit pour son exécution dans le langage natif du cluster.

Les différents codes possibles

Je vous propose ci-dessous un comparatif des approches pandas et pyspark pour un même traitement de données. Le scénario est de charger les données de la base open data des accidents corporels, soit deux dossiers de fichiers csv : les usagers et les caractéristiques des accidents. Une fois les fichiers chargés, une jointure sera réalisée entre les deux sources sur la base d’une clé commune.

L’objectif est ici d’établir un parallèle entre les syntaxes et non d’évaluer les performances. La librairie pandas n’est d’ailleurs pas la plus optimale pour charger des fichiers csv et on mettra à profit les méthodes plus classiques de lecture de fichiers.

A l’import, création d’un dataframe

# pandas
df = pd.read_csv(filename)

# pyspark
df = spark.read.format("csv") \
.option("header", "true").option("inferSchema", "true").load("filename")  

Avec pandas, il pourra être nécessaire de caster certaines colonnes dates suite à l’import.

La lecture en Spark autorise l’emploi de caractères génériques ( ? ou *) pour lire et concaténer automatiquement plusieurs fichiers. Les archives .zip contenant par exemple un fichier .csv peuvent être chargées sans décompression préalable, ce qui ne sera pas le cas avec pandas.

Fusion de deux dataframes

# pandas
df = pd.merge(df1, df2, on='key', how='inner')

# pyspark
df = df1.join( df2, 'key', how='inner') 

L’avantage de Databricks sera de pouvoir persister certains dataframes sous forme de vues (« tables locales ») ou de tables (« tables globales).  En particulier, le format delta pourra être mis à profit. Dès lors, il devient très simple de travailler en Spark SQL pur, toujours en débutant la cellule par la commande magique %sql.

Vérification du dataframe

# pandas
df.shape
df.info

# pyspark
df.count()
len(df.columns)
df.printSchema

Pandas se montre ici à son avantage avec des résultats plus simples à obtenir pour connaître les dimensions du dataframe (shape) ou des informations sur le schéma et les valeurs manquantes.

En conclusion

Lorsqu’on pose à certains érudits musicaux la sempiternelle question « Stones ou Beatles », il n’est rare d’entendre la réponse suivante : « ni l’un, ni l’autre, mais les Kinks ! »

A la question non moins sempiternelle « R ou Python », je répondrai donc… Scala ! C’est en effet le langage natif de Spark et donc le plus proche du moteur d’exécution.

Malgré tout, le choix se fera aussi en fonction de critères plus pragmatiques comme la puissance nécessaire au traitement, le coût de travailler sur l’exhaustivité des données (pourquoi par un échantillon représentatif ?) ou la maîtrise des différents langages par les personnes en charge des développements.