Lire les informations d’état Structured Streaming
Dans Databricks Runtime 14.3 LTS et versions ultérieures, sur un calcul configuré avec un mode d’accès dédié ou sans isolation, vous pouvez utiliser des opérations DataFrame ou des fonctions table-valeur SQL pour interroger les données et métadonnées d'état de Structured Streaming. Vous pouvez utiliser ces fonctions pour observer les informations d’état des requêtes avec état Structured Streaming, qui peuvent être utiles pour la supervision et le débogage.
Vous devez disposer d’un accès en lecture au chemin de point de contrôle d’une requête de diffusion en continu pour interroger les métadonnées ou les données d’état. Les fonctions décrites dans cet article fournissent un accès en lecture seule aux métadonnées et aux données d’état. Vous pouvez uniquement utiliser la sémantique de lecture par lots pour interroger les informations d’état.
Remarque
Vous ne pouvez pas interroger les informations d’état pour les pipelines Delta Live Tables, les tables de diffusion en continu ou les vues matérialisées. Vous ne pouvez pas interroger les informations d'état en utilisant l'informatique sans serveur ou avec des paramètres calculés selon le mode d'accès standard.
Lire le magasin d’état Structured Streaming
Vous pouvez lire les informations de magasin d’état pour les requêtes Structured Streaming exécutées dans n’importe quel Databricks Runtime pris en charge. Utilisez la syntaxe suivante :
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Les configurations facultatives suivantes sont prises en charge :
Option | Type | Valeur par défaut | Description |
---|---|---|---|
batchId |
Long | ID du dernier lot | Représente le lot cible à partir duquel effectuer la lecture. Spécifiez cette option pour interroger les informations d’état pour un état antérieur de la requête. Le lot doit être validé, mais pas encore nettoyé. |
operatorId |
Long | 0 | Représente l’opérateur cible à partir duquel effectuer la lecture. Cette option est utilisée quand la requête utilise plusieurs opérateurs avec état. |
storeName |
Chaîne | « PAR DÉFAUT » | Représente le nom du magasin d’états cible à lire. Cette option est utilisée lorsque l’opérateur avec état utilise plusieurs instances de magasin d’états.
storeName ou joinSide doit être spécifié pour une jointure flux-flux, mais pas les deux. |
joinSide |
Chaîne (« gauche » ou « droite ») | Représente le côté cible à partir duquel effectuer la lecture. Cette option est utilisée quand des utilisateurs souhaitent lire l’état à partir d’une jointure flux-flux. |
Les données retournées ont le schéma suivant :
Colonne | Type | Description |
---|---|---|
key |
Struct (autre type dérivé de la clé d’état) | La clé d’un enregistrement d’opérateur avec état dans le point de contrôle d’état. |
value |
Struct (autre type dérivé de la valeur d’état) | La valeur d’un enregistrement d’opérateur avec état dans le point de contrôle d’état. |
partition_id |
Entier | La partition du point de contrôle d’état qui contient l’enregistrement d’opérateur avec état. |
Lire les métadonnées d’état Structured Streaming
Important
Vous devez exécuter des requêtes de diffusion en continu sur Databricks Runtime 14.2 ou version ultérieure pour enregistrer les métadonnées d’état. Les fichiers de métadonnées d’état ne cassent pas la compatibilité descendante. Si vous choisissez d’exécuter une requête de diffusion en continu sur Databricks Runtime 14.1 ou version antérieure, les fichiers de métadonnées d’état existants sont ignorés et aucun nouveau fichier de métadonnées d’état n’est écrit.
Vous pouvez lire les informations de métadonnées d’état pour les requêtes Structured Streaming exécutées dans Databricks Runtime 14.2 ou version ultérieure. Utilisez la syntaxe suivante :
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Les données retournées ont le schéma suivant :
Colonne | Type | Description |
---|---|---|
operatorId |
Entier | L’ID entier de l’opérateur de diffusion en continu avec état. |
operatorName |
Entier | Nom de l’opérateur de diffusion en continu avec état. |
stateStoreName |
Chaîne | Nom du magasin d’état de l’opérateur. |
numPartitions |
Entier | Nombre de partitions du magasin d’état. |
minBatchId |
Long | L’ID de lot minimal disponible pour l’état d’interrogation. |
maxBatchId |
Long | L’ID de lot maximal disponible pour l’état d’interrogation. |
Remarque
Les valeurs d’ID de lot fournies par minBatchId
et maxBatchId
reflètent l’état au moment où le point de contrôle a été écrit. Les anciens lots sont nettoyés automatiquement avec l’exécution de micro-lots. Par conséquent, la valeur fournie ici n’est pas garantie d’être toujours disponible.