مشاركة عبر


أساسيات PySpark

تتناول هذه المقالة أمثلة بسيطة لتوضيح استخدام PySpark. يفترض أنك تفهم مفاهيم Apache Spark الأساسية وتشغل الأوامر في دفتر ملاحظات Azure Databricks المتصل بالحوسبة. يمكنك إنشاء DataFrames باستخدام بيانات نموذجية، وإجراء تحويلات أساسية بما في ذلك عمليات الصفوف والأعمدة على هذه البيانات، ودمج DataFrames متعددة وتجميع هذه البيانات، وتصور هذه البيانات، ثم حفظها في جدول أو ملف.

تحميل البيانات

تستخدم بعض الأمثلة في هذه المقالة نموذج البيانات المقدمة من Databricks لتوضيح استخدام DataFrames لتحميل البيانات وتحويلها وحفظها. إذا كنت ترغب في استخدام بياناتك الخاصة التي لم تكن موجودة بعد في Databricks، يمكنك تحميلها أولا وإنشاء DataFrame منها. راجع إنشاء جدول أو تعديله باستخدام تحميل الملفات وتحميل الملفات إلى وحدة تخزين كتالوج Unity.

حول بيانات عينة Databricks

يوفر Databricks بيانات نموذجية في الكتالوج samples وفي /databricks-datasets الدليل.

  • للوصول إلى البيانات النموذجية في الكتالوج samples ، استخدم التنسيق samples.<schema-name>.<table-name>. تستخدم هذه المقالة جداول في samples.tpch المخطط، الذي يحتوي على بيانات من عمل خيالي. customer يحتوي الجدول على معلومات حول العملاء، orders ويحتوي على معلومات حول الطلبات التي وضعها هؤلاء العملاء.
  • استخدم dbutils.fs.ls لاستكشاف البيانات في /databricks-datasets. استخدم Spark SQL أو DataFrames للاستعلام عن البيانات في هذا الموقع باستخدام مسارات الملفات. لمعرفة المزيد حول بيانات العينة المقدمة من Databricks، راجع نماذج مجموعات البيانات.

استيراد أنواع البيانات

تتطلب العديد من عمليات PySpark استخدام وظائف SQL أو التفاعل مع أنواع Spark الأصلية. يمكنك إما استيراد الوظائف والأنواع التي تحتاج إليها مباشرة فقط، أو يمكنك استيراد الوحدة النمطية بأكملها.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

نظرا لأن بعض الدالات المستوردة قد تتجاوز وظائف Python المضمنة، يختار بعض المستخدمين استيراد هذه الوحدات النمطية باستخدام اسم مستعار. تظهر الأمثلة التالية اسما مستعارا شائعا مستخدما في أمثلة التعليمات البرمجية Apache Spark:

import pyspark.sql.types as T
import pyspark.sql.functions as F

للحصول على قائمة شاملة وأنواع البيانات، راجع أنواع بيانات Spark.

للحصول على قائمة شاملة بوظائف PySpark SQL، راجع وظائف Spark.

إنشاء DataFrame

هناك عدة طرق لإنشاء DataFrame. عادة ما تقوم بتعريف DataFrame مقابل مصدر بيانات مثل جدول أو مجموعة من الملفات. ثم كما هو موضح في قسم المفاهيم الأساسية Apache Spark، استخدم إجراء، مثل display، لتشغيل التحويلات لتنفيذها. الأسلوب display إخراج DataFrames.

إنشاء DataFrame بقيم محددة

لإنشاء DataFrame بقيم محددة createDataFrame ، استخدم الأسلوب ، حيث يتم التعبير عن الصفوف كقائمة من المجموعات:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

لاحظ في الإخراج أنه يتم استنتاج أنواع بيانات أعمدة df_children تلقائيا. يمكنك بدلا من ذلك تحديد الأنواع عن طريق إضافة مخطط. يتم تعريف المخططات باستخدام StructType الذي يتكون من StructFields الذي يحدد الاسم ونوع البيانات وعلامة منطقية تشير إلى ما إذا كانت تحتوي على قيمة خالية أم لا. يجب استيراد أنواع البيانات من pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

إنشاء DataFrame من جدول في كتالوج Unity

لإنشاء DataFrame من جدول في كتالوج Unity، استخدم table الأسلوب الذي يحدد الجدول باستخدام التنسيق <catalog-name>.<schema-name>.<table-name>. انقر فوق كتالوج على شريط التنقل الأيمن لاستخدام مستكشف الكتالوج للانتقال إلى الجدول. انقر فوقه، ثم حدد نسخ مسار الجدول لإدراج مسار الجدول في دفتر الملاحظات.

يقوم المثال التالي بتحميل الجدول samples.tpch.customer، ولكن يمكنك بدلا من ذلك توفير المسار إلى الجدول الخاص بك.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

إنشاء DataFrame من ملف تم تحميله

لإنشاء DataFrame من ملف قمت بتحميله إلى وحدات تخزين كتالوج Unity، استخدم الخاصية read . يقوم هذا الأسلوب بإرجاع DataFrameReader، والذي يمكنك استخدامه بعد ذلك لقراءة التنسيق المناسب. انقر فوق خيار الكتالوج على الشريط الجانبي الصغير على اليسار واستخدم مستعرض الكتالوج لتحديد موقع الملف. حدده، ثم انقر فوق نسخ مسار ملف وحدة التخزين.

يقرأ المثال أدناه من *.csv ملف، ولكنه DataFrameReader يدعم تحميل الملفات في العديد من التنسيقات الأخرى. راجع أساليب DataFrameReader.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

لمزيد من المعلومات حول وحدات تخزين كتالوج Unity، راجع ما هي وحدات تخزين كتالوج Unity؟.

إنشاء DataFrame من استجابة JSON

لإنشاء DataFrame من حمولة استجابة JSON التي تم إرجاعها بواسطة واجهة برمجة تطبيقات REST، استخدم حزمة Python requests للاستعلام عن الاستجابة وتحليلها. يجب استيراد الحزمة لاستخدامها. يستخدم هذا المثال بيانات من قاعدة بيانات تطبيق المخدرات التابعة لإدارة الغذاء والدواء في الولايات المتحدة.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

للحصول على معلومات حول العمل مع JSON والبيانات الأخرى شبه المنظمة على Databricks، راجع بيانات النموذج شبه المنظمة.

تحديد حقل أو كائن JSON

لتحديد حقل أو كائن معين من JSON المحول، استخدم [] رمز . على سبيل المثال، لتحديد products الحقل الذي هو نفسه صفيف من المنتجات:

display(df_drugs.select(df_drugs["products"]))

يمكنك أيضا ربط استدعاءات الأسلوب معا لاجتياز حقول متعددة. على سبيل المثال، لإخراج اسم العلامة التجارية للمنتج الأول في تطبيق المخدرات:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

إنشاء DataFrame من ملف

لتوضيح إنشاء DataFrame من ملف، يقوم هذا المثال بتحميل بيانات CSV في /databricks-datasets الدليل.

للانتقال إلى نماذج مجموعات البيانات، يمكنك استخدام أوامر نظام ملفات Databricks Utilties . يستخدم dbutils المثال التالي لسرد مجموعات البيانات المتوفرة في /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

بدلا من ذلك، يمكنك استخدام %fs للوصول إلى أوامر نظام ملفات Databricks CLI، كما هو موضح في المثال التالي:

%fs ls '/databricks-datasets'

لإنشاء DataFrame من ملف أو دليل من الملفات، حدد المسار في load الأسلوب :

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

تحويل البيانات باستخدام DataFrames

