Du rôle des pipelines dans Azure ML et son SDK

Si vous êtes habitués à manipuler le package Python scikit learn, la notion de pipeline vous est sans doute familière. Cet objet permet d’enchainer des étapes (“steps“) comme la préparation de données (normalisation, réduction de dimensions, etc.), l’entrainement puis l’évaluation d’un modèle. La documentation officielle du package donne ainsi cet exemple.

>>> from sklearn.svm import SVC
>>> from sklearn.preprocessing import StandardScaler
>>> from sklearn.datasets import make_classification
>>> from sklearn.model_selection import train_test_split
>>> from sklearn.pipeline import Pipeline
>>> X, y = make_classification(random_state=0)
>>> X_train, X_test, y_train, y_test = train_test_split(X, y,
...                                                     random_state=0)
>>> pipe = Pipeline([('scaler', StandardScaler()), ('svc', SVC())])
>>> # The pipeline can be used as any other estimator
>>> # and avoids leaking the test set into the train set
>>> pipe.fit(X_train, y_train)
Pipeline(steps=[('scaler', StandardScaler()), ('svc', SVC())])
>>> pipe.score(X_test, y_test)
0.88

Le SDK Python azureml que nous avons déjà évoqué sur ce blog dispose également d’un concept de pipeline. Celui-ci ne doit pas être confondu avec les pipelines pouvant être définis dans l’interface visuelle du concepteur (“designer“). Les pipelines en tant que tels sont visibles dans un menu dédié.

(NDA : on serait en droit d’espérer à termes une fusion de ces éléments, c’est-à-dire d’observer le pipeline dans l’interface du concepteur, voire de retrouver le pipeline créé par le code dans les objets manipulables visuellement).

Les avantages des pipelines

La manipulation des pipelines, en particulier des entrées-sorties, ne sera pas triviale, alors nous allons insister sur tous les bénéfices d’un tel objet. Au delà de la représentation visuelle toujours plus parlante, le principal avantage sera la planification du pipeline. Celle-ci peut se faire de trois manières. Le code présenté ici est disponible dans ce dépôt GitHub.

Objet Scheduler

L’interface du studio Azure Machine Learning ne présente pas d’ordonnanceur visuel des notebooks. Pour autant, cette notion d’ordonnancement existe bien et se manipule au travers du SDK. Une fois publié, un pipeline dispose d’un identifiant. A partir de cet id, un objet Schedule peut être défini, et se déclenchera selon une récurrence déclarée au moyen de l’objet ScheduleRecurrence.

A sa création, l’ordonnancement est activé. Il sera possible de le désactiver à partir de son identifiant (à ne pas confondre avec l’identifiant du pipeline).

Les points négatifs de cet approche sont le manque de visibilité sur les ordonnancements définis (il est nécessaire de lancer la commande Schedule.list) et le fait que d’autres activités non définies dans des scripts présents sur dans l’espace de travail Azure Machine Learning.

Pipeline Azure DevOps

Encore un pipeline à ne pas confondre avec le pipeline du SDK azureml ! Nous parlons ici des pipelines de release d’Azure DevOps. En recherchant le terme “azureml” dans l’assistant (volet de droite), nous trouvons trois tâches, dont une permettant de lancer un pipeline Azure ML désigné à nouveau par son identifiant.

Un pipeline de release peut ensuite être ordonnancé au moyen des standards d’écriture du fichier YAML.

Activité Azure Data Factory

Nous disposons de trois activités distinctes dans le groupe “Machine Learning”. Les deux premières concernent l’ancien Azure Machine Learning studio, aujourd’hui déprécié. Concentrons-nous sur la troisième activité qui permet d’exécuter un pipeline Azure ML.

Pour remplir les différents paramètres, nous devons tout d’abord fournir un service lié (linked service) de type Azure Machine Learning (catégorie “compute“).

Nous privilégions l’authentification par identité managée, celle-ci se voyant attribuer un rôle de contributeur sur la ressource Azure Machine Learning.

Seconde information obligatoire : l’ID du pipeline publié. Un menu déroulant nous permettra de le choisir. Nous constatons ici que cette information sera particulièrement sensible lorsque nous devrons re-publier le pipeline. Il faudra donc limiter au maximum cette action, car elle engendrera une modification dans les paramètres de l’activité Data Factory. Une problématique similaire se posera dans le cas d’un déploiement automatique entre environnements (par exemple, de dev à prod) avec des ID de pipelines différents.

Et maintenant, comment réaliser un pipeline

Nous allons maintenant plonger dans le SDK Python azureml pour décortiquer les étapes de création d’un pipeline.

Il nous faut pour base un script Python qui contiendra le code à exécuter par une étape (step) du pipeline. Nous ne travaillerons ici qu’avec des étapes de PythonStepScript mais il en existe d’autres types, référencés dans la documentation officielle. Nous prendrons la bonne habitude de faire figurer dans ces scripts les lignes de code suivantes :

from azureml.core.run import Run

run = Run.get_context()
exp = run.experiment
ws = run.experiment.workspace

Celles-ci permettront de retrouver les éléments du “niveau supérieur”, c’est-à-dire l’expérience, son exécution ainsi que l’espace de travail.

Ensuite, nous pourrons travailler sur les entrées et sorties de chaque étape. Cette gestion des entrées-sorties nécessitera un article à part entière sur ce blog.

Exemple de “graph” de sortie du pipeline (experiment)

Nous recréons ainsi, par le code (et au prix de nombreux efforts !), un objet similaire au pipeline obtenu dans le Designer.

from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep

step1 = PythonScriptStep(
    script_name="step1.py",
    compute_target=compute_target,
    source_directory="scripts",
    allow_reuse=True
)


step2 = PythonScriptStep(
    script_name="step1.py",
    compute_target=compute_target,
    source_directory="scripts",
    allow_reuse=True
)

steps = [ step1, step2 ]

pipeline = Pipeline(workspace=ws, steps=steps)
pipeline.validate()

Nous pouvons maintenant soumettre le pipeline, dont la logique des étapes aura préalablement été validée par l’instruction .validate().

pipeline_run = experiment.submit(pipeline)
pipeline_run.wait_for_completion()

Mais cela ne nous permettrait pas de réutiliser ce pipeline dans les scénarios évoqués ci-dessus, nous allons donc le publier avec l’instruction .publish().

published_pipeline = pipeline.publish(
name=published_pipeline_name,
description="ceci est un pipeline à deux étapes"
)
published_pipeline

Ceci nous permet de connaître l’identifiant du pipeline (masqué dans la copie d’écran ci-dessous).

Il ne reste plus qu’à soumettre le pipeline pour l’exécuter même si nous utiliserons vraisemblablement d’autres méthodes comme l’appel par Azure Data Factory ou l’emploi de l’API REST.

published_pipeline.submit(ws, published_pipeline_name)

Les logs de l’exécution seront accessibles dans le menu Pipelines du portail, qui nous redirigera vers le menu Experiments. N’oubliez pas d’activer l’affichage des “child runs” pour visualiser les traces de l’exécution de chacune des étapes.

Voici enfin un exemple de code qui permettra de lancer ce pipeline par requête HTTP.

from azureml.core.authentication import InteractiveLoginAuthentication
import requests

auth = InteractiveLoginAuthentication()
aad_token = auth.get_authentication_header()

rest_endpoint1 = published_pipeline.endpoint

print("You can perform HTTP POST on URL {} to trigger this pipeline".format(rest_endpoint1))

response = requests.post(rest_endpoint1, 
                         headers=aad_token, 
                         json={"ExperimentName": "My_Pipeline_batch",
                               "RunSource": "SDK"}
                        )

try:
    response.raise_for_status()
