أساسيات PySpark
تتناول هذه المقالة أمثلة بسيطة لتوضيح استخدام PySpark. يفترض أنك تفهم مفاهيم Apache Spark الأساسية وتشغل الأوامر في دفتر ملاحظات Azure Databricks المتصل بالحوسبة. يمكنك إنشاء DataFrames باستخدام بيانات نموذجية، وإجراء تحويلات أساسية بما في ذلك عمليات الصفوف والأعمدة على هذه البيانات، ودمج DataFrames متعددة وتجميع هذه البيانات، وتصور هذه البيانات، ثم حفظها في جدول أو ملف.
تحميل البيانات
تستخدم بعض الأمثلة في هذه المقالة نموذج البيانات المقدمة من Databricks لتوضيح استخدام DataFrames لتحميل البيانات وتحويلها وحفظها. إذا كنت ترغب في استخدام بياناتك الخاصة التي لم تكن موجودة بعد في Databricks، يمكنك تحميلها أولا وإنشاء DataFrame منها. راجع إنشاء جدول أو تعديله باستخدام تحميل الملفات وتحميل الملفات إلى وحدة تخزين كتالوج Unity.
حول بيانات عينة Databricks
يوفر Databricks بيانات نموذجية في الكتالوج samples
وفي /databricks-datasets
الدليل.
- للوصول إلى البيانات النموذجية في الكتالوج
samples
، استخدم التنسيقsamples.<schema-name>.<table-name>
. تستخدم هذه المقالة جداول فيsamples.tpch
المخطط، الذي يحتوي على بيانات من عمل خيالي.customer
يحتوي الجدول على معلومات حول العملاء،orders
ويحتوي على معلومات حول الطلبات التي وضعها هؤلاء العملاء. - استخدم
dbutils.fs.ls
لاستكشاف البيانات في/databricks-datasets
. استخدم Spark SQL أو DataFrames للاستعلام عن البيانات في هذا الموقع باستخدام مسارات الملفات. لمعرفة المزيد حول بيانات العينة المقدمة من Databricks، راجع نماذج مجموعات البيانات.
استيراد أنواع البيانات
تتطلب العديد من عمليات PySpark استخدام وظائف SQL أو التفاعل مع أنواع Spark الأصلية. يمكنك إما استيراد الوظائف والأنواع التي تحتاج إليها مباشرة فقط، أو يمكنك استيراد الوحدة النمطية بأكملها.
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
نظرا لأن بعض الدالات المستوردة قد تتجاوز وظائف Python المضمنة، يختار بعض المستخدمين استيراد هذه الوحدات النمطية باستخدام اسم مستعار. تظهر الأمثلة التالية اسما مستعارا شائعا مستخدما في أمثلة التعليمات البرمجية Apache Spark:
import pyspark.sql.types as T
import pyspark.sql.functions as F
للحصول على قائمة شاملة وأنواع البيانات، راجع أنواع بيانات Spark.
للحصول على قائمة شاملة بوظائف PySpark SQL، راجع وظائف Spark.
إنشاء DataFrame
هناك عدة طرق لإنشاء DataFrame. عادة ما تقوم بتعريف DataFrame مقابل مصدر بيانات مثل جدول أو مجموعة من الملفات. ثم كما هو موضح في قسم المفاهيم الأساسية Apache Spark، استخدم إجراء، مثل display
، لتشغيل التحويلات لتنفيذها. الأسلوب display
إخراج DataFrames.
إنشاء DataFrame بقيم محددة
لإنشاء DataFrame بقيم محددة createDataFrame
، استخدم الأسلوب ، حيث يتم التعبير عن الصفوف كقائمة من المجموعات:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
لاحظ في الإخراج أنه يتم استنتاج أنواع بيانات أعمدة df_children
تلقائيا. يمكنك بدلا من ذلك تحديد الأنواع عن طريق إضافة مخطط. يتم تعريف المخططات باستخدام StructType
الذي يتكون من StructFields
الذي يحدد الاسم ونوع البيانات وعلامة منطقية تشير إلى ما إذا كانت تحتوي على قيمة خالية أم لا. يجب استيراد أنواع البيانات من pyspark.sql.types
.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
إنشاء DataFrame من جدول في كتالوج Unity
لإنشاء DataFrame من جدول في كتالوج Unity، استخدم table
الأسلوب الذي يحدد الجدول باستخدام التنسيق <catalog-name>.<schema-name>.<table-name>
. انقر فوق كتالوج على شريط التنقل الأيمن لاستخدام مستكشف الكتالوج للانتقال إلى الجدول. انقر فوقه، ثم حدد نسخ مسار الجدول لإدراج مسار الجدول في دفتر الملاحظات.
يقوم المثال التالي بتحميل الجدول samples.tpch.customer
، ولكن يمكنك بدلا من ذلك توفير المسار إلى الجدول الخاص بك.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
إنشاء DataFrame من ملف تم تحميله
لإنشاء DataFrame من ملف قمت بتحميله إلى وحدات تخزين كتالوج Unity، استخدم الخاصية read
. يقوم هذا الأسلوب بإرجاع DataFrameReader
، والذي يمكنك استخدامه بعد ذلك لقراءة التنسيق المناسب. انقر فوق خيار الكتالوج على الشريط الجانبي الصغير على اليسار واستخدم مستعرض الكتالوج لتحديد موقع الملف. حدده، ثم انقر فوق نسخ مسار ملف وحدة التخزين.
يقرأ المثال أدناه من *.csv
ملف، ولكنه DataFrameReader
يدعم تحميل الملفات في العديد من التنسيقات الأخرى. راجع أساليب DataFrameReader.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
لمزيد من المعلومات حول وحدات تخزين كتالوج Unity، راجع ما هي وحدات تخزين كتالوج Unity؟.
إنشاء DataFrame من استجابة JSON
لإنشاء DataFrame من حمولة استجابة JSON التي تم إرجاعها بواسطة واجهة برمجة تطبيقات REST، استخدم حزمة Python requests
للاستعلام عن الاستجابة وتحليلها. يجب استيراد الحزمة لاستخدامها. يستخدم هذا المثال بيانات من قاعدة بيانات تطبيق المخدرات التابعة لإدارة الغذاء والدواء في الولايات المتحدة.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
للحصول على معلومات حول العمل مع JSON والبيانات الأخرى شبه المنظمة على Databricks، راجع بيانات النموذج شبه المنظمة.
تحديد حقل أو كائن JSON
لتحديد حقل أو كائن معين من JSON المحول، استخدم []
رمز . على سبيل المثال، لتحديد products
الحقل الذي هو نفسه صفيف من المنتجات:
display(df_drugs.select(df_drugs["products"]))
يمكنك أيضا ربط استدعاءات الأسلوب معا لاجتياز حقول متعددة. على سبيل المثال، لإخراج اسم العلامة التجارية للمنتج الأول في تطبيق المخدرات:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
إنشاء DataFrame من ملف
لتوضيح إنشاء DataFrame من ملف، يقوم هذا المثال بتحميل بيانات CSV في /databricks-datasets
الدليل.
للانتقال إلى نماذج مجموعات البيانات، يمكنك استخدام أوامر نظام ملفات Databricks Utilties . يستخدم dbutils
المثال التالي لسرد مجموعات البيانات المتوفرة في /databricks-datasets
:
display(dbutils.fs.ls('/databricks-datasets'))
بدلا من ذلك، يمكنك استخدام %fs
للوصول إلى أوامر نظام ملفات Databricks CLI، كما هو موضح في المثال التالي:
%fs ls '/databricks-datasets'
لإنشاء DataFrame من ملف أو دليل من الملفات، حدد المسار في load
الأسلوب :
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
تحويل البيانات باستخدام DataFrames
تسهل DataFrames تحويل البيانات باستخدام أساليب مضمنة لفرز البيانات وتصفيتها وتجميعها. لا يتم تحديد العديد من التحويلات كأساليب على DataFrames، ولكن بدلا من ذلك يتم توفيرها في الحزمة spark.sql.functions
. راجع Databricks Spark SQL Functions.
عمليات العمود
يوفر Spark العديد من عمليات الأعمدة الأساسية:
تلميح
لإخراج كافة الأعمدة في DataFrame، استخدم columns
، على سبيل المثال df_customer.columns
.
تحديد الأعمدة
يمكنك تحديد أعمدة معينة باستخدام select
و col
. الدالة col
pyspark.sql.functions
في النموذج الفرعي.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
يمكنك أيضا الرجوع إلى عمود يستخدم expr
تعبيرا معرفا كسلسلة:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
يمكنك أيضا استخدام selectExpr
، الذي يقبل تعبيرات SQL:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
لتحديد الأعمدة باستخدام سلسلة حرفية، قم بما يلي:
df_customer.select(
"c_custkey",
"c_acctbal"
)
لتحديد عمود من DataFrame معين بشكل صريح، يمكنك استخدام []
عامل التشغيل أو .
عامل التشغيل. .
(لا يمكن استخدام عامل التشغيل لتحديد الأعمدة التي تبدأ بعدد صحيح أو الأعمدة التي تحتوي على مسافة أو حرف خاص.) يمكن أن يكون هذا مفيدا بشكل خاص عند الانضمام إلى DataFrames حيث يكون لبعض الأعمدة نفس الاسم.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
إنشاء أعمدة
لإنشاء عمود جديد، استخدم withColumn
الأسلوب . ينشئ المثال التالي عمودا جديدا يحتوي على قيمة منطقية استنادا إلى ما إذا كان رصيد c_acctbal
حساب العميل يتجاوز 1000
:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
إعادة تسمية الأعمدة
لإعادة تسمية عمود، استخدم withColumnRenamed
الأسلوب الذي يقبل أسماء الأعمدة الموجودة والجديدة:
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
يعد alias
الأسلوب مفيدا بشكل خاص عندما تريد إعادة تسمية الأعمدة كجزء من التجميعات:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
أنواع أعمدة التحويل
في بعض الحالات، قد تحتاج إلى تغيير نوع البيانات لعمود واحد أو أكثر في DataFrame. للقيام بذلك، استخدم cast
الأسلوب للتحويل بين أنواع بيانات العمود. يوضح المثال التالي كيفية تحويل عمود من عدد صحيح إلى نوع سلسلة، باستخدام col
الأسلوب للإشارة إلى عمود:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
إزالة الأعمدة
لإزالة الأعمدة، يمكنك حذف الأعمدة أثناء التحديد أو select(*) except
يمكنك استخدام drop
الأسلوب :
df_customer_flag_renamed.drop("balance_flag_renamed")
يمكنك أيضا إسقاط أعمدة متعددة في وقت واحد:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
عمليات الصف
يوفر Spark العديد من عمليات الصف الأساسية:
تصفية الصفوف
لتصفية الصفوف، استخدم filter
الأسلوب أو where
على DataFrame لإرجاع صفوف معينة فقط. لتعريف عمود للتصفية عليه، استخدم col
الأسلوب أو التعبير الذي يتم تقييمه إلى عمود.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
للتصفية حسب شروط متعددة، استخدم عوامل التشغيل المنطقية. على سبيل المثال، &
وتمكينك |
AND
من و OR
الشروط، على التوالي. يقوم المثال التالي بتصفية الصفوف حيث c_nationkey
يساوي 20
و c_acctbal
أكبر من 1000
.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
إزالة الصفوف المكررة
لإلغاء تكرار الصفوف، استخدم distinct
، الذي يقوم بإرجاع الصفوف الفريدة فقط.
df_unique = df_customer.distinct()
معالجة القيم الخالية
لمعالجة القيم الخالية، قم بإسقاط الصفوف التي تحتوي على قيم خالية باستخدام na.drop
الأسلوب . يتيح لك هذا الأسلوب تحديد ما إذا كنت تريد إسقاط صفوف تحتوي على any
قيم خالية أو all
قيم خالية.
لإسقاط أي قيم خالية، استخدم أحد الأمثلة التالية.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
إذا كنت تريد بدلا من ذلك تصفية الصفوف التي تحتوي على كافة القيم الخالية فقط، فاستخدم ما يلي:
df_customer_no_nulls = df_customer.na.drop("all")
يمكنك تطبيق هذا لمجموعة فرعية من الأعمدة عن طريق تحديد هذا، كما هو موضح أدناه:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
لتعبئة القيم المفقودة fill
، استخدم الأسلوب . يمكنك اختيار تطبيق هذا على جميع الأعمدة أو مجموعة فرعية من الأعمدة. في المثال أدناه، تتم تعبئة أرصدة الحساب التي لها قيمة خالية لرصيد c_acctbal
الحساب الخاص بها ب 0
.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
لاستبدال السلاسل بقيم أخرى، استخدم replace
الأسلوب . في المثال أدناه، يتم استبدال أي سلاسل عناوين فارغة بالكلمة UNKNOWN
:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
إلحاق الصفوف
لإلحاق الصفوف، تحتاج إلى استخدام union
الأسلوب لإنشاء DataFrame جديد. في المثال التالي، تم إنشاء DataFrame df_that_one_customer
مسبقا وتم df_filtered_customer
دمجه، والذي يقوم بإرجاع DataFrame مع ثلاثة عملاء:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
إشعار
يمكنك أيضا دمج DataFrames بكتابتها في جدول ثم إلحاق صفوف جديدة. بالنسبة لأحمال عمل الإنتاج، يمكن أن تقلل المعالجة التزايدية لمصادر البيانات إلى جدول مستهدف بشكل كبير من زمن الانتقال وتكاليف الحساب مع زيادة حجم البيانات. راجع استيعاب البيانات في مستودع Databricks.
فرز الصفوف
هام
يمكن أن يكون الفرز مكلفا على نطاق واسع، وإذا قمت بتخزين البيانات التي تم فرزها وإعادة تحميل البيانات باستخدام Spark، فلن يكون الترتيب مضمونا. تأكد من أنك مقصود في استخدامك للفرز.
لفرز الصفوف حسب عمود واحد أو أكثر، استخدم sort
الأسلوب أو orderBy
. بشكل افتراضي، يتم فرز هذه الأساليب بترتيب تصاعدي:
df_customer.orderBy(col("c_acctbal"))
للتصفية بترتيب تنازلي، استخدم desc
:
df_customer.sort(col("c_custkey").desc())
يوضح المثال التالي كيفية الفرز على عمودين:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
للحد من عدد الصفوف التي يجب إرجاعها بمجرد فرز DataFrame، استخدم limit
الأسلوب . يعرض المثال التالي أهم 10
النتائج فقط:
display(df_sorted.limit(10))
الانضمام إلى DataFrames
للانضمام إلى اثنين أو أكثر من DataFrames، استخدم join
الأسلوب . يمكنك تحديد الطريقة التي تريد ربط DataFrames بها في how
المعلمات (نوع الصلة) و on
(على أي أعمدة يجب أن تستند إلى الصلة). تتضمن أنواع الصلة الشائعة ما يلي:
inner
: هذا هو نوع الصلة الافتراضي، الذي يقوم بإرجاع DataFrame الذي يحتفظ فقط بالصفوف حيث يوجد تطابق للمعلمةon
عبر DataFrames.left
: يحافظ هذا على كافة صفوف DataFrame المحددة الأولى والصفوف فقط من DataFrame المحددة الثانية التي لها تطابق مع الأول.outer
: تحتفظ الصلة الخارجية بجميع الصفوف من كلا DataFrames بغض النظر عن التطابق.
للحصول على معلومات مفصلة حول الصلات، راجع العمل مع الصلات على Azure Databricks. للحصول على قائمة الصلات المدعومة في PySpark، راجع عمليات ربط DataFrame.
يقوم المثال التالي بإرجاع DataFrame واحد حيث يتم ربط كل صف من orders
DataFrame بالصف المقابل من customers
DataFrame. يتم استخدام صلة داخلية، حيث أن التوقع هو أن كل طلب يتوافق مع عميل واحد بالضبط.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
للانضمام بشروط متعددة، استخدم عوامل التشغيل المنطقية مثل &
و |
لتحديد AND
و OR
، على التوالي. يضيف المثال التالي شرطا إضافيا، التصفية إلى الصفوف التي تحتوي o_totalprice
على أكبر من 500,000
:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
تجميع البيانات
لتجميع البيانات في DataFrame، على GROUP BY
غرار في SQL، استخدم groupBy
الأسلوب لتحديد الأعمدة المراد تجميعها حسب والطريقة agg
لتحديد التجميعات. استيراد التجميعات الشائعة بما في ذلك avg
و sum
max
و و من min
pyspark.sql.functions
. يوضح المثال التالي متوسط رصيد العملاء حسب قطاع السوق:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
بعض التجميعات هي إجراءات، ما يعني أنها تقوم بتشغيل الحسابات. في هذه الحالة، لا تحتاج إلى استخدام إجراءات أخرى لإخراج النتائج.
لحساب الصفوف في DataFrame، استخدم count
الأسلوب :
df_customer.count()
المكالمات التسلسلية
الأساليب التي تحول DataFrames ترجع DataFrames، ولا يعمل Spark على التحويلات حتى يتم استدعاء الإجراءات. يعني هذا التقييم البطيء أنه يمكنك تسلسل أساليب متعددة للراحة وقابلية القراءة. يوضح المثال التالي كيفية سلسلة التصفية والتجميع وترتيبها:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
تصور DataFrame الخاص بك
لتصور DataFrame في دفتر ملاحظات، انقر فوق العلامة + الموجودة بجانب الجدول في الجزء العلوي الأيمن من DataFrame، ثم حدد Visualization لإضافة مخطط واحد أو أكثر استنادا إلى DataFrame الخاص بك. للحصول على تفاصيل حول المرئيات، راجع المرئيات في دفاتر ملاحظات Databricks.
display(df_order)
لتنفيذ مرئيات إضافية، توصي Databricks باستخدام Pandas API ل Spark. .pandas_api()
يسمح لك بالصب إلى واجهة برمجة تطبيقات pandas المقابلة ل Spark DataFrame. لمزيد من المعلومات، راجع Pandas API على Spark.
حفظ البيانات
بمجرد تحويل بياناتك، يمكنك حفظها باستخدام الأساليب DataFrameWriter
. يمكن العثور على قائمة كاملة بهذه الأساليب في DataFrameWriter. توضح الأقسام التالية كيفية حفظ DataFrame كجدول ومجموعة من ملفات البيانات.
حفظ DataFrame كجدول
لحفظ DataFrame كجدول في كتالوج Unity، استخدم write.saveAsTable
الأسلوب وحدد المسار بالتنسيق <catalog-name>.<schema-name>.<table-name>
.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
كتابة DataFrame الخاص بك ك CSV
لكتابة DataFrame الخاص *.csv
بك للتنسيق، استخدم write.csv
الأسلوب ، مع تحديد التنسيق والخيارات. بشكل افتراضي إذا كانت البيانات موجودة في المسار المحدد تفشل عملية الكتابة. يمكنك تحديد أحد الأوضاع التالية لاتخاذ إجراء مختلف:
overwrite
الكتابة فوق كافة البيانات الموجودة في المسار الهدف مع محتويات DataFrame.append
إلحاق محتويات DataFrame بالبيانات في المسار الهدف.ignore
يفشل بصمت في الكتابة إذا كانت البيانات موجودة في المسار الهدف.
يوضح المثال التالي الكتابة فوق البيانات باستخدام محتويات DataFrame كملفات CSV:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
الخطوات التالية
للاستفادة من المزيد من قدرات Spark على Databricks، راجع: