Руководство. Запуск сквозного конвейера аналитики Lakehouse
В этом руководстве показано, как настроить сквозной аналитический конвейер для lakehouse на платформе Azure Databricks.
Внимание
В этом руководстве используются интерактивные записные книжки для выполнения распространенных задач извлечения, преобразования и загрузки в Python в кластерах с поддержкой каталога Unity. Если вы не используете Unity Catalog, см. статью Выполнение первой ETL-выгрузки в Azure Databricks.
Задачи в этом руководстве
К концу этой статьи вы почувствуете себя уверенно.
- Запуск вычислительного кластера с поддержкой каталога Unity.
- Создание записной книжки Databricks.
- Запись и чтение данных из внешнего расположения каталога Unity.
- Настройка инкрементальной загрузки данных в таблицу каталога Unity с использованием Auto Loader.
- Выполнение ячеек записной книжки для обработки, запроса и предварительного просмотра данных.
- Планирование записной книжки в качестве задания Databricks.
- Запрос к таблицам каталога Unity из Databricks SQL
Azure Databricks предоставляет набор готовых к использованию средств, позволяющих специалистам по обработке и анализу данных быстро разрабатывать и развертывать конвейеры извлечения, преобразования и загрузки (ETL). Каталог Unity позволяет администраторам данных настраивать и защищать учетные данные хранения, внешние расположения и объекты базы данных для пользователей по всей организации. Databricks SQL позволяет аналитикам выполнять SQL-запросы к тем же таблицам, которые используются в рабочих нагрузках ETL, что позволяет выполнять бизнес-аналитику в реальном времени в большом масштабе.
Вы также можете использовать DLT для создания конвейеров ETL. Databricks создал DLT для снижения сложности создания, развертывания и обслуживания рабочих конвейеров ETL. См. руководство : запуск вашего первого конвейера DLT.
Требования
Примечание.
Если у вас нет прав управления кластером, вы по-прежнему можете выполнить большинство описанных ниже действий при наличии у вас доступа к кластеру.
Шаг 1. Создание кластера
Чтобы выполнить исследовательский анализ данных и инжиниринг данных, создайте кластер для предоставления вычислительных ресурсов, необходимых для выполнения команд.
- Щелкните "
Вычисления" на боковой панели.
- Нажмите кнопку
"Создать" на боковой панели и выберите "Кластер". Откроется страница нового кластера или вычислений.
- Укажите уникальное имя для кластера.
- В разделе производительность выберите переключатель (радиокнопка) один узел.
- В разделе Advanced, установите режим доступа на Ручной, затем выберите Выделенный.
- В отдельный пользователь или группавыберите имя пользователя.
- Выберите требуемую версию среды выполнения Databricks, 11.1 или более поздней, чтобы использовать каталог Unity.
- Нажмите кнопку "Создать вычисления" , чтобы создать кластер.
Дополнительные сведения о кластерах Databricks см. в статье "Вычисления".
Шаг 2. Создание записной книжки Databricks
Чтобы создать записную книжку в рабочей области, нажмите кнопку "Создать" на боковой панели и нажмите кнопку "Записная книжка". Пустая записная книжка открывается в рабочей области.
Дополнительные сведения о создании записных книжек и управлении ими см. в статье Управление записными книжками.
Шаг 3. Запись и чтение данных из внешнего расположения, управляемого каталогом Unity
В Databricks рекомендуется использовать Автозагрузчик для добавочного приема данных. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище.
Используйте каталог Unity для управления безопасным доступом к внешним расположениям. Пользователи или прикладные программные субъекты с разрешениями READ FILES
для внешнего расположения могут использовать Auto Loader для приема данных.
Обычно данные поступают во внешнее расположение в результате записи из других систем. В этой демонстрации можно имитировать поступление данных, записывая JSON-файлы во внешнее хранилище.
Скопируйте приведенный ниже код в ячейку записной книжки. Замените строковое значение catalog
на имя каталога с разрешениями CREATE CATALOG
и USE CATALOG
. Замените строковое значение external_location
на путь к внешнему ресурсу, имеющему разрешения READ FILES
, WRITE FILES
и CREATE EXTERNAL TABLE
.
Внешние расположения можно определить как весь контейнер хранилища, но часто они указывают на каталог, вложенный в контейнер.
Правильный формат пути к внешнему расположению: "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
При выполнении этой ячейки должна быть напечатана строка длиной 12 байтов, затем выведена строка "Hello world!" и показаны все базы данных, имеющиеся в предоставленном каталоге. Если вы не можете запустить эту ячейку, убедитесь, что вы находитесь в рабочей области с включенной поддержкой каталога Unity, и запросите соответствующие разрешения у администратора рабочей области для завершения работы с этим учебником.
Приведенный ниже код Python использует ваш адрес электронной почты для создания уникальной базы данных в указанном каталоге и уникального хранилища в предоставленном внешнем расположении. При выполнении этой ячейки будут удалены все данные, связанные с этим руководством, что позволяет выполнять этот пример индемпотентно. Класс создается и определяется, который вы будете использовать для имитации поступления пакетов данных из подключенной системы в ваше исходное внешнее расположение.
Скопируйте этот код в новую ячейку в записной книжке и выполните его, чтобы настроить среду.
Примечание.
Переменные, определенные в этом коде, должны позволить безопасно выполнять его без риска конфликта с существующими ресурсами рабочей области или другими пользователями. Ограниченные разрешения сети или хранилища приведут к ошибкам при выполнении этого кода; обратитесь к администратору рабочей области, чтобы решить проблему этих ограничений.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
Теперь вы можете загрузить пакет данных, скопировав следующий код в ячейку и выполнив его. Эту ячейку можно выполнить вручную до 60 раз, чтобы активировать поступление новых данных.
RawData.land_batch()
Шаг 4. Настройка автозагрузчика для приема данных в каталог Unity
Databricks рекомендует хранить данные с использованием формата Delta Lake. Delta Lake — это уровень хранения с открытым исходным кодом, который обеспечивает транзакции ACID и развитие озера данных. Delta Lake — это формат по умолчанию для таблиц, созданных в Databricks.
Чтобы настроить автозагрузчик для приема данных в таблицу каталога Unity, скопируйте следующий код в пустую ячейку записной книжки:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Дополнительные сведения об автозагрузчике см. в статье Автозагрузчик.
Дополнительные сведения о структурированной потоковой передаче с помощью каталога Unity см. в разделе Использование каталога Unity с структурированной потоковой передачей.
Шаг 5. Обработка данных и взаимодействие с ними
Ноутбуки выполняют логику поочерёдно, ячейка за ячейкой. Выполните следующие действия, чтобы применить логику в вашей ячейке:
Чтобы запустить ячейку, выполненную на предыдущем шаге, выберите ячейку и нажмите клавиши SHIFT+ВВОД.
Чтобы запросить только что созданную таблицу, скопируйте следующий код в пустую ячейку, а затем нажмите клавиши SHIFT+ВВОД, чтобы выполнить код в этой ячейке.
df = spark.read.table(table)
Чтобы просмотреть данные в только что созданной таблице, скопируйте следующий код в пустую ячейку, а затем нажмите клавиши SHIFT+ВВОД, чтобы выполнить код в этой ячейке.
display(df)
Дополнительные сведения об интерактивных параметрах визуализации данных см. в разделе "Визуализации" в записных книжках Databricks.
Шаг 6. Планирование задания
Записные книжки Databricks можно запускать в качестве производственных сценариев, добавив их в качестве задачи в задание Databricks. В этом шаге вы создадите новое задание, которое можно активировать вручную.
Чтобы запланировать записную книжку в качестве задачи, выполните следующие действия.
- Нажмите Расписание справа от строки заголовка.
- Введите уникальное Имя задания.
- Выберите Вручную.
- В раскрывающемся списке Кластер выберите кластер, созданный на шаге 1.
- Нажмите кнопку Создать.
- В открывшемся окне нажмите Запустить сейчас.
- Чтобы просмотреть результаты выполнения задания, щелкните на иконку
рядом с меткой времени последнего выполнения.
Дополнительные сведения о заданиях см. в разделе Что такое задания?.
Шаг 7. Запрос таблицы из Databricks SQL
Любой пользователь с USE CATALOG
разрешением на текущий каталог, USE SCHEMA
разрешение на текущую схему и SELECT
разрешения на таблицу могут запрашивать содержимое таблицы из предпочтительного API Databricks.
Для выполнения запросов в Databricks SQL требуется доступ к работающему хранилищу SQL.
Таблица, созданная ранее в этом руководстве, называется target_table
. Вы можете выполнить запрос, используя каталог, предоставленный в первой ячейке, и базу данных с шаблоном e2e_lakehouse_<your-username>
. Обозреватель каталогов можно использовать для поиска созданных объектов данных.
Дополнительные интеграции
Дополнительные сведения об интеграции и средствах для инжиниринга данных с помощью Azure Databricks: