Rozhraní API funkcí pandas
Rozhraní API funkcí pandas umožňují přímo použít nativní funkci Pythonu, která přebírá a odesílá instance pandas do datového rámce PySpark. Podobně jako uživatelem definované funkce pandas, rozhraní API funkcí také používají Apache Arrow k přenosu dat a knihovny pandas pro práci s daty; V rozhraních API funkcí knihovny pandas jsou však volitelné rady typu Python.
Existují tři typy rozhraní API funkcí pandas:
- Seskupené mapy
- Mapa
- Kogroupovaná mapa
Rozhraní API funkcí pandas využívají stejnou interní logiku, jakou používá provádění UDF pandas. Sdílejí charakteristiky, jako jsou PyArrow, podporované typy SQL a konfigurace.
Další informace najdete v blogovém příspěvku Nové uživatelské definované funkce Pandas a typové pokyny Pythonu v nadcházející verzi Apache Spark 3.0.
Seskupené mapy
Seskupená data přetvoříte pomocí groupBy().applyInPandas()
a implementujete vzorec „split-apply-combine“. Rozdělení, aplikace a kombinace se skládá ze tří kroků:
- Rozdělte data do skupin pomocí
DataFrame.groupBy
. - Použijte funkci na každou skupinu. Vstup i výstup funkce jsou
pandas.DataFrame
. Vstupní data obsahují všechny řádky a sloupce pro každou skupinu. - Zkombinujte výsledky do nového
DataFrame
.
Pokud chcete použít groupBy().applyInPandas()
, musíte definovat následující:
- Funkce Pythonu, která definuje výpočet pro každou skupinu
- Objekt
StructType
nebo řetězec, který definuje schéma výstupníhoDataFrame
Popisky sloupců vrácených pandas.DataFrame
musí buď odpovídat názvům polí v definovaném výstupním schématu, pokud jsou zadány jako řetězce, nebo musí odpovídat datovým typům polí podle pozice, například celočíselné indexy. Viz pandas.DataFrame pro to, jak označit sloupce při vytváření pandas.DataFrame
.
Všechna data pro skupinu se načtou do paměti před tím, než se funkce použije. To může vést k výjimkám typu nedostatek paměti, zejména pokud jsou velikosti skupin zkreslené. Konfigurace pro maxRecordsPerBatch není použita pro skupiny a je na vás, aby se seskupovaná data vešla do dostupné paměti.
Následující příklad ukazuje, jak pomocí groupby().apply()
odečíst průměr od každé hodnoty ve skupině.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Pro podrobnosti o použití viz pyspark.sql.GroupedData.applyInPandas.
Mapa
Operace mapování s instancemi pandas provádíte DataFrame.mapInPandas()
, aby bylo možné transformovat iterátor pandas.DataFrame
na jiný iterátor pandas.DataFrame
, který představuje aktuální datový rámec PySpark a vrátí výsledek jako datový rámec PySpark.
Základní funkce přebírá a vypíše iterátor pandas.DataFrame
. Může vrátit výstup libovolné délky na rozdíl od některých uživatelsky definovaných funkcí pandas, jako je řady na řady.
Následující příklad ukazuje, jak používat mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Podrobné pokyny pro použití naleznete v části pyspark.sql.DataFrame.mapInPandas.
Kogroupovaná mapa
U operací mapování se spoluskupenými instancemi pandas použijte DataFrame.groupby().cogroup().applyInPandas()
ke spoluskupení dvou PySpark DataFrame
pomocí společného klíče a pak aplikujte funkci Pythonu na každou spoluskupinu, jak je znázorněno.
- Prohazujte data tak, aby se skupiny jednotlivých datových rámců, které sdílejí klíč, společně seskupily.
- Aplikujte funkci na každou skupinu. Vstup funkce jsou dva
pandas.DataFrame
(s volitelnou n-ticí představující klíč). Výstupem funkce jepandas.DataFrame
. - Sloučte
pandas.DataFrame
ze všech skupin do nového PySparkDataFrame
.
Pokud chcete použít groupBy().cogroup().applyInPandas()
, musíte definovat následující:
- Funkce Pythonu, která definuje výpočet pro každou spoluskupinu.
- Objekt
StructType
nebo řetězec, který definuje schéma výstupního PySparkDataFrame
.
Popisky sloupců vrácených pandas.DataFrame
musí buď odpovídat názvům polí v definovaném výstupním schématu, pokud jsou zadány jako řetězce, nebo musí odpovídat datovým typům polí podle pozice, například celočíselné indexy. Viz pandas.DataFrame pro to, jak označit sloupce při vytváření pandas.DataFrame
.
Všechna data pro spoluskupinu se načtou do paměti před tím, než se funkce použije. To může vést k výjimkám typu nedostatek paměti, zejména pokud jsou velikosti skupin zkreslené. Konfigurace pro maxRecordsPerBatch se nepoužije a je na vás, abyste zajistili, že spoluskupovaná data zapadnou do dostupné paměti.
Následující příklad ukazuje, jak pomocí groupby().cogroup().applyInPandas()
provést asof join
mezi dvěma datovými sadami.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Pro podrobné informace o využití se podívejte na pyspark.sql.PandasCogroupedOps.applyInPandas.