Поделиться через


Работа с DataFrame и таблицами в R

Важно

SparkR в Databricks не рекомендуется в Databricks Runtime 16.0 и в более поздних версиях. Databricks рекомендует вместо этого использовать sparklyr.

В этой статье описывается использование пакетов R, таких как SparkR, sparklyrи dplyr для работы с data.frameR, dataFrames Spark и таблиц в памяти.

Обратите внимание, что работая с SparkR, sparklyr и dplyr, вы можете обнаружить, что будет возможно выполнить определенную операцию с помощью всех этих пакетов, и воспользоваться пакетом, который вам наиболее удобен. Например, для выполнения запроса можно вызывать такие функции, как SparkR::sql, sparklyr::sdf_sqlи dplyr::select. В других случаях вы можете выполнить операцию только с одним или двумя из этих пакетов, и выбранная операция зависит от сценария использования. Например, то, как вы вызываете sparklyr::sdf_quantile, немного отличается от способа вызова dplyr::percentile_approx, несмотря на то, что обе функции калькуируют квантили.

SQL можно использовать в качестве моста между SparkR и sparklyr. Например, можно использовать SparkR::sql для запроса таблиц, создаваемых с помощью sparklyr. Вы можете использовать sparklyr::sdf_sql для запроса таблиц, создаваемых с помощью SparkR. И dplyr код всегда преобразуется в SQL в памяти перед запуском. См. также взаимодействия с API и перевода SQL.

Загрузка SparkR, sparklyr и dplyr

Пакеты SparkR, sparklyr и dplyr включены в среду выполнения Databricks, установленную в кластерах Azure Databricks . Поэтому вам не нужно вызывать обычные install.package, прежде чем начать вызов этих пакетов. Однако сначала необходимо загрузить эти пакеты с library. Например, из записной книжки R в рабочей области Azure Databricks выполните следующий код в ячейке записной книжки, чтобы загрузить SparkR, sparklyr и dplyr:

library(SparkR)
library(sparklyr)
library(dplyr)

Подключение sparklyr к кластеру

После загрузки sparklyr необходимо вызвать sparklyr::spark_connect для подключения к кластеру, указав метод подключения databricks. Например, выполните следующий код в ячейке записной книжки, чтобы подключиться к кластеру, на котором размещена записная книжка:

sc <- spark_connect(method = "databricks")

Наоборот, записная книжка Azure Databricks уже устанавливает SparkSession в кластере для использования с SparkR, поэтому нет необходимости вызывать SparkR::sparkR.session, прежде чем начать использовать SparkR.

Загрузите файл данных в формате JSON в рабочую область

Многие примеры кода в этой статье основаны на данных в определенном расположении в рабочей области Azure Databricks с определенными именами столбцов и типами данных. Данные для этого примера кода возникают в JSON-файле с именем book.json из GitHub. Чтобы получить этот файл и загрузить его в рабочую область, следуйте этим шагам.

  1. Перейдите к файлу books.json на сайте GitHub и используйте текстовый редактор, чтобы скопировать его содержимое в файл с именем books.json где-то на локальном компьютере.
  2. На боковой панели рабочей области Azure Databricks щелкните по каталогу.
  3. Щелкните Создать таблицу.
  4. На вкладке Загрузить файл перетащите файл books.json с вашего локального компьютера в поле Перетащите файлы для загрузки. Или выберите , нажмите, чтобы просмотреть, и найдите файл books.json на локальном компьютере.

По умолчанию Azure Databricks отправляет локальный books.json-файл в расположение DBFS в вашем рабочем пространстве по пути /FileStore/tables/books.json.

Не нажимайте Создать таблицу с пользовательским интерфейсом или Создать таблицу в записной книжке. Примеры кода в этой статье используют данные из загруженного файла books.json в данной локации DBFS.

Прочитайте данные JSON в DataFrame

Используйте sparklyr::spark_read_json для чтения загруженного JSON-файла в DataFrame, указав подключение, путь к JSON-файлу и название внутреннего табличного представления данных. В этом примере необходимо указать, что файл book.json содержит несколько строк. Указание схемы столбцов здесь является необязательным. В противном случае sparklyr выводит схему столбцов по умолчанию. Например, выполните следующий код в ячейке записной книжки, чтобы считывать загруженные данные JSON-файла в кадр данных под названием jsonDF.

