Megosztás a következőn keresztül:


Oktatóanyag: Adatok betöltése és átalakítása Az Apache Spark DataFrame-ek használatával

Ez az oktatóanyag bemutatja, hogyan tölthet be és alakíthat át adatokat az Apache Spark Python (PySpark) DataFrame API, az Apache Spark Scala DataFrame API és a SparkR SparkDataFrame API használatával az Azure Databricksben.

Az oktatóanyag végére megismerheti a DataFrame-et, és megismerkedhet a következő feladatokkal:

Python

Lásd még az Apache Spark PySpark API-referenciát.

Scala

Lásd még az Apache Spark Scala API-referenciát.

R

Lásd még : Apache SparkR API-referencia.

Mi az a DataFrame?

A DataFrame egy kétdimenziós címkézett adatstruktúra, amely különböző típusú oszlopokat tartalmaz. A DataFrame-ekre, például számolótáblákra, SQL-táblákra vagy sorozatobjektumok szótárára is gondolhat. Az Apache Spark DataFrame-ek számos függvényt biztosítanak (oszlopok kiválasztása, szűrés, illesztés, összesítés), amelyek lehetővé teszik a gyakori adatelemzési problémák hatékony megoldását.

Az Apache Spark DataFrame-ek rugalmas elosztott adathalmazokra (RDD-kre) épülő absztrakciók. A Spark DataFrames és a Spark SQL egységes tervezési és optimalizálási motort használ, amely lehetővé teszi, hogy az Azure Databricks (Python, SQL, Scala és R) összes támogatott nyelvén szinte azonos teljesítményt nyújtsunk.

Követelmények

A következő oktatóanyag elvégzéséhez meg kell felelnie a következő követelményeknek:

  • Az oktatóanyagban szereplő példák használatához a munkaterületen engedélyezve kell legyen a Unity Catalogus.

  • Az oktatóanyagban szereplő példák egy Unity-katalógust kötetet a mintaadatok tárolására. A példák használatához hozzon létre egy kötetet, és használja a kötet katalógusát, sémáját és kötetneveit a példák által használt kötetútvonal beállításához.

  • A Unity Katalógusban a következő engedélyekkel kell rendelkeznie:

    • READ VOLUMEvagy WRITE VOLUMEALL PRIVILEGES az oktatóanyaghoz használt kötethez.
    • USE SCHEMA vagy ALL PRIVILEGES az oktatóanyaghoz használt sémához.
    • USE CATALOG vagy ALL PRIVILEGES az oktatóanyaghoz használt katalógushoz.

    Az engedélyek beállításához forduljon a Databricks rendszergazdájához vagy Unity Catalog-jogosultságok és biztonsági objektumok.

Tipp.

A cikkhez tartozó kész jegyzetfüzetekért tekintse meg a DataFrame oktatóanyag-jegyzetfüzeteit.

1. lépés: Változók definiálása és CSV-fájl betöltése

Ez a lépés meghatározza az oktatóanyagban használt változókat, majd betölt egy CSV-fájlt, amely a health.data.ny.gov babanévadatait tartalmazza a Unity Catalog-kötetbe.

  1. Nyisson meg egy új jegyzetfüzetet az Új ikon ikonra kattintva. Az Azure Databricks-jegyzetfüzetek közötti navigálásról a A jegyzetfüzet megjelenésének testreszabásacímű témakörben olvashat.

  2. Másolja és illessze be a következő kódot az új üres jegyzetfüzetcellába. Cserélje le <catalog-name>, <schema-name>és <volume-name> a Unity-katalógus köteteinek katalógusára, sémáira és kötetnevére. Cserélje le <table_name> egy tetszőleges táblanévre. Az oktatóanyag későbbi részében babanévadatokat fog betölteni ebbe a táblába.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Nyomja le Shift+Enter a cellát, és hozzon létre egy új üres cellát.

  4. Másolja és illessze be a következő kódot az új üres jegyzetfüzetcellába. Ez a kód a rows.csv fájlt a health.data.ny.gov-ból a Unity Catalog kötetbe másolja a Databricks dbutils paranccsal.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

2. lépés: DataFrame létrehozása

Ez a lépés létrehoz egy tesztadatokkal elnevezett df1 DataFrame-et, majd megjeleníti annak tartalmát.

  1. Másolja és illessze be a következő kódot az új üres jegyzetfüzetcellába. Ez a kód létrehozza a DataFrame-et tesztadatokkal, majd megjeleníti a DataFrame tartalmát és sémáját.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

3. lépés: Adatok betöltése adatkeretbe CSV-fájlból

Ez a lépés létrehoz egy df_csv nevű DataFrame-et a korábban az Ön Unity Catalog-kötetébe betöltött CSV-fájlból. Lásd: spark.read.csv.

  1. Másolja és illessze be a következő kódot az új üres jegyzetfüzetcellába. Ez a kód betölti a babanév adatait a DataFrame-be df_csv a CSV-fájlból, majd megjeleníti a DataFrame tartalmát.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

Számos támogatott fájlformátumból tölthet be adatokat.

4. lépés: A DataFrame megtekintése és használata

Tekintse meg és használja a DataFrames babaneveit az alábbi módszerekkel.

Megtudhatja, hogyan jelenítheti meg az Apache Spark DataFrame sémáját. Az Apache Spark a séma kifejezést használja a DataFrame oszlopainak nevére és adattípusára való hivatkozáshoz.

Feljegyzés

Az Azure Databricks a kifejezésséma használatával is leírja a katalógusban regisztrált táblák gyűjteményét.

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód a DataFrame-ek sémáját mutatja be a .printSchema() metódussal a két DataFrame sémáinak megtekintéséhez – a két DataFrame egyesítésére való felkészüléshez.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

Adatkeret oszlopának átnevezése

Megtudhatja, hogyan nevezhet át egy oszlopot egy DataFrame-ben.

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód átnevez egy oszlopot a df1_csv DataFrame-ben, hogy megfeleljen a df1 DataFrame megfelelő oszlopának. Ez a kód az Apache Spark withColumnRenamed() metódust használja.

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

Adatkeretek egyesítése

Megtudhatja, hogyan hozhat létre egy új DataFrame-et, amely hozzáadja az egyik DataFrame sorait a másikhoz.

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark union() metódussal egyesíti az első DataFrame df tartalmát a CSV-fájlból betöltött babaneveket tartalmazó DataFrame-lel df_csv .

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

Sorok szűrése DataFrame-ben

Az adatkészlet legnépszerűbb babaneveit sorok szűrésével fedezheti fel az Apache Spark .filter() vagy .where() metódusok használatával. Szűréssel kiválaszthatja a dataframe-ben visszaadni vagy módosítani kívánt sorok egy részhalmazát. Nincs különbség a teljesítményben vagy a szintaxisban, ahogy az alábbi példákban látható.

.filter() metódus használata

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark .filter() metódussal jeleníti meg azokat a sorokat a DataFrame-ben, amelyek száma meghaladja az 50-et.

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

.where() metódus használata

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark .where() metódussal jeleníti meg azokat a sorokat a DataFrame-ben, amelyek száma meghaladja az 50-et.

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

Oszlopok kijelölése DataFrame-ből és sorrend gyakoriság szerint

Ismerje meg, hogyan adhatja meg a visszaadni kívánt DataFrame oszlopait a select() módszerrel, hogy megtudja a babanév gyakoriságát. Az eredményeket az Apache Spark orderby és desc a függvények segítségével rendezheti.

Az Apache Spark pyspark.sql modulja támogatja az SQL-függvényeket. Az oktatóanyagban használt függvények közé tartozik az Apache Spark orderBy()desc()és expr() a függvények. Ezeknek a függvényeknek a használatát úgy engedélyezheti, hogy szükség szerint importálja őket a munkamenetbe.

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód importálja a függvényt desc() , majd az Apache Spark select() metódust és az Apache Sparkot orderBy() és desc() függvényeket használja a leggyakoribb nevek és azok számának csökkenő sorrendben való megjelenítéséhez.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

Adatkeret-részhalmaz létrehozása

