pandas işlev API'leri
pandas işlev API'leri, pandas örneklerini alıp PySpark DataFrame'e veren bir Python yerel işlevini doğrudan uygulamanızı sağlar. pandas kullanıcı tanımlı
Pandas işlev API'lerinin üç türü vardır:
- Gruplandırılmış harita
- Harita
- Birlikte gruplandırılmış eşleme
pandas işlev API'leri, pandas UDF yürütmesinin kullandığı iç mantığı kullanır. PyArrow, desteklenen SQL türleri ve yapılandırmalar gibi özellikleri paylaşırlar.
Daha fazla bilgi için, Apache Spark 3.0'ın Yaklaşan Sürümü'ndeki Yeni Pandas UDF'leri ve Python Türü İpuçları hakkında blog gönderisine bakın.
Gruplandırılmış harita
"Split-apply-combine" desenini uygulamak için groupBy().applyInPandas()
kullanarak gruplandırılmış verilerinizi dönüştürebilirsiniz. "Split-apply-combine" üç adımdan oluşur:
-
DataFrame.groupBy
kullanarak verileri gruplara bölün. - Her gruba bir işlev uygulayın. Fonksiyonun hem girişi hem de çıkışı
pandas.DataFrame
. Giriş verileri, her grup için tüm satırları ve columns'ı içerir. - Yeni bir
DataFrame
için sonuçları birleştirin.
groupBy().applyInPandas()
kullanmak için aşağıdakileri tanımlamanız gerekir:
- Her grup için hesaplamayı tanımlayan bir Python işlevi
- Bir
StructType
nesnesi veya çıkışDataFrame
'sinin schema'ini tanımlayan bir dize
Döndürülen pandas.DataFrame
'in column etiketleri, eğer dizeler olarak belirtilmişse tanımlı çıktı schema'nin alan adlarıyla eşleşmeli ya da dizeler olarak belirtilmemişse (örneğin, tamsayı dizinler), konumlarına göre alan veri türleriyle eşleşmelidir. Bkz. pandas. pandas.DataFrame
oluştururken columns etiketlemek için DataFrame.
Bir grubun tüm verileri, işlev uygulanmadan önce belleğe yüklenir. Bu, özellikle grup boyutları dengesizse bellek yetersiz özel durumlarına yol açabilir. maxRecordsPerBatch
Aşağıdaki örnekte, gruptaki her değerden ortalamayı çıkarmak için groupby().apply()
nasıl kullanılacağı gösterilmektedir.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Ayrıntılı kullanım için bkz. pyspark.sql.GroupedData.applyInPandas.
Harita
DataFrame.mapInPandas()
yineleyicisini geçerli PySpark DataFrame'i temsil eden ve sonucu PySpark DataFrame olarak döndüren başka bir pandas.DataFrame
yineleyicisine dönüştürmek için pandas.DataFrame
pandas örnekleriyle eşleme işlemleri gerçekleştirirsiniz.
Temel alınan işlev pandas.DataFrame
yineleyicisini alır ve çıktı olarak verir. Seriden Seriye gibi bazı pandas UDF'lerinin aksine rastgele uzunlukta çıkış döndürebilir.
Aşağıdaki örnekte mapInPandas()
nasıl kullanılacağı gösterilmektedir:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Ayrıntılı kullanım için bkz. pyspark.sql.DataFrame.mapInPandas.
Birlikte gruplandırılmış eşleme
Pandas örnekleriyle birlikte gruplandırılmış eşleme işlemleri için, iki PySpark DataFrame.groupby().cogroup().applyInPandas()
ortak bir anahtarla birlikte gruplandırmak için DataFrame
kullanın ve ardından gösterildiği gibi her bir cogroup'a bir Python işlevi uygulayın:
- Bir anahtarı paylaşan her DataFrame grubunun birlikte gruplandırılması için verileri karıştırın.
- Her bir cogroup'a bir işlev uygulayın. İşlevin girişi iki
pandas.DataFrame
(anahtarı temsil eden isteğe bağlı bir tanımlama grubu ile). İşlevin çıkışı birpandas.DataFrame
'dur. - Tüm gruplardan gelen
pandas.DataFrame
'ları yeni bir PySparkDataFrame
içinde birleştirin.
groupBy().cogroup().applyInPandas()
kullanmak için aşağıdakileri tanımlamanız gerekir:
- Her bir cogroup için hesaplamayı tanımlayan bir Python işlevi.
- PySpark
DataFrame
çıktısının schema tanımlayan birStructType
nesnesi veya dizesi.
Döndürülen pandas.DataFrame
'in column etiketleri, eğer dize olarak belirtilmişse, tanımlı çıktı schema'nin alan adları ile eşleşmeli, eğer dizeler değilse, konumsal olarak alan veri türleriyle (örneğin, tamsayı dizinleri) eşleşmelidir. Bkz. pandas. pandas.DataFrame
oluştururken columns etiketlemek için DataFrame.
İşlev uygulanmadan önce bir ortak grubun tüm verileri belleğe yüklenir. Bu, özellikle grup boyutları dengesizse bellek yetersiz özel durumlarına yol açabilir. maxRecordsPerBatch
Aşağıdaki örnekte, iki veri kümesi arasında bir groupby().cogroup().applyInPandas()
gerçekleştirmek için asof join
nasıl kullanılacağı gösterilmektedir.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Ayrıntılı kullanım için bkz. pyspark.sql.PandasCogroupedOps.applyInPandas.