jsonDF <- spark_read_json(
  sc      = sc,
  name    = "jsonTable",
  path    = "/FileStore/tables/books.json",
  options = list("multiLine" = TRUE),
  columns = c(
    author    = "character",
    country   = "character",
    imageLink = "character",
    language  = "character",
    link      = "character",
    pages     = "integer",
    title     = "character",
    year      = "integer"
  )
)

Для печати первых строк кадра данных можно использовать SparkR::head, SparkR::showили sparklyr::collect. По умолчанию head выводит первые шесть строк по умолчанию. show и collect распечатывают первые 10 строк. Например, выполните следующий код в ячейке записной книжки, чтобы распечатать первые строки кадра данных с именем jsonDF:

head(jsonDF)

# Source: spark<?> [?? x 8]
#   author                  country        image…¹ langu…² link  pages title  year
#   <chr>                   <chr>          <chr>   <chr>   <chr> <int> <chr> <int>
# 1 Chinua Achebe           Nigeria        images… English "htt…   209 Thin…  1958
# 2 Hans Christian Andersen Denmark        images… Danish  "htt…   784 Fair…  1836
# 3 Dante Alighieri         Italy          images… Italian "htt…   928 The …  1315
# 4 Unknown                 Sumer and Akk… images… Akkadi… "htt…   160 The … -1700
# 5 Unknown                 Achaemenid Em… images… Hebrew  "htt…   176 The …  -600
# 6 Unknown                 India/Iran/Ir… images… Arabic  "htt…   288 One …  1200
# … with abbreviated variable names ¹​imageLink, ²​language

show(jsonDF)

# Source: spark<jsonTable> [?? x 8]
#    author                  country       image…¹ langu…² link  pages title  year
#    <chr>                   <chr>         <chr>   <chr>   <chr> <int> <chr> <int>
#  1 Chinua Achebe           Nigeria       images… English "htt…   209 Thin…  1958
#  2 Hans Christian Andersen Denmark       images… Danish  "htt…   784 Fair…  1836
#  3 Dante Alighieri         Italy         images… Italian "htt…   928 The …  1315
#  4 Unknown                 Sumer and Ak… images… Akkadi… "htt…   160 The … -1700
#  5 Unknown                 Achaemenid E… images… Hebrew  "htt…   176 The …  -600
#  6 Unknown                 India/Iran/I… images… Arabic  "htt…   288 One …  1200
#  7 Unknown                 Iceland       images… Old No… "htt…   384 Njál…  1350
#  8 Jane Austen             United Kingd… images… English "htt…   226 Prid…  1813
#  9 Honoré de Balzac        France        images… French  "htt…   443 Le P…  1835
# 10 Samuel Beckett          Republic of … images… French… "htt…   256 Moll…  1952
# … with more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

collect(jsonDF)

# A tibble: 100 × 8
#    author                  country       image…¹ langu…² link  pages title  year
#    <chr>                   <chr>         <chr>   <chr>   <chr> <int> <chr> <int>
#  1 Chinua Achebe           Nigeria       images… English "htt…   209 Thin…  1958
#  2 Hans Christian Andersen Denmark       images… Danish  "htt…   784 Fair…  1836
#  3 Dante Alighieri         Italy         images… Italian "htt…   928 The …  1315
#  4 Unknown                 Sumer and Ak… images… Akkadi… "htt…   160 The … -1700
#  5 Unknown                 Achaemenid E… images… Hebrew  "htt…   176 The …  -600
#  6 Unknown                 India/Iran/I… images… Arabic  "htt…   288 One …  1200
#  7 Unknown                 Iceland       images… Old No… "htt…   384 Njál…  1350
#  8 Jane Austen             United Kingd… images… English "htt…   226 Prid…  1813
#  9 Honoré de Balzac        France        images… French  "htt…   443 Le P…  1835
# 10 Samuel Beckett          Republic of … images… French… "htt…   256 Moll…  1952
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

Выполнение запросов SQL и запись в таблицу и чтение

Вы можете использовать функции dplyr для выполнения запросов SQL в DataFrame. Например, выполните следующий код в ячейке ноутбука, чтобы использовать dplyr::group_by и dployr::count для получения подсчётов по автору из DataFrame с именем jsonDF. Используйте dplyr::arrange и dplyr::desc для сортировки результата в порядке убывания по подсчетам. Затем распечатайте первые 10 строк по умолчанию.

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n))

# Source:     spark<?> [?? x 2]
# Ordered by: desc(n)
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Gustave Flaubert           2
#  8 Homer                      2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows

