Apa itu fungsi yang ditentukan pengguna (UDF)?
Fungsi yang ditentukan pengguna (UDF) memungkinkan Anda menggunakan kembali dan berbagi kode yang memperluas fungsionalitas bawaan di Azure Databricks. Gunakan UDF untuk melakukan tugas tertentu, seperti perhitungan kompleks, transformasi, atau manipulasi data kustom.
Catatan
Pada kluster dengan mode akses bersama, UDF skalar Python didukung di Databricks Runtime 13.3 LTS ke atas, sementara UDF Scala didukung di Databricks Runtime 14.2 ke atas.
UDF skalar Python dapat didaftarkan di Unity Catalog menggunakan sintaks SQL dalam Databricks Runtime 13.3 LTS ke atas. Lihat Fungsi yang didefinisikan oleh pengguna (UDF) di Unity Catalog.
Kapan Anda harus menggunakan UDF?
Gunakan UDF untuk logika yang sulit diekspresikan dengan fungsi Apache Spark bawaan. Fungsi Apache Spark bawaan dioptimalkan untuk pemrosesan terdistribusi dan umumnya menawarkan performa yang lebih baik dalam skala besar. Untuk informasi selengkapnya, lihat Fungsi.
Databricks merekomendasikan UDF untuk kueri ad hoc, pembersihan data manual, analisis data eksploratif, dan operasi pada himpunan data kecil hingga menengah. Kasus penggunaan umum untuk UDF termasuk enkripsi dan dekripsi data, hashing, penguraian JSON, dan validasi.
Gunakan metode Apache Spark untuk operasi pada himpunan data yang sangat besar dan beban kerja apa pun yang berjalan secara teratur atau terus menerus, termasuk pekerjaan ETL dan operasi streaming.
UDF terdaftar dan lingkup sesi
UDF yang dibuat menggunakan SQL terdaftar di Unity Catalog dan memiliki izin terkait, sedangkan UDF yang dibuat dalam buku catatan Anda berbasis sesi dan dilingkup ke SparkSession saat ini.
Anda dapat menentukan dan mengakses UDF berbasis sesi menggunakan bahasa apa pun yang didukung oleh Azure Databricks. UDF bisa skalar atau non-skalar.
Catatan
Saat ini hanya UDF skalar SQL dan Python yang terdaftar di Unity Catalog yang tersedia di DBSQL.
UDF skalar
UDF skalar beroperasi pada satu baris dan mengembalikan satu nilai untuk setiap baris. Contoh berikut menggunakan UDF skalar untuk menghitung panjang setiap nama dalam kolom name
dan menambahkan nilai di kolom baru name_length
:
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
Untuk menerapkan ini dalam notebook Databricks menggunakan PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
Untuk informasi selengkapnya, lihat fungsi yang ditentukan pengguna (UDF) di Katalog Unity dan fungsi skalar yang ditentukan pengguna - Python.
Fungsi agregat yang ditentukan pengguna (UDAFs)
Fungsi agregat yang ditentukan pengguna (UDAF) beroperasi pada beberapa baris dan mengembalikan satu hasil agregat. Dalam contoh berikut, UDAF didefinisikan bahwa mengagregasi skor.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Lihat fungsi panda yang ditentukan pengguna untuk fungsi agregat yang ditentukan pengguna dan Python - Scala.
Fungsi tabel yang ditentukan pengguna Python (UDTF)
Penting
Fitur ini ada di Pratinjau Publik.
Catatan
UDTF Python tersedia di Databricks Runtime 14.3 LTS ke atas.
Fungsi tabel yang ditentukan pengguna (UDTF) Python dapat mengembalikan beberapa baris dan kolom untuk setiap baris input. Dalam contoh berikut, setiap nilai dalam kolom skor sesuai dengan daftar kategori. UDTF didefinisikan untuk membagi daftar yang dipisahkan koma menjadi beberapa baris. Lihat fungsi tabel yang didefinisikan pengguna (UDTF) Python
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="score: int, categories: string, name: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: float, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
# Apply the UDTF
result_df = df.select(ScoreCategoriesUDTF(df.score, df.categories, df.name))
display(result_df)
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Pertimbangan performa
- Fungsi bawaan dan UDF SQL adalah opsi paling efisien yang tersedia.
- UDF Scala umumnya lebih cepat saat dijalankan dalam Java Virtual Machine (JVM) dan menghindari overhead memindahkan data masuk dan keluar dari JVM.
- UDF Python dan Pandas UDF cenderung lebih lambat daripada UDF Scala karena memerlukan data untuk diserialisasikan dan dipindahkan dari JVM ke penerjemah Python. UDF Panda hingga 100x lebih cepat daripada UDF Python karena menggunakan Apache Arrow untuk mengurangi biaya serialisasi.