Bagikan melalui


fungsi API pandas

API fungsi pandas memungkinkan Anda untuk langsung menerapkan fungsi bawaan Python yang menerima dan menghasilkan instans pandas ke PySpark DataFrame. Mirip dengan fungsi panda yang ditentukan pengguna, API fungsi juga menggunakan Apache Arrow untuk mentransfer data dan panda untuk bekerja dengan data; namun, petunjuk jenis Python bersifat opsional dalam API fungsi panda.

Ada tiga jenis API fungsi panda:

  • Peta yang dikelompokkan
  • Peta
  • Peta yang dikogrupkan

API fungsi pandas memanfaatkan logika internal yang sama seperti yang digunakan oleh eksekusi UDF pandas. Mereka berbagi karakteristik seperti PyArrow, jenis SQL yang didukung, dan konfigurasi.

Untuk informasi selengkapnya, lihat posting blog UDF Panda Baru dan Petunjuk Jenis Python dalam Rilis Apache Spark 3.0mendatang.

Peta yang dikelompokkan

Anda mengubah data yang dikelompokkan menggunakan groupBy().applyInPandas() untuk menerapkan pola "split-apply-combine". Split-apply-combine terdiri dari tiga langkah:

  • Pisahkan data menjadi grup dengan menggunakan DataFrame.groupBy.
  • Terapkan fungsi pada setiap grup. Input dan output dari fungsi adalah keduanya pandas.DataFrame. Data input berisi semua baris dan kolom untuk setiap grup.
  • Gabungkan hasilnya ke dalam DataFramebaru .

Untuk menggunakan groupBy().applyInPandas(), Anda harus menentukan hal berikut:

  • Fungsi Python yang menentukan komputasi untuk setiap grup
  • Objek StructType atau string yang menentukan skema output DataFrame

Label kolom dari pandas.DataFrame yang dikembalikan harus cocok dengan nama bidang dalam skema output yang ditentukan jika ditentukan sebagai string, atau cocok dengan jenis data bidang berdasarkan posisi jika bukan string, misalnya, indeks bilangan bulat. Periksa pandas.DataFrame untuk cara memberi label kolom saat membuat pandas.DataFrame.

Semua data untuk grup dimuat ke dalam memori sebelum fungsi diterapkan. Ini dapat menyebabkan kesalahan memori habis, terutama jika ukuran grup tidak merata. Konfigurasi untuk maxRecordsPerBatch tidak diterapkan pada grup dan terserah Anda untuk memastikan bahwa data yang dikelompokkan sesuai dengan memori yang tersedia.

Contoh berikut menunjukkan cara menggunakan groupby().apply() untuk mengurangi rata-rata dari setiap nilai dalam grup.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Untuk penggunaan terperinci, lihat pyspark.sql.GroupedData.applyInPandas.

Peta

Anda melakukan operasi peta pada instance panda dengan DataFrame.mapInPandas() untuk mengubah iterator pandas.DataFrame menjadi iterator lain pandas.DataFrame yang mewakili PySpark DataFrame saat ini dan mengembalikan hasilnya sebagai PySpark DataFrame.

Fungsi dasar memproses dan mengeluarkan iterator dari pandas.DataFrame. Ini dapat mengembalikan output dengan panjang sembarang, berbeda dengan beberapa UDF pandas seperti Series ke Series.

Contoh berikut menunjukkan cara menggunakan mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Untuk penggunaan terperinci, lihat pyspark.sql.DataFrame.mapInPandas.

Peta yang dikogrupkan

Untuk operasi peta yang dikogrupkan dengan instans pandas, gunakan DataFrame.groupby().cogroup().applyInPandas() untuk mengkogrupkan dua DataFramePySpark dengan kunci bersama dan kemudian menerapkan fungsi Python ke setiap cogroup seperti yang diperlihatkan.

  • Kocok data sehingga grup dari setiap DataFrame yang berbagi kunci dikelompokkan bersama.
  • Terapkan fungsi ke setiap cogroup. Input dari fungsi adalah dua pandas.DataFrame (dengan tuplet opsional yang mewakili kunci). Output fungsi adalah pandas.DataFrame.
  • Gabungkan pandas.DataFramedari semua grup ke dalam objek PySpark baru DataFrame.

Untuk menggunakan groupBy().cogroup().applyInPandas(), Anda harus menentukan hal berikut:

  • Fungsi Python yang menentukan komputasi untuk setiap cogroup.
  • Objek StructType atau string yang menentukan skema output PySpark DataFrame.

Label kolom dari pandas.DataFrame yang dikembalikan harus cocok dengan nama bidang dalam skema output yang ditentukan jika ditentukan sebagai string, atau cocok dengan jenis data bidang berdasarkan posisi jika bukan string, misalnya, indeks bilangan bulat. Lihat pandas.DataFrame tentang cara memberi label kolom saat membuat pandas.DataFrame.

Semua data untuk grup bersama dimuat ke dalam memori sebelum fungsi diterapkan. Ini dapat menyebabkan pengecualian karena kehabisan memori, terutama jika ukuran grup tidak seimbang. Konfigurasi untuk maxRecordsPerBatch tidak diterapkan dan terserah Anda untuk memastikan bahwa data yang dikogrupkan sesuai dengan memori yang tersedia.

Contoh berikut menunjukkan cara menggunakan groupby().cogroup().applyInPandas() untuk melakukan asof join antara dua himpunan data.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Untuk penggunaan terperinci, lihat pyspark.sql.PandasCogroupedOps.applyInPandas.