pandas függvény API-k
A pandas függvény API-k lehetővé teszik, hogy közvetlenül alkalmazzon egy Python natív függvényt, amely pandas-példányokat vesz fel és ad ki egy PySpark DataFrame-hez. A pandas felhasználó által definiált függvényekhez, a függvény API-k Apache Arrow is használják az adatok átvitelére, a pandas pedig az adatokkal való együttműködésre; A Python-típusmutatók azonban nem kötelezőek a pandas függvény API-jaiban.
A pandas függvény API-jainak három típusa létezik:
- Csoportosított térkép
- Térkép
- Csoportosított térkép
A pandas függvény API-k ugyanazt a belső logikát használják, amelyet a pandas UDF-végrehajtás használ. Olyan jellemzőkkel rendelkeznek, mint a PyArrow, a támogatott SQL-típusok és a konfigurációk.
További információt az Apache Spark 3.0 közelgő kiadásában New Pandas UDFs és Python Type Hints című blogbejegyzésbentalál.
Csoportosított térkép
A csoportosított adatokat a "split-apply-combine" minta megvalósítása érdekében a groupBy().applyInPandas()
segítségével alakítja át. A split-apply-combine három lépésből áll:
- Ossza fel az adatokat csoportokra a
DataFrame.groupBy
használatával. - Alkalmazz egy függvényt minden csoportra. A függvény bemenete és kimenete egyaránt
pandas.DataFrame
. A bemeneti adatok tartalmazzák az összes sort és az egyes csoportokhoz tartozó columns értéket. - Egyesítse az eredményeket egy új
DataFrame
.
A groupBy().applyInPandas()
használatához a következőket kell megadnia:
- Egy Python-függvény, amely meghatározza az egyes csoportok számítását
- A
StructType
objektum vagy egy sztring, amely aDataFrame
kimenet schema-ját határozza meg.
A visszaadott pandas.DataFrame
column címkéinek meg kell egyezniük a meghatározott kimeneti schema mezőnevekkel, ha stringként vannak megadva, vagy pozíció szerint kell egyezniük az adattípusokkal, ha nem stringekről, például egész számindexekről van szó. Lásd pandas. A DataFrame a columnspandas.DataFrame
létrehozásakor történő címkézéséhez.
Egy csoport összes adata betöltődik a memóriába a függvény alkalmazása előtt. Ez a memória kivételek kiváltásához vezethet, különösen akkor, ha a csoportméretek el vannak tolódva. A maxRecordsPerBatch konfigurációja nincs alkalmazva a csoportokra, és Önnek kell gondoskodnia arról, hogy a csoportosított adatok elférjenek a rendelkezésre álló memóriában.
Az alábbi példa bemutatja, hogyan vonhatja ki a középértéket a csoport minden egyes értékéből a groupby().apply()
használatával.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
A részletes használatért lásd: pyspark.sql.GroupedData.applyInPandas.
Térkép
A pandas-példányokkal végzett térképműveleteket a DataFrame.mapInPandas()
hajtja végre annak érdekében, hogy a pandas.DataFrame
iterátorát a pandas.DataFrame
egy másik iterátorává alakítsa, amely az aktuális PySpark DataFrame-et jelöli, és az eredményt PySpark DataFrame-ként adja vissza.
A mögöttes függvény a pandas.DataFrame
iterátorát veszi fel és adja ki. Tetszőleges hosszúságú kimenetet adhat vissza, ellentétben bizonyos pandas UDF-ekkel, mint például a sorozatról sorozatra típusú.
Az alábbi példa a mapInPandas()
használatát mutatja be:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
A részletes használatért lásd: pyspark.sql.DataFrame.mapInPandas.
Csoportosított térkép
Pandas példányokkal végzett cogrouping térképműveletek esetén használja a DataFrame.groupby().cogroup().applyInPandas()
-t két PySpark DataFrame
csoportosítására közös kulcs alapján, majd alkalmazzon egy Python-függvényt az egyes csoportokra az alábbi módon.
- Rendezd át az adatokat úgy, hogy azok az adatkeret csoportok, amelyek kulcsot osztanak meg, együtt legyenek társítva.
- Függvény alkalmazása minden kogroupra. A függvény bemenete két
pandas.DataFrame
, és opcionálisan egy kulcsot ábrázoló tömb lehet. A függvény kimenete egypandas.DataFrame
. - Egyesítse az összes csoport
pandas.DataFrame
-ját egy új PySparkDataFrame
-be.
A groupBy().cogroup().applyInPandas()
használatához a következőket kell megadnia:
- Egy Python-függvény, amely meghatározza az egyes csoportok számítását.
- Egy
StructType
objektum vagy egy sztring, amely meghatározza a PySpark-kimenetDataFrame
schema-jét.
A visszaadott pandas.DataFrame
column címkéinek meg kell egyezniük a megadott kimeneti schema mezőnevekkel, ha sztringként vannak megadva, vagy a mező adattípusait pozíció szerint kell illeszteniük - például egész szám indexekkel -, ha nem sztringekről van szó. Lásd a pandas.DataFrame címkézésének módját a columnspandas.DataFrame
létrehozásakor.
A csoport összes adata betöltődik a memóriába a függvény alkalmazása előtt. Ez memória-kivételhez vezethet, különösen akkor, ha a csoportméretek el vannak tolódva. A maxRecordsPerBatch konfigurációja nincs alkalmazva, és Önnek kell gondoskodnia arról, hogy a csoportosított adatok elférjenek a rendelkezésre álló memóriában.
Az alábbi példa bemutatja, hogyan hajthat végre groupby().cogroup().applyInPandas()
két adathalmaz között asof join
.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
A részletes használatért lásd: pyspark.sql.PandasCogroupedOps.applyInPandas.