Az Apache Spark használata Apache HBase-adatok írására és olvasására
Az Apache HBase általában alacsony szintű API-val (vizsgálatokkal, lekérdezéssel és üzembe helyezéssel) vagy az Apache Phoenixet használó SQL-szintaxissal kérdezhető le. Az Apache az Apache Spark HBase-összekötőt is biztosítja. Az összekötő kényelmes és hatékony alternatíva a HBase által tárolt adatok lekérdezéséhez és módosításához.
Előfeltételek
Két különálló HDInsight-fürt van üzembe helyezve ugyanazon a virtuális hálózaton. Egy HBase és egy Spark, amelyen legalább Spark 2.1 (HDInsight 3.6) telepítve van. További információ: Linux-alapú fürtök létrehozása a HDInsightban az Azure Portal használatával.
A fürtök elsődleges tárolójának URI-sémája. Ez a séma az Azure Blob Storage,
abfs://
az Azure Data Lake Storage Gen2 vagy az Azure Data Lake Storage Gen1 adl:// esetében wasb://. Ha a Blob Storage biztonságos átvitele engedélyezve van, az URI leszwasbs://
. Lásd még: biztonságos átvitel.
Teljes folyamat
A Spark-fürt HBase-fürt lekérdezésének engedélyezésének magas szintű folyamata a következő:
- Készítsen elő néhány mintaadatot a HBase-ben.
- Szerezze be a hbase-site.xml fájlt a HBase-fürt konfigurációs mappájából (/etc/hbase/conf), és helyezze el a hbase-site.xml egy példányát a Spark 2 konfigurációs mappájába (/etc/spark2/conf). (NEM KÖTELEZŐ: használja a HDInsight csapata által biztosított szkriptet a folyamat automatizálásához)
- Futtassa
spark-shell
a Spark HBase-összekötőre való hivatkozásokat a beállításBan a Maven koordinátáipackages
alapján. - Definiáljon egy katalógust, amely leképozza a sémát a Sparkból a HBase-be.
- A HBase-adatok kezelése AZ RDD vagy a DataFrame API-k használatával.
Mintaadatok előkészítése az Apache HBase-ben
Ebben a lépésben létrehoz és feltölt egy táblát az Apache HBase-ben, amelyet aztán lekérdezhet a Spark használatával.
ssh
A parancs használatával csatlakozzon a HBase-fürthöz. Szerkessze a parancsot a HBase-fürt nevére cserélveHBASECLUSTER
, majd írja be a parancsot:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
hbase shell
A parancs használatával indítsa el a HBase interaktív rendszerhéjat. Írja be a következő parancsot az SSH-kapcsolatba:hbase shell
create
A paranccsal kétoszlopos családokat tartalmazó HBase-táblát hozhat létre. Írja be az alábbi parancsot:create 'Contacts', 'Personal', 'Office'
put
A parancs használatával értékeket szúrhat be egy adott tábla adott sorában lévő adott oszlopba. Írja be az alábbi parancsot:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
exit
A parancs használatával állítsa le a HBase interaktív rendszerhéját. Írja be az alábbi parancsot:exit
Parancsfájlok futtatása a fürtök közötti kapcsolat beállításához
A fürtök közötti kommunikáció beállításához kövesse az alábbi lépéseket két szkript futtatásához a fürtökön. Ezek a szkriptek automatizálják a "Kommunikáció manuális beállítása" szakaszban leírt fájlmásolási folyamatot.
- A HBase-fürtből futtatott szkript feltölti
hbase-site.xml
a HBase IP-leképezési adatait a Spark-fürthöz csatolt alapértelmezett tárolóba. - A Spark-fürtből futtatott szkript két cronfeladatot állít be két segédszkript rendszeres futtatásához:
- HBase cron-feladat – új
hbase-site.xml
fájlok és HBase IP-leképezés letöltése a Spark alapértelmezett tárfiókjából a helyi csomópontra - Spark cron-feladat – Ellenőrzi, hogy történt-e Spark-skálázás, és hogy a fürt biztonságos-e. Ha igen, szerkessze
/etc/hosts
a helyileg tárolt HBase IP-leképezést
- HBase cron-feladat – új
MEGJEGYZÉS: A folytatás előtt győződjön meg arról, hogy másodlagos tárfiókként hozzáadta a Spark-fürt tárfiókját a HBase-fürthöz. Győződjön meg arról, hogy a szkriptek a megadott sorrendben szerepelnek.
A HBase-fürt szkriptműveletével alkalmazza a módosításokat az alábbi szempontok figyelembevételével:
Tulajdonság Érték Bash-szkript URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
Csomóponttípus(ok) Régió Paraméterek -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
Kitartott igen -
SECONDARYS_STORAGE_URL
A Spark-oldal alapértelmezett tárolójának URL-címe. Példa paraméter:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
-
A Spark-fürtön a Szkriptművelettel alkalmazza a módosításokat az alábbi szempontok figyelembevételével:
Tulajdonság Érték Bash-szkript URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
Csomóponttípus(ok) Vezető, feldolgozó, Zookeeper Paraméterek -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
Kitartott igen - Megadhatja, hogy a fürt milyen gyakran ellenőrizze automatikusan, hogy frissül-e. Alapértelmezett:
-s “*/1 * * * *” -h 0
(Ebben a példában a Spark cron-feladat percenként fut, míg a HBase-cron nem fut) - Mivel a HBase cron alapértelmezés szerint nincs beállítva, újra kell futtatnia ezt a szkriptet, amikor skálázást végez a HBase-fürtön. Ha a HBase-fürt gyakran skálázódik, dönthet úgy, hogy automatikusan beállítja a HBase cron-feladatot. Például:
-s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc"
a szkriptet úgy konfigurálja, hogy 30 percenként végezzen ellenőrzéseket. Ez rendszeres időközönként futtatja a HBase cron ütemezését, hogy automatizálja az új HBase-adatok letöltését a közös tárfiókon a helyi csomópontra.
- Megadhatja, hogy a fürt milyen gyakran ellenőrizze automatikusan, hogy frissül-e. Alapértelmezett:
Feljegyzés
Ezek a szkriptek csak HDI 5.0- és HDI 5.1-fürtökön működnek.
A kommunikáció manuális beállítása (Nem kötelező, ha a fenti lépésben megadott szkript sikertelen)
MEGJEGYZÉS: Ezeket a lépéseket minden alkalommal végre kell hajtani, amikor az egyik fürt skálázási tevékenységen megy keresztül.
Másolja a hbase-site.xml a helyi tárolóból a Spark-fürt alapértelmezett tárolójának gyökerére. Szerkessze a parancsot a konfigurációnak megfelelően. Ezután a nyitott SSH-munkamenetből a HBase-fürtbe írja be a következő parancsot:
Szintaxis értéke Új érték URI-séma Módosítsa úgy, hogy tükrözze a tárterületet. A szintaxis a biztonságos átvitelt engedélyező blobtárolókra használható. SPARK_STORAGE_CONTAINER
Cserélje le a Spark-fürthöz használt alapértelmezett tárolónévre. SPARK_STORAGE_ACCOUNT
Cserélje le a Spark-fürthöz használt alapértelmezett tárfióknévre. hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
Ezután lépjen ki az ssh-kapcsolatból a HBase-fürthöz.
exit
Csatlakozzon a Spark-fürt fő csomópontjához az SSH használatával. Szerkessze a parancsot a Spark-fürt nevére cserélve
SPARKCLUSTER
, majd írja be a parancsot:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
Adja meg a Spark-fürt alapértelmezett tárolójából a fürt helyi tárolóján lévő Spark 2 konfigurációs mappába másolandó
hbase-site.xml
parancsot:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
A Spark Shell futtatása a Spark HBase-összekötőre hivatkozva
Az előző lépés elvégzése után képesnek kell lennie a Spark-rendszerhéj futtatására a Spark HBase-összekötő megfelelő verziójára hivatkozva.
Az alábbi táblázat például két verziót és a HDInsight csapat által jelenleg használt parancsokat sorolja fel. A fürtökhöz ugyanazokat a verziókat használhatja, ha a HBase és a Spark verziói megegyeznek a táblázatban feltüntetettekkel.
A Spark-fürtnek megnyitott SSH-munkamenetben adja meg a következő parancsot a Spark-rendszerhéj elindításához:
Szikra változat HDI HBase-verzió SHC-verzió Parancs 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
Tartsa nyitva ezt a Spark-rendszerhéj-példányt, és folytassa a katalógus és lekérdezés definiálásához. Ha nem találja az SHC Core-adattár verzióinak megfelelő jarokat, folytassa az olvasást.
A Spark- és HBase-verziók későbbi kombinációi esetében ezek az összetevők már nem jelennek meg a fenti adattárban. Az üvegeket közvetlenül a spark-hbase-connector GitHub-ágból hozhatja létre. Ha például a Spark 2.4-et és a HBase 2.1-et használja, hajtsa végre az alábbi lépéseket:
Klónozza az adattárat:
git clone https://github.com/hortonworks-spark/shc
Nyissa meg a branch-2.4-et:
git checkout branch-2.4
Buildelés az ágból (létrehoz egy .jar fájlt):
mvn clean package -DskipTests
Futtassa a következő parancsot (ügyeljen arra, hogy módosítsa a létrehozott .jar fájlnak megfelelő .jar nevet):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
Tartsa nyitva ezt a Spark-rendszerhéj-példányt, és folytassa a következő szakaszban.
Katalógus és lekérdezés definiálása
Ebben a lépésben definiál egy katalógusobjektumot, amely leképezi a sémát az Apache Sparkból az Apache HBase-be.
A megnyitott Spark Shellben adja meg a következő
import
utasításokat:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
Adja meg az alábbi parancsot a HBase-ben létrehozott Névjegyek tábla katalógusának meghatározásához:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
A kód:
- Katalógusséma definiálása a HBase nevű
Contacts
táblához. - Azonosítja a sorkulcsot
key
, és megfelelteti a Sparkban használt oszlopneveket a HBase-ben használt oszlopcsaládra, oszlopnévre és oszloptípusra. - A sorkulcsot részletesen elnevezett oszlopként (
rowkey
) határozza meg, amelynek egy adott oszlopcsaládjacf
rowkey
van.
- Katalógusséma definiálása a HBase nevű
Adja meg a parancsot egy olyan metódus meghatározásához, amely dataFrame-et biztosít a táblázat körül a
Contacts
HBase-ben:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
Hozza létre a DataFrame egy példányát:
val df = withCatalog(catalog)
A DataFrame lekérdezése:
df.show()
Két adatsornak kell megjelennie:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
Regisztráljon egy ideiglenes táblát, hogy lekérdezhesse a HBase-táblát a Spark SQL használatával:
df.createTempView("contacts")
SQL-lekérdezés kiadása a
contacts
táblán:spark.sqlContext.sql("select personalName, officeAddress from contacts").show
A következő eredményeknek kell megjelenniük:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
Új adatok beszúrása
Új partnerrekord beszúrásához adjon meg egy osztályt
ContactRecord
:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
Hozzon létre egy példányt
ContactRecord
, és helyezze el egy tömbbe:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
Mentse az új adatok tömbét a HBase-be:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
Az eredmények vizsgálata:
df.show()
A következőhöz hasonló kimenetnek kell megjelennie:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
Zárja be a Spark-rendszerhéjat a következő parancs beírásával:
:q