تسهل DataFrames تحويل البيانات باستخدام أساليب مضمنة لفرز البيانات وتصفيتها وتجميعها. لا يتم تحديد العديد من التحويلات كأساليب على DataFrames، ولكن بدلا من ذلك يتم توفيرها في الحزمة spark.sql.functions . راجع Databricks Spark SQL Functions.

عمليات العمود

يوفر Spark العديد من عمليات الأعمدة الأساسية:

تلميح

لإخراج كافة الأعمدة في DataFrame، استخدم columns، على سبيل المثال df_customer.columns.

تحديد الأعمدة

يمكنك تحديد أعمدة معينة باستخدام select و col. الدالة col pyspark.sql.functions في النموذج الفرعي.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

يمكنك أيضا الرجوع إلى عمود يستخدم expr تعبيرا معرفا كسلسلة:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

يمكنك أيضا استخدام selectExpr، الذي يقبل تعبيرات SQL:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

لتحديد الأعمدة باستخدام سلسلة حرفية، قم بما يلي:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

لتحديد عمود من DataFrame معين بشكل صريح، يمكنك استخدام [] عامل التشغيل أو . عامل التشغيل. .(لا يمكن استخدام عامل التشغيل لتحديد الأعمدة التي تبدأ بعدد صحيح أو الأعمدة التي تحتوي على مسافة أو حرف خاص.) يمكن أن يكون هذا مفيدا بشكل خاص عند الانضمام إلى DataFrames حيث يكون لبعض الأعمدة نفس الاسم.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

إنشاء أعمدة

لإنشاء عمود جديد، استخدم withColumn الأسلوب . ينشئ المثال التالي عمودا جديدا يحتوي على قيمة منطقية استنادا إلى ما إذا كان رصيد c_acctbal حساب العميل يتجاوز 1000:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

إعادة تسمية الأعمدة

لإعادة تسمية عمود، استخدم withColumnRenamed الأسلوب الذي يقبل أسماء الأعمدة الموجودة والجديدة:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

يعد alias الأسلوب مفيدا بشكل خاص عندما تريد إعادة تسمية الأعمدة كجزء من التجميعات:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

أنواع أعمدة التحويل

في بعض الحالات، قد تحتاج إلى تغيير نوع البيانات لعمود واحد أو أكثر في DataFrame. للقيام بذلك، استخدم cast الأسلوب للتحويل بين أنواع بيانات العمود. يوضح المثال التالي كيفية تحويل عمود من عدد صحيح إلى نوع سلسلة، باستخدام col الأسلوب للإشارة إلى عمود:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

إزالة الأعمدة

لإزالة الأعمدة، يمكنك حذف الأعمدة أثناء التحديد أو select(*) except يمكنك استخدام drop الأسلوب :

df_customer_flag_renamed.drop("balance_flag_renamed")

يمكنك أيضا إسقاط أعمدة متعددة في وقت واحد:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

عمليات الصف

يوفر Spark العديد من عمليات الصف الأساسية:

تصفية الصفوف

لتصفية الصفوف، استخدم filter الأسلوب أو where على DataFrame لإرجاع صفوف معينة فقط. لتعريف عمود للتصفية عليه، استخدم col الأسلوب أو التعبير الذي يتم تقييمه إلى عمود.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

للتصفية حسب شروط متعددة، استخدم عوامل التشغيل المنطقية. على سبيل المثال، & وتمكينك | AND من و OR الشروط، على التوالي. يقوم المثال التالي بتصفية الصفوف حيث c_nationkey يساوي 20 و c_acctbal أكبر من 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

إزالة الصفوف المكررة

لإلغاء تكرار الصفوف، استخدم distinct، الذي يقوم بإرجاع الصفوف الفريدة فقط.

df_unique = df_customer.distinct()

معالجة القيم الخالية

لمعالجة القيم الخالية، قم بإسقاط الصفوف التي تحتوي على قيم خالية باستخدام na.drop الأسلوب . يتيح لك هذا الأسلوب تحديد ما إذا كنت تريد إسقاط صفوف تحتوي على any قيم خالية أو all قيم خالية.

