Vytvorenie modelu strojového učenia pomocou apache Spark MLlib
V tomto článku sa dozviete, ako používať Apache Spark MLlib na vytvorenie aplikácie strojového učenia, ktorá spracováva jednoduchú prediktívnu analýzu otvorenej množiny údajov Azure. Spark poskytuje vstavané knižnice strojového učenia. V tomto príklade sa používa klasifikácia prostredníctvom logistickej regresie.
Základné knižnice SparkML a MLlib Spark poskytujú mnoho pomôcok, ktoré sú užitočné pri úlohách strojového učenia. Tieto pomôcky sú vhodné pre:
- Klasifikácia
- Clustering
- Testovanie hypotézy a výpočet štatistiky ukážky
- Regresia
- Dekompozícia hodnôt v jednotnom čísle (SVD) a analýza hlavných súčastí (PCA)
- Modelovanie tém
Vysvetlenie klasifikácie a logistickej regresie
Klasifikácia, populárna úloha strojového učenia, zahŕňa zoradenie vstupných údajov do kategórií. Algoritmus klasifikácie by mal zistiť, ako priradiť označenia k zadaným vstupným údajom. Napríklad algoritmus strojového učenia môže prijať informácie o zásobách ako vstup a rozdeliť akcie do dvoch kategórií: akcie, ktoré by ste mali predať a akcie, ktoré by ste si mali ponechať.
Algoritmus logistickej regresie je užitočný pri klasifikácii. Rozhranie Spark logistická regresia API je užitočné pre binárnu klasifikáciu vstupných údajov do jednej z dvoch skupín. Ďalšie informácie o logistickej regresii nájdete na Wikipédii.
Logistická regresia produkuje logistickú funkciu , ktorá dokáže predpovedať pravdepodobnosť, že vstupná vektor patrí do jednej alebo druhej skupiny.
Prediktívna analýza príklad nyc taxi dáta
Najprv nainštalujte .azureml-opendatasets
Údaje sú k dispozícii prostredníctvom prostriedku Azure Open Datasets . Táto podmnožina údajov hosťuje informácie o žltých cestách taxi, vrátane časov spustenia, koncových časov, miest začatia, koncových miest, nákladov na cestu a ďalších atribútov.
%pip install azureml-opendatasets
Zvyšok tohto článku sa spolieha na Apache Spark najprv vykonať nejakú analýzu na NYC taxi-výlet tip dáta a potom vyvinúť model predpovedať, či konkrétny výlet zahŕňa tip, alebo nie.
Vytvorenie modelu strojového učenia platformy Apache Spark
Vytvorte poznámkový blok v PySparku. Ďalšie informácie nájdete v téme Vytvorenie poznámkového bloku.
Importujte typy požadované pre tento poznámkový blok.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
Na sledovanie experimentov v strojovom učení a zodpovedajúcich spustení použijeme MLflow . Ak je automatické označovanie služby Microsoft Fabric povolené, automaticky sa zaznamenajú príslušné metriky a parametre.
import mlflow
Vytvorenie vstupného prvku DataFrame
Tento príklad načíta údaje do údajového rámca Pandas a potom ich skonvertuje na údajový rámec Apache Spark. V tomto formáte môžeme na vyčistenie a filtrovanie množiny údajov použiť ďalšie operácie služby Apache Spark.
Prilepte tieto riadky do novej bunky a spustite ich, aby ste vytvorili údajový rámec služby Spark. Tento krok načíta údaje prostredníctvom rozhrania API Otvorených množín údajov. Tieto údaje môžeme vyfiltrovať a preskúmať konkrétne okno údajov. Príklad kódu používa
start_date
aend_date
používa filter, ktorý vráti jeden mesiac údajov.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
Tento kód zníži množinu údajov na približne 10 000 riadkov. S cieľom urýchliť vývoj a trénovanie sa vzorky kódu nateraz ukážu do našej množiny údajov.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
Na naše údaje sa chceme pozrieť pomocou vstavaného
display()
príkazu. Pomocou tohto príkazu môžeme jednoducho zobraziť ukážku údajov alebo graficky preskúmať trendy v údajoch.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Príprava údajov
Príprava údajov je kľúčovým krokom v procese strojového učenia. Zahŕňa čistenie, transformáciu a usporiadanie nespracovaných údajov, aby boli vhodné na analýzu a modelovanie. V tejto ukážke kódu vykonáte niekoľko krokov prípravy údajov:
- Filtrovanie množiny údajov na odstránenie odchýlok a nesprávnych hodnôt
- Odstránenie stĺpcov, ktoré nie sú potrebné na trénovaie modelu
- Vytvorenie nových stĺpcov zo nespracovaných údajov
- Generovať označenie na určenie, či danú cestu taxi zahŕňa tip
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Potom vytvorte druhý prechod cez údaje a pridajte konečné funkcie.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Vytvorenie logistického regresného modelu
Konečná úloha skonvertuje označené údaje do formátu, ktorý dokáže spracovať logistická regresia. Vstup do logistického regresného algoritmu musí mať štruktúru párov označenie/vektory funkcie, kde vektor funkcie je vektorom čísel, ktoré predstavujú vstupný bod.
Na základe požiadaviek na poslednú úlohu musíme konvertovať kategorické stĺpce na čísla. Konkrétne musíme konvertovať stĺpce a weekdayString
na trafficTimeBins
celočíselné vyjadrenia. Na spracovanie tejto požiadavky máme k dispozícii veľa možností. Tento príklad zahŕňa OneHotEncoder
prístup:
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Výsledkom tejto akcie je nový údajový rámec so všetkými stĺpcami v správnom formáte, ktorý umožňuje trénovať model.
Trénovanie logistického regresného modelu
Prvá úloha rozdelí množinu údajov na tréningovú množinu a testovaciu alebo overenú množinu.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Keď budeme mať dva údajové rámce, musíme vytvoriť vzorec modelu a spustiť ho na trénovaní prvku DataFrame. Potom sa môžeme overiť pomocou testovacieho údajového rámca. Experimentujte s rôznymi verziami vzorca modelu, aby ste videli účinky rôznych kombinácií.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
Výstupy bunky:
Area under ROC = 0.9749430523917996
Vytvorenie vizuálneho vyjadrenia predpovede
Teraz môžeme vytvoriť konečnú vizualizáciu na interpretáciu výsledkov modelu. Krivka ROC môže určite predstavovať výsledok.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Súvisiaci obsah
- Používanie ukážok umelej inteligencie na vytvorenie modelov strojového učenia: Používanie ukážok umelej inteligencie
- Sledovanie spustení strojového učenia pomocou experimentov: experimenty strojového učenia