تنسيق وظائف Azure Databricks باستخدام Apache Airflow
توضح هذه المقالة دعم Apache Airflow لتنسيق مسارات البيانات باستخدام Azure Databricks، وتحتوي على إرشادات لتثبيت وتكوين Airflow محليا، وتوفر مثالا لنشر وتشغيل سير عمل Azure Databricks مع Airflow.
تزامن الوظيفة في مسار بيانات
غالبا ما يتطلب تطوير ونشر مسار معالجة البيانات إدارة التبعيات المعقدة بين المهام. على سبيل المثال، قد يقرأ المسار البيانات من مصدر، وتنظيف البيانات، وتحويل البيانات التي تم تنظيفها، وكتابة البيانات المحولة إلى هدف. تحتاج أيضا إلى دعم لاختبار الأخطاء وجدولتها واستكشاف الأخطاء وإصلاحها عند تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية.
تعالج أنظمة سير العمل هذه التحديات من خلال السماح لك بتحديد التبعيات بين المهام، والجدول الزمني عند تشغيل المسارات، ومراقبة مهام سير العمل. Apache Airflow هو حل مصدر مفتوح لإدارة وجدولة مسارات البيانات. يمثل تدفق الهواء خطوط أنابيب البيانات كرسوم بيانية دورةية موجهة (DAGs) للعمليات. يمكنك تحديد سير عمل في ملف Python، ويدير Airflow الجدولة والتنفيذ. يتيح لك اتصال Airflow Azure Databricks الاستفادة من محرك Spark المحسن الذي تقدمه Azure Databricks مع ميزات الجدولة ل Airflow.
المتطلبات
- يتطلب التكامل بين Airflow وAzure Databricks الإصدار 2.5.0 من Airflow والإصدارات الأحدث. يتم اختبار الأمثلة في هذه المقالة مع Airflow الإصدار 2.6.1.
- يتطلب تدفق الهواء Python 3.8 أو 3.9 أو 3.10 أو 3.11. يتم اختبار الأمثلة في هذه المقالة باستخدام Python 3.8.
- تتطلب الإرشادات الواردة في هذه المقالة لتثبيت وتشغيل Airflow pipenv لإنشاء بيئة Python الظاهرية.
مشغلو تدفق الهواء ل Databricks
يتكون Airflow DAG من مهام، حيث تقوم كل مهمة بتشغيل مشغل تدفق الهواء. يتم تنفيذ مشغلي تدفق الهواء الذين يدعمون التكامل مع Databricks في موفر Databricks.
يتضمن موفر Databricks عوامل تشغيل لتشغيل عدد من المهام مقابل مساحة عمل Azure Databricks، بما في ذلك استيراد البيانات إلى جدول، وتشغيل استعلامات SQL، والعمل مع مجلدات Databricks Git.
يقوم موفر Databricks بتنفيذ عاملي تشغيل لتشغيل المهام:
- يتطلب DatabricksRunNowOperator مهمة Azure Databricks موجودة ويستخدم طلب POST /api/2.1/jobs/run-now API لتشغيل تشغيل. توصي Databricks باستخدام
DatabricksRunNowOperator
لأنه يقلل من تكرار تعريفات الوظائف، ويمكن العثور على تشغيل الوظائف التي تم تشغيلها مع عامل التشغيل هذا في واجهة مستخدم الوظائف. - لا يتطلب DatabricksSubmitRunOperator وجود وظيفة في Azure Databricks ويستخدم طلب POST /api/2.1/jobs/runs/submit API لإرسال مواصفات المهمة وتشغيل تشغيل.
لإنشاء مهمة Azure Databricks جديدة أو إعادة تعيين مهمة موجودة، يقوم موفر Databricks بتنفيذ DatabricksCreateJobsOperator. DatabricksCreateJobsOperator
يستخدم POST /api/2.1/jobs/create وPOST /api/2.1/jobs/reset API requests. يمكنك استخدام DatabricksCreateJobsOperator
مع DatabricksRunNowOperator
لإنشاء وظيفة وتشغيلها.
إشعار
يتطلب استخدام عوامل تشغيل Databricks لتشغيل مهمة توفير بيانات اعتماد في تكوين اتصال Databricks. راجع إنشاء رمز مميز للوصول الشخصي ل Azure Databricks ل Airflow.
يكتب مشغلو Databricks Airflow عنوان URL لصفحة تشغيل المهمة إلى سجلات Airflow كل polling_period_seconds
(الافتراضي هو 30 ثانية). لمزيد من المعلومات، راجع صفحة حزمة apache-airflow-providers-databricks على موقع Airflow على الويب.
تثبيت تكامل Airflow Azure Databricks محليا
لتثبيت Airflow وموفر Databricks محليا للاختبار والتطوير، استخدم الخطوات التالية. للحصول على خيارات تثبيت Airflow الأخرى، بما في ذلك إنشاء تثبيت إنتاج، راجع التثبيت في وثائق Airflow.
افتح محطة طرفية ثم قم بتشغيل الأوامر التالية:
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>
استبدل <firstname>
و <lastname>
و <email>
باسم المستخدم والبريد الإلكتروني. ستتم مطالبتك بإدخال كلمة مرور للمستخدم المسؤول. تأكد من حفظ كلمة المرور هذه لأنها مطلوبة لتسجيل الدخول إلى واجهة مستخدم Airflow.
ينفذ هذا البرنامج النصي الخطوات التالية:
- إنشاء دليل باسم
airflow
والتغييرات في هذا الدليل. - يستخدم
pipenv
لإنشاء بيئة Python الظاهرية وإنشاءها. توصي Databricks باستخدام بيئة Python الظاهرية لعزل إصدارات الحزمة وتبعيات التعليمات البرمجية لتلك البيئة. يساعد هذا العزل على تقليل عدم تطابق إصدار الحزمة غير المتوقع وتضارب تبعية التعليمات البرمجية. - تهيئة متغير بيئة يسمى
AIRFLOW_HOME
تعيين إلى مسارairflow
الدليل. - تثبيت Airflow وحزم موفر Airflow Databricks.
airflow/dags
إنشاء دليل. يستخدمdags
Airflow الدليل لتخزين تعريفات DAG.- تهيئة قاعدة بيانات SQLite التي يستخدمها Airflow لتعقب بيانات التعريف. في توزيع تدفق الهواء للإنتاج، يمكنك تكوين Airflow مع قاعدة بيانات قياسية. تتم تهيئة قاعدة بيانات SQLite والتكوين الافتراضي لنشر Airflow في
airflow
الدليل. - إنشاء مستخدم مسؤول ل Airflow.
تلميح
لتأكيد تثبيت موفر Databricks، قم بتشغيل الأمر التالي في دليل تثبيت Airflow:
airflow providers list
بدء تشغيل خادم ويب Airflow والمجدول
مطلوب خادم ويب Airflow لعرض واجهة مستخدم Airflow. لبدء تشغيل خادم الويب، افتح محطة طرفية في دليل تثبيت Airflow وقم بتشغيل الأوامر التالية:
إشعار
إذا فشل خادم ويب Airflow في البدء بسبب تعارض في المنفذ، يمكنك تغيير المنفذ الافتراضي في تكوين Airflow.
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver
المجدول هو مكون Airflow الذي يقوم بجدولة DAGs. لبدء المجدول، افتح محطة طرفية جديدة في دليل تثبيت Airflow وقم بتشغيل الأوامر التالية:
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
اختبار تثبيت Airflow
للتحقق من تثبيت Airflow، يمكنك تشغيل أحد أمثلة DAGs المضمنة في Airflow:
- في نافذة المستعرض، افتح
http://localhost:8080/home
. سجل الدخول إلى واجهة مستخدم Airflow باستخدام اسم المستخدم وكلمة المرور التي أنشأتها عند تثبيت Airflow. تظهر صفحة Airflow DAGs . - انقر فوق مفتاح التبديل Pause/Unpause DAG لإلغاء استخدام أحد أمثلة DAGs، على سبيل المثال،
example_python_operator
. - قم بتشغيل مثال DAG بالنقر فوق الزر Trigger DAG .
- انقر فوق اسم DAG لعرض التفاصيل، بما في ذلك حالة تشغيل DAG.
إنشاء رمز مميز للوصول الشخصي ل Azure Databricks ل Airflow
يتصل Airflow ب Databricks باستخدام رمز الوصول الشخصي Azure Databricks (PAT). لإنشاء PAT، اتبع الخطوات الواردة في الرموز المميزة للوصول الشخصي Azure Databricks لمستخدمي مساحة العمل.
إشعار
كأفضل ممارسة أمان، عند المصادقة باستخدام الأدوات والأنظمة والبرامج النصية والتطبيقات التلقائية، توصي Databricks باستخدام رموز الوصول الشخصية التي تنتمي إلى كيانات الخدمة بدلا من مستخدمي مساحة العمل. لإنشاء رموز مميزة لكيانات الخدمة، راجع إدارة الرموز المميزة لكيان الخدمة.
يمكنك أيضا المصادقة على Azure Databricks باستخدام رمز مميز لمعرف Microsoft Entra. راجع اتصال Databricks في وثائق Airflow.
تكوين اتصال Azure Databricks
يحتوي تثبيت Airflow على اتصال افتراضي ل Azure Databricks. لتحديث الاتصال للاتصال بمساحة العمل باستخدام رمز الوصول الشخصي الذي أنشأته أعلاه:
- في نافذة المستعرض، افتح
http://localhost:8080/connection/list/
. إذا تمت مطالبتك بتسجيل الدخول، أدخل اسم المستخدم وكلمة المرور للمسؤول. - ضمن معرف Conn، حدد موقع databricks_default وانقر فوق الزر تحرير السجل .
- استبدل القيمة في حقل المضيف باسم مثيل مساحة العمل لنشر Azure Databricks، على سبيل المثال،
https://adb-123456789.cloud.databricks.com
. - في حقل كلمة المرور ، أدخل رمز الوصول الشخصي الخاص بك في Azure Databricks.
- انقر فوق حفظ.
إذا كنت تستخدم رمزا مميزا لمعرف Microsoft Entra، فشاهد اتصال Databricks في وثائق Airflow للحصول على معلومات حول تكوين المصادقة.
مثال: إنشاء Airflow DAG لتشغيل وظيفة Azure Databricks
يوضح المثال التالي كيفية إنشاء نشر Airflow بسيط يعمل على جهازك المحلي وينشر مثالا على DAG لتشغيل عمليات التشغيل في Azure Databricks. في هذا المثال، سوف:
- إنشاء دفتر ملاحظات جديد وإضافة تعليمة برمجية لطباعة ترحيب استنادا إلى معلمة مكونة.
- إنشاء مهمة Azure Databricks مع مهمة واحدة تقوم بتشغيل دفتر الملاحظات.
- تكوين اتصال Airflow بمساحة عمل Azure Databricks.
- إنشاء Airflow DAG لتشغيل مهمة دفتر الملاحظات. يمكنك تعريف DAG في برنامج نصي Python باستخدام
DatabricksRunNowOperator
. - استخدم واجهة مستخدم Airflow لتشغيل DAG وعرض حالة التشغيل.
إنشاء دفتر ملاحظات
يستخدم هذا المثال دفتر ملاحظات يحتوي على خليتين:
- تحتوي الخلية الأولى على عنصر واجهة مستخدم نص Databricks Utilities يحدد متغيرا يسمى
greeting
معينا إلى القيمةworld
الافتراضية . - تطبع الخلية الثانية قيمة
greeting
المتغير مسبوقا بhello
.
لإنشاء دفتر الملاحظات:
انتقل إلى مساحة عمل Azure Databricks، وانقر فوق
جديد في الشريط الجانبي، وحدد دفتر الملاحظات.
امنح دفتر ملاحظاتك اسما، مثل Hello Airflow، وتأكد من تعيين اللغة الافتراضية إلى Python.
انسخ التعليمة البرمجية ل Python التالية والصقها في الخلية الأولى من دفتر الملاحظات.
dbutils.widgets.text("greeting", "world", "Greeting") greeting = dbutils.widgets.get("greeting")
أضف خلية جديدة أسفل الخلية الأولى وانسخ التعليمة البرمجية ل Python التالية والصقها في الخلية الجديدة:
print("hello {}".format(greeting))
قم بإنشاء وظيفة
انقر فوق
مهام سير العمل في الشريط الجانبي.
انقر فوق
.
تظهر علامة التبويب المهام مع مربع الحوار إنشاء مهمة.
استبدل إضافة اسم لمهمتك... باسم وظيفتك.
في حقل اسم المهمة، أدخل اسما للمهمة، على سبيل المثال، greeting-task.
في القائمة المنسدلة النوع ، حدد دفتر الملاحظات.
في القائمة المنسدلة Source ، حدد Workspace.
انقر فوق مربع النص المسار واستخدم مستعرض الملفات للعثور على دفتر الملاحظات الذي أنشأته، وانقر فوق اسم دفتر الملاحظات، ثم انقر فوق تأكيد.
انقر فوق Add ضمن Parameters. في حقل المفتاح ، أدخل
greeting
. في حقل القيمة ، أدخلAirflow user
.انقر فوق إنشاء مهمة.
في لوحة Job details ، انسخ قيمة Job ID . هذه القيمة مطلوبة لتشغيل المهمة من Airflow.
تشغيل المهمة
لاختبار وظيفتك الجديدة في واجهة مستخدم وظائف Azure Databricks، انقر في الزاوية العلوية اليسرى. عند اكتمال التشغيل، يمكنك التحقق من الإخراج عن طريق عرض تفاصيل تشغيل الوظيفة.
إنشاء Airflow DAG جديد
يمكنك تعريف Airflow DAG في ملف Python. لإنشاء DAG لتشغيل مهمة دفتر الملاحظات المثال:
في محرر نص أو IDE، أنشئ ملفا جديدا باسم
databricks_dag.py
بالمحتويات التالية:from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow' } with DAG('databricks_dag', start_date = days_ago(2), schedule_interval = None, default_args = default_args ) as dag: opr_run_now = DatabricksRunNowOperator( task_id = 'run_now', databricks_conn_id = 'databricks_default', job_id = JOB_ID )
استبدل
JOB_ID
بقيمة معرف المهمة المحفوظة سابقا.احفظ الملف في
airflow/dags
الدليل. يقوم Airflow تلقائيا بقراءة ملفات DAG المخزنة فيairflow/dags/
.
تثبيت وتحقق من DAG في Airflow
لتشغيل DAG والتحقق منها في واجهة مستخدم Airflow:
- في نافذة المستعرض، افتح
http://localhost:8080/home
. تظهر شاشة Airflow DAGs . - حدد موقع
databricks_dag
زر التبديل إيقاف مؤقت/إلغاء استخدام DAG وانقر فوقه لإلغاء إيقاف DAG. - قم بتشغيل DAG بالنقر فوق الزر Trigger DAG .
- انقر فوق تشغيل في العمود Run لعرض حالة التشغيل وتفاصيله.