共用方式為


Avro 檔案

Apache Avro 是資料序列化系統。 Avro 提供:

  • 豐富的資料結構。
  • 壓縮、快速的二進位資料格式。
  • 用來儲存持續性資料的容器檔案。
  • 遠端程序呼叫 (RPC)。
  • 與動態語言的簡單整合。 不需要產生代碼即可讀取或寫入資料檔案,也不需要使用或實施 RPC 通訊協定。 代碼產生為選擇性最佳化,僅值得實施於靜態類型語言。

Avro資料來源支援:

  • 架構轉換:Apache Spark SQL 與 Avro 記錄之間的自動轉換。
  • 資料分割:輕鬆讀取和寫入資料分割資料,而不需要任何額外的設定。
  • 壓縮:將 Avro 寫入磁碟時要使用的壓縮。 支援的類型為 uncompressedsnappydeflate。 您也可以指定壓抑的層級。
  • 記錄名稱:透過傳遞包含 recordNamerecordNamespace的參數映射來記錄名稱和命名空間。

也請參閱讀取和寫入串流 Avro 資料

組態

您可以使用各種組態參數來變更 Avro 數據源的行為。

若要在讀取時忽略沒有 .avro 擴展名的檔案,您可以在 Hadoop 組態中設定參數 avro.mapred.ignore.inputs.without.extension。 預設值為 false

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

若要在寫入時設定壓縮,請設定下列 Spark 屬性:

  • 壓縮編解碼器:spark.sql.avro.compression.codec。 支援的編解碼器為 snappydeflate。 預設編解碼器為 snappy
  • 如果壓縮編解碼器 deflate,您可以使用下列方式設定壓縮層級:spark.sql.avro.deflate.level。 預設層級是 -1

您可以在叢集中設定這些屬性,Spark 組態 或在執行時間使用 spark.conf.set()。 例如:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

針對 Databricks Runtime 9.1 LTS 和更新版本,您可以在讀取檔案時提供 mergeSchema 選項,以變更 Avro 中的預設架構推斷行為。 將 mergeSchema 設定為 true 會從目標目錄中的一組 Avro 檔案推斷架構,並將它們合併,而不是從單一檔案推斷讀取架構。

Avro 支援的類型 -> Spark SQL 轉換

此程式庫支援讀取所有 Avro 類型。 它會使用下列從 Avro 類型到 Spark SQL 類型的對應:

Avro 類型 Spark SQL 類型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
bytes BinaryType
字串 StringType
記錄 StructType
enum StringType
陣列 ArrayType
map MapType
fixed BinaryType
union 請參閱等位型別

等位型別

Avro 資料來源支援讀取 union 類型。 Avro 會將下列三種類型視為 union 類型:

  • union(int, long) 對應至 LongType
  • union(float, double) 對應至 DoubleType
  • union(something, null),其中 something 是任何支援的 Avro 類型。 這會對應至與 something相同的 Spark SQL 類型,並將 nullable 設定為 true

所有其他的 union 類型都是複雜類型。 它們對應至 StructType,其中欄位名稱為 member0member1等,根據 union的成員。 這與 Avro 與 Parquet 之間轉換時的行為一致。

邏輯類型

Avro 資料來源支援讀取下列 Avro 邏輯類型

Avro 邏輯類型 Avro 類型 Spark SQL 類型
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

注意

Avro 資料來源略過 Avro 檔案中存在的文件、別名和其他屬性。

Spark SQL 支援的類型 -> Avro 轉換

此程式庫支援將所有 Spark SQL類型寫入 Avro。 對大多數類型而言,從Spark類型到Avro類型的對應很簡單(例如,IntegerType 轉換成 int):以下是少數特殊案例的清單:

Spark SQL 類型 Avro 類型 Avro 邏輯類型
ByteType int
ShortType int
BinaryType bytes
DecimalType fixed decimal
TimestampType long timestamp-micros
DateType int date

您也可以使用選項 avroSchema來指定完整的輸出 Avro 架構,以便將 Spark SQL 類型轉換為其他 Avro 類型。 預設不會套用下列轉換,而且需要使用者指定的 Avro 架構:

Spark SQL 類型 Avro 類型 Avro 邏輯類型
ByteType fixed
StringType enum
DecimalType bytes decimal
TimestampType long timestamp-millis

範例

這些範例會使用 episodes.avro 檔案。

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

此範例示範自訂 Avro 架構:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

此範例示範 Avro 壓縮選項:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

此範例示範分割的 Avro 記錄:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

此範例示範記錄名稱與命名空間:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

若要在 SQL 中查詢 Avro 數據,請將資料檔註冊為數據表或暫存檢視:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

筆記本範例:讀取和寫入 Avro 檔案

下列筆記本示範如何讀取和寫入 Avro 檔案。

讀取和寫入 Avro 檔案筆記本

取得筆記本