Adatfolyam-szkript (DFS)
A következőkre vonatkozik: Azure Data Factory
Azure Synapse Analytics
Tipp.
Próbálja ki a Data Factoryt a Microsoft Fabricben, amely egy teljes körű elemzési megoldás a nagyvállalatok számára. A Microsoft Fabric az adattovábbítástól az adatelemzésig, a valós idejű elemzésig, az üzleti intelligenciáig és a jelentéskészítésig mindent lefed. Ismerje meg, hogyan indíthat új próbaverziót ingyenesen!
Az adatfolyamok az Azure Data Factoryben és az Azure Synapse Pipelinesban is elérhetők. Ez a cikk az adatfolyamok leképezésére vonatkozik. Ha még nem használta az átalakításokat, tekintse meg az adatok leképezési adatfolyam használatával történő átalakításáról szóló bevezető cikket.
Az adatfolyam-szkript (DFS) a kódolási nyelvhez hasonló mögöttes metaadatok, amelyek a leképezési adatfolyamban található átalakítások végrehajtására szolgálnak. Minden átalakítást olyan tulajdonságok sorozata jelöl, amelyek biztosítják a feladat megfelelő futtatásához szükséges információkat. A szkript látható és szerkeszthető az ADF-ből a böngésző felhasználói felületének felső menüszalagján található "szkript" gombra kattintva.
Egy forrásátalakítás például arra utasítja a szolgáltatást, allowSchemaDrift: true,
hogy a forrásadatkészlet összes oszlopát belefoglalja az adatfolyamba, még akkor is, ha azok nem szerepelnek a sémavetítésben.
Használati esetek
Az elosztott fájlrendszert a felhasználói felület automatikusan hozza létre. A szkript megtekintéséhez és testreszabásához kattintson a Szkript gombra. Szkripteket az ADF felhasználói felületén kívül is létrehozhat, majd továbbíthatja azt a PowerShell-parancsmagba. Összetett adatfolyamok hibakeresése esetén egyszerűbb lehet a szkript mögötti kód beolvasása a folyamatok felhasználói felületi gráfképének vizsgálata helyett.
Íme néhány példa használati esetekre:
- Programozott módon számos, meglehetősen hasonló adatfolyamot hoz létre, azaz "kibélyegezi" az adatfolyamokat.
- Összetett kifejezések, amelyek nehezen kezelhetők a felhasználói felületen, vagy amelyek érvényesítési problémákat eredményeznek.
- Hibakeresés és a végrehajtás során visszaadott különböző hibák jobb megértése.
Amikor powershell-lel vagy API-val használható adatfolyam-szkriptet hoz létre, a formázott szöveget egyetlen sorba kell összecsuknia. A tabulátorokat és az új vonalakat feloldó karakterekként is megtarthatja. A szöveget azonban úgy kell formázni, hogy elférjen egy JSON-tulajdonságban. A szkriptszerkesztő felhasználói felületén alul található egy gomb, amely egyetlen sorként formázza a szkriptet.
Átalakítások hozzáadása
Az átalakítások hozzáadásához három alapvető lépés szükséges: az alapvető átalakítási adatok hozzáadása, a bemeneti adatfolyam átirányítása, majd a kimeneti adatfolyam átirányítása. Ez egy példában a legkönnyebben látható. Tegyük fel, hogy egy egyszerű forrással kezdjük, amely az adatfolyamot az alábbihoz hasonlóan nyeli el:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Ha úgy döntünk, hogy származtatott átalakítást adunk hozzá, először létre kell hoznunk az alapvető átalakítási szöveget, amely egy egyszerű kifejezéssel rendelkezik egy új nagybetűs oszlop upperCaseTitle
hozzáadásához:
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
Ezután a meglévő elosztott fájlrendszert vesszük fel, és hozzáadjuk az átalakítást:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Most pedig átirányítjuk a bejövő streamet úgy, source1
hogy azonosítjuk, hogy melyik átalakítás után érkezik az új átalakítás (ebben az esetben) és másolja a stream nevét az új átalakításra:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Végül azonosítjuk azt az átalakítást, amelyet az új átalakítás után el szeretnénk végezni, és lecseréljük a bemeneti adatfolyamot (ebben az esetben sink1
) az új átalakítás kimeneti streamjének nevére:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Az elosztott fájlrendszer alapjai
Az elosztott fájlrendszer összekapcsolt átalakítások sorozatából áll, beleértve a forrásokat, a fogadókat és másokat, amelyek új oszlopokat adhatnak hozzá, adatokat szűrhetnek, adatokat illeszthetnek össze és még sok mást. A szkript általában egy vagy több forrással kezdődik, amelyet számos átalakítás követ, és egy vagy több fogadóval végződik.
A források mindegyikének ugyanaz az alapszerkezete:
source(
source properties
) ~> source_name
Egy három oszlopból (movieId, cím, műfajok) álló egyszerű forrás például a következő:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
A forrásokon kívül minden átalakításnak ugyanaz az alapszerkezete:
name_of_incoming_stream transformation_type(
properties
) ~> new_stream_name
Például egy egyszerű származtatott átalakítás, amely egy oszlopot (címet) vesz fel, és felülírja nagybetűs verzióval, a következő:
source1 derive(
title = upper(title)
) ~> derive1
Egy séma nélküli fogadó pedig a következő:
derive1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Szkriptrészletek
A szkriptrészletek Adatfolyam szkriptek megosztható kódjai, amelyeket az adatfolyamok közötti megosztáshoz használhat. Ez az alábbi videó bemutatja, hogyan használhat szkriptrészleteket, és hogyan használhatja Adatfolyam szkriptet a szkript egyes részeinek másolásához és beillesztéséhez az adatfolyam-grafikonok mögött:
Összesített összesítő statisztikák
Adjon hozzá egy "SummaryStats" nevű összesítési átalakítást az adatfolyamhoz, majd illessze be az alábbi kódba a szkript összesítő függvényéhez a meglévő SummaryStats helyére. Ez általános mintát biztosít az adatprofil-összefoglaló statisztikákhoz.
aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats
Az alábbi mintával megszámolhatja az egyedi sorok számát és a különböző sorok számát az adatokban. Az alábbi példa egy ValueDistAgg nevű összesített átalakítással beilleszthető egy adatfolyamba. Ez a példa egy "title" nevű oszlopot használ. Mindenképpen cserélje le a "title" kifejezést az adatok azon sztringoszlopára, amelyet az értékek számának lekéréséhez szeretne használni.
aggregate(groupBy(title),
countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
numofdistinct = countDistinct(title)) ~> UniqDist
Az összes oszlop belefoglalása összesítésbe
Ez egy általános összesítési minta, amely bemutatja, hogyan tarthatja meg a kimeneti metaadatok fennmaradó oszlopait az aggregátumok létrehozásakor. Ebben az esetben a függvény használatával first()
választjuk ki az első értéket minden olyan oszlopban, amelynek a neve nem "film". Ennek használatához hozzon létre egy DistinctRows nevű összesítő átalakítást, majd illessze be a szkriptbe a meglévő DistinctRows összesítő szkript fölé.
aggregate(groupBy(movie),
each(match(name!='movie'), $$ = first($$))) ~> DistinctRows
Sorkivonat ujjlenyomatának létrehozása
Ezzel a kóddal hozhat létre egy új, három oszlopból álló kivonatot létrehozósha1
, származtatott oszlopot DWhash
az adatfolyam-szkriptben.
derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash
Az alábbi szkripttel is létrehozhat egy sorkivonatot a streamben található összes oszlop használatával anélkül, hogy az egyes oszlopokat el kellene neveznie:
derive(DWhash = sha1(columns())) ~> DWHash
String_agg egyenértékű
Ez a kód a T-SQL string_agg()
függvényhez hasonlóan fog működni, és egy tömbbe összesíti a sztringértékeket. Ezután a tömböt egy sztringbe helyezheti az SQL-célhelyekkel való használathoz.
source1 aggregate(groupBy(year),
string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg
Frissítések, upserts, inserts, deletes száma
Az Alter Row átalakítás használatakor érdemes lehet megszámolni az Alter Row-szabályzatokból eredő frissítések, upsertsek, beszúrások és törlések számát. Adjon hozzá egy összesítési átalakítást az alter sor után, és illessze be ezt a Adatfolyam szkriptet a darabszám összesítési definíciójába.
aggregate(updates = countIf(isUpdate(), 1),
inserts = countIf(isInsert(), 1),
upserts = countIf(isUpsert(), 1),
deletes = countIf(isDelete(),1)) ~> RowCount
Különálló sor az összes oszlop használatával
Ez a kódrészlet új összesítési átalakítást ad hozzá az adatfolyamhoz, amely az összes bejövő oszlopot átveszi, létrehoz egy kivonatot, amelyet a csoportosításhoz használnak az ismétlődések kiküszöböléséhez, majd kimenetként adja meg az egyes duplikált elemek első előfordulását. Nem kell explicit módon elneveznie az oszlopokat, azok automatikusan létrejönnek a bejövő adatfolyamból.
aggregate(groupBy(mycols = sha2(256,columns())),
each(match(true()), $$ = first($$))) ~> DistinctRows
NULL-ek keresése az összes oszlopban
Ez egy kódrészlet, amelyet beilleszthet az adatfolyamba, hogy általánosan ellenőrizze az összes oszlop null értékeit. Ez a technika a sémaeltolódást használja az összes sor összes oszlopának megtekintéséhez, és feltételes felosztással választja el a NULL-ekkel rendelkező sorokat a NULL-ekkel rendelkező soroktól.
split(contains(array(toString(columns())),isNull(#item)),
disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)
AutoMap-sémaeltolódás kijelöléssel
Ha egy meglévő adatbázissémát ismeretlen vagy dinamikus bejövő oszlopok készletéből kell betöltenie, a Fogadó transzformációban le kell képeznie a jobb oldali oszlopokat. Erre csak akkor van szükség, ha egy meglévő táblát tölt be. Adja hozzá ezt a kódrészletet a Fogadó elé, és hozzon létre egy Választót, amely automatikusan megfelelteti az oszlopokat. Hagyja automatikus leképezésre a fogadóleképezést.
select(mapColumn(
each(match(true()))
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> automap
Oszlopadattípusok megőrzése
Adja hozzá ezt a szkriptet egy származtatott oszlopdefinícióhoz, hogy az adatfolyam oszlopneveit és adattípusait egy állandó tárolóba tárolja egy fogadó használatával.
derive(each(match(type=='string'), $$ = 'string'),
each(match(type=='integer'), $$ = 'integer'),
each(match(type=='short'), $$ = 'short'),
each(match(type=='complex'), $$ = 'complex'),
each(match(type=='array'), $$ = 'array'),
each(match(type=='float'), $$ = 'float'),
each(match(type=='date'), $$ = 'date'),
each(match(type=='timestamp'), $$ = 'timestamp'),
each(match(type=='boolean'), $$ = 'boolean'),
each(match(type=='long'), $$ = 'long'),
each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1
Kitöltés lefelé
Az alábbi módon valósíthatja meg az adathalmazokkal kapcsolatos gyakori "Kitöltés lefelé" problémát, ha a NULL értékeket a sorozat előző nem NULL értékének értékére szeretné cserélni. Vegye figyelembe, hogy ez a művelet negatív hatással lehet a teljesítményre, mivel a teljes adatkészleten létre kell hoznia egy szintetikus ablakot egy "dummy" kategóriaértékkel. Emellett egy érték alapján kell rendeznie a megfelelő adatütemezést az előző nem NULL érték megkereséséhez. Ez az alábbi kódrészlet a szintetikus kategóriát "dummy" néven hozza létre, és helyettesítő kulccsal rendezi. Eltávolíthatja a helyettesítő kulcsot, és használhatja a saját adatspecifikus rendezési kulcsát. Ez a kódrészlet feltételezi, hogy már hozzáadott egy forrásátalakítást source1
source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
asc(sk, true),
Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1
Mozgóátlag
A mozgóátlag nagyon könnyen implementálható az adatfolyamokban Windows-átalakítással. Az alábbi példa a Microsoft részvényárfolyamainak 15 napos mozgóátlagát hozza létre.
window(over(stocksymbol),
asc(Date, true),
startRowOffset: -7L,
endRowOffset: 7L,
FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1
Az összes oszlopérték eltérő száma
Ezzel a szkripttel azonosíthatja a kulcsoszlopokat, és egyetlen szkriptrészlettel megtekintheti a stream összes oszlopának számosságát. Adja hozzá ezt a szkriptet összesített átalakításként az adatfolyamhoz, és automatikusan különböző oszlopszámokat biztosít.
aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern
Előző vagy következő sorértékek összehasonlítása
Ez a mintarészlet bemutatja, hogyan használható az Ablak átalakítás az aktuális sorkörnyezet oszlopértékeinek összehasonlítására az aktuális sor előtti és utáni sorok oszlopértékeivel. Ebben a példában egy származtatott oszlopot használunk egy hamis érték létrehozásához, amely lehetővé teszi egy ablakpartíciót a teljes adatkészleten. A helyettesítő kulcs átalakítással minden sorhoz egyedi kulcsértéket rendelhet hozzá. Ha ezt a mintát alkalmazza az adatátalakításokra, eltávolíthatja a helyettesítő kulcsot, ha ön egy olyan oszlop, amelyet rendezni szeretne, és eltávolíthatja a származtatott oszlopot, ha oszlopokkal szeretné particionálni az adatokat.
source1 keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
asc(sk, true),
prevAndCurr = lag(title,1)+'-'+last(title),
nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag
Hány oszlop található az adataimban?
size(array(columns()))
Kapcsolódó tartalom
Az adatfolyamok áttekintési cikkével ismerkedhet meg a Adatfolyam