Partager via


Sources de données personnalisées de PySpark

Important

Les sources de données personnalisées PySpark sont en préversion publique dans Databricks Runtime 15.2 (et les versions ultérieures). La prise en charge du streaming est disponible dans Databricks Runtime 15.3 et versions ultérieures.

Une source de données PySpark est créée par l’API DataSource Python (PySpark), qui permet la lecture à partir de sources de données personnalisées et l’écriture dans des récepteurs de données personnalisés dans Apache Spark à l’aide de Python. Vous pouvez utiliser les sources de données PySpark personnalisées pour définir des connexions personnalisées aux systèmes de données et implémenter des fonctionnalités supplémentaires pour générer des sources de données réutilisables.

Classe DataSource

La classe de base DataSource PySpark fournit des méthodes pour créer des lecteurs et des enregistreurs de données.

Implémenter la sous-classe de source de données

Selon votre cas d’usage, les éléments suivants doivent être implémentés par n’importe quelle sous-classe pour rendre une source de données lisible, accessible en écriture ou les deux :

Property ou Method Description
name Obligatoire. Nom de la source de données
schema Obligatoire. Le schéma de la source de données à lire ou écrire
reader() Doit retourner une valeur DataSourceReader pour rendre la source de données lisible (lot)
writer() Doit retourner une valeur DataSourceWriter pour rendre le récepteur de données accessible en écriture (lot)
streamReader() ou simpleStreamReader() Doit retourner une valeur DataSourceStreamReader pour rendre le flux de données lisible (diffusion en continu)
streamWriter() Doit retourner une valeur DataSourceStreamWriter pour rendre le flux de données accessible en écriture (diffusion en continu)

Remarque

Les méthodes définies par DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter et l’utilisateur doivent être sérialisées. En d’autres termes, ils doivent être un dictionnaire ou un dictionnaire imbriqué qui contient un type primitif.

Inscrire la source de données

Après avoir implémenté l’interface, vous devez l’inscrire, puis vous pouvez la charger ou l’utiliser autrement, comme illustré dans l’exemple suivant :

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Exemple 1 : Créer une source de données PySpark pour la requête par lots

Pour illustrer les fonctionnalités de lecteur de source de données PySpark, créez une source de données qui génère des exemples de données à l’aide du package Python faker. Pour plus d’informations sur faker, consultez la documentation Faker.

Installez le package faker à l’aide de la commande suivante :

%pip install faker

Étape 1 : définir l’exemple de source de données

Tout d’abord, définissez votre nouvelle source de données PySpark en tant que sous-classe de DataSource, avec un nom, un schéma et un lecteur. La méthode reader() doit être définie pour lire à partir d’une source de données dans une requête par lots.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Étape 2 : implémenter le lecteur pour une requête par lots

Ensuite, implémentez la logique de lecteur pour générer des exemples de données. Utilisez la bibliothèque faker installée pour remplir chaque champ du schéma.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Étape 3 : inscrire et utiliser l’exemple de source de données

Pour utiliser la source de données, inscrivez-la. Par défaut, la FakeDataSource comporte trois lignes et le schéma par défaut inclut les champs string suivants : name, , date, zipcode, state. L’exemple suivant inscrit, charge et génère un exemple de source de données avec les valeurs par défaut :

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Seuls string champs sont pris en charge, mais vous pouvez spécifier un schéma avec tous les champs correspondant aux champs des fournisseurs de package faker pour générer des données aléatoires pour les tests et le développement. L’exemple suivant charge la source de données avec les champs name et company :

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Pour charger la source de données avec un nombre personnalisé de lignes, spécifiez l’option numRows. L'exemple suivant spécifie 5 lignes :

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Exemple 2 : créer une source de données PySpark pour la lecture et l’écriture en diffuser en continu

Pour illustrer les fonctionnalités de lecteur et d’enregistreur de flux PySpark DataSource, créez un exemple de source de données qui génère deux lignes dans chaque microbatch à l’aide du package Python faker. Pour plus d’informations sur faker, consultez la documentation Faker.

Installez le package faker à l’aide de la commande suivante :

%pip install faker

Étape 1 : définir l’exemple de source de données

Tout d’abord, définissez votre nouvelle source de données PySpark en tant que sous-classe de DataSource avec un nom, un schéma et des méthodes streamReader() et streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Étape 2 : implémenter le lecteur de flux

Ensuite, implémentez l’exemple de lecteur de données de diffusion en continu qui génère deux lignes dans chaque microbatch. Vous pouvez implémenter DataSourceStreamReader, ou si la source de données a un débit faible et ne nécessite pas de partitionnement, vous pouvez l’implémenter SimpleDataSourceStreamReader à la place. simpleStreamReader() ou streamReader() doit être implémentée et simpleStreamReader() est appelée uniquement lorsque streamReader() n’est pas implémentée.

Implémentation de DataSourceStreamReader

L’instance streamReader a un décalage entier qui augmente de 2 dans chaque microbatch, implémenté avec l’interface DataSourceStreamReader.

from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implémentation simpleDataSourceStreamReader

L’instance SimpleStreamReader est identique à l’instance FakeStreamReader qui génère deux lignes dans chaque lot, mais implémentée avec l’interface SimpleDataSourceStreamReader sans partitionnement.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Étape 3 : implémenter l’enregistreur de flux

Implémentez maintenant l’enregistreur de diffusion en continu. Cet enregistreur de données de diffusion en continu écrit les informations de métadonnées de chaque microbatch dans un chemin local.

class SimpleCommitMessage:
   def __init__(self, partition_id: int, count: int):
       self.partition_id = partition_id
       self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Étape 4 : Inscrire et utiliser l’exemple de source de données

Pour utiliser la source de données, inscrivez-la. Une fois enregistré, vous pouvez l’utiliser dans les requêtes de diffusion en continu en tant que source ou récepteur en passant un nom court ou un nom complet à format(). L’exemple suivant inscrit la source de données, puis démarre une requête qui lit à partir de l’exemple de source de données et de sorties dans la console :

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

L’exemple suivant utilise également l’exemple de flux en tant que récepteur et spécifie un chemin de sortie :

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Dépannage

Si la sortie génère l’erreur suivante, votre calcul ne prend pas en charge les sources de données PySpark personnalisées. Vous devez utiliser Databricks Runtime 15.2 ou version ultérieure.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000