Kurz: Vytvorenie, trénovanie a vyhodnotenie modelu upliftu
Tento kurz predstavuje komplexný príklad pracovného postupu synapse Data Science v službe Microsoft Fabric. Naučíte sa vytvárať, trénovať a hodnotiť modely upliftingu a používať techniky modelovania povznesenia.
Predpoklady
Získajte predplatné služby Microsoft Fabric . Alebo si zaregistrujte bezplatnú skúšobnú služby Microsoft Fabric.
Prihláste sa do služby Microsoft Fabric.
Pomocou prepínača skúseností v ľavej dolnej časti domovskej stránky sa prepnete na službu Fabric.
- Znalosť poznámkových blokov služby Microsoft Fabric
- Lakehouse pre tento notebook, na ukladanie údajov pre tento príklad. Ďalšie informácie nájdete Pridanie jazera do notebooku
Sledovanie v poznámkovom bloke
Môžete si vyskúšať tieto kroky v poznámkovom bloke jedným z dvoch spôsobov:
- Otvorte a spustite vstavaný poznámkový blok.
- Nahrajte poznámkový blok z GitHubu.
Otvorenie vstavaného poznámkového bloku
Tento kurz sprevádza ukážkový modelovanie notebooku Povznesenie.
Ak chcete otvoriť vzorový poznámkový blok pre tento kurz, postupujte podľa pokynov v téme Príprava systému na kurzy dátovej vedy.
Uistite sa, že pripojiť lakehouse na notebook, ako začnete bežať kód.
Importovanie notebooku z GitHubu
Tento kurz sprevádza AIsample - Uplift Modeling.ipynb notebook.
Ak chcete otvoriť sprievodný poznámkový blok pre tento kurz, postupujte podľa pokynov v téme Príprava systému na kurzy dátových viedna import notebooku do pracovného priestoru.
Ak by ste radšej skopírovali a prilepiť kód z tejto stránky, môžete vytvoriť nový poznámkový blok.
Uistite sa, že pripojiť lakehouse k notebooku, ako začnete bežať kód.
Krok č. 1: Načítanie údajov
Množina údajov
Criteo AI Lab vytvoril množinu údajov. Táto množina údajov má 13 miliónov riadkov. Každý riadok predstavuje jedného používateľa. Každý riadok má 12 funkcií, indikátor spracovania a dve binárne označenia, ktoré zahŕňajú návštevu a konverziu.
- f0 – f11: odporúčané hodnoty (husté, plávajúce hodnoty)
- liečba: či bol používateľ náhodne cieľový na liečbu (napríklad reklama) (1 = liečba, 0 = kontrola)
- konverzie: či došlo k konverzii (napríklad ku kúpe) pre používateľa (binárne označenie)
- navštívte: či došlo k konverzii (napríklad ku nákupu) používateľa (binárneho označenia)
Citát
- Domovská stránka množiny údajov: https://ailab.criteo.com/criteo-uplift-prediction-dataset/
Množina údajov použitá v tomto poznámkovom bloku vyžaduje toto citácie bibTex:
@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}
Tip
Definovaním nasledujúcich parametrov môžete tento poznámkový blok jednoducho použiť v rôznych množinách údajov.
IS_CUSTOM_DATA = False # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"
# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"
EXPERIMENT_NAME = "aisample-upliftmodelling" # MLflow experiment name
Import knižníc
Pred spracovaním musíte importovať požadované knižnice Spark a SynapseML. Musíte tiež importovať knižnicu vizualizácií údajov , napríklad knižnicu vizualizácií údajov v jazyku Python Seaborn. Knižnica vizualizácie údajov poskytuje rozhranie vysokej úrovne na vytváranie vizuálnych prostriedkov v údajových prvkoch a poliach. Ďalšie informácie o Spark, synapseMLa Seaborn.
import os
import gzip
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns
%matplotlib inline
from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics
import mlflow
Stiahnutie množiny údajov a nahratie do služby lakehouse
Tento kód stiahne verejne dostupnú verziu množiny údajov a potom tento zdroj údajov uloží v úložisku Fabric lakehouse.
Dôležitý
Pred spustením nezabudnite Pridať lakehouse do notebooku. Ak to neurobíte, bude to mať za následok chybu.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
download_file = "criteo-research-uplift-v2.1.csv.gz"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{DATA_FILE}"):
r = requests.get(f"{remote_url}", timeout=30)
with open(f"{download_path}/{download_file}", "wb") as f:
f.write(r.content)
with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
fout.write(fin.read())
print("Downloaded demo data files into lakehouse.")
Spustite nahrávanie modulu runtime tohto poznámkového bloku.
# Record the notebook running time
import time
ts = time.time()
Nastavenie sledovania experimentov toku MLflow
Ak chcete rozšíriť možnosti zapisovania do denníka toku MLflow, automatické označovanie automaticky zaznamenáva hodnoty vstupných parametrov a výstupné metriky modelu strojového učenia počas jeho trénovania. Tieto informácie sa potom prihlásia do pracovného priestoru, kde môžu rozhrania API toku MLflow alebo príslušný experiment v pracovnom priestore získať prístup a vizualizovať ich. Ďalšie informácie o automatickom označovaní nájdete v tomto zdroji.
# Set up the MLflow experiment
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True) # Disable MLflow autologging
Nota
Ak chcete zakázať automatické označovanie služby Microsoft Fabric v relácii poznámkového bloku, zavolajte mlflow.autolog()
a nastavte disable=True
.
Čítať údaje z jazera
Prečítajte si nespracované údaje zo sekcie lakehouse Files a pridajte ďalšie stĺpce pre rôzne časti dátumov. Rovnaké informácie sa používajú na vytvorenie rozdelených delta tabuliek.
raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()
Krok č. 2: Prieskumná analýza údajov
Pomocou príkazu display
môžete zobraziť štatistiky vysokej úrovne množiny údajov. Môžete tiež zobraziť zobrazenia grafu na jednoduchú vizualizáciu podmnožín množiny údajov.
display(raw_df.limit(20))
Skontrolujte percento používateľov, ktorí navštevujú, percento používateľov, ktorí konvertujú, a percento návštevníkov, ktorí konvertujú.
raw_df.select(
F.mean("visit").alias("Percentage of users that visit"),
F.mean("conversion").alias("Percentage of users that convert"),
(F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()
Analýza naznačuje, že 4,9% používateľov z ošetrovateľskej skupiny - používateľov, ktorí dostali liečbu, alebo reklamy - navštívilo online obchod. Iba 3,8% používateľov z kontrolnej skupiny - používatelia, ktorí nikdy nedostali liečbu, alebo nikdy neboli ponúkaní alebo vystavení reklame - urobili to isté. Okrem toho 0,31% všetkých používateľov zo skupiny spracovania skonvertovaných alebo vykonaných nákup - zatiaľ čo len 0,19% používateľov z ovládacieho skupiny tak urobilo. V dôsledku toho miera konverzie návštevníkov, ktorí vykonali nákup, ktorí boli tiež členmi skupiny liečby, je 6,36%, v porovnaní s iba 5,07%** pre užívateľov kontrolnej skupiny. Na základe týchto výsledkov môže liečba potenciálne zvýšiť mieru návštevnosti približne o 1%a mieru konverzie návštevníkov približne o 1,3%. Liečba vedie k významnému zlepšeniu.
Krok č. 3: Definovanie modelu na trénovaie
Príprava tréningu a testovanie množín údajov
Na tomto raw_df
údajového rámca vojdete transformátor Featurize, ktorý extrahuje funkcie zo zadaných vstupných stĺpcov a vytvorí výstup týchto funkcií do nového stĺpca s názvom features
.
Výsledný údajový rámec sa uloží do nového prvku DataFrame s názvom df
.
transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())
# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()
Príprava množín údajov na spracovanie a kontrolu
Po vytvorení tréningových a testovacích množín údajov musíte tiež sformovať ošetrovacie a ovládacie množiny údajov, aby ste mohli trénovať modely strojového učenia na meranie vzostupu.
# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")
Teraz, keď ste si pripravili údaje, môžete pokračovať a trénovať model pomocou lightGBM.
Modelovanie povzdvihnutie: T-Learner s LightGBM
Metasúdaje sú množinou algoritmov, ktoré sú postavené na algoritmoch strojového učenia, ako sú LightGBM, Xgboost atď. Pomáhajú odhadnúť priemerný účinok podmieneného spracovania alebo cate. T-študent je meta-študent, ktorý nepoužíva jeden model. Namiesto toho T-learner používa jeden model na každú premennú liečby. Preto sa vyvíjajú dva modely a na metasúdaje odkazujeme ako na T-študenta. T-študent používa viacero modelov strojového učenia na prekonanie problému úplného zahadzovania liečby tým, že núti študenta, aby sa najprv na to rozdelil.
mlflow.autolog(exclusive=False)
classifier = (
LightGBMClassifier(dataTransferMode="bulk")
.setFeaturesCol("features") # Set the column name for features
.setNumLeaves(10) # Set the number of leaves in each decision tree
.setNumIterations(100) # Set the number of boosting iterations
.setObjective("binary") # Set the objective function for binary classification
.setLabelCol(LABEL_COLUMN) # Set the column name for the label
)
# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")
# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
treatment_run_id = treatment_run.info.run_id # Get the ID of the treatment run
treatment_model = classifier.fit(treatment_train_df) # Fit the classifier on the treatment training data
# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
control_run_id = control_run.info.run_id # Get the ID of the control run
control_model = classifier.fit(control_train_df) # Fit the classifier on the control training data
Použitie testovacej množiny údajov na vytvorenie predpovede
Tu môžete použiť treatment_model
a control_model
, ktoré boli definované predtým, na transformáciu test_df
testovacej množiny údajov. Potom vypočítate predpokladané zvýšenie. Predpovedanú zvýšenie definujete ako rozdiel medzi predpovedaným výsledkom liečby a predpovedaným výsledkom kontroly. Čím väčšia je táto predpovedaná zvýšenie rozdiel, tým väčšia je účinnosť liečby (napríklad reklama) na jednotlivca alebo podskupiny.
getPred = F.udf(lambda v: float(v[1]), FloatType())
# Cache the resulting DataFrame for easier access
test_pred_df = (
test_df.mlTransform(treatment_model)
.withColumn("treatment_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.mlTransform(control_model)
.withColumn("control_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
.select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
.cache()
)
# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))
Vykonať vyhodnocovanie modelu
Vzhľadom k tomu, skutočné zvýšenie nie je možné pozorovať pre každého jednotlivca, je potrebné merať zvýšenie nad skupinu jednotlivcov. Môžete použiť uplift Krivka, ktorá vykreslí skutočné, kumulatívne zvýšenie v celej populácii.
Os x predstavuje pomer populácie vybranej na liečbu. Hodnota 0 naznačuje, žiadna liečba skupiny - nikto nie je vystavený, alebo ponúkané, liečba. Hodnota 1 naznačuje plnú liečebnú skupinu - každý je vystavený alebo ponúkanej liečbe. Os y zobrazuje meranie zvýšenie. Cieľom je nájsť veľkosť liečby skupiny, alebo percento populácie, ktorá by bola ponúknutá alebo vystavená liečbe (napríklad reklama). Týmto prístupom sa optimalizuje cieľový výber s cieľom optimalizovať výsledok.
Najprv zoraďujte poradie testovacieho prvku DataFrame podľa predpovedaného upliftu. Predpovedaný vzostup je rozdiel medzi predpovedaným výsledkom liečby a predpovedaným výsledkom kontroly.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
V ďalšom kroku vypočítajte kumulatívne percento návštev v oboch skupinách liečby aj ovládacích skupín.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Nakoniec v každom percente vypočítate zvýšenie skupiny ako rozdiel medzi kumulatívnym percentom návštev medzi liečbou a kontrolnými skupinami.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Teraz vykreslte krivku vzostupu pre predpoveď testovacej množiny údajov. Pred vykreslením je potrebné skonvertovať údajový rámec PySpark na údajový rámec Pandas.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
Os x predstavuje pomer populácie vybranej na liečbu. Hodnota 0 naznačuje, žiadna liečba skupiny - nikto nie je vystavený, alebo ponúkané, liečba. Hodnota 1 naznačuje plnú liečebnú skupinu - každý je vystavený alebo ponúkanej liečbe. Os y zobrazuje meranie zvýšenie. Cieľom je nájsť veľkosť liečby skupiny, alebo percento populácie, ktorá by bola ponúknutá alebo vystavená liečbe (napríklad reklama). Týmto prístupom sa optimalizuje cieľový výber s cieľom optimalizovať výsledok.
Najprv zoraďujte poradie testovacieho prvku DataFrame podľa predpovedaného upliftu. Predpovedaný vzostup je rozdiel medzi predpovedaným výsledkom liečby a predpovedaným výsledkom kontroly.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
V ďalšom kroku vypočítajte kumulatívne percento návštev v oboch skupinách liečby aj ovládacích skupín.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Nakoniec v každom percente vypočítate zvýšenie skupiny ako rozdiel medzi kumulatívnym percentom návštev medzi liečbou a kontrolnými skupinami.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Teraz vykreslte krivku vzostupu pre predpoveď testovacej množiny údajov. Pred vykreslením je potrebné skonvertovať údajový rámec PySpark na údajový rámec Pandas.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
Analýza a vzostup krivka ako ukazujú, že top 20% populácie, ako zaradil podľa predpovede, by mal veľký zisk, ak dostali liečbu. To znamená, že prvých 20% populácie predstavuje skupinu podliehajúci jednotlivým údajom. Preto môžete nastaviť cutoff skóre pre požadovanú veľkosť liečby skupiny na 20%, identifikovať cieľový výber zákazníkov pre najväčší vplyv.
cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
"pred_uplift"
]
print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
{"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)
Krok č. 4: Registrácia konečného modelu strojového učenia
MLflow môžete použiť na sledovanie a zaznamenanie všetkých experimentov pre skupiny spracovania aj ovládacích prvkov. Toto sledovanie a zapisovanie do denníka zahŕňajú zodpovedajúce parametre, metriky a modely. Tieto informácie sa zapíšu do denníka pod názvom experimentu v pracovnom priestore na neskoršie použitie.
# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")
control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")
mlflow.end_run()
Zobrazenie experimentov:
- Na ľavom paneli vyberte pracovný priestor.
- Nájdite a vyberte názov experimentu, v tomto prípade aisample-upliftmodelling.
Krok č. 5: Uloženie výsledkov predpovede
Microsoft Fabric ponúka funkciu PREDICT – škálovateľnú funkciu, ktorá podporuje dávkové bodovanie v akomkoľvek výpočtovom nástroji. Umožňuje zákazníkom používať modely strojového učenia. Používatelia môžu vytvárať dávkové predpovede priamo z poznámkového bloku alebo zo stránky položiek pre konkrétny model. Ak sa chcete dozvedieť viac o službe PREDICT, navštívte tento zdroj a zistite, ako používať funkciu PREDICT v službe Microsoft Fabric.
# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")
# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")