except Exception:    
    raise Exception('Received bad response from the endpoint: {}\n'
                    'Response Code: {}\n'
                    'Headers: {}\n'
                    'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))

run_id = response.json().get('Id')
print('Submitted pipeline run: ', run_id)

Attention, une réponse “correcte” à cet appel HTTP sera de confirmer le bon lancement du pipeline. Mais rien ne vous garantit que son exécution se fera avec succès jusqu’au bout ! Il faudra pour cela se pencher sur la remontée de logs (et d’alertes !) à l’aide d’un outil comme Azure Monitor.

Job d’entrainement de modèle : nouvelle expérience guidée

Avec un peu de pratique des services Azure Machine Learning, vous vous apercevrez qu’un mode de développement idéal peut se dérouler comme suit :

  • travailler dans un IDE (par exemple, Visual Studio Code) depuis votre poste
  • exécuter et tester le code localement
  • ajouter des interactions au service Azure ML : charger un dataset, enregistrer un modèle, logguer des métriques d’évaluation…
  • exécuter le code à distance et à l’échelle des données complètes sur une ressource de calcul (compute cluster)
  • éventuellement, intégrer ce code au sein d’un objet pipeline qui sera planifiable (scheduling)

Nous pouvons alors écrire un canevas global, basé sur le SDK azureml, qui prendra en entrée le script Python réalisant des traitements ou l’entrainement d’un modèle.

Dans une optique de simplification de ce processus, nous découvrons (septembre 2021) une fonctionnalité dans la home du portail Azure Machine Learning : “train a model“.

Vous pouvez d’ailleurs vous tenir informé.e.s des nouveautés d’Azure Machine Learning sur ce lien officiel.

Nous allons retrouver cette nouveauté dans le menu déroulant “Create new“.

Cliquons sur “Job (preview)” pour ouvrir la fenêtre ci-dessous.

Comme pour toute autre tâche, nous devons choisir une ressource de calcul : compute cluster, compute instance ou Kubernetes.

A la deuxième étape, nous choisissons un environnement d’exécution parmi les environnements prédéfinis, un environnement custom créé préalablement ou bien une image Docker stockée dans un Container Registry.

Nous choisissons ici un environnement disposant du SDK azureml, de la librairie Scikit-Learn et quelques packages supplémentaires.

Troisième étape, nous allons soumettre un script Python contenant l’entrainement d’un modèle. Nous choisissons comme exemple un script présent dans les “samples” de code, au niveau du menu notebooks.

Pour soumettre le fichier .py, nous pouvons réaliser un upload depuis le poste local ou pointer vers un compte de stockage Azure.

Pensez à donner un nouveau nom d’expérience à la place de “Default”.

Une ligne de commande va lancer le script sur l’environnement défini. Nous écrivons tout simplement l’instruction python suivie du nom du fichier .py.

Nous pourrions nous arrêter ici pour faire marcher cette démonstration mais jetons tout de même un œil sur les options disponibles au bas de cet écran.

Il est possible d’ajouter un ou plusieurs inputs au script :

  • un dataset enregistré sur Azure ML
  • un autre fichier local (par exemple, un autre script Python contenant des dépendances)
  • un chemin vers le blob storage ou le file share par défaut

Nous retrouvons donc des actions qui demanderaient l’usage du SDK azureml à l’intérieur du script initial.

L’écran Review résume enfin les paramètres choisis. Attention, le nom du job ne pourra plus être utilisé !

*

Une fois le job lancé, nous pourrons suivre son exécution dans le menu Experiment. Les logs indiquent ci-dessous les sorties prévues dans le code et des modèles au format pickle sont disponibles dans le répertoire outputs.

En synthèse, Microsoft semble nous mettre sur la voie d’une utilisation plus poussée de l’UI (interface utilisateur) au détriment du SDK azureml qui représente une marche supplémentaire dans l’industrialisation du Machine Learning.

Tout l’intérêt de cette fonctionnalité résidera dans le fait de pointer sur un dataset important (déclaré dans Azure ML) et de réaliser les calculs avec une ressource puissance (plus puissante que notre laptop !).

Il manque peut-être à ce jour une capacité à planifier voire ordonnancer les jobs mais des nouveautés seront sans doute bientôt annoncées.

Dans une optique d’industrialisation, il sera toujours plus intéressant de disposer du code “de bout en bout” afin d’en gérer le versioning, la répétabilité ou encore le déploiement entre workspaces mais nous gagnerons ici un temps précieux dans la phase cruciale de preuve de valeur des algorithmes.

Forecasting des séries temporelles avec la librairie fbprophet

En 2017, les équipes de recherche de Facebook publiaient ce papier qui introduira la librairie fbprophet, disponible en R et en Python (pas de jaloux !). Cet outil peut être rangé dans la catégorie des modèles additifs généraux car il décompose une série temporelle de la sorte :

y(t) = g(t) + s(t) + h(t) + e(t)

avec respectivement :

  • g(t) : la tendance (linéaire ou logistique)
  • s(s) : une ou plusieurs composantes saisonnières (annuelle, hebdomadaire ou quotidienne)
  • h(t) : l’effet des vacances ou de jours spécifiques qui pourront être paramétrés
  • e(t) : l’erreur, bruit aléatoire qui mesure l’écart entre le modèle et les données réelles

Si l’on retient le terme “additif”, il est bien sûr possible de modifier le modèle pour le rendre “multiplicatif” (observez les crêtes de votre série temporelles, si celles-ci forment un cône, le modèle est sans doute multiplicatif).

La grande force de ce type de modèle tient dans sa capacité à être interprété ainsi que dans la clarté des représentations graphiques de la décomposition. Il sera particulièrement adapté à des phénomènes comme… la fréquentation d’un réseau social mais aussi des mesures économiques fortement soumises à des saisonnalités et aux périodes de vacances d’un pays. Il est si simple à mettre en œuvre qu’il gagnera à être comparé à des modèles plus classiques comme SARIMA (nous en reparlerons en toute fin d’article).

Installation de la librairie

Pour ne pas “polluer” notre installation locale avec de nouveaux packages et leurs dépendances, nous créons au préalable un environnement virtuel, à l’aide du package pyenv pour Windows (voir ce GitHub). Ne pas oublier d’ajouter les variables d’environnement et de redémarrer votre terminal ou IDE pour terminer l’installation.

pyenv install 3.8.10

pyenv shell 3.8.10

Nous pouvons alors installer les librairies suivantes :

pip install pystan==2.19.1.1

pip install fbprophet

Pystan est une librairie pour l’inférence bayésienne. Si cette installation ne fonctionne pas (les dépendances sont parfois capricieuses…), vous pouvez utiliser un prompt Anaconda et tenter la commande suivante :

conda install -c conda-forge fbprophet

Vérifiez enfin la bonne installation en choisissant l’interpréteur voulu (ici, conda) dans Visual Studio Code.

Lancez ensuite Python dans le terminal et testez un import de la librairie.

Mise en pratique

Déroulons maintenant un exemple simple, de bout en bout, en réalisant quelques tentatives d’optimisation du modèle. RTE France met à disposition les données énergétiques “eco2mix” dont la profondeur d’historique et le niveau de détail sera intéressant pour évaluer notre outil. J’ai pris connaissance de ce jeu de données dans cet excellent article du blog de Publicis Sapient.

Une seule contrainte dans la façon dont les données doivent être soumises à Prophet : les colonnes de temps et de mesure quantitative doivent être respectivement nommées “ds” et “y”.

Une granularité plus fine que le jour peut être utilisée dans le champ datetime. C’est alors qu’il sera pertinent d’activer le daily effect qui sera visualisable graphiquement.

La régularité est un élément fondamental pour la modélisation d’une série temporelle, que viennent perturber les années bissextiles. Nous pouvons décider de supprimer les 29 février, par exemple avec le code ci-dessous :

df= df.loc[~(df['ds'].dt.month.eq(2) & df['ds'].dt.day.eq(29))]

Attention, nous dégradons alors inévitablement la régularité des semaines…

Création du modèle et évaluation

Lançons tout d’abord un modèle simple, sur l’intégralité de l’historique. Dans l’extrait de code ci-dessous, df représente un pandas dataframe regroupant les deux colonnes nommées ds et y.

De nombreux paramètres sont définis par défaut : additivité, pas de saisonnalité journalière, détection automatique des change points (changement de tendance), etc.

m = Prophet(daily_seasonality=False)
m.add_country_holidays(country_name='FR')
m.fit(df)

Ici, nous ajoutons au modèle les jours fériés français qui deviendront autant d’indicateurs dans notre modèle additif.

m.train_holiday_names

Comme l’indique cette discussion, il ne semble pas possible d’ajouter plusieurs pays à l’aide de cette méthode.

Nous pouvons également ajouter nos propres dates si nous considérons que des événements (répétitifs sur la saisonnalité attendue) ont un impact sur le phénomène observé. Nous pourrions par exemple identifier les journées les plus froides de l’année qui engendrent certainement une hausse de consommation électrique. Mais serons-nous en capacité de les prédire par la suite ? Mieux vaut se limiter à des événements connus à l’avance tels que des compétitions sportives (lors de la finale de la Coupe du Monde, tout le monde allume la télé !).

Les séries temporelles seraient si simples si toutes les tendances étaient linéaires ! Mais dans la vraie vie (et encore plus en période de pandémie…), les tendances fluctuent et il est indispensable que notre modèle les comprenne. Prophet identifie automatiquement les “change points” et on les visualise ainsi, en rouge, à l’aide du code ci-dessous.

from prophet.plot import add_changepoints_to_plot

fig = m.plot(forecast)
a = add_changepoints_to_plot(fig.gca(), m, forecast)

Nous pouvons aussi les définir arbitrairement. Pourquoi pas en donnant les dates des changements de saison (été et hiver) ?

m = Prophet(daily_seasonality=False,
changepoints=['2013-12-21', '2014-06-21', '2014-12-21', '2015-06-21', '2015-12-21', '2016-06-21', '2016-12-21', '2017-06-21'],
changepoint_prior_scale=1)

Le paramètre changepoint_prior_scale indique à quel point le modèle doit respecter notre liste (ou sa détection automatique). C’est une valeur entre 0 et 1. Avec 1, nos change points sont bien retenus, comme l’atteste ce graphique.

Forecast

Pour calculer une prévision, nous allons avoir besoin d’un nouveau dataframe contenant une colonne de type datetime, toujours nommée “ds”, que nous soumettrons à la méthode .predict(), appliquée au modèle.

future = pd.date_range(start="2020-01-01",end="2021-12-31")
future = pd.DataFrame(future, columns=['ds'])

Tout comme pour la méthode .fit(), la syntaxe est ainsi tout à fait similaire à celle du package scikit-learn.

forecast = m.predict(future)

Une autre méthode pour créer la plage de forecast consiste à utiliser la fonction ci-dessous.

future = m.make_future_dataframe(periods=365)

Cette fonction génère automatiquement une plage de dates couvrant l’historique complet auquel s’ajoute la période définie en paramètre. Ceci a pour avantage de nous permettre de comparer les prévisions avec les données réelles qui ont été utilisées pour le modèle, ce que nous allons observer dans les sorties graphiques.

Sorties graphiques

Pour afficher la superposition des historiques et des prévisions, un simple plot suffit !

m.plot(forecast)

Nous pouvons ensuite obtenir les différents graphiques de décomposition.

m.plot_components(forecast)

D’autres graphiques interactifs sont disponibles conjointement avec la librairie plotly.

Cross validation

Nous allons maintenant évaluer la qualité de la prévision à l’aide des métriques d’évaluation traditionnelles que sont MSE, RMSE, MAE, MAPE, etc. et d’une méthode de validation croisée. Ici encore, tout est intégré dans une fonction ! Analysons le code ci-dessous.

df_cv = cross_validation(m, initial='730 days', period='365 days', horizon='365 days')
df_p = performance_metrics(df_cv, rolling_window=1)

Nous retrouvons le modèle préalablement entraîné (m). Un seul paramètre est obligatoire, celui de l’horizon de prévision. Mais nous pouvons également préciser la période initiale d’entrainement (par défaut, 3 horizons, et en effet avec ces méthodes dites “à court termes”, ne vous aventurez pas à prédire au delà d’un tiers de votre historique !). Enfin, le paramètre period indique la taille des “découpes” faites dans le jeu de données pour établir de nouvelles prévisions (par défaut, un demi horizon). Voici un exemple de sortie de la commande lancée sur un cluster Databricks.

A partir d’un entrainement sur les années 2013 et 2014, la validation croisée a effectué 4 prévisions sur les années 2015 à 2018.


Nous obtenons alors les métriques d’évaluation, en moyenne sur le nombre de “folds“.

L’argument rolling_window indique le pourcentage de données considérées dans la prévision (1 équivaut à 100%).

Nous pouvons enfin rechercher les meilleurs hyperparamètres pour notre modèle.

Utiliser une approche par hyperparameter tuning implique de lancer un nombre important d’entrainements et d’évaluations. C’est ici que nous tirerons profit d’un cluster de machines, en précisant le paramètre parallel=’processes’ dans la fonction cross_validation(). L’exemple de code donné sur le site officiel sera simple à adapter.

Et maintenant, Kats !

La R&D de Facebook ne s’est pas arrêtée là ! En 2021, une nouvelle librairie est mise à disposition : Kats. Celle-ci a pour vocation de simplifier les tâches des Data Scientists autour de l’analyse et de la modélisation des séries temporelles.

Ici, pas de nommage particulier des colonnes, mais nous transformerons le dataframe en objet spécifique à Kats : TimeSeriesData().

Une première fonctionnalité consiste à déterminer automatiquement 65 features de la série temporelle, c’est-à-dire des caractéristiques de cette série (moyenne, variance, entropie, etc.) qui pourront être par la suite intégrées à des modèles de Machine Learning ou à une approche par régression de la modélisation de la série temporelle.

De nombreuses fonctionnalités s’orientent autour de la détection : seasonalities, outlier, change point, and slow trend changes

Enfin, Kats intègre un grand nombre de modèles (ARIMA, HW, stlf… et l’inévitable Prophet !). Bref, c’est le couteau suisse rêvé de tout.e Data Scientist qui s’attaque à un sujet de séries temporelles.