Sdílet prostřednictvím


Rozhraní API funkcí pandas

Rozhraní API funkcí pandas umožňují přímo použít nativní funkci Pythonu, která přebírá a odesílá instance pandas do datového rámce PySpark. Podobně jako uživatelem definované funkce pandas, rozhraní API funkcí také používají Apache Arrow k přenosu dat a knihovny pandas pro práci s daty; V rozhraních API funkcí knihovny pandas jsou však volitelné rady typu Python.

Existují tři typy rozhraní API funkcí pandas:

  • Seskupené mapy
  • Mapa
  • Kogroupovaná mapa

Rozhraní API funkcí pandas využívají stejnou interní logiku, jakou používá provádění UDF pandas. Sdílejí charakteristiky, jako jsou PyArrow, podporované typy SQL a konfigurace.

Další informace najdete v blogovém příspěvku Nové uživatelské definované funkce Pandas a typové pokyny Pythonu v nadcházející verzi Apache Spark 3.0.

Seskupené mapy

Seskupená data přetvoříte pomocí groupBy().applyInPandas() a implementujete vzorec „split-apply-combine“. Rozdělení, aplikace a kombinace se skládá ze tří kroků:

  • Rozdělte data do skupin pomocí DataFrame.groupBy.
  • Použijte funkci na každou skupinu. Vstup i výstup funkce jsou pandas.DataFrame. Vstupní data obsahují všechny řádky a sloupce pro každou skupinu.
  • Zkombinujte výsledky do nového DataFrame.

Pokud chcete použít groupBy().applyInPandas(), musíte definovat následující:

  • Funkce Pythonu, která definuje výpočet pro každou skupinu
  • Objekt StructType nebo řetězec, který definuje schéma výstupního DataFrame

Popisky sloupců vrácených pandas.DataFrame musí buď odpovídat názvům polí v definovaném výstupním schématu, pokud jsou zadány jako řetězce, nebo musí odpovídat datovým typům polí podle pozice, například celočíselné indexy. Viz pandas.DataFrame pro to, jak označit sloupce při vytváření pandas.DataFrame.

Všechna data pro skupinu se načtou do paměti před tím, než se funkce použije. To může vést k výjimkám typu nedostatek paměti, zejména pokud jsou velikosti skupin zkreslené. Konfigurace pro maxRecordsPerBatch není použita pro skupiny a je na vás, aby se seskupovaná data vešla do dostupné paměti.

Následující příklad ukazuje, jak pomocí groupby().apply() odečíst průměr od každé hodnoty ve skupině.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Pro podrobnosti o použití viz pyspark.sql.GroupedData.applyInPandas.

Mapa

Operace mapování s instancemi pandas provádíte DataFrame.mapInPandas(), aby bylo možné transformovat iterátor pandas.DataFrame na jiný iterátor pandas.DataFrame, který představuje aktuální datový rámec PySpark a vrátí výsledek jako datový rámec PySpark.

Základní funkce přebírá a vypíše iterátor pandas.DataFrame. Může vrátit výstup libovolné délky na rozdíl od některých uživatelsky definovaných funkcí pandas, jako je řady na řady.

Následující příklad ukazuje, jak používat mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Podrobné pokyny pro použití naleznete v části pyspark.sql.DataFrame.mapInPandas.

Kogroupovaná mapa

U operací mapování se spoluskupenými instancemi pandas použijte DataFrame.groupby().cogroup().applyInPandas() ke spoluskupení dvou PySpark DataFramepomocí společného klíče a pak aplikujte funkci Pythonu na každou spoluskupinu, jak je znázorněno.

  • Prohazujte data tak, aby se skupiny jednotlivých datových rámců, které sdílejí klíč, společně seskupily.
  • Aplikujte funkci na každou skupinu. Vstup funkce jsou dva pandas.DataFrame (s volitelnou n-ticí představující klíč). Výstupem funkce je pandas.DataFrame.
  • Sloučte pandas.DataFrameze všech skupin do nového PySpark DataFrame.

Pokud chcete použít groupBy().cogroup().applyInPandas(), musíte definovat následující:

  • Funkce Pythonu, která definuje výpočet pro každou spoluskupinu.
  • Objekt StructType nebo řetězec, který definuje schéma výstupního PySpark DataFrame.

Popisky sloupců vrácených pandas.DataFrame musí buď odpovídat názvům polí v definovaném výstupním schématu, pokud jsou zadány jako řetězce, nebo musí odpovídat datovým typům polí podle pozice, například celočíselné indexy. Viz pandas.DataFrame pro to, jak označit sloupce při vytváření pandas.DataFrame.

Všechna data pro spoluskupinu se načtou do paměti před tím, než se funkce použije. To může vést k výjimkám typu nedostatek paměti, zejména pokud jsou velikosti skupin zkreslené. Konfigurace pro maxRecordsPerBatch se nepoužije a je na vás, abyste zajistili, že spoluskupovaná data zapadnou do dostupné paměti.

Následující příklad ukazuje, jak pomocí groupby().cogroup().applyInPandas() provést asof join mezi dvěma datovými sadami.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Pro podrobné informace o využití se podívejte na pyspark.sql.PandasCogroupedOps.applyInPandas.