Dataïku DSS et Azure Synapse Analytics : connexion à la base de données

Nous avons présenté dans un précédent article la plateforme de Data Science de Dataïku et son installation sur une VM dans Azure. Nous avons également déjà abordé la connexion à un compte de stockage Azure, afin de lire des fichiers de données ou d’écrire les différents résultats obtenus.

Dans un Système d’Information non centré autour du Data Lake, certaines données particulièrement intéressantes pour le Machine Learning peuvent être stockées dans les tables d’une base de données, souvent orientée “analytique”. Le Data Warehouse d’Azure, Synapse Analytics, est disponible dans les connexions prévues par Dataïku DSS. Nous allons détailler ici les actions à réaliser pour définir la connexion à la base de données. Un grand merci à mon collègue Benjamin BENITO qui a réalisé les opérations techniques décrites ci-dessous.

Dans les paramètres de Dataïku DSS, nous pouvons trouver la liste des différentes bases de données SQL compatibles avec le Studio de Data Science.

Nous choisissons Azure Synapse. Il faut noter, que dans une exploitation plus large des capacités de Synapse Analytics, il sera aussi possible d’utiliser cette ressource comme moteur de calcul Spark. Nous verrons ceci dans un prochain article.

Dedicated SQL Pool

Une base de données de type “dedicated SQL pool” aura été préalablement créée du côté de Synapse Analytics.

Pour la démonstration, nous disposons d’une table à une colonne, contenant une valeur.

Nous pouvons maintenant renseigner les différents éléments attendus pour la définition de la connexion. Commençons par cocher la case “Use custom JDBC URL”.

L’URL JDBC se trouve sur le portail Azure, à la page de la ressource Dedicated SQL Pool, onglet “connection strings“. Nous utiliserons ici la première version, utilisant la connexion SQL.

L’URL JDBC se détaille comme ci-dessous.

jdbc:sqlserver://<synapse-name>.sql.azuresynapse.net:1433;database=<db-name>;user=<admin-login>@<synapse-name>;password={your_password_here};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;

Il faudra remplacer ici le texte {your_password_here} par le mot de passe du compte administrateur.

Ces informations, ainsi que le port ouvert (1433) pourront également être précisées dans les cases disponibles. Il ne reste plus alors qu’à tester la connexion à l’aide du bouton situé en bas à gauche de l’écran. Je vous recommande de créer la connexion (bouton “create”) au préalable afin de sauvegarder les informations saisies.

Il est alors fort probable que vous soyez confronté.e.s à l’erreur ci-dessous.

Cette erreur est due à l’absence de driver SQL Server sur la VM où est installé Dataïku DSS. L’URL suivante détaille cette erreur.

ERR_SQL_CANNOT_LOAD_DRIVER: Failed to load database driver — Dataiku DSS 9.0 documentation

Nous allons donc nous connecter à cette machine virtuelle Linux pour effectuer manuellement l’installation du driver attendu. Nous devons disposer pour cela d’un login / password ou d’une clé SSH.

Voici les différentes commandes à jouer successivement.

wget -O /tmp/sqljdbc_10.2.1.0_enu.zip https://download.microsoft.com/download/4/d/5/4d5a79be-35f8-48d4-a984-473747362f99/sqljdbc_10.2.1.0_enu.zip
unzip /tmp/sqljdbc_10.2.1.0_enu.zip -d /tmp/
sudo cp /tmp/sqljdbc_10.2\enu/mssql-jdbc-10.2.1.jre8.jar /home/dataiku/dss/lib/jdbc/

sudo su dataiku
dataiku/dss/bin/dss stop
dataiku/dss/bin/dss start

Les premières commandes réalisent le téléchargement du binaire nécessaire, le dézippe puis le copie à l’endroit attendu. Un restart de DSS est ensuite nécessaire.

La connexion est maintenant établie !

Nous pouvons utiliser le lien “Import tables to datasets” pour obtenir des données stockées dans le Data Warehouse.

Serverless SQL Pool

Pour utiliser le mode serverless de Synapse Analytics, nous devrions nous orienter vers un autre type d’authentification. En effet, ce mode correspond à la création de méta-données sur des fichiers présents dans un Data Lake mais sans matérialisation dans le Data Warehouse. Des droits sous-jacents (sur les fichiers du Data Lake) sont donc nécessaires et il n’est pas possible d’attribuer de tels droits au user connu uniquement de la base de données.

