Menggunakan Apache Spark untuk membaca dan menulis data Apache HBase
Apache HBase biasanya dikueri dengan API tingkat rendah (memindai, mengambil, dan meletakkan) atau dengan sintaks SQL menggunakan Apache Phoenix. Apache juga menyediakan Apache Spark HBase Connector. Konektor adalah alternatif yang nyaman dan efisien untuk mengkueri dan memodifikasi data yang disimpan oleh HBase.
Prasyarat
Dua kluster HDInsight terpisah yang disebarkan dalam jaringan virtual yang sama. Satu HBase, dan satu Spark dengan setidaknya Spark 2.1 (HDInsight 3.6) terinstal. Untuk informasi selengkapnya, lihat Membuat kluster berbasis Linux di HDInsight menggunakan portal Microsoft Azure.
Skema URI untuk penyimpanan utama kluster Anda. Skemanya akan menjadi wasb:// untuk Azure Blob Storage,
abfs://
untuk Azure Data Lake Storage Gen2, atau adl:// untuk Azure Data Lake Storage Gen1. Jika transfer aman diaktifkan untuk Blob Storage, URI akan menjadiwasbs://
. Lihat juga, transfer aman.
Memproses keseluruhan
Memproses tingkat tinggi untuk pengaktifan kluster Spark Anda untuk mengkueri kluster HBase Anda adalah sebagai berikut:
- Siapkan beberapa data sampel di HBase.
- Dapatkan file hbase-site.xml dari folder konfigurasi kluster HBase Anda (/etc/hbase/conf), dan letakkan salinan hbase-site.xml di folder konfigurasi Spark 2 Anda (/etc/spark2/conf). (OPSIONAL: gunakan skrip yang disediakan oleh tim HDInsight untuk mengotomatiskan proses ini)
- Jalankan
spark-shell
yang mereferensikan Spark HBase Connector dengan koordinat Maven-nya dalam opsipackages
. - Tentukan katalog yang memetakan skema dari Spark ke HBase.
- Lakukan interaksi dengan data HBase menggunakan API RDD atau DataFrame.
Menyiapkan data sampel di Apache HBase
Dalam langkah ini, Anda membuat dan mengisi tabel di Apache HBase yang kemudian dapat Anda kueri menggunakan Spark.
Gunakan perintah
ssh
untuk menghubungkan ke kluster HBase Anda. Edit perintah dengan menggantiHBASECLUSTER
dengan nama kluster HBase Anda, lalu masukkan perintah:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
Gunakan perintah
hbase shell
untuk memulai shell interaktif HBase. Masukkan perintah berikut di koneksi SSH Anda:hbase shell
Gunakan perintah
create
untuk membuat tabel HBase dengan famili dua kolom. Masukkan perintah berikut:create 'Contacts', 'Personal', 'Office'
Gunakan perintah
put
untuk menyisipkan nilai pada kolom tertentu dalam baris tertentu dalam tabel tertentu. Masukkan perintah berikut:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
Gunakan perintah
exit
untuk menghentikan shell interaktif HBase. Masukkan perintah berikut:exit
Menjalankan skrip untuk menyiapkan koneksi antar kluster
Untuk menyiapkan komunikasi antar kluster, ikuti langkah-langkah untuk menjalankan dua skrip pada kluster Anda. Skrip ini akan mengotomatiskan proses penyalinan file yang dijelaskan di bagian 'Siapkan komunikasi secara manual'.
- Skrip yang Anda jalankan dari kluster HBase akan mengunggah
hbase-site.xml
dan informasi pemetaan IP HBase ke penyimpanan default yang terpasang pada kluster Spark Anda. - Skrip yang Anda jalankan dari kluster Spark menyiapkan dua pekerjaan cron untuk menjalankan dua skrip pembantu secara berkala:
- Pekerjaan cron HBase – unduh file
hbase-site.xml
baru dan pemetaan IP HBase dari akun penyimpanan default Spark ke simpul lokal - Pekerjaan cron Spark - memeriksa apakah penyekalaan Spark terjadi dan apakah kluster aman. Jika memang demikian, edit
/etc/hosts
untuk menyertakan pemetaan IP HBase yang disimpan secara lokal
- Pekerjaan cron HBase – unduh file
CATATAN: Sebelum melanjutkan, pastikan Anda telah menambahkan akun penyimpanan kluster Spark ke kluster HBase Anda sebagai akun penyimpanan sekunder. Pastikan Anda skrip secara berurutan seperti yang ditunjukkan.
Gunakan Tindakan Skrip pada kluster HBase Anda untuk menerapkan perubahan dengan pertimbangan berikut:
Properti Nilai URI skrip bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
Jenis node Wilayah Parameter -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
Dipertahankan yes -
SECONDARYS_STORAGE_URL
adalah url penyimpanan default sisi Spark. Contoh Parameter:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
-
Menggunakan Tindakan Skrip pada kluster Spark Anda untuk menerapkan perubahan dengan pertimbangan berikut:
Properti Nilai URI skrip bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
Jenis node Head, Pekerja, Zookeeper Parameter -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
Dipertahankan yes - Anda dapat menentukan seberapa sering Anda menginginkan kluster ini secara otomatis memeriksa apakah ada pembaruan. Default:
-s “*/1 * * * *” -h 0
(Dalam contoh ini, pekerjaan Spark cron berjalan setiap menit, sementara cron HBase tidak berjalan) - Karena HBase cron tidak disiapkan secara default, Anda perlu menjalankan ulang skrip ini saat melakukan penskalakan ke kluster HBase Anda. Jika kluster HBase sering diskalakan, Anda dapat memilih untuk mengatur pekerjaan cron HBase secara otomatis. Contohnya:
-s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc"
mengonfigurasi skrip untuk melakukan pemeriksaan setiap 30 menit. Hal ini akan menjalankan jadwal cron HBase secara berkala untuk mengotomatiskan pengunduhan informasi HBase baru pada akun penyimpanan umum ke simpul lokal.
- Anda dapat menentukan seberapa sering Anda menginginkan kluster ini secara otomatis memeriksa apakah ada pembaruan. Default:
Catatan
Skrip ini hanya berfungsi pada kluster HDI 5.0 dan HDI 5.1.
Menyiapkan komunikasi secara manual (Opsional, jika langkah skrip yang disediakan di atas gagal)
CATATAN: Langkah-langkah ini perlu dilakukan setiap kali salah satu kluster menjalani aktivitas penyekalaan.
Salin hbase-site.xml dari penyimpanan lokal ke root penyimpanan default kluster Spark Anda. Edit perintah untuk mencerminkan konfigurasi Anda. Kemudian, dari sesi SSH terbuka Anda ke kluster HBase, masukkan perintah:
Nilai sintaks Nilai baru Skema URI Ubah untuk mencerminkan penyimpanan Anda. Sintaksnya adalah untuk penyimpanan blob dengan transfer aman diaktifkan. SPARK_STORAGE_CONTAINER
Ganti dengan nama kontainer penyimpanan default yang digunakan untuk kluster Spark. SPARK_STORAGE_ACCOUNT
Ganti dengan nama akun penyimpanan default yang digunakan untuk kluster Spark. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
Kemudian keluarlah dari koneksi ssh Anda ke kluster HBase Anda.
exit
Sambungkan ke simpul kepala kluster Spark Anda menggunakan SSH. Edit perintah dengan mengganti
SPARKCLUSTER
dengan nama kluster Spark Anda, lalu masukkan perintah:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
Masukkan perintah untuk menyalin
hbase-site.xml
dari penyimpanan default kluster Spark Anda ke folder konfigurasi Spark 2 pada penyimpanan lokal kluster:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Jalankan Spark Shell yang mereferensikan Spark HBase Connector
Setelah menyelesaikan langkah sebelumnya, Anda akan dapat menjalankan shell Spark, yang mereferensikan versi Spark HBase Connector yang sesuai.
Sebagai contoh, tabel berikut ini mencantumkan dua versi dan perintah terkait yang saat ini digunakan tim HDInsight. Anda dapat menggunakan versi yang sama untuk kluster Anda jika versi HBase dan Spark sama seperti yang ditunjukkan dalam tabel.
Dalam sesi SSH terbuka Anda ke kluster Spark, masukkan perintah berikut untuk memulai shell Spark:
Versi Spark Versi HDI HBase Versi SHC Perintah 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
Biarkan instans shell Spark ini tetap terbuka dan lanjutkan untuk Menentukan katalog dan kueri. Jika Anda tidak menemukan jar yang sesuai dengan versi Anda di repositori SHC Core, lanjutkan pembacaan.
Untuk kombinasi berikutnya dari versi Spark dan HBase, artefak ini tidak lagi dipublikasikan di repositori di atas. Anda dapat membangun jar langsung dari cabang GitHub spark-hbase-connector. Misalnya, jika Anda menjalankan dengan Spark 2.4 dan HBase 2.1, selesaikan langkah-langkah berikut:
Mengklonakan repositori:
git clone https://github.com/hortonworks-spark/shc
Buka cabang-2.4:
git checkout branch-2.4
Membangun dari cabang (membuat file .jar):
mvn clean package -DskipTests
Jalankan perintah berikut (pastikan untuk mengubah nama .jar yang terkait dengan file .jar yang Anda bangun):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
Biarkan instans shell Spark ini tetap terbuka dan lanjutkan ke bagian berikutnya.
Menentukan katalog dan kueri
Dalam langkah ini, Anda mendefinisikan objek katalog yang memetakan skema dari Apache Spark ke Apache HBase.
Di Spark Shell Anda yang terbuka, masukkan pernyataan
import
berikut:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
Masukkan perintah di bawah ini untuk menentukan katalog tabel Kontak yang Anda buat di HBase:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
Kode:
- Mendefinisikan skema katalog untuk tabel HBase bernama
Contacts
. - Mengidentifikasi rowkey sebagai
key
, dan petakan nama kolom yang digunakan dalam Spark ke famili kolom, nama kolom, dan jenis kolom seperti yang digunakan di HBase. - Menentukan rowkey secara terperinci sebagai kolom bernama (
rowkey
), yang memiliki famili kolom spesifikcf
darirowkey
.
- Mendefinisikan skema katalog untuk tabel HBase bernama
Masukkan perintah untuk menentukan metode yang menyediakan DataFrame di sekitar tabel Anda
Contacts
di HBase:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
Membuat instans DataFrame:
val df = withCatalog(catalog)
Kuerikan DataFrame:
df.show()
Anda akan melihat dua baris data:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
Daftarkan tabel sementara agar Anda bisa mengkueri tabel HBase menggunakan Spark SQL:
df.createTempView("contacts")
Menerbitkan kueri SQL terhadap tabel
contacts
:spark.sqlContext.sql("select personalName, officeAddress from contacts").show
Anda akan melihat hasil seperti ini:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Menyisipkan data baru
Untuk menyisipkan rekaman Kontak baru, tentukan kelas
ContactRecord
:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
Buat instans
ContactRecord
dan masukkan ke dalam array:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
Simpan array data baru ke HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
Periksa hasilnya:
df.show()
Anda akan melihat output seperti ini:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
Tutup shell Spark dengan memasukkan perintah berikut:
:q