Затем можно использовать sparklyr::spark_write_table для записи результата в таблицу в Azure Databricks. Например, выполните следующий код в ячейке записной книжки, чтобы повторно запустить запрос, а затем записать результат в таблицу с именем json_books_agg:

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n)) %>%
  spark_write_table(
    name = "json_books_agg",
    mode = "overwrite"
  )

Чтобы убедиться, что таблица создана, можно использовать sparklyr::sdf_sql вместе с SparkR::showDF для отображения данных таблицы. Например, выполните следующий код в ячейке записной книжки, чтобы запросить таблицу в кадр данных, а затем использовать sparklyr::collect для печати первых 10 строк кадра данных по умолчанию:

collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

Вы также можете использовать sparklyr::spark_read_table, чтобы сделать что-то подобное. Например, выполните следующий код в ячейке записной книжки, чтобы запросить предыдущий кадр данных с именем jsonDF в кадр данных, а затем использовать sparklyr::collect для печати первых 10 строк кадра данных по умолчанию:

fromTable <- spark_read_table(
  sc   = sc,
  name = "json_books_agg"
)

collect(fromTable)

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

Добавление столбцов и вычисление значений столбцов в DataFrame

Функции dplyr можно использовать для добавления столбцов в кадры данных и вычислений значений столбцов.

Например, выполните следующий код в ячейке ноутбука, чтобы получить содержимое DataFrame с именем jsonDF. Используйте dplyr::mutate, чтобы добавить столбец с именем todayи заполнить этот новый столбец текущей меткой времени. Затем напишите это содержимое в новый кадр данных с именем withDate и используйте dplyr::collect для печати первых 10 строк нового кадра данных по умолчанию.

Заметка

dplyr::mutate принимает только аргументы, соответствующие встроенным функциям Hive (также известным как UDF) и встроенным агрегатным функциям (также называемым UDAF). Для получения общих сведений смотрите раздел Функции Hive. См. информацию о функциях, связанных с датой, в этом разделе в Функции даты.

withDate <- jsonDF %>%
  mutate(today = current_timestamp())

collect(withDate)

# A tibble: 100 × 9
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:32:59
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:32:59
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:32:59
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:32:59
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:32:59
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:32:59
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:32:59
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:32:59
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

Теперь используйте dplyr::mutate, чтобы добавить еще два столбца в содержимое таблицы данных withDate. Новые столбцы month и year содержат числовой месяц и год из столбца today. Затем напишите это содержимое в новый кадр данных с именем withMMyyyyи используйте dplyr::select вместе с dplyr::collect для печати author, title, month и year столбцов первых десяти строк нового кадра данных по умолчанию:

withMMyyyy <- withDate %>%
  mutate(month = month(today),
         year  = year(today))

collect(select(withMMyyyy, c("author", "title", "month", "year")))

# A tibble: 100 × 4
#    author                  title                                     month  year
#    <chr>                   <chr>                                     <int> <int>
#  1 Chinua Achebe           Things Fall Apart                             9  2022
#  2 Hans Christian Andersen Fairy tales                                   9  2022
#  3 Dante Alighieri         The Divine Comedy                             9  2022
#  4 Unknown                 The Epic Of Gilgamesh                         9  2022
#  5 Unknown                 The Book Of Job                               9  2022
#  6 Unknown                 One Thousand and One Nights                   9  2022
#  7 Unknown                 Njál's Saga                                   9  2022
#  8 Jane Austen             Pride and Prejudice                           9  2022
#  9 Honoré de Balzac        Le Père Goriot                                9  2022
# 10 Samuel Beckett          Molloy, Malone Dies, The Unnamable, the …     9  2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

Теперь используйте dplyr::mutate, чтобы добавить еще два столбца в содержимое DataFrame withMMyyyy. Новые столбцы formatted_date содержат часть yyyy-MM-dd из столбца today, а новый столбец day содержит числовой день из нового столбца formatted_date. Затем напишите это содержимое в новый кадр данных с именем withUnixTimestampи используйте dplyr::select вместе с dplyr::collect для печати title, formatted_dateи day столбцов первых десяти строк нового кадра данных по умолчанию:

withUnixTimestamp <- withMMyyyy %>%
  mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
         day            = dayofmonth(formatted_date))

collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))

