Поделиться через


Пользовательские источники данных PySpark

Внимание

Пользовательские источники данных PySpark находятся в общедоступной предварительной версии в Databricks Runtime 15.2 и выше, а также в бессерверной среде версии 2. Поддержка потоковой передачи доступна в Databricks Runtime 15.3 и выше.

PySpark DataSource создается API-интерфейсом DataSource Python (PySpark), который позволяет читать из пользовательских источников данных и записывать их в пользовательские приемники данных в Apache Spark с помощью Python. Пользовательские источники данных PySpark можно использовать для определения пользовательских подключений к системам данных и реализации дополнительных функций для создания повторно используемых источников данных.

Класс DataSource

PySpark DataSource — это базовый класс, который предоставляет методы для создания средств чтения и записи данных.

Реализация подкласса источника данных

В зависимости от варианта использования любой подкласс должен реализовать следующие методы, чтобы источник данных стал доступен для чтения, записи или обоих:

Свойство или метод Описание
name Обязательное. Имя источника данных
schema Обязательное. Схема источника данных для чтения или записи
reader() Должен быть возвращен DataSourceReader, чтобы источник данных был доступен для чтения (в режиме пакетной обработки)
writer() Необходимо вернуть DataSourceWriter, чтобы приемник данных стал доступен для записи (пакетная обработка)
streamReader() или simpleStreamReader() Необходимо вернуть DataSourceStreamReader для обеспечения возможности чтения потока данных.
streamWriter() Необходимо вернуть DataSourceStreamWriter, чтобы поток данных можно было сделать доступным для записи (потоковая передача)

Примечание.

Определяемые пользователем DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter и их методы должны иметь возможность быть сериализованными. Другими словами, они должны представлять собой словарь или вложенный словарь, который содержит примитивный тип.

Регистрация источника данных

После реализации интерфейса необходимо зарегистрировать его, затем можно загрузить или использовать его, как показано в следующем примере:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Пример 1. Создание источника данных PySpark для пакетного запроса

Чтобы продемонстрировать возможности чтения PySpark DataSource, создайте источник данных, который создает примеры данных с помощью faker пакета Python. Для получения дополнительной информации о faker см. документацию Faker.

Установите пакет с помощью следующей faker команды:

%pip install faker

Шаг 1. Определение примера DataSource

Сначала определите новый PySpark DataSource как подкласс DataSource с именем, схемой и читателем. Метод reader() должен быть определен для чтения из источника данных в пакетном запросе.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Шаг 2. Реализация средства чтения для пакетного запроса

Затем реализуйте логику чтения для создания примеров данных. Используйте установленную библиотеку faker для заполнения каждого поля в схеме.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Шаг 3. Регистрация и использование примера источника данных

Чтобы использовать источник данных, зарегистрируйте его. По умолчанию FakeDataSource имеет три строки, а схема включает следующие поля string: name, date, zipcode, state. В следующем примере регистрируются, загружаются и выводятся данные из примера источника данных с настройками по умолчанию.

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Поддерживаются только поля string, но можно указать схему с любыми полями, соответствующими полям поставщиков пакетов faker для создания случайных данных для тестирования и разработки. В следующем примере загружается источник данных с полями name и company.

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Чтобы загрузить источник данных с пользовательским числом строк, укажите этот параметр numRows. В следующем примере указано 5 строк:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Пример 2. Создание PySpark DataSource для потоковой передачи чтения и записи

Чтобы продемонстрировать возможности средства чтения потоков и записи PySpark DataSource, создайте пример источника данных, который создает две строки в каждом микробатче faker с помощью пакета Python. Дополнительные сведения смотрите в документации по Faker faker.

Установите пакет с помощью следующей faker команды:

%pip install faker

Шаг 1. Определение примера DataSource

Сначала определите новый Источник данных PySpark в качестве подкласса DataSource с именем, схемой и методами streamReader() и streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Шаг 2. Реализация средства чтения потоков

Затем реализуйте пример средства чтения потоковых данных, который создает две строки в каждом микробатче. Можно реализовать DataSourceStreamReaderили, если источник данных имеет низкую пропускную способность и не требует секционирования, можно реализовать SimpleDataSourceStreamReader вместо этого. Либо simpleStreamReader(), либо streamReader() должны быть реализованы, и simpleStreamReader() вызывается только в том случае, если streamReader() не реализован.

Реализация DataSourceStreamReader

Экземпляр streamReader имеет целочисленное смещение, которое увеличивается на 2 в каждом микробатче, реализованном с помощью интерфейса DataSourceStreamReader.

from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Реализация SimpleDataSourceStreamReader

Экземпляр SimpleStreamReader совпадает с экземпляром FakeStreamReader , который создает две строки в каждом пакете, но реализуется с интерфейсом SimpleDataSourceStreamReader без секционирования.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Шаг 3. Реализация модуля записи потоков

Теперь реализуйте модуль записи потоковой передачи. Этот модуль для записи потоковых данных записывает метаданные каждого микробатча в локальную директорию.

class SimpleCommitMessage:
   def __init__(self, partition_id: int, count: int):
       self.partition_id = partition_id
       self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Шаг 4. Регистрация и использование примера источника данных

Чтобы использовать источник данных, зарегистрируйте его. После регистрации его можно использовать в потоковых запросах в качестве источника или потребителя, передав короткое имя или полное имя format(). В следующем примере регистрируется источник данных, затем запускается запрос, который считывает данные из примера источника и выводит их в консоль.

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

Кроме того, в следующем примере в качестве приемника используется пример потока и указывается выходной путь:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Устранение неполадок

Если выходные данные являются следующей ошибкой, вычислительные ресурсы не поддерживают пользовательские источники данных PySpark. Необходимо использовать Databricks Runtime 15.2 или более поздней версии.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000