مشاركة عبر


الاستعلام عن قواعد البيانات باستخدام JDBC

يدعم Azure Databricks الاتصال بقواعد البيانات الخارجية باستخدام JDBC. توفر هذه المقالة بناء الجملة الأساسي لتكوين هذه الاتصالات واستخدامها مع أمثلة في Python وSQL وSc scala.

هام

التكوينات الموضحة في هذه المقالة تجريبية. يتم توفير الميزات التجريبية كما هي ولا تدعمها Databricks من خلال الدعم التقني للعملاء. للحصول على دعم اتحاد الاستعلام الكامل، يجب عليك بدلا من ذلك استخدام Lakehouse Federation، والذي يمكن مستخدمي Azure Databricks من الاستفادة من بناء جملة كتالوج Unity وأدوات إدارة البيانات.

يوفر Partner Connect عمليات تكامل محسنة لمزامنة البيانات مع العديد من مصادر البيانات الخارجية. راجع ما هو اتصال شريك Databricks؟.

هام

لا تتضمن الأمثلة الواردة في هذه المقالة أسماء المستخدمين وكلمات المرور في عناوين URL ل JDBC. توصي Databricks باستخدام البيانات السرية لتخزين بيانات اعتماد قاعدة البيانات الخاصة بك. على سبيل المثال:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

للإشارة إلى أسرار Databricks باستخدام SQL، يجب تكوين خاصية تكوين Spark أثناء استخدام نظام المجموعة.

للحصول على مثال كامل لإدارة البيانات السرية، راجع البرنامج التعليمي: إنشاء واستخدام البيانات السرية Databricks.

قراءة البيانات باستخدام JDBC

يجب تكوين عدد من الإعدادات لقراءة البيانات باستخدام JDBC. لاحظ أن كل قاعدة بيانات تستخدم تنسيقا مختلفا ل <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

يقرأ Spark المخطط تلقائيا من جدول قاعدة البيانات ويعيد تعيين أنواعه إلى أنواع Spark SQL.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

يمكنك تشغيل الاستعلامات مقابل جدول JDBC هذا:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

كتابة البيانات باستخدام JDBC

يستخدم حفظ البيانات في الجداول باستخدام JDBC تكوينات مماثلة للقراءة. راجع المثال التالي:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

يحاول السلوك الافتراضي إنشاء جدول جديد ويطرح خطأ إذا كان هناك جدول بهذا الاسم موجود بالفعل.

يمكنك إلحاق البيانات بجدول موجود باستخدام بناء الجملة التالي:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

يمكنك الكتابة فوق جدول موجود باستخدام بناء الجملة التالي:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

التحكم في التوازي في استعلامات JDBC

بشكل افتراضي، يستعلم برنامج تشغيل JDBC عن قاعدة البيانات المصدر مع مؤشر ترابط واحد فقط. لتحسين أداء القراءات، تحتاج إلى تحديد عدد من الخيارات للتحكم في عدد الاستعلامات المتزامنة التي يقوم بها Azure Databricks إلى قاعدة البيانات الخاصة بك. بالنسبة للمجموعات الصغيرة، يضمن تعيين numPartitions الخيار يساوي عدد الذاكرات الأساسية للمنفذ في مجموعتك أن جميع العقد تستعلم عن البيانات بالتوازي.

تحذير

يمكن أن يؤدي الإعداد numPartitions إلى قيمة عالية على مجموعة كبيرة إلى أداء سلبي لقاعدة البيانات البعيدة، حيث قد يطغى عدد كبير جدا من الاستعلامات المتزامنة على الخدمة. هذا أمر مزعج بشكل خاص لقواعد بيانات التطبيق. كن حذرا من تعيين هذه القيمة فوق 50.

إشعار

تسريع الاستعلامات عن طريق تحديد عمود مع فهرس محسوب في قاعدة البيانات المصدر ل partitionColumn.

يوضح مثال التعليمات البرمجية التالي تكوين التوازي لنظام مجموعة مع ثمانية ذاكرات أساسية:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

إشعار

يدعم Azure Databricks جميع خيارات Apache Spark لتكوين JDBC.

عند الكتابة إلى قواعد البيانات باستخدام JDBC، يستخدم Apache Spark عدد الأقسام في الذاكرة للتحكم في التوازي. يمكنك إعادة تقسيم البيانات قبل الكتابة للتحكم في التوازي. تجنب العدد الكبير من الأقسام على مجموعات كبيرة لتجنب إرباك قاعدة البيانات البعيدة. يوضح المثال التالي إعادة تقسيم إلى ثمانية أقسام قبل الكتابة:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

دفع استعلام لأسفل إلى محرك قاعدة البيانات

يمكنك دفع استعلام كامل إلى قاعدة البيانات وإرجاع النتيجة فقط. table تحدد المعلمة جدول JDBC للقراءة. يمكنك استخدام أي شيء صالح في عبارة استعلام FROM SQL.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

التحكم في عدد الصفوف التي تم جلبها لكل استعلام

تحتوي برامج تشغيل JDBC على معلمة fetchSize تتحكم في عدد الصفوف التي تم جلبها في كل مرة من قاعدة البيانات البعيدة.

الإعدادات نتيجة
منخفض جدا زمن انتقال عال بسبب العديد من الجولات (عدد قليل من الصفوف التي تم إرجاعها لكل استعلام)
مرتفع جدا خطأ نفاد الذاكرة (تم إرجاع الكثير من البيانات في استعلام واحد)

القيمة المثلى هي اعتماد حمل العمل. وتشمل الاعتبارات ما يلي:

  • كم عدد الأعمدة التي يتم إرجاعها بواسطة الاستعلام؟
  • ما هي أنواع البيانات التي يتم إرجاعها؟
  • كم من الوقت يتم إرجاع السلاسل في كل عمود؟

قد يكون للأنظمة افتراضية صغيرة جدا وتستفيد من الضبط. على سبيل المثال: القيمة الافتراضية fetchSize ل Oracle هي 10. تؤدي زيادته إلى 100 إلى تقليل عدد الاستعلامات الإجمالية التي تحتاج إلى تنفيذها بواسطة عامل 10. نتائج JDBC هي حركة مرور الشبكة، لذا تجنب أعدادا كبيرة جدا، ولكن قد تكون القيم المثلى بالآلاف للعديد من مجموعات البيانات.

fetchSize استخدم الخيار ، كما في المثال التالي:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()