Oktatóanyag: Databricks Delta-tábla frissítésére szolgáló data lake capture minta implementálása
Ez az oktatóanyag bemutatja, hogyan kezelheti az eseményeket egy hierarchikus névtérrel rendelkező tárfiókban.
Létrehozhat egy kis megoldást, amely lehetővé teszi, hogy a felhasználó feltöltsön egy Databricks Delta-táblát egy vesszővel tagolt értékeket (csv) tartalmazó fájl feltöltésével, amely egy értékesítési rendelést ír le. Ezt a megoldást egy Event Grid-előfizetés, egy Azure-függvény és egy Azure Databricks-feladat összekapcsolásával hozhatja létre.
Az oktatóanyag során az alábbi lépéseket fogja végrehajtani:
- Hozzon létre egy Event Grid-előfizetést, amely meghív egy Azure-függvényt.
- Hozzon létre egy Azure-függvényt, amely értesítést kap egy eseménytől, majd futtatja a feladatot az Azure Databricksben.
- Hozzon létre egy Databricks-feladatot, amely beszúr egy ügyfélrendelést a tárfiókban található Databricks Delta-táblába.
Ezt a megoldást fordított sorrendben fogjuk felépíteni, kezdve az Azure Databricks-munkaterülettel.
Előfeltételek
Hozzon létre egy hierarchikus névteret (Azure Data Lake Storage) tartalmazó tárfiókot. Ez az oktatóanyag egy .
contosoorders
Győződjön meg arról, hogy a felhasználói fiókjához hozzá van rendelve a Storage Blob Data Közreműködő szerepkör .
Hozzon létre egy szolgáltatásnevet, hozzon létre egy ügyfélkulcsot, majd adjon hozzáférést a szolgáltatásnévnek a tárfiókhoz.
Lásd az oktatóanyagot: Csatlakozás az Azure Data Lake Storage-hoz (1–3. lépés). A lépések elvégzése után illessze be a bérlőazonosítót, az alkalmazásazonosítót és az ügyfél titkos kódértékeket egy szövegfájlba. Ezekre hamarosan szüksége lesz.
Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.
Értékesítési rendelés létrehozása
Először hozzon létre egy csv-fájlt, amely leírja az értékesítési rendelést, majd töltse fel a fájlt a tárfiókba. Később a fájlból származó adatokat fogja használni a Databricks Delta-tábla első sorának feltöltéséhez.
Az Azure Portalon lépjen az új tárfiókjára.
Válassza a Storage browser-Blob> containers-Add> container (Tároló hozzáadása) lehetőséget, és hozzon létre egy új, névvel ellátott tárolót.
Az adattárolóban hozzon létre egy bemenet nevű könyvtárat.
Illessze be a következő szöveget egy szövegszerkesztőbe.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
Mentse a fájlt a helyi számítógépre, és adja meg a data.csv nevet.
A tárböngészőben töltse fel ezt a fájlt a bemeneti mappába.
Feladat létrehozása az Azure Databricksben
Ebben a szakaszban a következő feladatokat fogja elvégezni:
- Azure Databricks-munkaterület létrehozása.
- Hozzon létre egy notebookot.
- Databricks Delta-tábla létrehozása és feltöltése.
- Adjon hozzá olyan kódot, amely sorokat szúr be a Databricks Delta táblába.
- Hozzon létre egy feladatot.
Azure Databricks-munkaterület létrehozása
Ebben a szakaszban egy Azure Databricks-munkaterületet fog létrehozni az Azure Portal használatával.
Azure Databricks-munkaterület létrehozása. Nevezze el a munkaterületet
contoso-orders
. Lásd: Azure Databricks-munkaterület létrehozása.Hozzon létre egy fürtöt. Nevezze el a fürtöt
customer-order-cluster
. Lásd: Fürt létrehozása.Hozzon létre egy notebookot. Nevezze el a jegyzetfüzetet
configure-customer-table
, és válassza a Pythont a jegyzetfüzet alapértelmezett nyelveként. Lásd: Jegyzetfüzet létrehozása.
Databricks Delta-tábla létrehozása és feltöltése
A létrehozott jegyzetfüzetben másolja és illessze be az alábbi kódblokkot az első cellába, de még ne futtassa ezt a kódot.
Cserélje le a
appId
kódblokkban található ,password
tenant
helyőrző értékeket az oktatóanyag előfeltételeinek teljesítése során gyűjtött értékekre.dbutils.widgets.text('source_file', "", "Source File") spark.conf.set("fs.azure.account.auth.type", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>") spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>") spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token") adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/' inputPath = adlsPath + dbutils.widgets.get('source_file') customerTablePath = adlsPath + 'delta-tables/customers'
Ez a kód létrehoz egy source_file nevű widgetet. Később létre fog hozni egy Azure-függvényt, amely meghívja ezt a kódot, és átadja a widget fájlelérési útját. Ez a kód emellett hitelesíti a szolgáltatásnevet a tárfiókkal, és létrehoz néhány változót, amelyeket más cellákban fog használni.
Feljegyzés
Éles környezetben fontolja meg a hitelesítési kulcs azure Databricksben való tárolását. Ezután adjon hozzá egy keresési kulcsot a kódblokkhoz a hitelesítési kulcs helyett.
Például ahelyett, hogy ezt a kódsort használná:spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
a következő kódsort használná:spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>"))
.
Miután elvégezte ezt az oktatóanyagot, tekintse meg az Azure Databricks webhelyén található Azure Data Lake Storage-cikket , hogy példákat találjon erre a megközelítésre.Nyomja le a SHIFT + ENTER billentyűket a kód ebben a blokkban való futtatásához.
Másolja és illessze be a következő kódblokkot egy másik cellába, majd nyomja le a SHIFT + ENTER billentyűkombinációt a kód ebben a blokkban való futtatásához.
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType inputSchema = StructType([ StructField("InvoiceNo", IntegerType(), True), StructField("StockCode", StringType(), True), StructField("Description", StringType(), True), StructField("Quantity", IntegerType(), True), StructField("InvoiceDate", StringType(), True), StructField("UnitPrice", DoubleType(), True), StructField("CustomerID", IntegerType(), True), StructField("Country", StringType(), True) ]) rawDataDF = (spark.read .option("header", "true") .schema(inputSchema) .csv(adlsPath + 'input') ) (rawDataDF.write .mode("overwrite") .format("delta") .saveAsTable("customer_data", path=customerTablePath))
Ez a kód létrehozza a Databricks Delta táblát a tárfiókban, majd betölt néhány kezdeti adatot a korábban feltöltött CSV-fájlból.
A kódblokk sikeres futtatása után távolítsa el ezt a kódblokkot a jegyzetfüzetből.
Sorokat beszúró kód hozzáadása a Databricks Delta-táblába
Másolja és illessze be a következő kódblokkot egy másik cellába, de ne futtassa ezt a cellát.
upsertDataDF = (spark .read .option("header", "true") .csv(inputPath) ) upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
Ez a kód egy csv-fájlból származó adatokkal szúr be adatokat egy ideiglenes táblanézetbe. A csv-fájl elérési útja a korábbi lépésben létrehozott beviteli widgetből származik.
Másolja és illessze be a következő kódblokkot egy másik cellába. Ez a kód egyesíti az ideiglenes táblanézet tartalmát a Databricks Delta-táblával.
%sql MERGE INTO customer_data cd USING customer_data_to_upsert cu ON cd.CustomerID = cu.CustomerID WHEN MATCHED THEN UPDATE SET cd.StockCode = cu.StockCode, cd.Description = cu.Description, cd.InvoiceNo = cu.InvoiceNo, cd.Quantity = cu.Quantity, cd.InvoiceDate = cu.InvoiceDate, cd.UnitPrice = cu.UnitPrice, cd.Country = cu.Country WHEN NOT MATCHED THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country) VALUES ( cu.InvoiceNo, cu.StockCode, cu.Description, cu.Quantity, cu.InvoiceDate, cu.UnitPrice, cu.CustomerID, cu.Country)
Feladat létrehozása
Hozzon létre egy feladatot, amely a korábban létrehozott jegyzetfüzetet futtatja. Később létre fog hozni egy Azure-függvényt, amely egy esemény létrehozásakor futtatja ezt a feladatot.
Válassza az Új> feladat lehetőséget.
Adjon nevet a feladatnak, válassza ki a létrehozott jegyzetfüzetet és a fürtöt. Ezután válassza a Létrehozás lehetőséget a feladat létrehozásához.
Azure-függvény létrehozása
Hozzon létre egy Azure-függvényt, amely futtatja a feladatot.
Az Azure Databricks-munkaterületen kattintson az Azure Databricks-felhasználónevére a felső sávon, majd a legördülő listában válassza a Felhasználói beállítások lehetőséget.
Az Access-jogkivonatok lapon válassza az Új jogkivonat létrehozása lehetőséget.
Másolja ki a megjelenő jogkivonatot, majd kattintson a Kész gombra.
A Databricks-munkaterület felső sarkában válassza a Személyek ikont, majd válassza a Felhasználói beállítások lehetőséget.
Válassza az Új jogkivonat létrehozása gombot, majd a Létrehozás gombot.
Ügyeljen arra, hogy a jogkivonatot biztonságos helyre másolja. Az Azure-függvénynek szüksége van erre a jogkivonatra a Databricks-hitelesítéshez, hogy futtathassa a feladatot.
Az Azure Portal menüjében vagy a Kezdőlapon válassza az Erőforrás létrehozása elemet.
Az Új lapon válassza a Számítási>függvényalkalmazás lehetőséget.
A Függvényalkalmazás létrehozása lap Alapjai lapján válasszon egy erőforráscsoportot, majd módosítsa vagy ellenőrizze a következő beállításokat:
Beállítás Érték Függvényalkalmazás neve contosoorder Futtatókörnyezet verme .NET Közzététel Kód Operációs rendszer Windows Konstrukció típusa Felhasználás (kiszolgáló nélküli) Válassza az Áttekintés + létrehozás, majd a Létrehozás lehetőséget.
Ha az üzembe helyezés befejeződött, az Erőforrás megnyitása gombra kattintva nyissa meg a függvényalkalmazás áttekintő oldalát.
A Beállítások csoportban válassza a Konfiguráció lehetőséget.
Az Alkalmazásbeállítások lapon válassza az Új alkalmazásbeállítás gombot az egyes beállítások hozzáadásához.
Adja hozzá a következő beállításokat:
Név beállítása Érték DBX_INSTANCE A databricks-munkaterület régiója. Például: westus2.azuredatabricks.net
DBX_PAT A korábban létrehozott személyes hozzáférési jogkivonat. DBX_JOB_ID A futó feladat azonosítója. A beállítások véglegesítéséhez válassza a Mentés lehetőséget.
A Függvények csoportban válassza a Függvények, majd a Létrehozás lehetőséget.
Válassza az Azure Event Grid-eseményindítót.
Ha a rendszer erre kéri, telepítse a Microsoft.Azure.WebJobs.Extensions.EventGrid bővítményt. Ha telepítenie kell, akkor újra ki kell választania az Azure Event Grid-eseményindítót a függvény létrehozásához.
Megjelenik az Új függvény panel.
Az Új függvény panelen nevezze el az UpsertOrder függvényt, majd válassza a Létrehozás gombot.
Cserélje le a kódfájl tartalmát erre a kódra, majd válassza a Mentés gombot:
#r "Azure.Messaging.EventGrid" #r "System.Memory.Data" #r "Newtonsoft.Json" #r "System.Text.Json" using Azure.Messaging.EventGrid; using Azure.Messaging.EventGrid.SystemEvents; using Newtonsoft.Json; using Newtonsoft.Json.Linq; private static HttpClient httpClient = new HttpClient(); public static async Task Run(EventGridEvent eventGridEvent, ILogger log) { log.LogInformation("Event Subject: " + eventGridEvent.Subject); log.LogInformation("Event Topic: " + eventGridEvent.Topic); log.LogInformation("Event Type: " + eventGridEvent.EventType); log.LogInformation(eventGridEvent.Data.ToString()); if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") { StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>(); if (fileData.Api == "FlushWithClose") { log.LogInformation("Triggering Databricks Job for file: " + fileData.Url); var fileUrl = new Uri(fileData.Url); var httpRequestMessage = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))), Headers = { { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)}, { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" } }, Content = new StringContent(JsonConvert.SerializeObject(new { job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process), notebook_params = new { source_file = String.Join("", fileUrl.Segments.Skip(2)) } })) }; var response = await httpClient.SendAsync(httpRequestMessage); response.EnsureSuccessStatusCode(); } } }
Ez a kód elemzi a létrehozott tárolási esemény adatait, majd létrehoz egy kérésüzenetet az eseményt kiváltó fájl URL-címével. Az üzenet részeként a függvény átad egy értéket a korábban létrehozott source_file widgetnek. A függvénykód elküldi az üzenetet a Databricks-feladatnak, és a korábban beszerzett jogkivonatot használja hitelesítésként.
Event Grid-előfizetés létrehozása
Ebben a szakaszban egy Event Grid-előfizetést fog létrehozni, amely meghívja az Azure-függvényt a fájlok tárfiókba való feltöltésekor.
Válassza az Integráció lehetőséget, majd az Integráció lapon válassza az Event Grid-eseményindítót.
Az Eseményindító szerkesztése panelen nevezze el az eseményt
eventGridEvent
, majd válassza az Esemény-előfizetés létrehozása lehetőséget.Feljegyzés
A név
eventGridEvent
megegyezik az Azure-függvénynek átadott paraméter nevével.Az Esemény-előfizetés létrehozása lap Alapjai lapján módosítsa vagy ellenőrizze a következő beállításokat:
Beállítás Érték Név contoso-order-event-subscription Témakörtípus Tárfiók Forráserőforrás contosoorders Rendszertémakör neve <create any name>
Szűrés eseménytípusokra Blob létrehozva és blob törölve Válassza a Létrehozás gombot.
Az Event Grid-előfizetés tesztelése
Hozzon létre egy nevű
customer-order.csv
fájlt, illessze be a következő adatokat a fájlba, és mentse a helyi számítógépre.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
A Storage Explorerben töltse fel ezt a fájlt a tárfiók bemeneti mappájába.
A fájl feltöltése a Microsoft.Storage.BlobCreated eseményt aktiválja. Az Event Grid értesíti az összes előfizetőt az eseményről. Esetünkben az Azure-függvény az egyetlen előfizető. Az Azure-függvény elemzi az eseményparamétereket annak meghatározásához, hogy melyik esemény történt. Ezután átadja a fájl URL-címét a Databricks-feladatnak. A Databricks-feladat beolvassa a fájlt, és hozzáad egy sort a tárfiókot tároló Databricks Delta-táblához.
Ha ellenőrizni szeretné, hogy a feladat sikeres volt-e, tekintse meg a feladat futtatását. Megjelenik a befejezés állapota. A feladatok futtatásának megtekintésével kapcsolatos további információkért tekintse meg a feladatok futtatásainak megtekintése című témakört.
Egy új munkafüzetcellában futtassa ezt a lekérdezést egy cellában a frissített deltatábla megtekintéséhez.
%sql select * from customer_data
A visszaadott tábla a legújabb rekordot jeleníti meg.
A rekord frissítéséhez hozzon létre egy fájlt,
customer-order-update.csv
illessze be a következő adatokat a fájlba, és mentse a helyi számítógépre.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
Ez a csv-fájl majdnem megegyezik az előzőével, kivéve, hogy a rendelés mennyisége a következőre
228
22
módosul: .A Storage Explorerben töltse fel ezt a fájlt a tárfiók bemeneti mappájába.
Futtassa újra a lekérdezést
select
a frissített deltatábla megtekintéséhez.%sql select * from customer_data
A visszaadott tábla a frissített rekordot jeleníti meg.
Az erőforrások eltávolítása
Ha már nincs rájuk szükség, törölje az erőforráscsoportot és az összes kapcsolódó erőforrást. Ehhez válassza ki a tárfiók erőforráscsoportját, és válassza a Törlés lehetőséget.