Zdieľať cez


Používanie sparkr

SparkR je balík R, ktorý poskytuje odľahčené klientske riešenie na použitie Apache Spark z R. Služba SparkR poskytuje distribuovanú implementáciu údajového rámu, ktorá podporuje operácie, ako napríklad výber, filtrovanie, agregácia atď. SparkR tiež podporuje distribuované strojové učenie pomocou MLlib.

SparkR môžete použiť prostredníctvom definícií dávkových úloh služby Spark alebo s interaktívnymi poznámkovými blokmi služby Microsoft Fabric.

Podpora R je k dispozícii len v Spark3.1 alebo vyššie. R v Spark 2.4 nie je podporované.

Požiadavky

  • Získajte predplatné na Microsoft Fabric. Alebo si zaregistrujte bezplatnú skúšobnú verziu služby Microsoft Fabric.

  • Prihláste sa do služby Microsoft Fabric.

  • Pomocou prepínača skúseností v ľavej dolnej časti domovskej stránky sa prepnete na službu Fabric.

    Snímka obrazovky ponuky prepínača prostredí zobrazujúca, kde vybrať možnosť Dátová veda.

  • Otvorte alebo vytvorte poznámkový blok. Ďalšie informácie nájdete v téme Ako používať poznámkové bloky služby Microsoft Fabric.

  • Ak chcete zmeniť primárny jazyk, nastavte možnosť jazyka na SparkR (R ).

  • Pripojte svoj notebook k jazeru. Na ľavej strane vyberte položku Pridať a pridajte existujúci lakehouse alebo vytvorte lakehouse.

Čítanie a zapisovať údajové rámce SparkR

Prečítajte si údajový rámec SparkR z lokálnej architektúry R data.frame

Najjednoduchší spôsob vytvorenia údajového rámca je konvertovanie lokálnej architektúry R data.frame na údajový rámec Spark DataFrame.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Čítanie a zapisovaie údajového rámca SparkR z Lakehouse

Údaje môžu byť uložené v lokálnom systéme súborov uzlov klastra. Všeobecné metódy čítať a zapisovať Údajový rámec SparkR z Lakehouse je read.df a write.df. Tieto metódy sa vydavajú cestou k súboru, aby sa načítal, a typ zdroja údajov. SparkR podporuje natívne čítanie súborov CSV, JSON, textu a parketov.

Ak chcete čítať a zapisovať do Lakehouse, najprv ju pridajte do svojej relácie. Na ľavej strane notebooku vyberte položku Pridať a pridajte existujúce Lakehouse alebo vytvorte Lakehouse.

Poznámka

Ak chcete získať prístup k súborom Lakehouse pomocou balíkov Spark, ako read.df napríklad alebo write.df, použite jeho cestu k službe ADFS alebo relatívnu cestu k službe Spark. V prieskumníkovi Lakehouse kliknite pravým tlačidlom myši na súbory alebo priečinok, ku ktorému chcete získať prístup, a z kontextovej ponuky skopírujte jeho cestu k službe ADFS alebo relatívnu cestu k službe Spark .

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric má tidyverse predinštalované. K súborom Lakehouse máte prístup v známych balíkoch R, ako je napríklad čítanie a písanie súborov Lakehouse pomocou readr::read_csv() a readr::write_csv().

Poznámka

Ak chcete získať prístup k súborom Lakehouse pomocou balíkov R, musíte použiť cestu k súboru API. V prieskumníkovi Lakehouse kliknite pravým tlačidlom myši na súbor alebo priečinok, ku ktorému chcete získať prístup, a skopírujte jeho cestu k súboru API z kontextovej ponuky.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

Pomocou dotazov SparkSQL si tiež môžete prečítať údajový rámec SparkR v službe Lakehouse.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Operácie s údajovým rámcom

Údajové rámce SparkR podporujú mnoho funkcií na štruktúrované spracovanie údajov. Tu je niekoľko základných príkladov. Úplný zoznam nájdete v dokumentácii k rozhraniu API služby SparkR.

Výber riadkov a stĺpcov

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Zoskupenie a agregácia

Údajové snímky SparkR podporujú mnoho bežne používaných funkcií na agregáciu údajov po zoskupení. Môžeme napríklad vypočítať histogram času čakania v vernej množine údajov, ako je znázornené nižšie.

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Operácie stĺpcov

