Bagikan melalui


Membangun model pembelajaran mesin dengan Apache Spark MLlib

Dalam artikel ini, Anda mempelajari cara menggunakan Apache Spark MLlib untuk membuat aplikasi pembelajaran mesin yang menangani analisis prediktif sederhana pada himpunan data terbuka Azure. Spark menyediakan pustaka pembelajaran mesin bawaan. Contoh ini menggunakan klasifikasi melalui regresi logistik.

Pustaka SparkML dan MLlib Spark inti menyediakan banyak utilitas yang berguna untuk tugas pembelajaran mesin. Utilitas ini cocok untuk:

  • Klasifikasi
  • Pengklusteran
  • Pengujian hipotesis dan penghitungan statistik sampel
  • Regresi
  • Penguraian nilai tunggal (SVD) dan analisis komponen utama (PCA)
  • Pemodelan topik

Memahami klasifikasi dan regresi logistik

Klasifikasi, tugas pembelajaran mesin populer, melibatkan pengurutan data input ke dalam kategori. Algoritma klasifikasi harus mencari tahu cara menetapkan label ke data input yang disediakan. Misalnya, algoritma pembelajaran mesin dapat menerima informasi stok sebagai input, dan membagi stok menjadi dua kategori: saham yang harus Anda jual dan saham yang harus Anda simpan.

Algoritma regresi logistik berguna untuk klasifikasi. API regresi logistik Spark berguna untuk klasifikasi biner data input ke dalam salah satu dari dua grup. Untuk informasi selengkapnya tentang regresi logistik, lihat Wikipedia.

Regresi logistik menghasilkan fungsi logistik yang dapat memprediksi probabilitas bahwa vektor input termasuk dalam satu grup atau yang lain.

Contoh analisis prediktif data taksi NYC

Pertama, instal azureml-opendatasets. Data tersedia melalui sumber daya Azure Open Datasets . Subset himpunan data ini menghosting informasi tentang perjalanan taksi kuning, termasuk waktu mulai, waktu akhir, lokasi mulai, lokasi akhir, biaya perjalanan, dan atribut lainnya.

%pip install azureml-opendatasets

Sisa artikel ini bergantung pada Apache Spark untuk terlebih dahulu melakukan beberapa analisis pada data tip perjalanan taksi NYC dan kemudian mengembangkan model untuk memprediksi apakah perjalanan tertentu menyertakan tip atau tidak.

Buat model pembelajaran mesin Apache Spark

  1. Buat buku catatan PySpark. Untuk informasi selengkapnya, kunjungi Membuat buku catatan.

  2. Impor tipe yang diperlukan untuk buku catatan ini.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Kami akan menggunakan MLflow untuk melacak eksperimen pembelajaran mesin dan eksekusi yang sesuai. Jika Microsoft Fabric Autologging diaktifkan, metrik dan parameter yang sesuai akan diambil secara otomatis.

    import mlflow
    

Menyusun DataFrame input

Contoh ini memuat data ke dalam dataframe Pandas, lalu mengonversinya menjadi dataframe Apache Spark. Dalam format itu, kita dapat menerapkan operasi Apache Spark lainnya untuk membersihkan dan memfilter himpunan data.

  1. Tempelkan baris ini ke dalam sel baru, dan jalankan untuk membuat Spark DataFrame. Langkah ini mengambil data melalui Open Datasets API. Kita dapat memfilter data ini ke bawah untuk memeriksa jendela data tertentu. Contoh kode menggunakan start_date dan end_date untuk menerapkan filter yang mengembalikan satu bulan data.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Kode ini mengurangi himpunan data menjadi sekitar 10.000 baris. Untuk mempercepat pengembangan dan pelatihan, kode mengambil sampel himpunan data kami untuk saat ini.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Kami ingin melihat data kami menggunakan perintah bawaan display() . Dengan perintah ini, kita dapat dengan mudah melihat sampel data, atau secara grafis menjelajahi tren dalam data.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Menyiapkan data

Persiapan data adalah langkah penting dalam proses pembelajaran mesin. Ini melibatkan pembersihan, transformasi, dan organisasi data mentah, untuk membuatnya cocok untuk analisis dan pemodelan. Dalam sampel kode ini, Anda melakukan beberapa langkah persiapan data:

  • Memfilter himpunan data untuk menghapus outlier dan nilai yang salah
  • Menghapus kolom yang tidak diperlukan untuk pelatihan model
  • Membuat kolom baru dari data mentah
  • Membuat label untuk menentukan apakah perjalanan Taksi tertentu melibatkan tip atau tidak
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Selanjutnya, buat yang kedua melewati data untuk menambahkan fitur akhir.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Membuat model regresi logistik

Tugas akhir mengonversi data berlabel menjadi format yang dapat ditangani oleh regresi logistik. Input ke algoritma regresi logistik harus memiliki struktur pasangan vektor label/fitur, di mana vektor fitur adalah vektor angka yang mewakili titik input.

Berdasarkan persyaratan tugas akhir, kita harus mengonversi kolom kategoris menjadi angka. Secara khusus, kita harus mengonversi trafficTimeBins kolom dan weekdayString menjadi representasi bilangan bulat. Kami memiliki banyak opsi yang tersedia untuk menangani persyaratan ini. Contoh ini melibatkan OneHotEncoder pendekatan:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Tindakan ini menghasilkan DataFrame baru dengan semua kolom dalam format yang tepat untuk melatih model.

Melatih model regresi logistik

Tugas pertama membagi himpunan data menjadi set pelatihan, dan set pengujian atau validasi.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Setelah kita memiliki dua DataFrames, kita harus membuat rumus model dan menjalankannya terhadap DataFrame pelatihan. Kemudian kita dapat memvalidasi terhadap dataFrame pengujian. Bereksperimenlah dengan versi rumus model yang berbeda untuk melihat efek kombinasi yang berbeda.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Output sel:

Area under ROC = 0.9749430523917996

Membuat representasi visual dari prediksi

Kita sekarang dapat membangun visualisasi akhir untuk menginterpretasikan hasil model. Kurva ROC tentu dapat menyajikan hasilnya.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Grafik yang menunjukkan kurva ROC untuk regresi logistik dalam model tip.