Udostępnij za pośrednictwem


Bazy danych zapytań używające sterownika JDBC

Usługa Azure Databricks obsługuje nawiązywanie połączeń z zewnętrznymi bazami danych przy użyciu protokołu JDBC. Ten artykuł zawiera podstawową składnię do konfigurowania i używania tych connections oraz przedstawia przykłady w językach Python, SQL i Scala.

Ważne

Konfiguracje opisane w tym artykule są eksperymentalne. Funkcje eksperymentalne są udostępniane zgodnie z oczekiwaniami i nie są obsługiwane przez usługę Databricks za pośrednictwem pomocy technicznej klienta. Aby get pełnej obsługi federacji zapytań, należy zamiast tego użyć Lakehouse Federation, co umożliwia użytkownikom usługi Azure Databricks korzystanie z narzędzi do zarządzania Catalog składnią i danymi aparatu Unity.

Program Partner Connect zapewnia zoptymalizowane integracje na potrzeby synchronizowania danych z wieloma zewnętrznymi źródłami danych. Zobacz Co to jest program Databricks Partner Connect?.

Ważne

Przykłady w tym artykule nie zawierają nazw użytkowników i haseł w adresach URL JDBC. Databricks zaleca używanie sekretów do przechowywania Twojej bazy danych credentials. Na przykład:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Aby odwołać się do wpisów tajnych usługi Databricks w języku SQL, należy skonfigurować właściwość konfiguracji platformy Spark podczas inicjowania klastra.

Aby zapoznać się z pełnym przykładem zarządzania wpisami tajnymi, zobacz Samouczek: tworzenie i używanie wpisu tajnego usługi Databricks.

Odczytywanie danych za pomocą JDBC

Należy skonfigurować wiele ustawień do odczytywania danych przy użyciu JDBC. Należy pamiętać, że każda baza danych używa innego formatu dla elementu <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark automatycznie odczytuje schema z bazy danych table i mapuje ich typy z powrotem na typy Spark SQL.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Zapytania można uruchamiać względem tego tableJDBC:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Zapisywanie danych za pomocą JDBC

Zapis danych do tables za pomocą JDBC wymaga konfiguracji podobnych do tych przy odczycie. Zobacz poniższy przykład:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Domyślne zachowanie próbuje utworzyć nowy table i zgłasza błąd, jeśli table o tej nazwie już istnieje.

Dane można dołączyć do istniejącego table przy użyciu następującej składni:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Istniejące table można zastąpić przy użyciu następującej składni:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Sterowanie równoległością zapytań JDBC

Domyślnie sterownik JDBC wysyła zapytanie do źródłowej bazy danych tylko z jednym wątkiem. Aby zwiększyć wydajność operacji odczytu, należy określić szereg opcji kontrolowania liczby równoczesnych zapytań usługi Azure Databricks do bazy danych. W przypadku małych klastrów ustawienie numPartitions opcji równej liczbie rdzeni funkcji wykonawczej w klastrze gwarantuje, że wszystkie węzły wykonują zapytania dotyczące danych równolegle.

Ostrzeżenie

Ustawienie numPartitions na dużą wartość w dużym klastrze może spowodować ujemną wydajność zdalnej bazy danych, ponieważ zbyt wiele równoczesnych zapytań może przeciążyć usługę. Jest to szczególnie kłopotliwe w przypadku baz danych aplikacji. Należy uważać, aby ustawić tę wartość powyżej 50.

Uwaga

Przyspiesz zapytania, wybierając column z indeksem obliczonym w bazie danych źródłowej dla partitionColumn.

W poniższym przykładzie kodu pokazano konfigurowanie równoległości dla klastra z ośmioma rdzeniami:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Uwaga

Usługa Azure Databricks obsługuje wszystkie opcje platformy Apache Spark do konfigurowania JDBC.

Podczas zapisywania w bazach danych przy użyciu JDBC platforma Apache Spark używa liczby partycji w pamięci do kontrolowania równoległości. Dane można ponownie partycjonować przed zapisaniem w celu kontrolowania równoległości. Unikaj dużej liczby partycji w dużych klastrach, aby uniknąć przeciążenia zdalnej bazy danych. W poniższym przykładzie pokazano ponowne partycjonowanie do ośmiu partycji przed zapisaniem:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Wypychanie zapytania do aparatu bazy danych

Możesz wypchnąć całe zapytanie do bazy danych i zwrócić tylko wynik. Parametr table identyfikuje JDBC table, który ma być odczytany. Możesz użyć dowolnego elementu prawidłowego w klauzuli zapytania FROM SQL.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Kontrolowanie liczby wierszy pobranych na zapytanie

Sterowniki JDBC mają fetchSize parametr, który kontroluje liczbę wierszy pobranych w czasie z zdalnej bazy danych.

Ustawienie Result
Za mało Duże opóźnienie ze względu na wiele rund (kilka wierszy zwracanych na zapytanie)
Zbyt wysoka Błąd braku pamięci (za dużo danych zwracanych w jednym zapytaniu)

Optymalna wartość jest zależna od obciążenia. Zagadnienia obejmują:

  • Ile columns jest zwracanych przez zapytanie?
  • Jakie typy danych są zwracane?
  • Jak długie są ciągi zwracane w każdej column?

Systemy mogą mieć bardzo małe wartości domyślne i korzystać z dostrajania. Na przykład: wartość domyślna fetchSize oracle to 10. Zwiększenie go do 100 zmniejsza liczbę całkowitych zapytań, które muszą być wykonywane przez współczynnik 10. Wyniki JDBC są ruchem sieciowym, dlatego unikaj bardzo dużych liczb, ale optymalne values mogą znajdować się w tysiącach dla wielu zestawów danych.

fetchSize Użyj opcji , jak w poniższym przykładzie:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()