Aracılığıyla paylaş


pandas işlev API'leri

pandas işlev API'leri, pandas örneklerini alıp PySpark DataFrame'e veren bir Python yerel işlevini doğrudan uygulamanızı sağlar. pandas kullanıcı tanımlı işlevlerine benzer şekilde işlev API'leri de verileri aktarmak için Apache Arrow ve verilerle çalışmak için pandas kullanır; ancak Python türü ipuçları pandas işlev API'lerinde isteğe bağlıdır.

Pandas işlev API'lerinin üç türü vardır:

  • Gruplandırılmış harita
  • Harita
  • Birlikte gruplandırılmış eşleme

pandas işlev API'leri, pandas UDF yürütmesinin kullandığı iç mantığı kullanır. PyArrow, desteklenen SQL türleri ve yapılandırmalar gibi özellikleri paylaşırlar.

Daha fazla bilgi için, Apache Spark 3.0'ın Yaklaşan Sürümü'ndeki Yeni Pandas UDF'leri ve Python Türü İpuçları hakkında blog gönderisine bakın.

Gruplandırılmış harita

"Split-apply-combine" desenini uygulamak için groupBy().applyInPandas() kullanarak gruplandırılmış verilerinizi dönüştürebilirsiniz. "Split-apply-combine" üç adımdan oluşur:

  • DataFrame.groupBykullanarak verileri gruplara bölün.
  • Her gruba bir işlev uygulayın. Fonksiyonun hem girişi hem de çıkışı pandas.DataFrame. Giriş verileri, her grup için tüm satırları ve columns'ı içerir.
  • Yeni bir DataFrameiçin sonuçları birleştirin.

groupBy().applyInPandas()kullanmak için aşağıdakileri tanımlamanız gerekir:

  • Her grup için hesaplamayı tanımlayan bir Python işlevi
  • Bir StructType nesnesi veya çıkış DataFrame'sinin schema'ini tanımlayan bir dize

Döndürülen pandas.DataFrame'in column etiketleri, eğer dizeler olarak belirtilmişse tanımlı çıktı schema'nin alan adlarıyla eşleşmeli ya da dizeler olarak belirtilmemişse (örneğin, tamsayı dizinler), konumlarına göre alan veri türleriyle eşleşmelidir. Bkz. pandas. pandas.DataFrameoluştururken columns etiketlemek için DataFrame.

Bir grubun tüm verileri, işlev uygulanmadan önce belleğe yüklenir. Bu, özellikle grup boyutları dengesizse bellek yetersiz özel durumlarına yol açabilir. maxRecordsPerBatch yapılandırması gruplara uygulanmaz ve gruplandırılmış verilerin kullanılabilir belleğe sığdığından emin olmak size bağlıdır.

Aşağıdaki örnekte, gruptaki her değerden ortalamayı çıkarmak için groupby().apply() nasıl kullanılacağı gösterilmektedir.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Ayrıntılı kullanım için bkz. pyspark.sql.GroupedData.applyInPandas.

Harita

DataFrame.mapInPandas() yineleyicisini geçerli PySpark DataFrame'i temsil eden ve sonucu PySpark DataFrame olarak döndüren başka bir pandas.DataFrame yineleyicisine dönüştürmek için pandas.DataFrame pandas örnekleriyle eşleme işlemleri gerçekleştirirsiniz.

Temel alınan işlev pandas.DataFrameyineleyicisini alır ve çıktı olarak verir. Seriden Seriye gibi bazı pandas UDF'lerinin aksine rastgele uzunlukta çıkış döndürebilir.

Aşağıdaki örnekte mapInPandas()nasıl kullanılacağı gösterilmektedir:

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Ayrıntılı kullanım için bkz. pyspark.sql.DataFrame.mapInPandas.

Birlikte gruplandırılmış eşleme

Pandas örnekleriyle birlikte gruplandırılmış eşleme işlemleri için, iki PySpark DataFrame.groupby().cogroup().applyInPandas()ortak bir anahtarla birlikte gruplandırmak için DataFrame kullanın ve ardından gösterildiği gibi her bir cogroup'a bir Python işlevi uygulayın:

  • Bir anahtarı paylaşan her DataFrame grubunun birlikte gruplandırılması için verileri karıştırın.
  • Her bir cogroup'a bir işlev uygulayın. İşlevin girişi iki pandas.DataFrame (anahtarı temsil eden isteğe bağlı bir tanımlama grubu ile). İşlevin çıkışı bir pandas.DataFrame'dur.
  • Tüm gruplardan gelen pandas.DataFrame'ları yeni bir PySpark DataFrameiçinde birleştirin.

groupBy().cogroup().applyInPandas()kullanmak için aşağıdakileri tanımlamanız gerekir:

  • Her bir cogroup için hesaplamayı tanımlayan bir Python işlevi.
  • PySpark DataFrameçıktısının schema tanımlayan bir StructType nesnesi veya dizesi.

Döndürülen pandas.DataFrame'in column etiketleri, eğer dize olarak belirtilmişse, tanımlı çıktı schema'nin alan adları ile eşleşmeli, eğer dizeler değilse, konumsal olarak alan veri türleriyle (örneğin, tamsayı dizinleri) eşleşmelidir. Bkz. pandas. pandas.DataFrameoluştururken columns etiketlemek için DataFrame.

İşlev uygulanmadan önce bir ortak grubun tüm verileri belleğe yüklenir. Bu, özellikle grup boyutları dengesizse bellek yetersiz özel durumlarına yol açabilir. maxRecordsPerBatch yapılandırması uygulanmaz ve birlikte gruplandırılmış verilerin kullanılabilir belleğe sığmasını sağlamak size bağlıdır.

Aşağıdaki örnekte, iki veri kümesi arasında bir groupby().cogroup().applyInPandas() gerçekleştirmek için asof join nasıl kullanılacağı gösterilmektedir.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Ayrıntılı kullanım için bkz. pyspark.sql.PandasCogroupedOps.applyInPandas.