لإسقاط أي قيم خالية، استخدم أحد الأمثلة التالية.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

إذا كنت تريد بدلا من ذلك تصفية الصفوف التي تحتوي على كافة القيم الخالية فقط، فاستخدم ما يلي:

df_customer_no_nulls = df_customer.na.drop("all")

يمكنك تطبيق هذا لمجموعة فرعية من الأعمدة عن طريق تحديد هذا، كما هو موضح أدناه:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

لتعبئة القيم المفقودة fill ، استخدم الأسلوب . يمكنك اختيار تطبيق هذا على جميع الأعمدة أو مجموعة فرعية من الأعمدة. في المثال أدناه، تتم تعبئة أرصدة الحساب التي لها قيمة خالية لرصيد c_acctbal الحساب الخاص بها ب 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

لاستبدال السلاسل بقيم أخرى، استخدم replace الأسلوب . في المثال أدناه، يتم استبدال أي سلاسل عناوين فارغة بالكلمة UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

إلحاق الصفوف

لإلحاق الصفوف، تحتاج إلى استخدام union الأسلوب لإنشاء DataFrame جديد. في المثال التالي، تم إنشاء DataFrame df_that_one_customer مسبقا وتم df_filtered_customer دمجه، والذي يقوم بإرجاع DataFrame مع ثلاثة عملاء:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

إشعار

يمكنك أيضا دمج DataFrames بكتابتها في جدول ثم إلحاق صفوف جديدة. بالنسبة لأحمال عمل الإنتاج، يمكن أن تقلل المعالجة التزايدية لمصادر البيانات إلى جدول مستهدف بشكل كبير من زمن الانتقال وتكاليف الحساب مع زيادة حجم البيانات. راجع استيعاب البيانات في مستودع Databricks.

فرز الصفوف

هام

يمكن أن يكون الفرز مكلفا على نطاق واسع، وإذا قمت بتخزين البيانات التي تم فرزها وإعادة تحميل البيانات باستخدام Spark، فلن يكون الترتيب مضمونا. تأكد من أنك مقصود في استخدامك للفرز.

لفرز الصفوف حسب عمود واحد أو أكثر، استخدم sort الأسلوب أو orderBy . بشكل افتراضي، يتم فرز هذه الأساليب بترتيب تصاعدي:

df_customer.orderBy(col("c_acctbal"))

للتصفية بترتيب تنازلي، استخدم desc:

df_customer.sort(col("c_custkey").desc())

يوضح المثال التالي كيفية الفرز على عمودين:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

للحد من عدد الصفوف التي يجب إرجاعها بمجرد فرز DataFrame، استخدم limit الأسلوب . يعرض المثال التالي أهم 10 النتائج فقط:

display(df_sorted.limit(10))

الانضمام إلى DataFrames

للانضمام إلى اثنين أو أكثر من DataFrames، استخدم join الأسلوب . يمكنك تحديد الطريقة التي تريد ربط DataFrames بها في how المعلمات (نوع الصلة) و on (على أي أعمدة يجب أن تستند إلى الصلة). تتضمن أنواع الصلة الشائعة ما يلي:

  • inner: هذا هو نوع الصلة الافتراضي، الذي يقوم بإرجاع DataFrame الذي يحتفظ فقط بالصفوف حيث يوجد تطابق للمعلمة on عبر DataFrames.
  • left: يحافظ هذا على كافة صفوف DataFrame المحددة الأولى والصفوف فقط من DataFrame المحددة الثانية التي لها تطابق مع الأول.
  • outer: تحتفظ الصلة الخارجية بجميع الصفوف من كلا DataFrames بغض النظر عن التطابق.

للحصول على معلومات مفصلة حول الصلات، راجع العمل مع الصلات على Azure Databricks. للحصول على قائمة الصلات المدعومة في PySpark، راجع عمليات ربط DataFrame.