SparkR poskytuje mnoho funkcií, ktoré možno priamo použiť v stĺpcoch na spracovanie údajov a agregáciu. Nasledujúci príklad znázorňuje použitie základných aritmetických funkcií.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Použitie funkcie definovanej používateľom

SparkR podporuje niekoľko typov funkcií definovaných používateľom:

Spustenie funkcie pre veľkú množinu údajov s dapply alebo dapplyCollect

dapply

Použite funkciu na každú oblasť .SparkDataFrame Funkcia, ktorá sa má použiť na každú oblasť SparkDataFrame oblasti a mala by mať iba jeden parameter, ktorému bude odoslaná funkcia data.frame zodpovedá každej oblasti. Výstupom funkcie by mala byť data.frame. Schéma určuje formát riadka výsledného parametra SparkDataFrame. Musí sa zhodovať s typmi údajov vrátenej hodnoty.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Rovnako ako dapply, použite funkciu na každú oblasť SparkDataFrame a výsledok zhromažďte späť. Výstupom funkcie by mala byť data.framehodnota . Tentoraz však nie je nutné odovzdať schému. Všimnite si, že dapplyCollect zlyhanie môže zlyhať, ak výstupy funkcie spustené na celej oblasti nie je možné vyžiadať do ovládača a zmestiť sa do pamäte ovládača.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Spustenie funkcie pre veľké zoskupenie množín údajov podľa vstupných stĺpcov s gapply alebo gapplyCollect

gapply

Použitie funkcie na každú skupinu SparkDataFrame. Funkcia sa má použiť na každú skupinu SparkDataFrame a mala by mať iba dva parametre: zoskupujúci kľúč a R data.frame zodpovedajúci danmu kľúču. Skupiny sa vyberú zo SparkDataFrames stĺpcov. Výstupom funkcie by mala byť data.framehodnota . Schéma určuje formát riadka výsledného typu SparkDataFrame. Musí predstavovať výstupné schémy funkcie R z typov údajov služby Spark. Názvy stĺpcov vrátených data.frame stĺpcov sú nastavené používateľom.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Like gapply– použije funkciu na každú skupinu SparkDataFrame a výsledok zhromaždí späť do R data.frame. Výstupom funkcie by mala byť data.framehodnota . Nie je však potrebné odovzdať schému. Všimnite si, že gapplyCollect zlyhanie môže zlyhať, ak výstupy funkcie spustené na celej oblasti nie je možné vyžiadať do ovládača a zmestiť sa do pamäte ovládača.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Spustenie lokálnych funkcií R distribuovaných pomocou spark.lapply

spark.lapply

Podobne ako v natívnom lapply jazyku R spustí spark.lapply funkciu na zozname prvkov a distribuuje výpočty pomocou Spark. Použije funkciu spôsobom, ktorý je podobný doParallellapply alebo ako prvky zoznamu. Výsledky všetkých výpočtov by sa mali zmestiť do jedného počítača. Ak nie je tomu tak, môžu urobiť niečo ako df <- createDataFrame(list) a potom použiť dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Spustenie dotazov SQL zo služby SparkR

Údajový rámec SparkR možno zaregistrovať aj ako dočasné zobrazenie, ktoré vám umožní spúšťať dotazy SQL v rámci svojich údajov. Funkcia SQL umožňuje aplikáciám programovo spúšťať dotazy SQL a vráti výsledok ako údajový rámec SparkR.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Strojové učenie

SparkR zobrazuje väčšinu algoritmov MLLib. Pod pokrievkou používa SparkR MLlib na trénovaie modelu.

V nasledujúcom príklade je uvedené, ako pomocou SparkR vytvoriť gaussiansky model GLM. Ak chcete spustiť lineárnu regresiu, nastavte skupinu na "gaussian"hodnotu . Ak chcete spustiť logistickú regresiu, nastavte rodinu na "binomial". Pri používaní sparkML GLM SparkR sa automaticky vykonáva jedno horúce kódovanie kategorických funkcií, takže sa nemusí vykonávať manuálne. Okrem funkcií String a Double Type je tiež možné prispôsobiť sa funkciám MLlib Vector na účely kompatibility s inými súčasťami jazyka MLlib.

Ďalšie informácie o podporovaných algoritmoch strojového učenia nájdete v dokumentácii pre sparkr a MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)