Megosztás a következőn keresztül:


JDBC-t használó adatbázisok lekérdezése

Az Azure Databricks támogatja a külső adatbázisokhoz való csatlakozást A JDBC használatával. Ez a cikk az connections konfigurálásának és használatának alapvető szintaxisát ismerteti a Python, az SQL és a Scala példáival.

Fontos

A cikkben ismertetett konfigurációk kísérleti jellegűek. A kísérleti funkciókat a Databricks jelenleg is biztosítja, és a Databricks nem támogatja az ügyfél technikai támogatásával. A lekérdezési összevonás teljes támogatásának get ehelyett Lakehouse Federationkell használnia, amely lehetővé teszi, hogy az Azure Databricks-felhasználók kihasználhassák a Unity Catalog szintaxisát és adatszabályozási eszközeit.

A Partner Connect optimalizált integrációkat biztosít az adatok számos külső adatforrással való szinkronizálásához. Lásd : Mi az a Databricks Partner Connect?.

Fontos

A cikkben szereplő példák nem tartalmaznak felhasználóneveket és jelszavakat a JDBC URL-címeiben. A Databricks titkos kulcsok használatát javasolja az adatbázis credentialstárolásához. Példa:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

A Databricks-titkos kódok SQL-hez való hivatkozásához konfigurálnia kell egy Spark-konfigurációs tulajdonságot a fürt initilizálása során.

A titkos kódok kezelésére vonatkozó teljes példáért tekintse meg az oktatóanyagot: Databricks-titkos kód létrehozása és használata.

Adatok olvasása a JDBC-vel

Több beállítást kell konfigurálnia az adatok JDBC használatával való olvasásához. Vegye figyelembe, hogy minden adatbázis más formátumot használ a <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

A Spark automatikusan beolvassa a schema-t az table adatbázisból, és a típusokat a Spark SQL-típusokra képezi le.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Lekérdezéseket futtathat ezen a JDBC tablekapcsolaton:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Adatok írása A JDBC használatával

Az adatok a tables-ra JDBC-vel való mentése hasonló beállításokat alkalmaz, mint az olvasásnál. Lásd a következő példát:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Az alapértelmezett viselkedés új table próbál létrehozni, és hibát jelez, ha már létezik ilyen nevű table.

Meglévő table-hoz az alábbi szintaxissal fűzhet hozzá adatokat:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Egy meglévő table az alábbi szintaxissal írhatja felül:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Párhuzamosság szabályozása JDBC-lekérdezésekhez

Alapértelmezés szerint a JDBC-illesztő csak egyetlen szálal lekérdezi a forrásadatbázist. Az olvasási teljesítmény javítása érdekében számos lehetőséget kell megadnia annak szabályozásához, hogy az Azure Databricks hány egyidejű lekérdezést végez az adatbázison. Kis fürtök esetén a numPartitions fürt végrehajtó magjainak számával egyenlő beállítással biztosítható, hogy az összes csomópont párhuzamosan kérdezi le az adatokat.

Figyelmeztetés

Ha nagy méretű fürtön állít be numPartitions magas értéket, az negatív teljesítményt eredményezhet a távoli adatbázis számára, mivel túl sok egyidejű lekérdezés túlterhelheti a szolgáltatást. Ez különösen zavaró az alkalmazás-adatbázisok esetében. Ügyeljen arra, hogy ezt az értéket 50 fölé állítsa.

Feljegyzés

A lekérdezések felgyorsítása a partitionColumnforrásadatbázisában kiszámított indexet tartalmazó column kiválasztásával.

Az alábbi példakód bemutatja, hogyan konfigurálható a párhuzamosság egy nyolc maggal rendelkező fürthöz:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Feljegyzés

Az Azure Databricks támogatja az Összes Apache Spark-beállítást a JDBC konfigurálásához.

Amikor JDBC-vel ír adatbázisba, az Apache Spark a memóriában lévő partíciók számát használja a párhuzamosság szabályozásához. A párhuzamosság szabályozásához írás előtt újrapartitálhatja az adatokat. A távoli adatbázis túlterhelésének elkerülése érdekében kerülje a nagy méretű fürtök nagy számú partícióját. Az alábbi példa nyolc partícióra való újraparticionálást mutat be írás előtt:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Lekérdezés leküldése az adatbázismotorba

Leküldhet egy teljes lekérdezést az adatbázisba, és csak az eredményt adja vissza. A table paraméter azonosítja az olvasni kívánt JDBC-table. Az SQL-lekérdezési FROM záradékban érvényes bármit használhat.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Lekérdezésenként lekért sorok számának szabályozása

A JDBC-illesztőprogramok olyan fetchSize paraméterrel rendelkeznek, amely a távoli adatbázisból egyszerre beolvasott sorok számát szabályozza.

Beállítás Eredmény
Túl alacsony Nagy késés sok kerekítés miatt (lekérdezésenként néhány sor visszaadva)
Túl magas Memóriahiba (túl sok adat egy lekérdezésben)

Az optimális érték a számítási feladatoktól függ. Megfontolandó szempontok:

  • Hány columns ad vissza a lekérdezés?
  • Milyen adattípusokat ad vissza?
  • Milyen hosszúak a karakterláncok, amelyeket az egyes column visszaad?

Előfordulhat, hogy a rendszerek alapértelmezett értéke nagyon kicsi, és a finomhangolás is előnyös lehet. Például: Az Oracle alapértelmezett értéke fetchSize 10. Ha 100-ra növeli azt, az összes lekérdezés számát csökkenti, amelyet 10-es tényezővel kell végrehajtani. A JDBC-eredmények hálózati forgalomnak minősülnek, ezért kerülje a nagyon nagy számokat, de az optimális values sok adathalmaz esetében ezres nagyságrendben lehetnek.

Használja a fetchSize lehetőséget, ahogyan az alábbi példában is látható:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()