Megosztás a következőn keresztül:


PySpark egyéni adatforrások

Fontos

A PySpark egyéni adatforrásai nyilvános előnézetben vannak a Databricks Runtime 15.2-es és újabb verzióiban, valamint a kiszolgáló nélküli környezet 2-es verziójában. A streamelés támogatása a Databricks Runtime 15.3-ban és újabb verziókban érhető el.

A PySpark DataSource-t a Python (PySpark) DataSource API hozza létre, amely lehetővé teszi az egyéni adatforrásokból való olvasást és az Apache Spark egyéni adatgyűjtőkbe való írását a Python használatával. A PySpark egyéni adatforrásokkal egyéni kapcsolatokat definiálhat az adatrendszerekhez, és további funkciókat valósíthat meg az újrafelhasználható adatforrások kiépítéséhez.

DataSource osztály

A PySpark DataSource egy alaposztály, amely metódusokat biztosít az adatolvasók és írók létrehozásához.

Az adatforrás alosztályának implementálása

A használati esettől függően az alábbiakat kell implementálnia bármely alosztálynak, hogy egy adatforrás olvasható, írható vagy mindkettő legyen:

Tulajdonság vagy metódus Leírás
name Szükséges. Az adatforrás neve
schema Szükséges. Az olvasandó vagy írandó adatforrás sémája
reader() Vissza kell adnia egy értéket DataSourceReader az adatforrás olvashatóvá tételéhez (köteg)
writer() Vissza kell adnia egy értéket DataSourceWriter az adatgyűjtő írhatóvá tételéhez (köteg)
streamReader() vagy simpleStreamReader() Vissza kell adnia egy DataSourceStreamReader elemet, hogy az adatfolyam olvasható legyen (streamelés)
streamWriter() Vissza kell adnia egy objektumot DataSourceStreamWriter annak érdekében, hogy az adatfolyam írható legyen (streamelés)

Megjegyzés

A felhasználó által definiált DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter, , és metódusainak szerializálhatónak kell lenniük. Más szóval egy primitív típust tartalmazó szótárnak vagy beágyazott szótárnak kell lennie.

Az adatforrás regisztrálása

A felület implementálása után regisztrálnia kell azt, majd betöltheti vagy más módon használhatja az alábbi példában látható módon:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

1. példa: PySpark DataSource létrehozása kötegelt lekérdezéshez

A PySpark DataSource olvasói képességeinek bemutatásához hozzon létre egy adatforrást, amely a Python-csomag használatával faker hoz létre példaadatokat. További információkért fakerlásd a Faker dokumentációját.

Telepítse a faker csomagot a következő paranccsal:

%pip install faker

1. lépés: A DataSource példa definiálása

Először is definiálja az új PySpark DataSource-t egy névvel, sémával és olvasóval rendelkező DataSource alosztályaként. A reader() metódust úgy kell definiálni, hogy kötegelt lekérdezésnél adatforrásból olvasson.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

2. lépés: Az olvasó kialakítása tömeges lekérdezéshez

Ezután implementálja az olvasó logikáját a példaadatok létrehozásához. A telepített faker kódtár használatával töltse ki a séma egyes mezőinek tartalmát.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

3. lépés: A minta adatforrás regisztrálása és használata

Az adatforrás használatához regisztráljon. Alapértelmezés szerint a FakeDataSource három sorból áll, és a séma a következő string mezőket tartalmazza: name, date, zipcode, state. Az alábbi példa az alapértelmezett értékekkel regisztrálja, betölti és kimeneteli a példaadatforrást:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Csak string mezők támogatottak, de megadhat egy olyan sémát, amely megfelel faker csomagszolgáltatók mezőinek, hogy véletlenszerű adatokat generáljon teszteléshez és fejlesztéshez. Az alábbi példa betölti az adatforrást a name és company mezőkkel:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Ha egyéni számú sorból szeretné betölteni az adatforrást, adja meg a numRows beállítást. Az alábbi példa 5 sort határoz meg:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

2. példa: PySpark DataSource létrehozása olvasási és írási streameléshez

A PySpark DataSource streamolvasói és -írói képességeinek bemutatásához hozzon létre egy példaadatforrást, amely két sort hoz létre minden mikrobatchben a faker Python-csomag használatával. További információkért fakerlásd a Faker dokumentációját.

Telepítse a faker csomagot a következő paranccsal:

%pip install faker

1. lépés: A DataSource példa definiálása

Először is definiálja az új PySpark DataSource-t a DataSource alosztályaként egy névvel, sémával és metódussal streamReader() és streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

2. lépés: A streamolvasó implementálása

Ezután implementálja a példa streamelési adatolvasót, amely minden mikrobatchben két sort hoz létre. Implementálhatja DataSourceStreamReader, vagy ha az adatforrás alacsony átviteli sebességgel rendelkezik, és nem igényel particionálást, implementálhatja SimpleDataSourceStreamReader helyette. A simpleStreamReader() vagy a streamReader() implementálandó, és a simpleStreamReader() csak akkor kerül meghívásra, ha a streamReader() nincs implementálva.

DataSourceStreamReader implementáció

A streamReader példány egy egész számú eltéréssel rendelkezik, amely minden mikroadagban 2-vel nő, a DataSourceStreamReader felületen implementálva.

from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

SimpleDataSourceStreamReader implementáció

A SimpleStreamReader példány ugyanaz, mint az a FakeStreamReader példány, amely minden kötegben két sort hoz létre, de particionálás nélkül implementálva van a SimpleDataSourceStreamReader felülettel.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

3. lépés: A streamíró implementálása

Most implementálja a streamelési írót. Ez a streamelési adatíró az egyes mikrobatchek metaadatait egy helyi elérési útra írja.

class SimpleCommitMessage:
   def __init__(self, partition_id: int, count: int):
       self.partition_id = partition_id
       self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

4. lépés: A minta adatforrás regisztrálása és használata

Az adatforrás használatához regisztrálja azt. A regisztrálás után forrásként vagy célként használhatja a streamelési lekérdezésekben, ha rövid vagy teljes nevet ad meg a format()-nek. Az alábbi példa regisztrálja az adatforrást, majd elindít egy lekérdezést, amely beolvassa a példa adatforrásából és a kimeneteket a konzolra:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

Másik lehetőségként az alábbi példa a példastreamet használja fogadóként, és megadja a kimeneti útvonalat:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Hibaelhárítás

Ha a kimenet a következő hiba, a számítás nem támogatja a PySpark egyéni adatforrásait. A Databricks Runtime 15.2 vagy újabb verzióját kell használnia.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000