البرنامج التعليمي: تحميل البيانات وتحويلها باستخدام Apache Spark DataFrames
يوضح لك هذا البرنامج التعليمي كيفية تحميل البيانات وتحويلها باستخدام Apache Spark Python (PySpark) DataFrame API وApache Spark Scala DataFrame API وSparkR SparkDataFrame API في Azure Databricks.
بنهاية هذا البرنامج التعليمي، سوف تفهم ما هو DataFrame وتكون على دراية بالمهام التالية:
Python
- تعريف المتغيرات ونسخ البيانات العامة إلى وحدة تخزين كتالوج Unity
- إنشاء DataFrame باستخدام Python
- تحميل البيانات في DataFrame من ملف CSV
- عرض إطار بيانات والتفاعل معه
- حفظ DataFrame
- تشغيل استعلامات SQL في PySpark
راجع أيضا مرجع Apache Spark PySpark API.
Scala
- تعريف المتغيرات ونسخ البيانات العامة إلى وحدة تخزين كتالوج Unity
- إنشاء DataFrame باستخدام Scala
- تحميل البيانات في DataFrame من ملف CSV
- عرض DataFrame والتفاعل معه
- حفظ DataFrame
- تشغيل استعلامات SQL في Apache Spark
راجع أيضا مرجع Apache Spark Scala API.
R
- تعريف المتغيرات ونسخ البيانات العامة إلى وحدة تخزين كتالوج Unity
- إنشاء SparkR SparkDataFrames
- تحميل البيانات في DataFrame من ملف CSV
- عرض إطار بيانات والتفاعل معه
- حفظ DataFrame
- تشغيل استعلامات SQL في SparkR
راجع أيضا مرجع Apache SparkR API.
ما هو DataFrame؟
DataFrame هو بنية بيانات ثنائية الأبعاد تحمل أعمدة من أنواع مختلفة محتملة. يمكنك التفكير في DataFrame مثل جدول بيانات أو جدول SQL أو قاموس كائنات السلسلة. توفر Apache Spark DataFrames مجموعة غنية من الوظائف (تحديد الأعمدة، والتصفية، والانضمام، والتجاميع) التي تسمح لك بحل مشكلات تحليل البيانات الشائعة بكفاءة.
Apache Spark DataFrames هي تجريد مبني على مجموعات البيانات الموزعة المرنة (RDDs). يستخدم Spark DataFrames وSpark SQL محرك تخطيط وتحسين موحد، مما يسمح لك بالحصول على أداء متطابق تقريبا عبر جميع اللغات المدعومة على Azure Databricks (Python وSQL وSca وR).
المتطلبات
لإكمال البرنامج التعليمي التالي، يجب أن تفي بالمتطلبات التالية:
لاستخدام الأمثلة في هذا البرنامج التعليمي، يجب تمكين كتالوج Unity لمساحة العمل الخاصة بك.
تستخدم الأمثلة في هذا البرنامج التعليمي وحدة تخزين كتالوج Unity لتخزين بيانات العينة. لاستخدام هذه الأمثلة، قم بإنشاء وحدة تخزين واستخدم أسماء كتالوج وحدة التخزين والمخطط ووحدات التخزين لتعيين مسار وحدة التخزين المستخدمة من قبل الأمثلة.
يجب أن يكون لديك الأذونات التالية في كتالوج Unity:
READ VOLUME
وWRITE VOLUME
، أوALL PRIVILEGES
وحدة التخزين المستخدمة لهذا البرنامج التعليمي.USE SCHEMA
أوALL PRIVILEGES
للمخطط المستخدم لهذا البرنامج التعليمي.USE CATALOG
أوALL PRIVILEGES
للكتالوج المستخدم لهذا البرنامج التعليمي.
لتعيين هذه الأذونات، راجع امتيازات مسؤول Databricks أو كتالوج Unity والعناصر القابلة للتأمين.
تلميح
للحصول على دفتر ملاحظات مكتمل لهذه المقالة، راجع دفاتر ملاحظات البرنامج التعليمي DataFrame.
الخطوة 1: تحديد المتغيرات وتحميل ملف CSV
تحدد هذه الخطوة المتغيرات للاستخدام في هذا البرنامج التعليمي ثم تقوم بتحميل ملف CSV يحتوي على بيانات اسم الطفل من health.data.ny.gov إلى وحدة تخزين كتالوج Unity.
افتح دفتر ملاحظات جديدا بالنقر فوق الأيقونة
. لمعرفة كيفية التنقل في دفاتر ملاحظات Azure Databricks، راجع واجهة دفتر ملاحظات Databricks وعناصر التحكم.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. استبدل
<catalog-name>
و<schema-name>
و<volume-name>
بأسماء الكتالوج والمخطط ووحدات التخزين لوحدة تخزين كتالوج Unity. استبدل<table_name>
باسم جدول من اختيارك. ستقوم بتحميل بيانات اسم الطفل في هذا الجدول لاحقا في هذا البرنامج التعليمي.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
اضغط
Shift+Enter
لتشغيل الخلية وإنشاء خلية فارغة جديدة.انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. تنسخ هذه التعليمة البرمجية
rows.csv
الملف من health.data.ny.gov إلى وحدة تخزين كتالوج Unity باستخدام الأمر Databricks dbutuils .Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
الخطوة 2: إنشاء DataFrame
تنشئ هذه الخطوة DataFrame باسم df1
مع بيانات الاختبار ثم تعرض محتوياته.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. تنشئ هذه التعليمة البرمجية DataFrame مع بيانات الاختبار، ثم تعرض محتويات ومخطط DataFrame.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
الخطوة 3: تحميل البيانات في DataFrame من ملف CSV
تنشئ هذه الخطوة DataFrame باسم df_csv
من ملف CSV الذي قمت بتحميله مسبقا في وحدة تخزين كتالوج Unity. انظر spark.read.csv.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. تقوم هذه التعليمة البرمجية بتحميل بيانات اسم الطفل في DataFrame
df_csv
من ملف CSV ثم تعرض محتويات DataFrame.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
يمكنك تحميل البيانات من العديد من تنسيقات الملفات المدعومة.
الخطوة 4: عرض إطار البيانات والتفاعل معه
اعرض أسماء طفلك وتفاعل معها باستخدام DataFrames باستخدام الطرق التالية.
طباعة مخطط DataFrame
تعرف على كيفية عرض مخطط Apache Spark DataFrame. يستخدم Apache Spark مخطط المصطلح للإشارة إلى أسماء وأنواع البيانات للأعمدة في DataFrame.
إشعار
يستخدم Azure Databricks أيضا مخطط المصطلح لوصف مجموعة من الجداول المسجلة في كتالوج.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تعرض هذه التعليمة البرمجية مخطط DataFrames الخاص بك مع
.printSchema()
أسلوب لعرض مخططات DataFrames - للتحضير لتوحيد إطاري البيانات.Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
إعادة تسمية العمود في DataFrame
تعرف على كيفية إعادة تسمية عمود في DataFrame.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تعيد هذه التعليمة البرمجية تسمية عمود في
df1_csv
DataFrame لمطابقة العمود المعني فيdf1
DataFrame. تستخدم هذه التعليمة البرمجية أسلوب Apache SparkwithColumnRenamed()
.Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
دمج DataFrames
تعرف على كيفية إنشاء DataFrame جديد يضيف صفوف DataFrame إلى آخر.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark
union()
لدمج محتويات DataFramedf
الأول مع DataFramedf_csv
الذي يحتوي على بيانات أسماء الأطفال المحملة من ملف CSV.Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
تصفية الصفوف في DataFrame
اكتشف أسماء الأطفال الأكثر شيوعا في مجموعة البيانات الخاصة بك عن طريق تصفية الصفوف، باستخدام Apache Spark .filter()
أو .where()
الأساليب. استخدم التصفية لتحديد مجموعة فرعية من الصفوف لإرجاعها أو تعديلها في DataFrame. لا يوجد فرق في الأداء أو بناء الجملة، كما هو ملاحظ في الأمثلة التالية.
استخدام أسلوب .filter()
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark
.filter()
لعرض تلك الصفوف في DataFrame بعدد يزيد عن 50.Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
استخدام أسلوب .where()
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark
.where()
لعرض تلك الصفوف في DataFrame بعدد يزيد عن 50.Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
تحديد أعمدة من DataFrame وترتيبها حسب التردد
تعرف على تكرار اسم الطفل باستخدام select()
الأسلوب لتحديد الأعمدة من DataFrame لإرجاعها. استخدم Apache Spark orderby
والوظائف desc
لترتيب النتائج.
توفر الوحدة النمطية pyspark.sql ل Apache Spark الدعم لوظائف SQL. من بين هذه الوظائف التي نستخدمها في هذا البرنامج التعليمي هي وظائف Apache Spark orderBy()
و desc()
و expr()
. يمكنك تمكين استخدام هذه الدالات عن طريق استيرادها إلى جلسة العمل الخاصة بك حسب الحاجة.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستورد هذه التعليمة البرمجية الدالة
desc()
ثم تستخدم أسلوب Apache Sparkselect()
وApache SparkorderBy()
والوظائفdesc()
لعرض الأسماء الأكثر شيوعا وعددها بترتيب تنازلي.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
إنشاء مجموعة فرعية DataFrame
تعرف على كيفية إنشاء مجموعة فرعية DataFrame من DataFrame موجود.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark
filter
لإنشاء DataFrame جديد يقيد البيانات حسب السنة والعدد والجنس. يستخدم أسلوب Apache Sparkselect()
للحد من الأعمدة. كما يستخدم Apache SparkorderBy()
والوظائفdesc()
لفرز DataFrame الجديد حسب العدد.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
الخطوة 5: حفظ DataFrame
تعرف على كيفية حفظ DataFrame. يمكنك إما حفظ DataFrame إلى جدول أو كتابة DataFrame إلى ملف أو ملفات متعددة.
حفظ DataFrame في جدول
يستخدم Azure Databricks تنسيق Delta Lake لكافة الجداول بشكل افتراضي. لحفظ DataFrame الخاص بك، يجب أن يكون لديك CREATE
امتيازات الجدول على الكتالوج والمخطط.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تحفظ هذه التعليمة البرمجية محتويات DataFrame في جدول باستخدام المتغير الذي حددته في بداية هذا البرنامج التعليمي.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
تعمل معظم تطبيقات Apache Spark على مجموعات بيانات كبيرة وبطريقة موزعة. يكتب Apache Spark دليلا من الملفات بدلا من ملف واحد. يقسم Delta Lake مجلدات وملفات Parquet. يمكن للعديد من أنظمة البيانات قراءة هذه الدلائل من الملفات. توصي Azure Databricks باستخدام الجداول عبر مسارات الملفات لمعظم التطبيقات.
حفظ DataFrame إلى ملفات JSON
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تحفظ هذه التعليمة البرمجية DataFrame إلى دليل ملفات JSON.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
قراءة DataFrame من ملف JSON
تعرف على كيفية استخدام أسلوب Apache Spark spark.read.format()
لقراءة بيانات JSON من دليل إلى DataFrame.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تعرض هذه التعليمة البرمجية ملفات JSON التي حفظتها في المثال السابق.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
مهام إضافية: تشغيل استعلامات SQL في PySpark وSc scala وR
توفر Apache Spark DataFrames الخيارات التالية لدمج SQL مع PySpark وSc scala وR. يمكنك تشغيل التعليمات البرمجية التالية في نفس دفتر الملاحظات الذي قمت بإنشائه لهذا البرنامج التعليمي.
تحديد عمود كتعلام SQL
تعرف على كيفية استخدام أسلوب Apache Spark selectExpr()
. هذا هو متغير من select()
الأسلوب الذي يقبل تعبيرات SQL وإرجاع DataFrame محدث. يسمح لك هذا الأسلوب باستخدام تعبير SQL، مثل upper
.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark
selectExpr()
وتعبير SQLupper
لتحويل عمود سلسلة إلى أحرف كبيرة (وإعادة تسمية العمود).Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
استخدم expr()
لاستخدام بناء جملة SQL لعمود
تعرف على كيفية استيراد واستخدام دالة Apache Spark expr()
لاستخدام بناء جملة SQL في أي مكان يتم فيه تحديد عمود.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستورد هذه التعليمة البرمجية الدالة
expr()
ثم تستخدم دالة Apache Sparkexpr()
وتعبير SQLlower
لتحويل عمود سلسلة إلى حالة صغيرة (وإعادة تسمية العمود).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
تشغيل استعلام SQL عشوائي باستخدام الدالة spark.sql()
تعرف على كيفية استخدام وظيفة Apache Spark spark.sql()
لتشغيل استعلامات SQL العشوائية.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية دالة Apache Spark
spark.sql()
للاستعلام عن جدول SQL باستخدام بناء جملة SQL.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.
دفاتر ملاحظات البرنامج التعليمي DataFrame
تتضمن دفاتر الملاحظات التالية أمثلة على الاستعلامات من هذا البرنامج التعليمي.