Le message d’erreur rencontré sera alors :

Failed to read data from table, caused by: SQLServerException: External table 'dbo.<external_table>' is not accessible because location does not exist or it is used by another process.

La documentation officielle de Dataïku donne toutes les étapes permettant d’utiliser l’authentification au travers d’Azure Active Directory.

Nous avons besoin d’un Principal de Service, c’est-à-dire d’une identité dans Azure. Nous relevons tout d’abord l’application ID (ou client ID) qu’il sera nécessaire de renseigner dans Dataïku DSS.

Dans le menu “Endpoints”, nous notons l’information de l’OAuth 2.0 token endpoint (v1).

Dans le menu “API permisssions”, nous devons ajouter la permission Azure SQL Database.

Choisir ensuite “Delegated permissions” puis cocher la case user_impersonation.

Nous terminons le paramétrage de cette identité en lui associant un secret. Notez ici bien la valeur du secret, que vous ne pourrez plus retrouver par la suite.

Toujours dans le portail Azure, il faut donner les droits de lecture sur le Data Lake au principal de service. Cela se fait au moyen de l’ajout d’un RBAC de type “Blob storage data reader”.

Sinon, une erreur comme ci-dessous se produira lors de l’aperçu de la table ou de la vue :

Failed to read data from table, caused by: SQLServerException: File 'https://adlsdataiku.dfs.core.windows.net/dataiku/input/diamonds.csv' cannot be opened because it does not exist or it is used by another process.

Nous reprenons maintenant la définition d’une nouvelle connexion Synapse Analytics depuis le Data Science Studio.

En cochant la case “Use custom JDBC URL”, il faut donner une Connection URL de la forme suivante :

jdbc:sqlserver://<synapse-name>-ondemand.sql.azuresynapse.net:1433;database=master;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30

Nous précisons le port (1433 par défaut) et le nom de la Database.

Pour la partie liée à l’Azure AD, il faut cocher la case “Login with Azure OAuth” puis coller les informations préalablement collectées :

  • STS URL (OAuth 2.0 token endpoint (v1))
  • Client id (application id)
  • Client secret (la valeur du secret et non le secret id)

Deux modes de “credentials” sont possibles : Global ou Per user. Ce second mode signifie que c’est le login utilisé dans DSS qui est repris pour s’authentifier auprès de la base Synapse. Il ne sera alors pas nécessaire de renseigner le client secret.

Depuis Synapse Analytics, dans une nouvelle requête SQL, nous allons déclarer le principal de service (SPN) comme un utilisateur de la base de données. Veillez à bien être connecté à “Built-in”, c’est-à-dire la base serverless.

Voici le code à exécuter, où le nom du SPN doit être mis à jour. Il est nécessaire d’attribuer des droits de type “BULK” puis “SELECT” sur les objets qui seront autorisés à Dataïku DSS.

CREATE USER [dataikuspn] FROM  EXTERNAL PROVIDER  WITH DEFAULT_SCHEMA=[dbo];
GO
GRANT ADMINISTER DATABASE BULK OPERATIONS TO dataikuspn;
GO
GRANT SELECT ON OBJECT::dbo.V_diamonds TO [dataikuspn];
GO

Il est maintenant possible d’ouvrir la liste des tables à importer depuis l’interface de DSS.

Ce second scénario s’avère intéressant pour limiter les coûts d’utilisation de Synapse Analytics (coût à la quantité de données lues) et facilite l’exploitation des données du Data Lake qui auront pu être réorganisées sous forme de vues SQL plus lisibles des utilisateurs de Dataïku.

Poser du Parquet dans le cloud Azure ? Oui !

En quelques années de l’ère “big data”, le format de fichier orienté colonne Parquet s’est imposé comme l’un des standards du stockage sur les lacs de données (data lake) grâce à ses performances de compression, sa gestion des partitions et son intégration avec des frameworks distribués comme Spark. Il s’agit d’un projet porté par la fondation Apache qui héberge la documentation officielle.

Avant d’obtenir un fichier Parquet, le format initial est bien souvent un format texte classique structuré comme du CSV ou semi-structuré comme le JSON. Nous sommes également dans le scénario de réalisation d’un couche “clean” au sein d’un data lake, à partir de nombreux fichiers de la couche “raw“, nécessitant d’être agrégés et optimisés pour des outils d’analyse.

Dans cet article, nous allons dérouler un cas pratique allant de la constitution de ces fichiers Parquet jusqu’à leur utilisation dans un rapport Power BI.

Partons d’une année de fichiers zip issus de vélos en location de la ville de New York, soit un total de 828Mo, atteignant 3,45Go une fois décompressés (CSV).

Nous disposons ainsi en tout de 19 506 857 lignes pour l’année 2020.

Avec un peu de script Python, nous pouvons décompresser automatiquement ces fichiers.

import os
import zipfile

def unzip_files(path_to_zip_file) -> None:
    directory_to_extract_to = "datasets"
    with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
        zip_ref.extractall(directory_to_extract_to)

for file in os.listdir("zip"):
    print(f"Unzipping : {file}")
    unzip_files("zip/"+file)

Ensuite, depuis un environnement Spark comme Azure Databricks, nous pouvons lire tous les CSV, qui auront été déposés au préalable sur un compte de stockage Azure.

file_location = "/mnt/nycitibike/datasets/*"
file_type = "csv"
df = spark.read.format(file_type).option("inferSchema", "true").option("header","true").load(file_location)
display(df)

Notez qu’il faudra renommer les colonnes comportant un ou plusieurs espaces, interdits en entêtes dans le format Parquet, ce que nous faisons avec le script ci-dessous.

df = df \
.withColumnRenamed("start station id","start_station_id") \
.withColumnRenamed("start station name","start_station_name") \
.withColumnRenamed("start station latitude","start_station_latitude") \
.withColumnRenamed("start station longitude","start_station_longitude") \
.withColumnRenamed("end station id","end_station_id") \
.withColumnRenamed("end station name","end_station_name") \
.withColumnRenamed("end station latitude","end_station_latitude") \
.withColumnRenamed("end station longitude","end_station_longitude") \
.withColumnRenamed("birth year","birth_year")

L’enregistrement au format Parquet est alors possible, toujours à l’aide de l’API PySpark.

df.write.format("parquet").mode("overwrite").save("/mnt/nycitibike/dbx_parquet")

Les données sont automatiquement partitionnées (il est possible d’avoir la main sur ce mécanisme, comme nous le verrons plus tard) et quelques fichiers spécifiques à Databricks sont ajoutés :

– ficher “commited” qui liste les différentes parties

– fichier vide “started”

– fichier vide _SUCCESS

Ces fichiers qui tracent la transaction réalisées vont nous gêner pour une usage dans Power BI. Nous pourrions éviter leur génération en définissant les options suivantes dans notre script Spark comme l’indiquent ces échanges sur le forum Databricks.

Il est possible de prendre la main sur le nombre de fichiers obtenus, ainsi que sur la clé qui aidera au partitionnement. Nous ajoutons deux fonctions que sont repartition() et partitionBy() dans la syntaxe d’écriture du Parquet.

df.repartition(1).write.partitionBy("YearMonth").format("parquet").mode('overwrite').save("/mnt/nycitibike/dbx_parquet")

Dans Power BI Desktop, nous disposons d’un type de source Parquet, à partir du menu “Obtenir les données”.

Ce connecteur attend le chemin d’un fichier Parquet et non d’un répertoire contenant les partitions sous forme de fichiers distincts. Nous découvrons ainsi la fonction du langage M qui réalise la lecture du fichier : Parquet.Document().

= Parquet.Document(File.Contents("C:\Users\PaulPeton\Documents\Python Scripts\Parquet\my_to_parquet.parquet"))

Cette fonction est documentée depuis juin 2020 mais elle n’a pas fait l’objet d’une grande communication jusqu’à présent (janvier 2021) et la roadmap Power BI indique un (nouveau ?) connecteur Parquet pour mars 2021.

Pour lire le contenu d’un dossier, nous pouvons sans problème appliquer la technique de connexion à un dossier sur un compte de stockage Azure, puis lire chaque fichier à l’aide du bouton “expand” de la colonne “Content” contenant tous les binaries. Cette manipulation crée automatique une fonction d’import contenant le code suivant.

= (Paramètre1) => let
Source = Parquet.Document(Paramètre1)
in
Source

Attention, pour réaliser cette opération, il faut au préalable filtrer les fichiers ne correspondant pas à des partitions (commit, started, SUCCESS pour un dossier Parquet généré par Databricks), qui débutent tous par un caractère underscore.

