مصادر البيانات المخصصة ل PySpark
هام
توجد مصادر البيانات المخصصة ل PySpark في المعاينة العامة في Databricks Runtime 15.2 وما فوق. يتوفر دعم الدفق في Databricks Runtime 15.3 وما فوق.
يتم إنشاء مصدر بيانات PySpark بواسطة Python (PySpark) DataSource API، والتي تمكن القراءة من مصادر البيانات المخصصة والكتابة إلى متلقي البيانات المخصصة في Apache Spark باستخدام Python. يمكنك استخدام مصادر البيانات المخصصة PySpark لتحديد الاتصالات المخصصة بأنظمة البيانات وتنفيذ وظائف إضافية، لبناء مصادر بيانات قابلة لإعادة الاستخدام.
فئة DataSource
PySpark DataSource هي فئة أساسية توفر أساليب لإنشاء قراء البيانات والكتاب.
تنفيذ الفئة الفرعية لمصدر البيانات
اعتمادا على حالة الاستخدام الخاصة بك، يجب تنفيذ ما يلي بواسطة أي فئة فرعية لجعل مصدر البيانات إما قابلا للقراءة أو للكتابة أو كليهما:
الخاصية أو الأسلوب | الوصف |
---|---|
name |
مطلوب. اسم مصدر البيانات |
schema |
مطلوب. مخطط مصدر البيانات المطلوب قراءته أو كتابته |
reader() |
يجب إرجاع DataSourceReader لجعل مصدر البيانات قابلا للقراءة (دفعة) |
writer() |
يجب إرجاع DataSourceWriter لجعل مصدر البيانات قابلا للكتابة (دفعة) |
streamReader() أو simpleStreamReader() |
يجب إرجاع DataSourceStreamReader لجعل دفق البيانات قابلا للقراءة (دفق) |
streamWriter() |
يجب إرجاع DataSourceStreamWriter لجعل دفق البيانات قابلا للكتابة (دفق) |
إشعار
يجب أن تكون أساليبها المعرفة DataSource
DataSourceStreamReader
DataSourceReader
DataSourceWriter
DataSourceStreamWriter
من قبل المستخدم و قادرة على إجراء تسلسل لها. بمعنى آخر، يجب أن تكون قاموسا أو قاموسا متداخلا يحتوي على نوع بدائي.
تسجيل مصدر البيانات
بعد تنفيذ الواجهة، يجب عليك تسجيلها، ثم يمكنك تحميلها أو استخدامها بطريقة أخرى كما هو موضح في المثال التالي:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
مثال 1: إنشاء مصدر بيانات PySpark للاستعلام الدفعي
لإظهار قدرات قارئ PySpark DataSource، قم بإنشاء مصدر بيانات ينشئ بيانات مثال باستخدام حزمة faker
Python. لمزيد من المعلومات حول faker
، راجع وثائق Faker.
تثبيت الحزمة faker
باستخدام الأمر التالي:
%pip install faker
الخطوة 1: تحديد مثال DataSource
أولا، حدد مصدر بيانات PySpark الجديد كفئة فرعية من DataSource
مع اسم ومخطط وقارئ. reader()
يجب تعريف الأسلوب للقراءة من مصدر بيانات في استعلام دفعي.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
الخطوة 2: تنفيذ القارئ لاستعلام دفعي
بعد ذلك، قم بتنفيذ منطق القارئ لإنشاء بيانات المثال. استخدم المكتبة المثبتة faker
لملء كل حقل في المخطط.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
الخطوة 3: تسجيل مصدر بيانات المثال واستخدامه
لاستخدام مصدر البيانات، قم بتسجيله. بشكل افتراضي، يحتوي على FakeDataSource
ثلاثة صفوف، ويتضمن المخطط هذه string
الحقول: name
، date
، ، zipcode
. state
يقوم المثال التالي بتسجيل وتحميل وإخراج مصدر بيانات المثال مع الإعدادات الافتراضية:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
يتم اعتماد الحقول فقط string
، ولكن يمكنك تحديد مخطط مع أي حقول تتوافق مع faker
حقول موفري الحزم لإنشاء بيانات عشوائية للاختبار والتطوير. يقوم المثال التالي بتحميل مصدر البيانات بالحقول name
و company
:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
لتحميل مصدر البيانات بعدد مخصص من الصفوف، حدد numRows
الخيار . يحدد المثال التالي 5 صفوف:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
مثال 2: إنشاء مصدر بيانات PySpark لقراءة وكتابة البث
لتوضيح قدرات قارئ دفق PySpark DataSource والكاتب، قم بإنشاء مصدر بيانات مثال يقوم بإنشاء صفين في كل مجموعة صغيرة باستخدام faker
حزمة Python. لمزيد من المعلومات حول faker
، راجع وثائق Faker.
تثبيت الحزمة faker
باستخدام الأمر التالي:
%pip install faker
الخطوة 1: تحديد مثال DataSource
أولا، حدد مصدر بيانات PySpark الجديد كفئة فرعية من DataSource
مع اسم ومخطط وأساليب streamReader()
و streamWriter()
.
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
الخطوة 2: تنفيذ قارئ الدفق
بعد ذلك، قم بتنفيذ مثال قارئ البيانات المتدفقة الذي ينشئ صفين في كل دفعة صغيرة. يمكنك تنفيذ DataSourceStreamReader
، أو إذا كان مصدر البيانات لديه معدل نقل منخفض ولا يتطلب التقسيم، يمكنك التنفيذ SimpleDataSourceStreamReader
بدلا من ذلك. إما simpleStreamReader()
أو streamReader()
يجب تنفيذها، ويتم simpleStreamReader()
استدعاؤها فقط عندما streamReader()
لا يتم تنفيذها.
تنفيذ DataSourceStreamReader
يحتوي المثيل streamReader
على إزاحة عدد صحيح تزيد بمقدار 2 في كل تجزئة صغيرة، يتم تنفيذها مع الواجهة DataSourceStreamReader
.
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
تنفيذ SimpleDataSourceStreamReader
المثيل SimpleStreamReader
هو نفس المثيل FakeStreamReader
الذي ينشئ صفين في كل دفعة، ولكن يتم تنفيذه مع الواجهة SimpleDataSourceStreamReader
دون تقسيم.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
الخطوة 3: تنفيذ كاتب الدفق
الآن قم بتنفيذ كاتب البث. يكتب كاتب البيانات المتدفقة هذا معلومات بيانات التعريف لكل قالب صغير إلى مسار محلي.
class SimpleCommitMessage(WriterCommitMessage):
partition_id: int
count: int
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
الخطوة 4: تسجيل مصدر بيانات المثال واستخدامه
لاستخدام مصدر البيانات، قم بتسجيله. بعد تسجيله، يمكنك استخدامه في الاستعلامات المتدفقة كمصدر أو متلقي عن طريق تمرير اسم قصير أو اسم كامل إلى format()
. يسجل المثال التالي مصدر البيانات، ثم يبدأ استعلاما يقرأ من مصدر البيانات المثال والمخرجات إلى وحدة التحكم:
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
بدلا من ذلك، يستخدم المثال التالي دفق المثال كمتلقي ويحدد مسار الإخراج:
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
استكشاف الأخطاء وإصلاحها
إذا كان الإخراج هو الخطأ التالي، فإن الحساب الخاص بك لا يدعم مصادر البيانات المخصصة PySpark. يجب استخدام Databricks Runtime 15.2 أو أعلى.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000