ملف Avro
Apache Avro هو نظام تسلسل البيانات. يوفر Avro:
- بنيات البيانات الغنية.
- تنسيق بيانات ثنائي مضغوط وسريع.
- ملف حاوية، لتخزين البيانات الثابتة.
- استدعاء الإجراء عن بعد (RPC).
- تكامل بسيط مع اللغات الديناميكية. لا يلزم إنشاء التعليمات البرمجية لقراءة ملفات البيانات أو كتابتها ولا لاستخدام بروتوكولات RPC أو تنفيذها. إنشاء التعليمات البرمجية كتحسين اختياري، يستحق التنفيذ فقط للغات مكتوبة بشكل ثابت.
يدعم مصدر بيانات Avro ما يلي:
- تحويل المخطط: التحويل التلقائي بين سجلات Apache Spark SQL وAvro.
- التقسيم: قراءة وكتابة البيانات المقسمة بسهولة دون أي تكوين إضافي.
- الضغط: الضغط لاستخدامه عند كتابة Avro إلى القرص. الأنواع المدعومة هي
uncompressed
وsnappy
و.deflate
يمكنك أيضا تحديد مستوى الانكماش. - أسماء السجلات: اسم السجل ومساحة الاسم عن طريق تمرير خريطة المعلمات مع
recordName
وrecordNamespace
.
راجع أيضا قراءة وكتابة بيانات Avro المتدفقة.
التكوين
يمكنك تغيير سلوك مصدر بيانات Avro باستخدام معلمات التكوين المختلفة.
لتجاهل الملفات بدون .avro
الملحق عند القراءة، يمكنك تعيين المعلمة avro.mapred.ignore.inputs.without.extension
في تكوين Hadoop. الافتراضي هو false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
لتكوين الضغط عند الكتابة، قم بتعيين خصائص Spark التالية:
- برنامج ترميز الضغط:
spark.sql.avro.compression.codec
. برامج الترميز المدعومة هيsnappy
وdeflate
. برنامج الترميز الافتراضي هوsnappy
. - إذا كان برنامج ترميز الضغط هو
deflate
، يمكنك تعيين مستوى الضغط باستخدام:spark.sql.avro.deflate.level
. المستوى الافتراضي هو-1
.
يمكنك تعيين هذه الخصائص في تكوين Spark للمجموعة أو في وقت التشغيل باستخدام spark.conf.set()
. على سبيل المثال:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
بالنسبة إلى Databricks Runtime 9.1 LTS والإصدارات الأحدث، يمكنك تغيير سلوك استنتاج المخطط الافتراضي في Avro عن طريق توفير mergeSchema
الخيار عند قراءة الملفات. سيؤدي الإعداد mergeSchema
إلى true
استنتاج مخطط من مجموعة من ملفات Avro في الدليل الهدف ودمجها بدلا من استنتاج مخطط القراءة من ملف واحد.
الأنواع المدعومة لتحويل Avro -> Spark SQL
تدعم هذه المكتبة قراءة جميع أنواع Avro. يستخدم التعيين التالي من أنواع Avro إلى أنواع Spark SQL:
نوع Avro | نوع Spark SQL |
---|---|
boolean | BooleanType |
العدد الصحيح | IntegerType |
طويل | LongType |
عائم | FloatType |
مزدوج | DoubleType |
وحدات البايت | نوع ثنائي |
سلسلة | StringType |
سجل | نوع البنية |
enum | StringType |
صفيف | نوع الصفيف |
map | نوع الخريطة |
ثابت | نوع ثنائي |
union | راجع أنواع الاتحاد. |
أنواع الاتحاد
يدعم مصدر بيانات Avro أنواع القراءة union
. يعتبر Avro الأنواع الثلاثة التالية أنواعا union
:
union(int, long)
تعيين إلىLongType
.union(float, double)
تعيين إلىDoubleType
.union(something, null)
، حيثsomething
هو أي نوع Avro مدعوم. يتم تعيين هذا إلى نفس نوع Spark SQL مثل ،something
معnullable
تعيين إلىtrue
.
جميع الأنواع الأخرى union
أنواع معقدة. يتم تعيينها إلى StructType
حيث تكون member0
أسماء الحقول ، member1
وما إلى ذلك، وفقا لأعضاء union
. هذا متناسق مع السلوك عند التحويل بين Avro وParquet.
الأنواع المنطقية
يدعم مصدر بيانات Avro قراءة أنواع Avro المنطقية التالية:
نوع Avro المنطقي | نوع Avro | نوع Spark SQL |
---|---|---|
date | العدد الصحيح | DateType |
الطابع الزمني-مللي ثانية | طويل | نوع الطابع الزمني |
timestamp-micros | طويل | نوع الطابع الزمني |
عشري | ثابت | نوع عشري |
عشري | وحدات البايت | نوع عشري |
إشعار
يتجاهل مصدر بيانات Avro المستندات والأسماء المستعارة والخصائص الأخرى الموجودة في ملف Avro.
الأنواع المدعومة ل Spark SQL -> تحويل Avro
تدعم هذه المكتبة كتابة جميع أنواع Spark SQL في Avro. بالنسبة لمعظم الأنواع، يكون التعيين من أنواع Spark إلى أنواع Avro بسيطا (على سبيل المثال IntegerType
يتم تحويله إلى int
)؛ فيما يلي قائمة بالحالات الخاصة القليلة:
نوع Spark SQL | نوع Avro | نوع Avro المنطقي |
---|---|---|
ByteType | العدد الصحيح | |
ShortType | العدد الصحيح | |
نوع ثنائي | وحدات البايت | |
نوع عشري | ثابت | عشري |
نوع الطابع الزمني | طويل | timestamp-micros |
DateType | العدد الصحيح | date |
يمكنك أيضا تحديد مخطط Avro للإخراج بالكامل مع الخيار avroSchema
، بحيث يمكن تحويل أنواع Spark SQL إلى أنواع Avro أخرى.
لا يتم تطبيق التحويلات التالية بشكل افتراضي وتتطلب مخطط Avro المحدد من قبل المستخدم:
نوع Spark SQL | نوع Avro | نوع Avro المنطقي |
---|---|---|
ByteType | ثابت | |
StringType | enum | |
نوع عشري | وحدات البايت | عشري |
نوع الطابع الزمني | طويل | الطابع الزمني-مللي ثانية |
الأمثلة
تستخدم هذه الأمثلة ملف episodes.avro .
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
يوضح هذا المثال مخطط Avro مخصصا:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
يوضح هذا المثال خيارات ضغط Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
يوضح هذا المثال سجلات Avro المقسمة:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
يوضح هذا المثال اسم السجل ومساحة الاسم:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
للاستعلام عن بيانات Avro في SQL، قم بتسجيل ملف البيانات كجدول أو طريقة عرض مؤقتة:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
مثال دفتر الملاحظات: قراءة ملفات Avro وكتابتها
يوضح دفتر الملاحظات التالي كيفية قراءة ملفات Avro وكتابتها.