Mengkueri database menggunakan JDBC
Azure Databricks mendukung menyambungkan ke database eksternal menggunakan JDBC. Artikel ini menyediakan sintaks dasar untuk mengonfigurasi dan menggunakan koneksi ini dengan contoh di Python, SQL, dan Scala.
Penting
Konfigurasi yang dijelaskan dalam artikel ini bersifat Eksperimental. Fitur eksperimental disediakan apa adanya dan tidak didukung oleh Databricks melalui dukungan teknis pelanggan. Untuk mendapatkan dukungan federasi kueri penuh, Anda harus menggunakan Lakehouse Federation, yang memungkinkan pengguna Azure Databricks Anda memanfaatkan sintaks Katalog Unity dan alat tata kelola data.
Partner Connect menyediakan integrasi yang dioptimalkan untuk menyinkronkan data dengan banyak sumber data eksternal. Lihat Apa itu Databricks Partner Connect?.
Penting
Contoh dalam artikel ini tidak menyertakan nama pengguna dan kata sandi di URL JDBC. Databricks merekomendasikan penggunaan rahasia untuk menyimpan kredensial database Anda. Contohnya:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Untuk mengacu pada rahasia Databricks dengan SQL, Anda harus mengonfigurasi properti konfigurasi Spark selama inisialisasi kluster.
Untuk contoh lengkap manajemen rahasia, lihat Tutorial: Membuat dan menggunakan rahasia dari Databricks.
Anda harus mengonfigurasi sejumlah pengaturan untuk membaca data menggunakan JDBC. Perhatikan bahwa setiap database menggunakan format yang berbeda untuk <jdbc-url>
.
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark secara otomatis membaca skema dari tabel database dan memetakan jenisnya kembali ke jenis Spark SQL.
employees_table.printSchema
DESCRIBE employees_table_vw
employees_table.printSchema
Anda dapat menjalankan kueri terhadap tabel JDBC ini:
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Menyimpan data ke tabel dengan JDBC menggunakan konfigurasi serupa untuk membaca. Lihat contoh berikut:
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Perilaku default mencoba membuat tabel baru dan melemparkan kesalahan jika tabel dengan nama tersebut sudah ada.
Anda dapat menambahkan data ke tabel yang sudah ada menggunakan sintaks berikut:
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Anda bisa menimpa tabel yang sudah ada menggunakan sintaks berikut:
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Secara default, driver JDBC meminta database sumber hanya dengan satu utas. Untuk meningkatkan performa bacaan, Anda perlu menentukan sejumlah opsi untuk mengontrol berapa banyak kueri simultan yang dibuat Azure Databricks ke database Anda. Untuk kluster kecil, mengatur opsi numPartitions
sama dengan jumlah inti pengolah di kluster Anda memastikan bahwa semua simpul melakukan kueri data secara paralel.
Peringatan
Mengatur numPartitions
ke nilai tinggi pada kluster besar dapat mengakibatkan performa negatif untuk database jarak jauh, karena terlalu banyak kueri simultan yang mungkin membanjiri layanan. Ini sangat merepotkan untuk database aplikasi. Waspadalah terhadap pengaturan nilai ini di atas 50.
Catatan
Percepat kueri dengan memilih kolom dengan indeks yang dihitung dalam database sumber untuk partitionColumn
.
Contoh kode berikut menunjukkan konfigurasi paralelisme untuk kluster dengan delapan inti:
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Catatan
Azure Databricks mendukung semua opsi Apache Spark untuk mengonfigurasi JDBC.
Saat menulis ke database menggunakan JDBC, Apache Spark menggunakan jumlah partisi dalam memori untuk mengontrol paralelisme. Anda dapat mempartisi ulang data sebelum menulis untuk mengontrol paralelisme. Hindari jumlah partisi yang tinggi pada kluster besar untuk menghindari kewalahan database jarak jauh Anda. Contoh berikut menunjukkan repartisi ke delapan partisi sebelum proses penulisan.
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Anda dapat mendelegasikan seluruh kueri ke basis data dan hanya menerima hasilnya. Parameter table
mengidentifikasi tabel JDBC untuk dibaca. Anda dapat menggunakan apa pun yang valid dalam klausa FROM
kueri SQL.
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Driver JDBC memiliki fetchSize
parameter yang mengontrol jumlah baris yang diambil pada satu waktu dari database jarak jauh.
Pengaturan | Hasil |
---|---|
Terlalu rendah | Latensi tinggi karena banyak perputaran data (beberapa baris yang dikembalikan untuk setiap kueri) |
Terlalu tinggi | Kesalahan kehabisan memori (terlalu banyak data yang dikembalikan dalam satu kueri) |
Nilai optimal tergantung pada beban kerja. Pertimbangannya meliputi:
- Berapa banyak kolom yang dikembalikan oleh kueri?
- Jenis data apa yang dikembalikan?
- Berapa lama string di setiap kolom dikembalikan?
Sistem mungkin menerapkan pengaturan awal yang sangat kecil dan dapat diuntungkan dari penyetelan. Misalnya: Default fetchSize
Oracle adalah 10. Meningkatkannya menjadi 100 mengurangi jumlah total kueri yang perlu dijalankan dengan faktor 10. Hasil JDBC adalah lalu lintas jaringan, jadi hindari jumlah yang sangat besar, tetapi nilai optimal mungkin dalam ribuan untuk banyak himpunan data.
fetchSize
Gunakan opsi , seperti dalam contoh berikut:
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()