يقوم المثال التالي بإرجاع DataFrame واحد حيث يتم ربط كل صف من orders DataFrame بالصف المقابل من customers DataFrame. يتم استخدام صلة داخلية، حيث أن التوقع هو أن كل طلب يتوافق مع عميل واحد بالضبط.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

للانضمام بشروط متعددة، استخدم عوامل التشغيل المنطقية مثل & و | لتحديد AND و OR، على التوالي. يضيف المثال التالي شرطا إضافيا، التصفية إلى الصفوف التي تحتوي o_totalprice على أكبر من 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

تجميع البيانات

لتجميع البيانات في DataFrame، على GROUP BY غرار في SQL، استخدم groupBy الأسلوب لتحديد الأعمدة المراد تجميعها حسب والطريقة agg لتحديد التجميعات. استيراد التجميعات الشائعة بما في ذلك avgو summaxو و من min pyspark.sql.functions. يوضح المثال التالي متوسط رصيد العملاء حسب قطاع السوق:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

بعض التجميعات هي إجراءات، ما يعني أنها تقوم بتشغيل الحسابات. في هذه الحالة، لا تحتاج إلى استخدام إجراءات أخرى لإخراج النتائج.

لحساب الصفوف في DataFrame، استخدم count الأسلوب :

df_customer.count()

المكالمات التسلسلية

الأساليب التي تحول DataFrames ترجع DataFrames، ولا يعمل Spark على التحويلات حتى يتم استدعاء الإجراءات. يعني هذا التقييم البطيء أنه يمكنك تسلسل أساليب متعددة للراحة وقابلية القراءة. يوضح المثال التالي كيفية سلسلة التصفية والتجميع وترتيبها:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

تصور DataFrame الخاص بك

لتصور DataFrame في دفتر ملاحظات، انقر فوق العلامة + الموجودة بجانب الجدول في الجزء العلوي الأيمن من DataFrame، ثم حدد Visualization لإضافة مخطط واحد أو أكثر استنادا إلى DataFrame الخاص بك. للحصول على تفاصيل حول المرئيات، راجع المرئيات في دفاتر ملاحظات Databricks.

display(df_order)

لتنفيذ مرئيات إضافية، توصي Databricks باستخدام Pandas API ل Spark. .pandas_api() يسمح لك بالصب إلى واجهة برمجة تطبيقات pandas المقابلة ل Spark DataFrame. لمزيد من المعلومات، راجع Pandas API على Spark.

حفظ البيانات

بمجرد تحويل بياناتك، يمكنك حفظها باستخدام الأساليب DataFrameWriter . يمكن العثور على قائمة كاملة بهذه الأساليب في DataFrameWriter. توضح الأقسام التالية كيفية حفظ DataFrame كجدول ومجموعة من ملفات البيانات.

حفظ DataFrame كجدول

لحفظ DataFrame كجدول في كتالوج Unity، استخدم write.saveAsTable الأسلوب وحدد المسار بالتنسيق <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

كتابة DataFrame الخاص بك ك CSV

لكتابة DataFrame الخاص *.csv بك للتنسيق، استخدم write.csv الأسلوب ، مع تحديد التنسيق والخيارات. بشكل افتراضي إذا كانت البيانات موجودة في المسار المحدد تفشل عملية الكتابة. يمكنك تحديد أحد الأوضاع التالية لاتخاذ إجراء مختلف:

  • overwrite الكتابة فوق كافة البيانات الموجودة في المسار الهدف مع محتويات DataFrame.
  • append إلحاق محتويات DataFrame بالبيانات في المسار الهدف.
  • ignore يفشل بصمت في الكتابة إذا كانت البيانات موجودة في المسار الهدف.

يوضح المثال التالي الكتابة فوق البيانات باستخدام محتويات DataFrame كملفات CSV:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

الخطوات التالية

للاستفادة من المزيد من قدرات Spark على Databricks، راجع: