Поделиться через


Руководство. Запуск сквозного конвейера аналитики Lakehouse

В этом руководстве показано, как настроить сквозной аналитический конвейер для lakehouse на платформе Azure Databricks.

Внимание

В этом руководстве используются интерактивные записные книжки для выполнения распространенных задач извлечения, преобразования и загрузки в Python в кластерах с поддержкой каталога Unity. Если вы не используете Unity Catalog, см. статью Выполнение первой ETL-выгрузки в Azure Databricks.

Задачи в этом руководстве

К концу этой статьи вы почувствуете себя уверенно.

  1. Запуск вычислительного кластера с поддержкой каталога Unity.
  2. Создание записной книжки Databricks.
  3. Запись и чтение данных из внешнего расположения каталога Unity.
  4. Настройка инкрементальной загрузки данных в таблицу каталога Unity с использованием Auto Loader.
  5. Выполнение ячеек записной книжки для обработки, запроса и предварительного просмотра данных.
  6. Планирование записной книжки в качестве задания Databricks.
  7. Запрос к таблицам каталога Unity из Databricks SQL

Azure Databricks предоставляет набор готовых к использованию средств, позволяющих специалистам по обработке и анализу данных быстро разрабатывать и развертывать конвейеры извлечения, преобразования и загрузки (ETL). Каталог Unity позволяет администраторам данных настраивать и защищать учетные данные хранения, внешние расположения и объекты базы данных для пользователей по всей организации. Databricks SQL позволяет аналитикам выполнять SQL-запросы к тем же таблицам, которые используются в рабочих нагрузках ETL, что позволяет выполнять бизнес-аналитику в реальном времени в большом масштабе.

Вы также можете использовать DLT для создания конвейеров ETL. Databricks создал DLT для снижения сложности создания, развертывания и обслуживания рабочих конвейеров ETL. См. руководство : запуск вашего первого конвейера DLT.

Требования

Примечание.

Если у вас нет прав управления кластером, вы по-прежнему можете выполнить большинство описанных ниже действий при наличии у вас доступа к кластеру.

Шаг 1. Создание кластера

Чтобы выполнить исследовательский анализ данных и инжиниринг данных, создайте кластер для предоставления вычислительных ресурсов, необходимых для выполнения команд.

  1. Щелкните "Значок вычисленийВычисления" на боковой панели.
  2. Нажмите кнопку Значок "Создать" на боковой панели и выберите "Кластер". Откроется страница нового кластера или вычислений.
  3. Укажите уникальное имя для кластера.
  4. В разделе производительность выберите переключатель (радиокнопка) один узел.
  5. В разделе Advanced, установите режим доступа на Ручной, затем выберите Выделенный.
  6. В отдельный пользователь или группавыберите имя пользователя.
  7. Выберите требуемую версию среды выполнения Databricks, 11.1 или более поздней, чтобы использовать каталог Unity.
  8. Нажмите кнопку "Создать вычисления" , чтобы создать кластер.

Дополнительные сведения о кластерах Databricks см. в статье "Вычисления".

Шаг 2. Создание записной книжки Databricks

Чтобы создать записную книжку в рабочей области, нажмите кнопку Значок "Создать" на боковой панели и нажмите кнопку "Записная книжка". Пустая записная книжка открывается в рабочей области.

Дополнительные сведения о создании записных книжек и управлении ими см. в статье Управление записными книжками.

Шаг 3. Запись и чтение данных из внешнего расположения, управляемого каталогом Unity

В Databricks рекомендуется использовать Автозагрузчик для добавочного приема данных. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище.

Используйте каталог Unity для управления безопасным доступом к внешним расположениям. Пользователи или прикладные программные субъекты с разрешениями READ FILES для внешнего расположения могут использовать Auto Loader для приема данных.

Обычно данные поступают во внешнее расположение в результате записи из других систем. В этой демонстрации можно имитировать поступление данных, записывая JSON-файлы во внешнее хранилище.

Скопируйте приведенный ниже код в ячейку записной книжки. Замените строковое значение catalog на имя каталога с разрешениями CREATE CATALOG и USE CATALOG. Замените строковое значение external_location на путь к внешнему ресурсу, имеющему разрешения READ FILES, WRITE FILES и CREATE EXTERNAL TABLE.

Внешние расположения можно определить как весь контейнер хранилища, но часто они указывают на каталог, вложенный в контейнер.

Правильный формат пути к внешнему расположению: "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

