fungsi API pandas
API fungsi pandas memungkinkan Anda untuk langsung menerapkan fungsi bawaan Python yang menerima dan menghasilkan instans pandas ke PySpark DataFrame. Mirip dengan fungsi panda yang ditentukan pengguna, API fungsi juga menggunakan Apache Arrow untuk mentransfer data dan panda untuk bekerja dengan data; namun, petunjuk jenis Python bersifat opsional dalam API fungsi panda.
Ada tiga jenis API fungsi panda:
- Peta yang dikelompokkan
- Peta
- Peta yang dikogrupkan
API fungsi pandas memanfaatkan logika internal yang sama seperti yang digunakan oleh eksekusi UDF pandas. Mereka berbagi karakteristik seperti PyArrow, jenis SQL yang didukung, dan konfigurasi.
Untuk informasi selengkapnya, lihat posting blog UDF Panda Baru dan Petunjuk Jenis Python dalam Rilis Apache Spark 3.0mendatang.
Peta yang dikelompokkan
Anda mengubah data yang dikelompokkan menggunakan groupBy().applyInPandas()
untuk menerapkan pola "split-apply-combine". Split-apply-combine terdiri dari tiga langkah:
- Pisahkan data menjadi grup dengan menggunakan
DataFrame.groupBy
. - Terapkan fungsi pada setiap grup. Input dan output dari fungsi adalah keduanya
pandas.DataFrame
. Data input berisi semua baris dan kolom untuk setiap grup. - Gabungkan hasilnya ke dalam
DataFrame
baru .
Untuk menggunakan groupBy().applyInPandas()
, Anda harus menentukan hal berikut:
- Fungsi Python yang menentukan komputasi untuk setiap grup
- Objek
StructType
atau string yang menentukan skema outputDataFrame
Label kolom dari pandas.DataFrame
yang dikembalikan harus cocok dengan nama bidang dalam skema output yang ditentukan jika ditentukan sebagai string, atau cocok dengan jenis data bidang berdasarkan posisi jika bukan string, misalnya, indeks bilangan bulat. Periksa pandas.DataFrame untuk cara memberi label kolom saat membuat pandas.DataFrame
.
Semua data untuk grup dimuat ke dalam memori sebelum fungsi diterapkan. Ini dapat menyebabkan kesalahan memori habis, terutama jika ukuran grup tidak merata. Konfigurasi untuk maxRecordsPerBatch
Contoh berikut menunjukkan cara menggunakan groupby().apply()
untuk mengurangi rata-rata dari setiap nilai dalam grup.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Untuk penggunaan terperinci, lihat pyspark.sql.GroupedData.applyInPandas.
Peta
Anda melakukan operasi peta pada instance panda dengan DataFrame.mapInPandas()
untuk mengubah iterator pandas.DataFrame
menjadi iterator lain pandas.DataFrame
yang mewakili PySpark DataFrame saat ini dan mengembalikan hasilnya sebagai PySpark DataFrame.
Fungsi dasar memproses dan mengeluarkan iterator dari pandas.DataFrame
. Ini dapat mengembalikan output dengan panjang sembarang, berbeda dengan beberapa UDF pandas seperti Series ke Series.
Contoh berikut menunjukkan cara menggunakan mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Untuk penggunaan terperinci, lihat pyspark.sql.DataFrame.mapInPandas.
Peta yang dikogrupkan
Untuk operasi peta yang dikogrupkan dengan instans pandas, gunakan DataFrame.groupby().cogroup().applyInPandas()
untuk mengkogrupkan dua DataFrame
PySpark dengan kunci bersama dan kemudian menerapkan fungsi Python ke setiap cogroup seperti yang diperlihatkan.
- Kocok data sehingga grup dari setiap DataFrame yang berbagi kunci dikelompokkan bersama.
- Terapkan fungsi ke setiap cogroup. Input dari fungsi adalah dua
pandas.DataFrame
(dengan tuplet opsional yang mewakili kunci). Output fungsi adalahpandas.DataFrame
. - Gabungkan
pandas.DataFrame
dari semua grup ke dalam objek PySpark baruDataFrame
.
Untuk menggunakan groupBy().cogroup().applyInPandas()
, Anda harus menentukan hal berikut:
- Fungsi Python yang menentukan komputasi untuk setiap cogroup.
- Objek
StructType
atau string yang menentukan skema output PySparkDataFrame
.
Label kolom dari pandas.DataFrame
yang dikembalikan harus cocok dengan nama bidang dalam skema output yang ditentukan jika ditentukan sebagai string, atau cocok dengan jenis data bidang berdasarkan posisi jika bukan string, misalnya, indeks bilangan bulat. Lihat pandas.DataFrame tentang cara memberi label kolom saat membuat pandas.DataFrame
.
Semua data untuk grup bersama dimuat ke dalam memori sebelum fungsi diterapkan. Ini dapat menyebabkan pengecualian karena kehabisan memori, terutama jika ukuran grup tidak seimbang. Konfigurasi untuk maxRecordsPerBatch tidak diterapkan dan terserah Anda untuk memastikan bahwa data yang dikogrupkan sesuai dengan memori yang tersedia.
Contoh berikut menunjukkan cara menggunakan groupby().cogroup().applyInPandas()
untuk melakukan asof join
antara dua himpunan data.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Untuk penggunaan terperinci, lihat pyspark.sql.PandasCogroupedOps.applyInPandas.