Zdieľať cez


Recept: Azure AI služby - Multivariate Detekcia anomálií

Tento recept ukazuje, ako môžete používať služby SynapseML a Azure AI na Apache Spark na multivariátov detekciu anomálií. Multivariálna detekcia anomálií umožňuje detekciu anomálií v mnohých premenných alebo časovom rade pri zohľadnení všetkých medzi korelácií a závislostí medzi rôznymi premennými. V tomto scenári použijeme synapseML na trénovanie modelu na multivariate detekciu anomálií pomocou služieb Azure AI. Potom na model použijeme na odvodenie multivariálnych anomálií v množine údajov obsahujúcej syntetické merania z troch senzorov IoT.

Dôležité

Od 20. septembra 2023 nebudete môcť vytvárať nové zdroje detektora anomálií. Služba Anomaly Detector sa vyradí 1. októbra 2026.

Ďalšie informácie o detektore anomálií Azure AI nájdete na tejto stránke dokumentácie.

Požiadavky

  • Predplatné služby Azure – vytvorte si ho zdarma
  • Pripojte svoj notebook k jazeru. Na ľavej strane vyberte položku Pridať a pridajte existujúci lakehouse alebo vytvorte jazero.

Inštalácia

Pomocou pokynov na vytvorenie Anomaly Detector zdroja pomocou portálu Azure alebo alternatívne môžete na vytvorenie tohto zdroja použiť aj Azure CLI.

Po nastavení Anomaly Detectormôžete preskúmať metódy spracovania údajov rôznych formulárov. Katalóg služieb v službe Azure AI poskytuje niekoľko možností: Vision, Speech, Language, Web search, Decision, Translation a Document Intelligence.

Vytvorenie zdroja detektora anomálií

  • Na portáli Azure vyberte položku Vytvoriť vo svojej skupine zdrojov a potom zadajte text Anomaly Detector (Detektor anomálií). Vyberte zdroj Detektor anomálií.
  • Pomenujte prostriedok a v ideálnom prípade použite rovnakú oblasť ako zvyšok skupiny prostriedkov. Použite predvolené možnosti pre ostatné a potom vyberte položku Skontrolovať a vytvoriť.
  • Po vytvorení zdroja Anomaly Detector ho otvorte a vyberte Keys and Endpoints panel na ľavej navigačnej table. Skopírujte kľúč zdroja Anomaly Detector do premennej ANOMALY_API_KEY prostredia alebo ho uložte do premennej anomalyKey .

Vytvorenie prostriedku konta úložiska

Ak chcete uložiť priebežné údaje, musíte si vytvoriť konto úložiska objektu Blob platformy Azure. V rámci tohto konta úložiska vytvorte kontajner na uloženie prechodných údajov. Poznačte si názov kontajnera a skopírujte reťazec pripojenia do tohto kontajnera. Premennú a BLOB_CONNECTION_STRING premennú prostredia budete potrebovať neskôrcontainerName.

Zadanie kľúčov služby

Začnime nastavením premenných prostredia pre naše kľúče služby. Ďalšia bunka nastaví ANOMALY_API_KEY premenné prostredia a BLOB_CONNECTION_STRING na základe hodnôt uložených v službe Azure Key Vault. Ak ste spustili tento kurz vo vlastnom prostredí, skôr ako budete pokračovať, uistite sa, že ste tieto premenné prostredia nastavili.

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Teraz umožňuje čítať ANOMALY_API_KEY premenné prostredia a BLOB_CONNECTION_STRING nastaviť containerName premenné a location .

# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

Najprv sa pripojíme k nášmu kontu úložiska, aby tu detektor anomálií mohol ušetriť priebežné výsledky:

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

Importujme všetky potrebné moduly.

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.cognitive import *

Teraz si prejdime naše vzorové údaje do údajového rámca Spark.

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

Teraz môžeme vytvoriť estimator objekt, ktorý sa používa na trénovaie nášho modelu. Určujeme časy začiatku a konca tréningových údajov. Zadáme tiež vstupné stĺpce, ktoré sa majú použiť, a názov stĺpca, ktorý obsahuje časové pečiatky. Nakoniec určíme počet údajových bodov, ktoré sa použijú v posuvné okne detekcie anomálií, a nastavíme reťazec pripojenia na konto ukladacieho priestoru objektu Blob platformy Azure.

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

Teraz, keď sme vytvorili zostavu estimator, prispôsobme ju údajom:

model = estimator.fit(df)
```parameter

Once the training is done, we can now use the model for inference. The code in the next cell specifies the start and end times for the data we would like to detect the anomalies in.

```python
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

Keď sme volali .show(5) do predchádzajúcej bunky, zobrazilo sa v nej prvých päť riadkov v údajovom rámci. Výsledky boli všetky null , pretože neboli vnútri okna inferencie.

Ak chcete zobraziť výsledky iba pre odvodené údaje, umožňuje vybrať potrebné stĺpce. Potom môžeme zoradiť riadky v údajovom rámci podľa vzostupného poradia a filtrovať výsledok tak, aby zobrazoval iba riadky, ktoré sú v rozsahu okna inferencie. V našom prípade inferenceEndTime je rovnaký ako posledný riadok v údajovom prvku, takže to môžeme ignorovať.

Nakoniec, aby ste mohli lepšie vykresliť výsledky, umožňuje skonvertovať údajový rámec Spark na údajový rámec Pandas.

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

contributors Naformátujte stĺpec, ktorý uchováva skóre príspevku z každého senzora do zistených anomálií. Ďalšia bunka naformátuje tieto údaje a rozdelí skóre príspevku každého senzora do svojho vlastného stĺpca.

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}

rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

Výborne! Teraz máme skóre príspevku senzorov 1, 2 a 3 v series_0stĺpcoch , series_1a series_2 .

Spustením ďalšej bunky vykreslíte výsledky. Parameter minSeverity určuje minimálnu závažnosť anomálií, ktoré sa majú vykresliť.

minSeverity = 0.1

####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

Snímka obrazovky zobrazujúca vykreslenie výsledkov viacnásobnej detekcie anomálií.

Na pozemkoch sa zobrazujú nespracované údaje zo senzorov (vo vnútri okna inferencie) v oranžovej, zelenej a modrej. Červené zvislé čiary na prvom obrázku zobrazujú zistené anomálie, ktoré majú závažnosť väčšiu alebo rovnajúcu sa minSeverityhodnote .

Druhý vykreslený graf zobrazuje skóre závažnosti všetkých zistených anomálií s prahovou hodnotou uvedenou minSeverity v bodkovanej červenej čiare.

Nakoniec posledné vykreslenia ukazuje príspevok údajov z každého senzora k zisteným anomáliám. Pomáha nám diagnostikovať a pochopiť najpravdepodobne najväčšiu príčinu každej anomálie.