Skalární funkce definované uživatelem – Python
Tento článek obsahuje příklady uživatelem definované funkce Pythonu (UDF). Ukazuje, jak zaregistrovat funkce definované uživatelem, jak vyvolat funkce definované uživatelem a poskytuje upozornění na pořadí vyhodnocení dílčích výrazů ve Spark SQL.
V Databricks Runtime 14.0 a novějších můžete pomocí uživatelem definovaných tabulkových funkcí Pythonu (UDTFs) zaregistrovat funkce, které místo skalárních hodnot vracejí celé relace. Viz uživatelem definované funkce tabulek v Pythonu (UDTFs).
Poznámka:
Ve službě Databricks Runtime 12.2 LTS a níže nejsou u výpočetních prostředků katalogu Unity, které používají režim sdíleného přístupu, podporované uživatelem definované uživatelem Pythonu a uživatelem pandas. Skalární uživatelem definované funkce Pythonu a uživatelem definované funkce Pandas jsou podporovány ve službě Databricks Runtime 13.3 LTS a vyšší pro všechny režimy přístupu.
Ve službě Databricks Runtime 13.3 LTS a novějších můžete pomocí syntaxe SQL zaregistrovat skalární uživatelem definované uživatelem Pythonu do katalogu Unity. Viz uživatelem definované funkce (UDF) v katalogu Unity.
Registrace funkce jako funkce definované uživatelem
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Volitelně můžete nastavit návratový typ definovaného uživatelem. Výchozí návratový typ je StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Volání funkce definovaná uživatelem ve Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Použití UDF s datovými rámci
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Alternativně můžete deklarovat stejnou funkci definovanou uživatelem pomocí syntaxe poznámek:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Kontrola pořadí vyhodnocení a hodnoty null
Spark SQL (včetně SQL a rozhraní DATAFrame and Dataset API) nezaručuje pořadí vyhodnocení dílčích výrazů. Zejména vstupy operátoru nebo funkce nejsou nutně vyhodnoceny zleva doprava nebo v jiném pevném pořadí. Například logické AND
výrazy OR
nemají sémantiku "zkratování" zleva doprava.
Proto je nebezpečné spoléhat se na vedlejší účinky nebo pořadí vyhodnocení logických výrazů a pořadí WHERE
HAVING
a pořadí klauzulí, protože tyto výrazy a klauzule je možné změnit pořadí během optimalizace a plánování dotazů. Konkrétně platí, že pokud UDF spoléhá na sémantiku zkratování v SQL pro kontrolu hodnoty null, neexistuje žádná záruka, že se kontrola hodnoty null provede před vyvoláním funkce definované uživatelem. Příklad:
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Tato WHERE
klauzule nezaručuje, že se funkce definovaná uživatelem strlen
bude volat po vyfiltrování hodnot null.
Pokud chcete provést správnou kontrolu hodnoty null, doporučujeme provést některou z následujících akcí:
- Nastavení samotné funkce definovaná uživatelem s podporou hodnoty null a provedení kontroly hodnoty null uvnitř samotného uživatelem definovaného uživatelem
- Použití
IF
neboCASE WHEN
výrazy k ověření hodnoty null a vyvolání funkce definovaná uživatelem v podmíněné větvi
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Omezení
Následující omezení platí pro PySpark UDFs:
Omezení importu modulu : uživatelem definované funkce PySpark ve sdílených clusterech a bezserverových výpočetních prostředcích nemohou přistupovat ke složkám Git, souborům pracovních prostorů nebo svazkům katalogu Unity pro import modulů ve službě Databricks Runtime 14.2 a starší.
Proměnné pro vysílání: UDF PySpark v sdílených clusterech a bezserverové výpočetní prostředky nepodporují proměnné pro vysílání.
limit paměti: funkce PySpark UDF na bezserverové výpočetní prostředky mají limit paměti 1 GB na UDF PySpark. Překročení tohoto limitu způsobí následující chybu:
[UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.