Megtudhatja, hogyan hozhat létre adatkeret-részhalmazt egy meglévő DataFrame-ből.

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark filter metódus használatával hoz létre egy új DataFrame-et, amely év, darabszám és nem szerint korlátozza az adatokat. Az Apache Spark select() metódust használja az oszlopok korlátozására. Emellett az Apache Spark orderBy() és desc() a függvények használatával rendezi az új DataFrame-et szám szerint.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

5. lépés: A DataFrame mentése

Megtudhatja, hogyan menthet DataFrame-eket. A DataFrame-et mentheti egy táblába, vagy fájlba vagy több fájlba is írhatja a DataFrame-et.

A DataFrame mentése táblázatba

Az Azure Databricks alapértelmezés szerint az összes tábla Delta Lake-formátumát használja. A DataFrame mentéséhez CREATE táblajogokkal kell rendelkeznie a katalógusban és a sémában.

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az oktatóanyag elején definiált változó használatával menti a DataFrame tartalmát egy táblába.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

A legtöbb Apache Spark-alkalmazás nagy adatkészleteken és elosztott módon működik. Az Apache Spark egyetlen fájl helyett fájlkönyvtárat ír ki. A Delta Lake felosztja a Parquet-mappákat és -fájlokat. Sok adatrendszer képes beolvasni ezeket a fájlkönyvtárakat. Az Azure Databricks azt javasolja, hogy a legtöbb alkalmazáshoz használja a táblákat fájlelérési utakon keresztül.

A DataFrame mentése JSON-fájlokba

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód jSON-fájlok könyvtárába menti a DataFrame-et.

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

A DataFrame beolvasása egy JSON-fájlból

Megtudhatja, hogyan olvashatja be a JSON-adatokat egy könyvtárból egy DataFrame-be az Apache Spark spark.read.format() metódus használatával.

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód megjeleníti az előző példában mentett JSON-fájlokat.

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

További feladatok: SQL-lekérdezések futtatása a PySparkban, a Scalában és az R-ben

Az Apache Spark DataFrame-ek az alábbi lehetőségeket biztosítják az SQL és a PySpark, a Scala és az R kombinálásához. Az alábbi kódot az oktatóanyaghoz létrehozott jegyzetfüzetben futtathatja.

Adjon meg egy oszlopot SQL-lekérdezésként

Ismerje meg, hogyan használhatja az Apache Spark selectExpr() metódust. Ez a metódus egy változata, amely elfogadja az select() SQL-kifejezéseket, és egy frissített DataFrame-et ad vissza. Ez a módszer lehetővé teszi egy SQL-kifejezés, például uppera .

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark selectExpr() metódust és az SQL upper kifejezést használja a sztringoszlop nagybetűssé alakításához (és az oszlop átnevezéséhez).

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

A expr() használata a SQL-szintaxis alkalmazásához egy oszlophoz

Megtudhatja, hogyan importálhatja és használhatja az Apache Spark expr() függvényt az SQL-szintaxis használatára bárhol, ahol egy oszlop meg lesz adva.

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód importálja a expr() függvényt, majd az Apache Spark expr() függvényt és az SQL lower kifejezést használja egy sztringoszlop kisbetűssé alakításához (és átnevezi az oszlopot).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

Tetszőleges SQL-lekérdezés futtatása spark.sql() függvénnyel

Ismerje meg, hogyan futtathat tetszőleges SQL-lekérdezéseket az Apache Spark spark.sql() függvény használatával.

  1. Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark spark.sql() függvénnyel kérdez le egy SQL-táblát SQL-szintaxis használatával.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

DataFrame-oktatóanyag-jegyzetfüzetek

Az alábbi jegyzetfüzetek az oktatóanyagból származó példákat tartalmazzák.

Python

DataFrames-oktatóanyag a Python használatával

Jegyzetfüzet lekérése

Scala

DataFrames-oktatóanyag a Scala használatával

Jegyzetfüzet lekérése

R

DataFrames-oktatóanyag az R használatával

Jegyzetfüzet lekérése

További erőforrások