Rekordok frissítése vagy egyesítése az Azure SQL Database-ben az Azure Functions használatával
Az Azure Stream Analytics (ASA) jelenleg csak az SQL-kimenetek (Azure SQL Databases és Azure Synapse Analytics) sorainak beszúrását (hozzáfűzését) támogatja. Ez a cikk az UPDATE, UPSERT vagy MERGE sql-adatbázisokon való engedélyezésének kerülő megoldásait ismerteti, amelyek köztes rétegként az Azure Functionst használja.
Az Azure Functions alternatív lehetőségei a végén jelennek meg.
Követelmény
Az adatok táblázatba írása általában a következő módon végezhető el:
Mód | Egyenértékű T-SQL utasítás | Követelmények |
---|---|---|
Hozzáfűzés | INSERT | Egyik sem |
Replace | EGYESÍTÉS (UPSERT) | Egyedi kulcs |
Felhalmoz | MERGE (UPSERT) összetett hozzárendelési operátorral (+= , -= ...) |
Egyedi kulcs és akkumulátor |
A különbségek szemléltetéséhez tekintse meg, mi történik a következő két rekord betöltésekor:
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 0 |
10:05 | A | 20 |
Hozzáfűzési módban két rekordot szúrunk be. Az egyenértékű T-SQL utasítás a következő:
INSERT INTO [target] VALUES (...);
Ennek eredménye:
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 0 |
10:05 | A | 20 |
Csere módban csak az utolsó értéket kapjuk meg kulcs szerint. Itt Device_Id használjuk kulcsként. Az egyenértékű T-SQL utasítás a következő:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Ennek eredménye:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
Végül halmozási módban egy összetett hozzárendelési operátorral (+=
) összegzünkValue
. Itt is a Device_Id használjuk kulcsként:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Ennek eredménye:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 21 |
A teljesítmény szempontjából az ASA SQL-adatbázis kimeneti adapterei jelenleg csak natív módon támogatják a hozzáfűzési módot. Ezek az adapterek tömeges beszúrással maximalizálják az átviteli sebességet, és korlátozzák a visszanyomást.
Ez a cikk bemutatja, hogyan valósíthat meg csere- és halmozási módokat az Azure Functions használatával az ASA-hoz. Ha egy függvényt közvetítő rétegként használ, a lehetséges írási teljesítmény nem befolyásolja a streamelési feladatot. Ebben a tekintetben az Azure Functions használata a legjobban az Azure SQL-sel működik. A Synapse SQL esetén a tömegesről sorról sorra történő váltás nagyobb teljesítményproblémákat eredményezhet.
Azure Functions-kimenet
Feladatunkban az ASA SQL-kimenetet az ASA Azure Functions-kimenetre cseréljük. Az UPDATE, UPSERT vagy MERGE képességek a függvényben vannak implementálva.
Jelenleg két lehetőség áll rendelkezésre az SQL Database-hez való hozzáférésre egy függvényben. Az első az Azure SQL kimeneti kötése. Jelenleg C#-ra van korlátozva, és csak csere módot kínál. A második egy SQL-lekérdezés összeállítása, amelyet a megfelelő SQL-illesztőprogramon keresztül kell elküldeni (Microsoft.Data.SqlClient for .NET).
A következő minták esetében a következő táblázatsémát feltételezzük. A kötési beállításhoz egy elsődleges kulcsot kell beállítani a céltáblán. SQL-illesztő használata esetén nem szükséges, de ajánlott.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
A függvénynek meg kell felelnie a következő elvárásoknak, amelyeket az ASA kimeneteként kell használnia:
- Az Azure Stream Analytics 200-os HTTP-állapotot vár a Functions alkalmazástól a sikeresen feldolgozott kötegekhez
- Amikor az Azure Stream Analytics 413 ("http Request Entity Too Large") kivételt kap egy Azure-függvénytől, csökkenti az Azure-függvénynek küldött kötegek méretét
- A tesztelési kapcsolat során a Stream Analytics egy üres köteggel rendelkező POST-kérelmet küld az Azure Functionsnek, és 20-szor újabb HTTP-állapotot vár a teszt ellenőrzéséhez
1. lehetőség: Frissítés kulccsal az Azure-függvény SQL-kötésével
Ez a beállítás az Azure Function SQL Kimeneti kötést használja. Ez a bővítmény lecserélhet egy objektumot egy táblában, anélkül, hogy SQL-utasítást kellene írnia. Jelenleg nem támogatja az összetett hozzárendelési operátorokat (akkumulációkat).
Ez a minta a következőre épült:
- Az Azure Functions 4-es futtatókörnyezete
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
A kötési megközelítés jobb megértése érdekében javasoljuk, hogy kövesse ezt az oktatóanyagot.
Először hozzon létre egy alapértelmezett HttpTrigger-függvényalkalmazást az oktatóanyag követésével. A rendszer a következő információkat használja:
- Nyelv:
C#
- Futtatókörnyezet:
.NET 6
(a function/runtime v4 alatt) - Sablon:
HTTP trigger
Telepítse a kötésbővítményt a következő parancs futtatásával a projektmappában található terminálon:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Adja hozzá az SqlConnectionString
elemet a Values
célkiszolgáló kapcsolati sztring kitöltése szakaszáhozlocal.settings.json
:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Cserélje le a teljes függvényt (.cs fájlt a projektben) a következő kódrészletre. Frissítse saját névterét, osztálynevét és függvénynevét:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Frissítse a céltábla nevét a kötési szakaszban:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Frissítse az Device
osztály- és leképezési szakaszt a saját sémájának megfelelően:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Mostantól hibakereséssel tesztelheti a helyi függvény és az adatbázis közötti vezetékeket (F5 a Visual Studio Code-ban). Az SQL-adatbázisnak elérhetőnek kell lennie a gépről. Az SSMS használható a kapcsolat ellenőrzésére. Ezután küldjön POST-kéréseket a helyi végpontnak. Egy üres törzsű kérésnek http 204-et kell visszaadnia. A tényleges hasznos adattal rendelkező kéréseket a céltáblában kell őrizni (csere/frissítés módban). Íme egy minta hasznos adat, amely megfelel az ebben a mintában használt sémának:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
A függvény mostantól közzétehető az Azure-ban. Egy alkalmazásbeállítást kell beállítani a következőhöz SqlConnectionString
: . Az Azure SQL Server tűzfalának engedélyeznie kell , hogy az Azure-szolgáltatások elérjék az élő függvényt.
A függvény ezután kimenetként definiálható az ASA-feladatban, és a rekordok beszúrása helyett rekordok cseréjére használható.
2. lehetőség: Egyesítés összetett hozzárendeléssel (halmozódás) egyéni SQL-lekérdezésen keresztül
Feljegyzés
Újraindításkor és helyreállításkor az ASA újra küldheti a már kibocsátott kimeneti eseményeket. Ez egy olyan elvárt viselkedés, amely a felhalmozási logika meghiúsulását okozhatja (az egyes értékek megduplázása). Ennek megakadályozása érdekében javasoljuk, hogy ugyanazokat az adatokat adja ki egy táblában a natív ASA SQL-kimeneten keresztül. Ez a vezérlőtábla ezután a problémák észlelésére és szükség esetén a felhalmozás újraszinkronizálására használható.
Ez a beállítás a Microsoft.Data.SqlClientet használja. Ez a kódtár lehetővé teszi, hogy sql-lekérdezéseket adjunk ki egy SQL Database-nek.
Ez a minta a következőre épült:
Először hozzon létre egy alapértelmezett HttpTrigger-függvényalkalmazást az oktatóanyag követésével. A rendszer a következő információkat használja:
- Nyelv:
C#
- Futtatókörnyezet:
.NET 6
(a function/runtime v4 alatt) - Sablon:
HTTP trigger
Telepítse az SqlClient-kódtárat a következő parancs futtatásával a projektmappában található terminálon:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Adja hozzá az SqlConnectionString
elemet a Values
célkiszolgáló kapcsolati sztring kitöltése szakaszáhozlocal.settings.json
:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Cserélje le a teljes függvényt (.cs fájlt a projektben) a következő kódrészletre. Frissítse saját névterét, osztálynevét és függvénynevét:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Frissítse a sqltext
parancsépítési szakaszt a saját sémájának megfelelően (figyelje meg, hogyan érhető el a felhalmozás az operátoron keresztül a +=
frissítés során):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Mostantól hibakereséssel tesztelheti a helyi függvény és az adatbázis közötti vezetékeket (F5 a VS Code-ban). Az SQL-adatbázisnak elérhetőnek kell lennie a gépről. Az SSMS használható a kapcsolat ellenőrzésére. Ezután post-kéréseket ad ki a helyi végpontnak. Egy üres törzsű kérésnek http 204-et kell visszaadnia. A tényleges hasznos adattal rendelkező kéréseket a céltáblában kell őrizni (halmozási/egyesítési módban). Íme egy minta hasznos adat, amely megfelel az ebben a mintában használt sémának:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
A függvény mostantól közzétehető az Azure-ban. Egy alkalmazásbeállítást kell beállítani a következőhöz SqlConnectionString
: . Az Azure SQL Server tűzfalának engedélyeznie kell , hogy az Azure-szolgáltatások elérjék az élő függvényt.
A függvény ezután kimenetként definiálható az ASA-feladatban, és a rekordok beszúrása helyett rekordok cseréjére használható.
Alternatívák
Az Azure Functionsen kívül többféleképpen is el lehet érni a várt eredményt. Ez a szakasz néhányat tartalmaz.
Utófeldolgozás a cél SQL Database-ben
A háttérfeladat akkor működik, ha az adatok a standard ASA SQL-kimeneteken keresztül kerülnek be az adatbázisba.
Az Azure SQL INSTEAD OF
esetében a DML-triggerek az ASA által kiadott INSERT-parancsok elfogására használhatók:
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
A Synapse SQL esetében az ASA beszúrhat egy előkészítési táblába. Az ismétlődő tevékenységek ezután szükség szerint átalakíthatják az adatokat egy köztes táblává. Végül az adatok átkerülnek az éles táblába.
Előfeldolgozás az Azure Cosmos DB-ben
Az Azure Cosmos DB natív módon támogatja az UPSERT-t. Itt csak hozzáfűzés/csere lehetséges. A felhalmozásokat ügyféloldali felügyelet mellett kell kezelni az Azure Cosmos DB-ben.
Ha a követelmények egyeznek, a cél SQL-adatbázist egy Azure Cosmos DB-példányra kell cserélni. Ehhez fontos változásra van szükség az általános megoldásarchitektúra terén.
A Synapse SQL esetében az Azure Cosmos DB az Azure Cosmos DB-hez készült Azure Synapse Linken keresztül használható közvetítő rétegként. Az Azure Synapse Link használható elemzési tár létrehozásához. Ez az adattár ezután közvetlenül a Synapse SQL-ben kérdezhető le.
Az alternatívák összehasonlítása
Minden megközelítés különböző értékajánlatokat és képességeket kínál:
Típus | Lehetőség | Módok | Azure SQL Database | Azure Synapse Analytics |
---|---|---|---|---|
Feldolgozás utáni | ||||
Triggerek | Csere, halmozódás | + | N/A, az eseményindítók nem érhetők el a Synapse SQL-ben | |
Előkészítés | Csere, halmozódás | + | + | |
Előzetes feldolgozás | ||||
Azure Functions | Csere, halmozódás | + | - (sorról sorra teljesítmény) | |
Az Azure Cosmos DB cseréje | Replace | N.A. | N.A. | |
Azure Cosmos DB Azure Synapse Link | Replace | n/a | + |
Támogatás kérése
További segítségért próbálja ki a Microsoft Q&A kérdésoldalát az Azure Stream Analyticshez.
Következő lépések
- Az Azure Stream Analytics kimeneteinek ismertetése
- Azure Stream Analytics-kimenet az Azure SQL Database-be
- Az Azure Stream Analyticsből az Azure SQL Database-be irányuló átviteli teljesítmény növelése
- Felügyelt identitások használata az Azure SQL Database vagy az Azure Synapse Analytics eléréséhez egy Azure Stream Analytics-feladatból
- Referenciaadatok használata SQL Database-ből azure Stream Analytics-feladathoz
- Az Azure Functions futtatása az Azure Stream Analytics-feladatokban – Oktatóanyag a Redis kimenetéhez
- Rövid útmutató: Stream Analytics-feladat létrehozása az Azure Portal használatával