Recept: Azure AI služby - Multivariate Detekcia anomálií
Tento recept ukazuje, ako môžete používať služby SynapseML a Azure AI na Apache Spark na multivariátov detekciu anomálií. Multivariálna detekcia anomálií umožňuje detekciu anomálií v mnohých premenných alebo časovom rade pri zohľadnení všetkých medzi korelácií a závislostí medzi rôznymi premennými. V tomto scenári použijeme synapseML na trénovanie modelu na multivariate detekciu anomálií pomocou služieb Azure AI. Potom na model použijeme na odvodenie multivariálnych anomálií v množine údajov obsahujúcej syntetické merania z troch senzorov IoT.
Dôležité
Od 20. septembra 2023 nebudete môcť vytvárať nové zdroje detektora anomálií. Služba Anomaly Detector sa vyradí 1. októbra 2026.
Ďalšie informácie o detektore anomálií Azure AI nájdete na tejto stránke dokumentácie.
Požiadavky
- Predplatné služby Azure – vytvorte si ho zdarma
- Pripojte svoj notebook k jazeru. Na ľavej strane vyberte položku Pridať a pridajte existujúci lakehouse alebo vytvorte jazero.
Inštalácia
Pomocou pokynov na vytvorenie Anomaly Detector
zdroja pomocou portálu Azure alebo alternatívne môžete na vytvorenie tohto zdroja použiť aj Azure CLI.
Po nastavení Anomaly Detector
môžete preskúmať metódy spracovania údajov rôznych formulárov. Katalóg služieb v službe Azure AI poskytuje niekoľko možností: Vision, Speech, Language, Web search, Decision, Translation a Document Intelligence.
Vytvorenie zdroja detektora anomálií
- Na portáli Azure vyberte položku Vytvoriť vo svojej skupine zdrojov a potom zadajte text Anomaly Detector (Detektor anomálií). Vyberte zdroj Detektor anomálií.
- Pomenujte prostriedok a v ideálnom prípade použite rovnakú oblasť ako zvyšok skupiny prostriedkov. Použite predvolené možnosti pre ostatné a potom vyberte položku Skontrolovať a vytvoriť.
- Po vytvorení zdroja Anomaly Detector ho otvorte a vyberte
Keys and Endpoints
panel na ľavej navigačnej table. Skopírujte kľúč zdroja Anomaly Detector do premennejANOMALY_API_KEY
prostredia alebo ho uložte do premennejanomalyKey
.
Vytvorenie prostriedku konta úložiska
Ak chcete uložiť priebežné údaje, musíte si vytvoriť konto úložiska objektu Blob platformy Azure. V rámci tohto konta úložiska vytvorte kontajner na uloženie prechodných údajov. Poznačte si názov kontajnera a skopírujte reťazec pripojenia do tohto kontajnera. Premennú a BLOB_CONNECTION_STRING
premennú prostredia budete potrebovať neskôrcontainerName
.
Zadanie kľúčov služby
Začnime nastavením premenných prostredia pre naše kľúče služby. Ďalšia bunka nastaví ANOMALY_API_KEY
premenné prostredia a BLOB_CONNECTION_STRING
na základe hodnôt uložených v službe Azure Key Vault. Ak ste spustili tento kurz vo vlastnom prostredí, skôr ako budete pokračovať, uistite sa, že ste tieto premenné prostredia nastavili.
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Teraz umožňuje čítať ANOMALY_API_KEY
premenné prostredia a BLOB_CONNECTION_STRING
nastaviť containerName
premenné a location
.
# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
"wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"
Najprv sa pripojíme k nášmu kontu úložiska, aby tu detektor anomálií mohol ušetriť priebežné výsledky:
spark.sparkContext._jsc.hadoopConfiguration().set(
f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)
Importujme všetky potrebné moduly.
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.cognitive import *
Teraz si prejdime naše vzorové údaje do údajového rámca Spark.
df = (
spark.read.format("csv")
.option("header", "true")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)
df = (
df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)
# Let's inspect the dataframe:
df.show(5)
Teraz môžeme vytvoriť estimator
objekt, ktorý sa používa na trénovaie nášho modelu. Určujeme časy začiatku a konca tréningových údajov. Zadáme tiež vstupné stĺpce, ktoré sa majú použiť, a názov stĺpca, ktorý obsahuje časové pečiatky. Nakoniec určíme počet údajových bodov, ktoré sa použijú v posuvné okne detekcie anomálií, a nastavíme reťazec pripojenia na konto ukladacieho priestoru objektu Blob platformy Azure.
trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
estimator = (
FitMultivariateAnomaly()
.setSubscriptionKey(anomalyKey)
.setLocation(location)
.setStartTime(trainingStartTime)
.setEndTime(trainingEndTime)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
)
Teraz, keď sme vytvorili zostavu estimator
, prispôsobme ju údajom:
model = estimator.fit(df)
```parameter
Once the training is done, we can now use the model for inference. The code in the next cell specifies the start and end times for the data we would like to detect the anomalies in.
```python
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"
result = (
model.setStartTime(inferenceStartTime)
.setEndTime(inferenceEndTime)
.setOutputCol("results")
.setErrorCol("errors")
.setInputCols(inputColumns)
.setTimestampCol(timestampColumn)
.transform(df)
)
result.show(5)
Keď sme volali .show(5)
do predchádzajúcej bunky, zobrazilo sa v nej prvých päť riadkov v údajovom rámci. Výsledky boli všetky null
, pretože neboli vnútri okna inferencie.
Ak chcete zobraziť výsledky iba pre odvodené údaje, umožňuje vybrať potrebné stĺpce. Potom môžeme zoradiť riadky v údajovom rámci podľa vzostupného poradia a filtrovať výsledok tak, aby zobrazoval iba riadky, ktoré sú v rozsahu okna inferencie. V našom prípade inferenceEndTime
je rovnaký ako posledný riadok v údajovom prvku, takže to môžeme ignorovať.
Nakoniec, aby ste mohli lepšie vykresliť výsledky, umožňuje skonvertovať údajový rámec Spark na údajový rámec Pandas.
rdf = (
result.select(
"timestamp",
*inputColumns,
"results.contributors",
"results.isAnomaly",
"results.severity"
)
.orderBy("timestamp", ascending=True)
.filter(col("timestamp") >= lit(inferenceStartTime))
.toPandas()
)
rdf
contributors
Naformátujte stĺpec, ktorý uchováva skóre príspevku z každého senzora do zistených anomálií. Ďalšia bunka naformátuje tieto údaje a rozdelí skóre príspevku každého senzora do svojho vlastného stĺpca.
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {"series_0": 0, "series_1": 0, "series_2": 0}
rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
[rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf
Výborne! Teraz máme skóre príspevku senzorov 1, 2 a 3 v series_0
stĺpcoch , series_1
a series_2
.
Spustením ďalšej bunky vykreslíte výsledky. Parameter minSeverity
určuje minimálnu závažnosť anomálií, ktoré sa majú vykresliť.
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
rdf["timestamp"],
rdf["sensor_1"],
color="tab:orange",
linestyle="solid",
linewidth=2,
label="sensor_1",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_2"],
color="tab:green",
linestyle="solid",
linewidth=2,
label="sensor_2",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_3"],
color="tab:blue",
linestyle="solid",
linewidth=2,
label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()
anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)
plt.legend()
plt.title(
"A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
rdf["timestamp"],
rdf["severity"],
color="black",
linestyle="solid",
linewidth=2,
label="Severity score",
)
plt.plot(
rdf["timestamp"],
[minSeverity] * len(rdf["severity"]),
color="red",
linestyle="dotted",
linewidth=1,
label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
rdf["timestamp"],
rdf["series_1"],
width=2,
color="tab:green",
label="sensor_2",
bottom=rdf["series_0"],
)
plt.bar(
rdf["timestamp"],
rdf["series_2"],
width=2,
color="tab:blue",
label="sensor_3",
bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
Na pozemkoch sa zobrazujú nespracované údaje zo senzorov (vo vnútri okna inferencie) v oranžovej, zelenej a modrej. Červené zvislé čiary na prvom obrázku zobrazujú zistené anomálie, ktoré majú závažnosť väčšiu alebo rovnajúcu sa minSeverity
hodnote .
Druhý vykreslený graf zobrazuje skóre závažnosti všetkých zistených anomálií s prahovou hodnotou uvedenou minSeverity
v bodkovanej červenej čiare.
Nakoniec posledné vykreslenia ukazuje príspevok údajov z každého senzora k zisteným anomáliám. Pomáha nám diagnostikovať a pochopiť najpravdepodobne najväčšiu príčinu každej anomálie.