استخدام Apache Spark لقراءة وكتابة بيانات Apache HBase
عادةً ما يتم الاستعلام عن Apache HBase إما من خلال واجهة برمجة التطبيقات ذات المستوى المنخفض (عمليات المسح، والحصول على، ووضع) أو باستخدام بناء جملة SQL باستخدام Apache Phoenix. يوفر Apache أيضًا موصل Apache Spark HBase. يعد الموصل بديلًا مناسبًا وفعالًا للاستعلام عن البيانات المخزنة بواسطة HBase وتعديلها.
المتطلبات الأساسية
تم نشر مجموعتين منفصلتين من أنظمة HDInsight في نفس الشبكة الافتراضية . One HBase وSpark واحد به على الأقل Spark 2.1 (HDInsight 3.6) مثبتة. لمزيد من المعلومات، راجع إنشاء مجموعات مستندة إلى Linux في HDInsight باستخدام بوابة Azure .
مخطط URI للتخزين الأساسي لمجموعاتك. سيكون هذا المخطط wasb: // لتخزين Azure Blob،
abfs://
لـAzure Data Lake Storage Gen2 أو adl: // لـAzure Data Lake Storage Gen1. إذا تم تمكين النقل الآمن لتخزين Blob، فسيكون URIwasbs://
. راجع أيضًا، النقل الآمن .
إجمالي العمليات
العملية عالية المستوى لتمكين مجموعة Spark الخاصة بك من الاستعلام عن مجموعة HBase الخاصة بك على النحو التالي:
- تحضير بعض عينات البيانات في HBase.
- احصل على ملف hbase-site.xml من مجلد تكوين مجموعة HBase (/ etc / hbase / conf)، ثم ضع نسخة من hbase-site.xml في مجلد تكوين Spark 2 (/ etc / spark2 / conf). (اختياري: استخدم البرنامج النصي المقدم من فريق HDInsight لأتمتة هذه العملية)
- قم بتشغيل
spark-shell
الرجوع إلى Spark HBase Connector بواسطة إحداثيات Maven الخاصة به في الخيارpackages
. - حدد كتالوجًا يقوم بتعيين المخطط من Spark إلى HBase.
- التفاعل مع بيانات HBase باستخدام واجهات برمجة تطبيقات RDD أو DataFrame.
تحضير البيانات النموذجية في Apache HBase
في هذه الخطوة، تقوم بإنشاء جدول وملؤه في Apache HBase بحيث يمكنك الاستعلام بعد ذلك باستخدام Spark.
استخدم الأمر
ssh
للاتصال بمجموعة HBase الخاصة بك. قم بتحرير الأمر عن طريق استبدالHBASECLUSTER
باسم مجموعة HBase الخاصة بك، ثم أدخل الأمر :ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
استخدم الأمر
hbase shell
لبدء shell التفاعلي HBase. أدخل الأمر التالي في اتصال SSH:hbase shell
استخدم الأمر
create
لإنشاء جدول HBase مع عائلات من عمودين. أدخل الأمر التالي:create 'Contacts', 'Personal', 'Office'
استخدم الأمر
put
لإدراج القيم في عمود محدد في صف محدد في جدول معين. أدخل الأمر التالي:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
استخدم الأمر
exit
لإيقاف غلاف HBase التفاعلي. أدخل الأمر التالي:exit
قم بتشغيل البرامج النصية لإعداد الاتصال بين المجموعات
لإعداد الاتصال بين المجموعات، اتبع الخطوات لتشغيل برنامجين نصيين على مجموعاتك. ستقوم هذه البرامج النصية بأتمتة عملية نسخ الملفات الموضحة في قسم "إعداد الاتصال يدويا".
- سيتم تحميل البرنامج النصي الذي تقوم بتشغيله من مجموعة HBase
hbase-site.xml
ومعلومات تعيين HBase IP إلى وحدة التخزين الافتراضية المرفقة بمجموعة Spark الخاصة بك. - يقوم البرنامج النصي الذي تقوم بتشغيله من مجموعة Spark بإعداد مهمتين cron لتشغيل برنامجين نصيين مساعدين بشكل دوري:
- مهمة HBase cron - تنزيل ملفات
hbase-site.xml
جديدة وتعيين HBase IP من حساب التخزين الافتراضي Spark إلى العقدة المحلية - وظيفة Spark cron - تتحقق مما إذا كان مقياس Spark قد حدث وما إذا كانت المجموعة آمنة. إذا كان الأمر كذلك، فقم بتحرير
/etc/hosts
لتضمين تعيين HBase IP المخزن محليًا
- مهمة HBase cron - تنزيل ملفات
ملاحظة: قبل المتابعة، تأكد من إضافة حساب تخزين مجموعة Spark إلى مجموعة HBase كحساب تخزين ثانوي. تأكد من البرامج النصية بالترتيب كما هو موضح.
استخدم Script Action في مجموعة HBase لديك لتطبيق التغييرات مع الاعتبارات التالية:
الخاصية القيمة عنوان URI النصي Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
نوع العقدة (العقدات) المنطقة المعلمات -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
الاستمرار yes -
SECONDARYS_STORAGE_URL
هو عنوان url الخاص بوحدة التخزين الافتراضية من جانب Spark. مثال على المعلمة:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
-
استخدم Script Action على مجموعة Spark لديك لتطبيق التغييرات مع الاعتبارات التالية:
الخاصية القيمة عنوان URI النصي Bash https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
نوع العقدة (العقدات) رئيس، عامل، Zookeeper المعلمات -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
الاستمرار yes - يمكنك تحديد عدد المرات التي تريد أن تتحقق فيها هذه المجموعة تلقائيًا من التحديث. الافتراضي:
-s “*/1 * * * *” -h 0
(في هذا المثال، تعمل مهمة Spark cron كل دقيقة، بينما لا يتم تشغيل HBase cron) - نظرا لأنه لم يتم إعداد HBase cron بشكل افتراضي، تحتاج إلى إعادة تشغيل هذا البرنامج النصي عند إجراء التحجيم إلى نظام مجموعة HBase الخاص بك. إذا كانت مجموعة HBase تتوسع كثيرًا، فيمكنك اختيار إعداد وظيفة HBase cron تلقائيًا. على سبيل المثال: يقوم
-s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc"
بتهيئة البرنامج النصي لإجراء عمليات تحقق كل 30 دقيقة. سيؤدي هذا إلى تشغيل جدول HBase cron بشكل دوري لأتمتة تنزيل معلومات HBase الجديدة على حساب التخزين المشترك إلى العقدة المحلية.
- يمكنك تحديد عدد المرات التي تريد أن تتحقق فيها هذه المجموعة تلقائيًا من التحديث. الافتراضي:
إشعار
تعمل هذه البرامج النصية فقط على مجموعات HDI 5.0 وHDI 5.1.
إعداد الاتصال يدويًا (اختياري، إذا فشل البرنامج النصي في الخطوة أعلاه)
NOTE: يجب تنفيذ هذه الخطوات في كل مرة تخضع فيها إحدى المجموعات لنشاط تحجيم.
انسخ hbase-site.xml من التخزين المحلي إلى جذر التخزين الافتراضي لمجموعة Spark. قم بتحرير الأمر ليعكس التكوين الخاص بك. بعد ذلك، من جلسة SSH المفتوحة إلى مجموعة HBase، أدخل الأمر:
قيمة بناء الجملة قيمة جديدة مخطط URI تعديل لتعكس التخزين الخاص بك. بناء الجملة هو لتخزين كائن ثنائي كبير الحجم مع تمكين النقل الآمن. SPARK_STORAGE_CONTAINER
استبدل اسم حاوية التخزين الافتراضي المستخدم لمجموعة Spark. SPARK_STORAGE_ACCOUNT
استبدل اسم حساب التخزين الافتراضي المستخدم لمجموعة Spark. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
ثم قم بإنهاء اتصال ssh بمجموعة HBase الخاصة بك.
exit
قم بالاتصال بالعقدة الرئيسية لمجموعة Spark الخاصة بك باستخدام SSH. قم بتحرير الأمر عن طريق استبدال
SPARKCLUSTER
باسم مجموعة Spark، ثم أدخل الأمر :ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
أدخل الأمر للنسخ
hbase-site.xml
من التخزين الافتراضي لنظام مجموعة Spark إلى مجلد تكوين Spark 2 على التخزين المحلي للمجموعة:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
قم بتشغيل Spark Shell بالإشارة إلى موصل Spark HBase
بعد إكمال الخطوة السابقة، يجب أن تكون قادرًا على تشغيل Spark shell، بالإشارة إلى الإصدار المناسب من Spark HBase Connector.
كمثال، يسرد الجدول التالي نسختين والأوامر المقابلة التي يستخدمها فريق HDInsight حاليًا. يمكنك استخدام نفس الإصدارات لمجموعاتك إذا كانت إصدارات HBase وSpark متطابقة كما هو موضح في الجدول.
في جلسة SSH المفتوحة إلى Spark، أدخل الأمر التالي لبدء Spark shell:
إصدار Spark إصدار HDI HBase إصدار SHC الأمر 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
اترك مثيل Spark shell مفتوحًا وتابع تعريف الكتالوج والاستعلام . إذا لم تجد jars التي تتوافق مع إصداراتك في مستودع SHC Core، فتابع القراءة.
بالنسبة للمجموعات اللاحقة من إصدارات Spark وHBase، لم يعد يتم نشر هذه القطع الأثرية في أعلى repo. يمكنك بناء الجرار مباشرة من فرع GitHub spark-hbase-connector . على سبيل المثال، إذا كنت تستخدم Spark 2.4 وHBase 2.1، فأكمل الخطوات التالية:
استنساخ repo:
git clone https://github.com/hortonworks-spark/shc
انتقل إلى الفرع 2.4:
git checkout branch-2.4
بناء من الفرع (إنشاء ملف .jar):
mvn clean package -DskipTests
قم بتشغيل الأمر التالي (تأكد من تغيير اسم .jar الذي يتوافق مع ملف .jar الذي أنشأته):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
احتفظ بمثيل Spark shell هذا مفتوحًا وتابع إلى القسم التالي.
تحديد الكتالوج والاستعلام
في هذه الخطوة، تقوم بتعريف عنصر كتالوج يقوم بتعيين المخطط من Apache Spark إلى Apache HBase.
في Spark Shell المفتوحة، أدخل عبارات
import
التالية:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
أدخل الأمر أدناه لتحديد كتالوج لجدول جهات الاتصال الذي أنشأته في HBase:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
التعليمة البرمجية:
- يحدد مخطط كتالوج لجدول HBase المسمى
Contacts
. - يعرّف مفتاح الصف على أنه
key
، وقم بتعيين أسماء الأعمدة المستخدمة في Spark لعائلة الأعمدة واسم العمود ونوع العمود كما هو مستخدم في HBase. - يعرّف مفتاح الصف بالتفصيل على أنه عمود مسمى (
rowkey
)، والذي يحتوي على مجموعة أعمدة معينةcf
منrowkey
.
- يحدد مخطط كتالوج لجدول HBase المسمى
أدخل الأمر لتعريف أسلوب يوفر DataFrame حول
Contacts
الجدول في HBase:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
قم بإنشاء مثيل من DataFrame:
val df = withCatalog(catalog)
استعلام عن DataFrame:
df.show()
يجب أن ترى صفين من البيانات:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
قم بتسجيل جدول مؤقت حتى تتمكن من الاستعلام عن جدول HBase باستخدام Spark SQL:
df.createTempView("contacts")
إصدار استعلام SQL مقابل الجدول
contacts
:spark.sqlContext.sql("select personalName, officeAddress from contacts").show
يجب أن ترى نتائج مثل هذه:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
أدخل بيانات جديدة
لإدراج سجل جهة اتصال جديد، حدد فئة
ContactRecord
:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
قم بإنشاء مثيل لـ
ContactRecord
وضعه في مصفوفة:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
احفظ مجموعة البيانات الجديدة في HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
افحص النتائج:
df.show()
يجب أن ترى الإخراج مثل هذا:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
أغلق spark shell بإدخال الأمر التالي:
:q