= Table.SelectRows(Source, each not Text.StartsWith([Name], "_"))

Les données se chargent alors en mettant bout à bout toutes les partitions !

Il est maintenant tentant d’essayer une actualisation incrémentielle de notre dataset, sur une période d’un mois. Pour autant, celle-ci n’est pas garantie comme nous l’indique le message d’alerte dans la pop-up de paramétrage du rafraichissement.

Une traduction approximative annonce que la requête “ne peut être pliée”, il s’agit ici du concept de query folding : la requête ne pourrait appliquer un filtre (de dates) en amont du chargement complet des données. Ceci signifie tout simplement de le mécanisme incrémentiel ne se réaliserait pas.

Afin de tester ce fonctionnement, nous publions le rapport sur un espace de travail Premium du service Power BI. Il sera alors possible d’ouvrir une connexion à l’espace à partir du client lourd SQL Server Management Studio.

Au premier chargement, nous n’aurons qu’une seule partition.

Mais en actualisant le dataset depuis le service, les partitions apparaissent.

Nous observons bien qu’une nouvelle actualisation du jeu de données n’affecte pas toutes les partitions. C’est gagné !

En ajoutant des fichiers supplémentaires, nous observons aussi que le paramètre de stockage de lignes (ici, sur un an) fonctionne et cela permet donc de créer un dataset actualisé sur une période glissante.

Pour terminer, précisons que la requête réalisée ici peut tout à fait être exécutée au sein d’un dataflow Power BI et servir ainsi à la création de différents rapports.

[EDIT du 23/01/2021 : cette approche fonctionne aussi avec le format Delta Lake créé depuis Azure Databricks, et c’est tant mieux car ce format apporte beaucoup d’avantages, qui seront sûrement abordés dans un prochain article.]

Déploiement continu avec Azure Synapse Analytics – partie 1

S’il était bien une fonctionnalité attendue pour la nouvelle (2020) version d’Azure Synapse Analytics, c’était la gestion du versionning et du déploiement des scripts créés dans l’outil.

Depuis la disponibilité générale (fin novembre 2020), nous trouvons un menu Source control dans la fenêtre Manage.

Nous pouvons définir la Git configuration, c’est-à-dire l’outil qui servira à l’enregistrement des développements et à leur versionning. Nous pouvons choisir entre Azure DevOps ou GitHub.

Nous choisissons ici Azure DevOps mais la partie GitHub est documentée sur ce lien officiel.

Nous devons disposer au préalable d’un projet DevOps et d’un repository dont l’URL devra être renseignée dans Synapse.

The Dev Ops repository link must start with http or https and be in DevOps format. For example: https://account.visualstudio.com/project/_git/repository

Dès lors, le dépôt se remplie avec les premiers éléments faisant partie de la ressource Synapse Analytics.

Dans le studio Synapse, nous disposons de plusieurs actions :

La validation permet de contrôler le développement réalisé et des erreurs seront remontées le cas échéant.

Le bouton Commit permet de “sauvegarder” le développement réalisé sur la branche active. Il est recommandé de ne pas développer directement sur la branche master.

Le bouton Publish de l’interface Synapse permet de créer ou mettre à jour la branche workspace_publish.

Il est donc logique de les utiliser successivement, dans cet ordre. L’action Publish réalisera une sauvegarde si nécessaire. A noter que les sorties (outputs) des cellules des notebooks sont effacées au moment de la publication. C’est une bonne pratique pour le versionning mais peut déstabiliser l’utilisateur souhaitant conserver l’affichage de ses résultats sans relancer le notebook complet.

Au fur et à mesure de l’utilisation de Synapse Analytics, de nouveaux répertoires vont se créer sur la branche collaborative.

Manquent ici triggers, credentials et Spark job definiition

Dans la branche workspace_publish, nous retrouvons deux fichiers JSON dans une logique complètement similaire à celle d’Azure Data Factory, permettant un déploiement de type “template ARM“.

Le fichier TemplateParametersForWorkspace.json contient en particulier le nom des informations de connexion (nom du workspace, URL, etc.) et les mots de passe, secrets ou token, sont effacés. Il faudra écraser ou renseigner ces valeurs lors du déploiement dans un autre environnement.

Dans une seconde partie, nous développerons les approches possibles pour le déploiement continue à l’aide d’Azure Pipelines.