Aracılığıyla paylaş


Kullanıcı tanımlı skaler işlevler - Python

Bu makalede Python kullanıcı tanımlı işlev (UDF) örnekleri yer alır. UDF'lerin nasıl kaydedileceklerini, UDF'lerin nasıl çağrılacaklarını gösterir ve Spark SQL'de alt ifadelerin değerlendirme sırası hakkında uyarılar sağlar.

Databricks Runtime 14.0 ve üzerinde, skaler değerler yerine tüm ilişkileri döndüren işlevleri kaydetmek için Python kullanıcı tanımlı tablo işlevlerini (UDF) kullanabilirsiniz. bkz. Python kullanıcı tanımlı tablo işlevleri (UDF).

Not

Databricks Runtime 12.2 LTS ve altında, Python UDF'leri ve Pandas UDF'leri, paylaşılan erişim modunu kullanan Unity Kataloğu hesaplamasında desteklenmez. Skaler Python UDF'leri ve Pandas UDF'leri tüm erişim modları için Databricks Runtime 13.3 LTS ve üzerinde desteklenir.

Databricks Runtime 13.3 LTS ve üzerinde SQL söz dizimlerini kullanarak skaler Python UDF'lerini Unity Kataloğu'na kaydedebilirsiniz. Bkz. Unity Kataloğundaki Kullanıcı Tanımlı Fonksiyonlar (UDF).

İşlevi UDF olarak kaydetme

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

İsteğe bağlı olarak UDF'nizin dönüş türünü ayarlayabilirsiniz. Varsayılan dönüş türü şeklindedir StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Spark SQL'de UDF'yi çağırma

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

DataFrame'lerle UDF kullanma

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Alternatif olarak, ek açıklama söz dizimini kullanarak aynı UDF'yi bildirebilirsiniz:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Değerlendirme sırası ve null denetimi

Spark SQL (SQL ve DataFrame ile Veri Kümesi API'si dahil) alt ifadelerin değerlendirilme sırasını garanti etmez. Özellikle, bir işlecin veya işlevin girişleri mutlaka soldan sağa veya başka bir sabit sırada değerlendirilmez. Örneğin, mantıksal AND ve OR ifadelerde soldan sağa "kısa devre" semantiği yoktur.

Bu nedenle, boole ifadelerinin yan etkilerine veya değerlendirme sırasına ve ve WHERE yan tümcelerinin sırasına HAVING güvenmek tehlikelidir, çünkü bu ifadeler ve yan tümceler sorgu iyileştirme ve planlama sırasında yeniden sıralanabilir. Özellikle, bir UDF null denetim için SQL'de kısa devre semantiği kullanıyorsa, UDF'yi çağırmadan önce null denetimin gerçekleşeceğinin garantisi yoktur. Örneğin,

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Bu WHERE yan tümce, null değerleri filtreledikten sonra UDF'nin çağrılacağı garanti strlen etmez.

Doğru null denetimi gerçekleştirmek için aşağıdakilerden birini yapmanızı öneririz:

  • UDF'nin kendisini null algılayan hale getirin ve UDF'nin içinde null denetimi yapın
  • Null denetimi yapmak ve koşullu dalda UDF'yi çağırmak için veya IF ifadelerini kullanın CASE WHEN
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Sınırlamalar

PySpark UDF'leri için aşağıdaki sınırlamalar geçerlidir:

  • Modülü içeri aktarma kısıtlamaları: Paylaşılan kümelerdeki ve sunucusuz işlemdeki PySpark UDF'leri, Databricks Runtime 14.2 ve altındaki modülleri içeri aktarmak için Git klasörlerine, çalışma alanı dosyalarına veya Unity Katalog Birimlerine erişemez.

  • Yayın değişkenleri: Paylaşılan kümelerdeki pyspark UDF'leri ve sunucusuz işlem yayın değişkenlerini desteklemez.

  • Bellek sınırı: Sunucusuz işlemdeki PySpark UDF'leri, PySpark UDF başına 1 GB bellek sınırına sahiptir. Bu sınırın aşılması şu hatayla sonuçlanır: [UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.