الاستعلام عن قواعد البيانات باستخدام JDBC
يدعم Azure Databricks الاتصال بقواعد البيانات الخارجية باستخدام JDBC. توفر هذه المقالة بناء الجملة الأساسي لتكوين هذه الاتصالات واستخدامها مع أمثلة في Python وSQL وSc scala.
هام
التكوينات الموضحة في هذه المقالة تجريبية. يتم توفير الميزات التجريبية كما هي ولا تدعمها Databricks من خلال الدعم التقني للعملاء. للحصول على دعم اتحاد الاستعلام الكامل، يجب عليك بدلا من ذلك استخدام Lakehouse Federation، والذي يمكن مستخدمي Azure Databricks من الاستفادة من بناء جملة كتالوج Unity وأدوات إدارة البيانات.
يوفر Partner Connect عمليات تكامل محسنة لمزامنة البيانات مع العديد من مصادر البيانات الخارجية. راجع ما هو اتصال شريك Databricks؟.
هام
لا تتضمن الأمثلة الواردة في هذه المقالة أسماء المستخدمين وكلمات المرور في عناوين URL ل JDBC. توصي Databricks باستخدام البيانات السرية لتخزين بيانات اعتماد قاعدة البيانات الخاصة بك. على سبيل المثال:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
للإشارة إلى أسرار Databricks باستخدام SQL، يجب تكوين خاصية تكوين Spark أثناء استخدام نظام المجموعة.
للحصول على مثال كامل لإدارة البيانات السرية، راجع البرنامج التعليمي: إنشاء واستخدام البيانات السرية Databricks.
قراءة البيانات باستخدام JDBC
يجب تكوين عدد من الإعدادات لقراءة البيانات باستخدام JDBC. لاحظ أن كل قاعدة بيانات تستخدم تنسيقا مختلفا ل <jdbc-url>
.
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
يقرأ Spark المخطط تلقائيا من جدول قاعدة البيانات ويعيد تعيين أنواعه إلى أنواع Spark SQL.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
يمكنك تشغيل الاستعلامات مقابل جدول JDBC هذا:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
كتابة البيانات باستخدام JDBC
يستخدم حفظ البيانات في الجداول باستخدام JDBC تكوينات مماثلة للقراءة. راجع المثال التالي:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
يحاول السلوك الافتراضي إنشاء جدول جديد ويطرح خطأ إذا كان هناك جدول بهذا الاسم موجود بالفعل.
يمكنك إلحاق البيانات بجدول موجود باستخدام بناء الجملة التالي:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
يمكنك الكتابة فوق جدول موجود باستخدام بناء الجملة التالي:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
التحكم في التوازي في استعلامات JDBC
بشكل افتراضي، يستعلم برنامج تشغيل JDBC عن قاعدة البيانات المصدر مع مؤشر ترابط واحد فقط. لتحسين أداء القراءات، تحتاج إلى تحديد عدد من الخيارات للتحكم في عدد الاستعلامات المتزامنة التي يقوم بها Azure Databricks إلى قاعدة البيانات الخاصة بك. بالنسبة للمجموعات الصغيرة، يضمن تعيين numPartitions
الخيار يساوي عدد الذاكرات الأساسية للمنفذ في مجموعتك أن جميع العقد تستعلم عن البيانات بالتوازي.
تحذير
يمكن أن يؤدي الإعداد numPartitions
إلى قيمة عالية على مجموعة كبيرة إلى أداء سلبي لقاعدة البيانات البعيدة، حيث قد يطغى عدد كبير جدا من الاستعلامات المتزامنة على الخدمة. هذا أمر مزعج بشكل خاص لقواعد بيانات التطبيق. كن حذرا من تعيين هذه القيمة فوق 50.
إشعار
تسريع الاستعلامات عن طريق تحديد عمود مع فهرس محسوب في قاعدة البيانات المصدر ل partitionColumn
.
يوضح مثال التعليمات البرمجية التالي تكوين التوازي لنظام مجموعة مع ثمانية ذاكرات أساسية:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
إشعار
يدعم Azure Databricks جميع خيارات Apache Spark لتكوين JDBC.
عند الكتابة إلى قواعد البيانات باستخدام JDBC، يستخدم Apache Spark عدد الأقسام في الذاكرة للتحكم في التوازي. يمكنك إعادة تقسيم البيانات قبل الكتابة للتحكم في التوازي. تجنب العدد الكبير من الأقسام على مجموعات كبيرة لتجنب إرباك قاعدة البيانات البعيدة. يوضح المثال التالي إعادة تقسيم إلى ثمانية أقسام قبل الكتابة:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
دفع استعلام لأسفل إلى محرك قاعدة البيانات
يمكنك دفع استعلام كامل إلى قاعدة البيانات وإرجاع النتيجة فقط. table
تحدد المعلمة جدول JDBC للقراءة. يمكنك استخدام أي شيء صالح في عبارة استعلام FROM
SQL.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
التحكم في عدد الصفوف التي تم جلبها لكل استعلام
تحتوي برامج تشغيل JDBC على معلمة fetchSize
تتحكم في عدد الصفوف التي تم جلبها في كل مرة من قاعدة البيانات البعيدة.
الإعدادات | نتيجة |
---|---|
منخفض جدا | زمن انتقال عال بسبب العديد من الجولات (عدد قليل من الصفوف التي تم إرجاعها لكل استعلام) |
مرتفع جدا | خطأ نفاد الذاكرة (تم إرجاع الكثير من البيانات في استعلام واحد) |
القيمة المثلى هي اعتماد حمل العمل. وتشمل الاعتبارات ما يلي:
- كم عدد الأعمدة التي يتم إرجاعها بواسطة الاستعلام؟
- ما هي أنواع البيانات التي يتم إرجاعها؟
- كم من الوقت يتم إرجاع السلاسل في كل عمود؟
قد يكون للأنظمة افتراضية صغيرة جدا وتستفيد من الضبط. على سبيل المثال: القيمة الافتراضية fetchSize
ل Oracle هي 10. تؤدي زيادته إلى 100 إلى تقليل عدد الاستعلامات الإجمالية التي تحتاج إلى تنفيذها بواسطة عامل 10. نتائج JDBC هي حركة مرور الشبكة، لذا تجنب أعدادا كبيرة جدا، ولكن قد تكون القيم المثلى بالآلاف للعديد من مجموعات البيانات.
fetchSize
استخدم الخيار ، كما في المثال التالي:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()