При выполнении этой ячейки должна быть напечатана строка длиной 12 байтов, затем выведена строка "Hello world!" и показаны все базы данных, имеющиеся в предоставленном каталоге. Если вы не можете запустить эту ячейку, убедитесь, что вы находитесь в рабочей области с включенной поддержкой каталога Unity, и запросите соответствующие разрешения у администратора рабочей области для завершения работы с этим учебником.

Приведенный ниже код Python использует ваш адрес электронной почты для создания уникальной базы данных в указанном каталоге и уникального хранилища в предоставленном внешнем расположении. При выполнении этой ячейки будут удалены все данные, связанные с этим руководством, что позволяет выполнять этот пример индемпотентно. Класс создается и определяется, который вы будете использовать для имитации поступления пакетов данных из подключенной системы в ваше исходное внешнее расположение.

Скопируйте этот код в новую ячейку в записной книжке и выполните его, чтобы настроить среду.

Примечание.

Переменные, определенные в этом коде, должны позволить безопасно выполнять его без риска конфликта с существующими ресурсами рабочей области или другими пользователями. Ограниченные разрешения сети или хранилища приведут к ошибкам при выполнении этого кода; обратитесь к администратору рабочей области, чтобы решить проблему этих ограничений.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

Теперь вы можете загрузить пакет данных, скопировав следующий код в ячейку и выполнив его. Эту ячейку можно выполнить вручную до 60 раз, чтобы активировать поступление новых данных.

RawData.land_batch()

Шаг 4. Настройка автозагрузчика для приема данных в каталог Unity

Databricks рекомендует хранить данные с использованием формата Delta Lake. Delta Lake — это уровень хранения с открытым исходным кодом, который обеспечивает транзакции ACID и развитие озера данных. Delta Lake — это формат по умолчанию для таблиц, созданных в Databricks.

Чтобы настроить автозагрузчик для приема данных в таблицу каталога Unity, скопируйте следующий код в пустую ячейку записной книжки:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(source)
  .select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Дополнительные сведения об автозагрузчике см. в статье Автозагрузчик.

Дополнительные сведения о структурированной потоковой передаче с помощью каталога Unity см. в разделе Использование каталога Unity с структурированной потоковой передачей.

Шаг 5. Обработка данных и взаимодействие с ними

Ноутбуки выполняют логику поочерёдно, ячейка за ячейкой. Выполните следующие действия, чтобы применить логику в вашей ячейке:

  1. Чтобы запустить ячейку, выполненную на предыдущем шаге, выберите ячейку и нажмите клавиши SHIFT+ВВОД.

  2. Чтобы запросить только что созданную таблицу, скопируйте следующий код в пустую ячейку, а затем нажмите клавиши SHIFT+ВВОД, чтобы выполнить код в этой ячейке.

    df = spark.read.table(table)
    
  3. Чтобы просмотреть данные в только что созданной таблице, скопируйте следующий код в пустую ячейку, а затем нажмите клавиши SHIFT+ВВОД, чтобы выполнить код в этой ячейке.

    display(df)
    

Дополнительные сведения об интерактивных параметрах визуализации данных см. в разделе "Визуализации" в записных книжках Databricks.

Шаг 6. Планирование задания

Записные книжки Databricks можно запускать в качестве производственных сценариев, добавив их в качестве задачи в задание Databricks. В этом шаге вы создадите новое задание, которое можно активировать вручную.

Чтобы запланировать записную книжку в качестве задачи, выполните следующие действия.

  1. Нажмите Расписание справа от строки заголовка.
  2. Введите уникальное Имя задания.
  3. Выберите Вручную.
  4. В раскрывающемся списке Кластер выберите кластер, созданный на шаге 1.
  5. Нажмите кнопку Создать.
  6. В открывшемся окне нажмите Запустить сейчас.
  7. Чтобы просмотреть результаты выполнения задания, щелкните на иконку Внешняя ссылка рядом с меткой времени последнего выполнения.

Дополнительные сведения о заданиях см. в разделе Что такое задания?.

Шаг 7. Запрос таблицы из Databricks SQL

Любой пользователь с USE CATALOG разрешением на текущий каталог, USE SCHEMA разрешение на текущую схему и SELECT разрешения на таблицу могут запрашивать содержимое таблицы из предпочтительного API Databricks.

Для выполнения запросов в Databricks SQL требуется доступ к работающему хранилищу SQL.

Таблица, созданная ранее в этом руководстве, называется target_table. Вы можете выполнить запрос, используя каталог, предоставленный в первой ячейке, и базу данных с шаблоном e2e_lakehouse_<your-username>. Обозреватель каталогов можно использовать для поиска созданных объектов данных.

Дополнительные интеграции

Дополнительные сведения об интеграции и средствах для инжиниринга данных с помощью Azure Databricks: