Bazy danych zapytań używające sterownika JDBC
Usługa Azure Databricks obsługuje nawiązywanie połączeń z zewnętrznymi bazami danych przy użyciu protokołu JDBC. Ten artykuł zawiera podstawową składnię do konfigurowania i używania tych connections oraz przedstawia przykłady w językach Python, SQL i Scala.
Ważne
Konfiguracje opisane w tym artykule są eksperymentalne. Funkcje eksperymentalne są udostępniane zgodnie z oczekiwaniami i nie są obsługiwane przez usługę Databricks za pośrednictwem pomocy technicznej klienta. Aby get pełnej obsługi federacji zapytań, należy zamiast tego użyć Lakehouse Federation, co umożliwia użytkownikom usługi Azure Databricks korzystanie z narzędzi do zarządzania Catalog składnią i danymi aparatu Unity.
Program Partner Connect zapewnia zoptymalizowane integracje na potrzeby synchronizowania danych z wieloma zewnętrznymi źródłami danych. Zobacz Co to jest program Databricks Partner Connect?.
Ważne
Przykłady w tym artykule nie zawierają nazw użytkowników i haseł w adresach URL JDBC. Databricks zaleca używanie sekretów do przechowywania Twojej bazy danych credentials. Na przykład:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Aby odwołać się do wpisów tajnych usługi Databricks w języku SQL, należy skonfigurować właściwość konfiguracji platformy Spark podczas inicjowania klastra.
Aby zapoznać się z pełnym przykładem zarządzania wpisami tajnymi, zobacz Samouczek: tworzenie i używanie wpisu tajnego usługi Databricks.
Odczytywanie danych za pomocą JDBC
Należy skonfigurować wiele ustawień do odczytywania danych przy użyciu JDBC. Należy pamiętać, że każda baza danych używa innego formatu dla elementu <jdbc-url>
.
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark automatycznie odczytuje schema z bazy danych table i mapuje ich typy z powrotem na typy Spark SQL.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
Zapytania można uruchamiać względem tego tableJDBC:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Zapisywanie danych za pomocą JDBC
Zapis danych do tables za pomocą JDBC wymaga konfiguracji podobnych do tych przy odczycie. Zobacz poniższy przykład:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Domyślne zachowanie próbuje utworzyć nowy table i zgłasza błąd, jeśli table o tej nazwie już istnieje.
Dane można dołączyć do istniejącego table przy użyciu następującej składni:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Istniejące table można zastąpić przy użyciu następującej składni:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Sterowanie równoległością zapytań JDBC
Domyślnie sterownik JDBC wysyła zapytanie do źródłowej bazy danych tylko z jednym wątkiem. Aby zwiększyć wydajność operacji odczytu, należy określić szereg opcji kontrolowania liczby równoczesnych zapytań usługi Azure Databricks do bazy danych. W przypadku małych klastrów ustawienie numPartitions
opcji równej liczbie rdzeni funkcji wykonawczej w klastrze gwarantuje, że wszystkie węzły wykonują zapytania dotyczące danych równolegle.
Ostrzeżenie
Ustawienie numPartitions
na dużą wartość w dużym klastrze może spowodować ujemną wydajność zdalnej bazy danych, ponieważ zbyt wiele równoczesnych zapytań może przeciążyć usługę. Jest to szczególnie kłopotliwe w przypadku baz danych aplikacji. Należy uważać, aby ustawić tę wartość powyżej 50.
Uwaga
Przyspiesz zapytania, wybierając column z indeksem obliczonym w bazie danych źródłowej dla partitionColumn
.
W poniższym przykładzie kodu pokazano konfigurowanie równoległości dla klastra z ośmioma rdzeniami:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Uwaga
Usługa Azure Databricks obsługuje wszystkie opcje platformy Apache Spark do konfigurowania JDBC.
Podczas zapisywania w bazach danych przy użyciu JDBC platforma Apache Spark używa liczby partycji w pamięci do kontrolowania równoległości. Dane można ponownie partycjonować przed zapisaniem w celu kontrolowania równoległości. Unikaj dużej liczby partycji w dużych klastrach, aby uniknąć przeciążenia zdalnej bazy danych. W poniższym przykładzie pokazano ponowne partycjonowanie do ośmiu partycji przed zapisaniem:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Wypychanie zapytania do aparatu bazy danych
Możesz wypchnąć całe zapytanie do bazy danych i zwrócić tylko wynik. Parametr table
identyfikuje JDBC table, który ma być odczytany. Możesz użyć dowolnego elementu prawidłowego w klauzuli zapytania FROM
SQL.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Kontrolowanie liczby wierszy pobranych na zapytanie
Sterowniki JDBC mają fetchSize
parametr, który kontroluje liczbę wierszy pobranych w czasie z zdalnej bazy danych.
Ustawienie | Result |
---|---|
Za mało | Duże opóźnienie ze względu na wiele rund (kilka wierszy zwracanych na zapytanie) |
Zbyt wysoka | Błąd braku pamięci (za dużo danych zwracanych w jednym zapytaniu) |
Optymalna wartość jest zależna od obciążenia. Zagadnienia obejmują:
- Ile columns jest zwracanych przez zapytanie?
- Jakie typy danych są zwracane?
- Jak długie są ciągi zwracane w każdej column?
Systemy mogą mieć bardzo małe wartości domyślne i korzystać z dostrajania. Na przykład: wartość domyślna fetchSize
oracle to 10. Zwiększenie go do 100 zmniejsza liczbę całkowitych zapytań, które muszą być wykonywane przez współczynnik 10. Wyniki JDBC są ruchem sieciowym, dlatego unikaj bardzo dużych liczb, ale optymalne values mogą znajdować się w tysiącach dla wielu zestawów danych.
fetchSize
Użyj opcji , jak w poniższym przykładzie:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()