# A tibble: 100 × 3
#    title                                           formatted_date   day
#    <chr>                                           <chr>          <int>
#  1 Things Fall Apart                               2022-09-27        27
#  2 Fairy tales                                     2022-09-27        27
#  3 The Divine Comedy                               2022-09-27        27
#  4 The Epic Of Gilgamesh                           2022-09-27        27
#  5 The Book Of Job                                 2022-09-27        27
#  6 One Thousand and One Nights                     2022-09-27        27
#  7 Njál's Saga                                     2022-09-27        27
#  8 Pride and Prejudice                             2022-09-27        27
#  9 Le Père Goriot                                  2022-09-27        27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27        27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

Создание временного представления

Можно создавать именованные временные представления в памяти, основанные на существующих DataFrame. Например, выполните следующий код в ячейке записной книжки, чтобы с помощью SparkR::createOrReplaceTempView получить содержимое предыдущего DataFrame с именем jsonTable и создать временное представление из него с именем timestampTable. Затем используйте sparklyr::spark_read_table для чтения содержимого временного представления. Используйте sparklyr::collect для печати первых 10 строк временной таблицы по умолчанию:

createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")

spark_read_table(
  sc = sc,
  name = "timestampTable"
) %>% collect()

# A tibble: 100 × 10
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:11:56
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:11:56
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:11:56
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:11:56
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:11:56
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:11:56
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:11:56
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:11:56
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
#   names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names

Провести статистический анализ на DataFrame

Вы можете использовать sparklyr вместе с dplyr для статистического анализа.

Например, создайте DataFrame для выполнения статистического анализа. Для этого выполните следующий код в ячейке записной книжки, чтобы использовать sparklyr::sdf_copy_to для записи содержимого набора данных iris, встроенного в R, в кадр данных с именем iris. Используйте sparklyr::sdf_collect для печати первых 10 строк временной таблицы по умолчанию:

irisDF <- sdf_copy_to(
  sc        = sc,
  x         = iris,
  name      = "iris",
  overwrite = TRUE
)

sdf_collect(irisDF, "row-wise")

# A tibble: 150 × 5
#    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
#           <dbl>       <dbl>        <dbl>       <dbl> <chr>
#  1          5.1         3.5          1.4         0.2 setosa
#  2          4.9         3            1.4         0.2 setosa
#  3          4.7         3.2          1.3         0.2 setosa
#  4          4.6         3.1          1.5         0.2 setosa
#  5          5           3.6          1.4         0.2 setosa
#  6          5.4         3.9          1.7         0.4 setosa
#  7          4.6         3.4          1.4         0.3 setosa
#  8          5           3.4          1.5         0.2 setosa
#  9          4.4         2.9          1.4         0.2 setosa
# 10          4.9         3.1          1.5         0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows

Теперь используйте dplyr::group_by для группировки строк по столбцу Species. Используйте dplyr::summarize вместе с dplyr::percentile_approx для вычисления сводной статистики по 25-му, 50-му, 75-му и 100-му квантилям столбца Sepal_Length по Species. Используйте sparklyr::collect, чтобы вывести результаты.

Заметка

dplyr::summarize принимает только аргументы, соответствующие встроенным функциям Hive (также известным как UDFs) и встроенным агрегатным функциям (также называемым UDAFs). Общие сведения см. в разделе Функции Hive. Для получения сведений о percentile_approxсм. Встроенные агрегатные функции (UDAF).

quantileDF <- irisDF %>%
  group_by(Species) %>%
  summarize(
    quantile_25th = percentile_approx(
      Sepal_Length,
      0.25
    ),
    quantile_50th = percentile_approx(
      Sepal_Length,
      0.50
    ),
    quantile_75th = percentile_approx(
      Sepal_Length,
      0.75
    ),
    quantile_100th = percentile_approx(
      Sepal_Length,
      1.0
    )
  )

collect(quantileDF)

# A tibble: 3 × 5
#   Species    quantile_25th quantile_50th quantile_75th quantile_100th
#   <chr>              <dbl>         <dbl>         <dbl>          <dbl>
# 1 virginica            6.2           6.5           6.9            7.9
# 2 versicolor           5.6           5.9           6.3            7
# 3 setosa               4.8           5             5.2            5.8

Аналогичные результаты можно вычислить, например с помощью sparklyr::sdf_quantile:

print(sdf_quantile(
  x = irisDF %>%
    filter(Species == "virginica"),
  column = "Sepal_Length",
  probabilities = c(0.25, 0.5, 0.75, 1.0)
))

# 25%  50%  75% 100%
# 6.2  6.5  6.9  7.9