你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将事件接收到 HTTP 终结点

本文介绍如何验证 HTTP 终结点以接收来自事件订阅的事件并随后接收和反序列化事件。 本文使用 Azure 函数进行演示。 但是,无论应用程序托管在何处,相同的概念都适用。

注意

建议在通过事件网格触发 Azure 函数时使用事件网格触发器。 它在事件网格和 Azure Functions 之间提供了更简单、更快速的集成。 但是,Azure Functions 的事件网格触发器不支持托管代码需要控制返回给事件网格的 HTTP 状态代码的场景。 鉴于此限制,例如,在 Azure Function 上运行的代码将无法返回 5XX 错误,以通过事件网格启动事件传送重试。

先决条件

  • 需要包含 HTTP 触发函数的函数应用。

添加依赖项

若要使用 .NET 进行开发,请向 Azure.Messaging.EventGridNuget 包的函数添加依赖项

可通过发布 SDKs 引用将 SDK 用于其他语言。 这些包中有用于本机事件类型(如 EventGridEventStorageBlobCreatedEventDataEventHubCaptureFileCreatedEventData)的模型。

终结点验证

首先需要处理 Microsoft.EventGrid.SubscriptionValidationEvent 事件。 每次有人订阅某个事件时,事件网格都会在数据有效负载中通过 validationCode 向终结点发送一个验证事件。 终结点必须回显到响应正文中才能证明此终结点有效且被你所拥有。 如果你使用的是事件网格触发器(而不是 Webhook 触发函数),系统会为你执行终结点验证。 如果使用第三方 API 服务(例如 ZapierIFTTT),可能无法以编程方式回显验证码。 对于这些服务,可以使用订阅验证事件中发送的验证 URL 手动验证订阅。 在 validationUrl 属性中复制该 URL 并通过 REST 客户端或 Web 浏览器发送 GET 请求。

在 C# 中,ParseMany() 方法用于将包含一个或多个事件的 BinaryData 实例反序列化为 EventGridEvent 数组。 如果你提前知道只反序列化单个事件,则可以改用 Parse 方法。

若要以编程方式回显验证码,请运行下面的代码。

using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System;
using Azure.Messaging.EventGrid;
using Azure.Messaging.EventGrid.SystemEvents;

namespace Function1
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");
            string response = string.Empty;
            BinaryData events = await BinaryData.FromStreamAsync(req.Body);
            log.LogInformation($"Received events: {events}");

            EventGridEvent[] eventGridEvents = EventGridEvent.ParseMany(events);

            foreach (EventGridEvent eventGridEvent in eventGridEvents)
            {
                // Handle system events
                if (eventGridEvent.TryGetSystemEventData(out object eventData))
                {
                    // Handle the subscription validation event
                    if (eventData is SubscriptionValidationEventData subscriptionValidationEventData)
                    {
                        log.LogInformation($"Got SubscriptionValidation event data, validation code: {subscriptionValidationEventData.ValidationCode}, topic: {eventGridEvent.Topic}");
                        // Do any additional validation (as required) and then return back the below response
                        var responseData = new
                        {
                            ValidationResponse = subscriptionValidationEventData.ValidationCode
                        };

                        return new OkObjectResult(responseData);
                    }
                }
            }
            return new OkObjectResult(response);
        }
    }
}
module.exports = function (context, req) {
    context.log('JavaScript HTTP trigger function begun');
    var validationEventType = "Microsoft.EventGrid.SubscriptionValidationEvent";

    for (var events in req.body) {
        var body = req.body[events];
        // Deserialize the event data into the appropriate type based on event type
        if (body.data && body.eventType == validationEventType) {
            context.log("Got SubscriptionValidation event data, validation code: " + body.data.validationCode + " topic: " + body.topic);

            // Do any additional validation (as required) and then return back the below response
            var code = body.data.validationCode;
            context.res = { status: 200, body: { "ValidationResponse": code } };
        }
    }
    context.done();
};

测试验证响应

通过将样本事件粘贴到函数的测试字段,测试验证响应函数:

[{
  "id": "2d1781af-3a4c-4d7c-bd0c-e34b19da4e66",
  "topic": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
  "subject": "",
  "data": {
    "validationCode": "512d38b6-c7b8-40c8-89fe-f46f9e9622b6"
  },
  "eventType": "Microsoft.EventGrid.SubscriptionValidationEvent",
  "eventTime": "2018-01-25T22:12:19.4556811Z",
  "metadataVersion": "1",
  "dataVersion": "1"
}]

选择“运行”时,输出应为“200 正常”且正文中应显示 {"validationResponse":"512d38b6-c7b8-40c8-89fe-f46f9e9622b6"}

验证请求

验证输出

处理 Blob 存储事件

现在,让我们扩展函数来处理 Microsoft.Storage.BlobCreated 系统事件:

using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System;
using Azure.Messaging.EventGrid;
using Azure.Messaging.EventGrid.SystemEvents;

namespace Function1
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");
            string response = string.Empty;
            BinaryData events = await BinaryData.FromStreamAsync(req.Body);
            log.LogInformation($"Received events: {events}");

            EventGridEvent[] eventGridEvents = EventGridEvent.ParseMany(events);

            foreach (EventGridEvent eventGridEvent in eventGridEvents)
            {
                // Handle system events
                if (eventGridEvent.TryGetSystemEventData(out object eventData))
                {
                    // Handle the subscription validation event
                    if (eventData is SubscriptionValidationEventData subscriptionValidationEventData)
                    {
                        log.LogInformation($"Got SubscriptionValidation event data, validation code: {subscriptionValidationEventData.ValidationCode}, topic: {eventGridEvent.Topic}");
                        // Do any additional validation (as required) and then return back the below response

                        var responseData = new
                        {
                            ValidationResponse = subscriptionValidationEventData.ValidationCode
                        };
                        return new OkObjectResult(responseData);
                    }
                    // Handle the storage blob created event
                    else if (eventData is StorageBlobCreatedEventData storageBlobCreatedEventData)
                    {
                        log.LogInformation($"Got BlobCreated event data, blob URI {storageBlobCreatedEventData.Url}");
                    }
                }
            }
            return new OkObjectResult(response);
        }
    }
}
module.exports = function (context, req) {
    context.log('JavaScript HTTP trigger function begun');
    var validationEventType = "Microsoft.EventGrid.SubscriptionValidationEvent";
    var storageBlobCreatedEvent = "Microsoft.Storage.BlobCreated";

    for (var events in req.body) {
        var body = req.body[events];
        // Deserialize the event data into the appropriate type based on event type  
        if (body.data && body.eventType == validationEventType) {
            context.log("Got SubscriptionValidation event data, validation code: " + body.data.validationCode + " topic: " + body.topic);

            // Do any additional validation (as required) and then return back the below response
            var code = body.data.validationCode;
            context.res = { status: 200, body: { "ValidationResponse": code } };
        }

        else if (body.data && body.eventType == storageBlobCreatedEvent) {
            var blobCreatedEventData = body.data;
            context.log("Relaying received blob created event payload:" + JSON.stringify(blobCreatedEventData));
        }
    }
    context.done();
};

测试能否处理“已处理 Blob”事件

通过将 Blob 存储事件放到测试字段中并运行以下项来测试函数的新功能:

[{
  "topic": "/subscriptions/{subscription-id}/resourceGroups/Storage/providers/Microsoft.Storage/storageAccounts/xstoretestaccount",
  "subject": "/blobServices/default/containers/testcontainer/blobs/testfile.txt",
  "eventType": "Microsoft.Storage.BlobCreated",
  "eventTime": "2017-06-26T18:41:00.9584103Z",
  "id": "aaaa0a0a-bb1b-cc2c-dd3d-eeeeee4e4e4e",
  "data": {
    "api": "PutBlockList",
    "clientRequestId": "bbbb1b1b-cc2c-dd3d-ee4e-ffffff5f5f5f",
    "requestId": "cccc2c2c-dd3d-ee4e-ff5f-aaaaaa6a6a6a",
    "eTag": "0x8D4BCC2E4835CD0",
    "contentType": "text/plain",
    "contentLength": 524288,
    "blobType": "BlockBlob",
    "url": "https://example.blob.core.windows.net/testcontainer/testfile.txt",
    "sequencer": "00000000000004420000000000028963",
    "storageDiagnostics": {
      "batchId": "dddd3d3d-ee4e-ff5f-aa6a-bbbbbb7b7b7b"
    }
  },
  "dataVersion": "",
  "metadataVersion": "1"
}]

函数日志中应该会显示 Blob URL 输出:

2022-11-14T22:40:45.978 [Information] Executing 'Function1' (Reason='This function was programmatically called via the host APIs.', Id=8429137d-9245-438c-8206-f9e85ef5dd61)
2022-11-14T22:40:46.012 [Information] C# HTTP trigger function processed a request.
2022-11-14T22:40:46.017 [Information] Received events: [{"topic": "/subscriptions/{subscription-id}/resourceGroups/Storage/providers/Microsoft.Storage/storageAccounts/xstoretestaccount","subject": "/blobServices/default/containers/testcontainer/blobs/testfile.txt","eventType": "Microsoft.Storage.BlobCreated","eventTime": "2017-06-26T18:41:00.9584103Z","id": "aaaa0a0a-bb1b-cc2c-dd3d-eeeeee4e4e4e","data": {"api": "PutBlockList","clientRequestId": "bbbb1b1b-cc2c-dd3d-ee4e-ffffff5f5f5f","requestId": "cccc2c2c-dd3d-ee4e-ff5f-aaaaaa6a6a6a","eTag": "0x8D4BCC2E4835CD0","contentType": "text/plain","contentLength": 524288,"blobType": "BlockBlob","url": "https://example.blob.core.windows.net/testcontainer/testfile.txt","sequencer": "00000000000004420000000000028963","storageDiagnostics": {"batchId": "dddd3d3d-ee4e-ff5f-aa6a-bbbbbb7b7b7b"}},"dataVersion": "","metadataVersion": "1"}]
2022-11-14T22:40:46.335 [Information] Got BlobCreated event data, blob URI https://example.blob.core.windows.net/testcontainer/testfile.txt
2022-11-14T22:40:46.346 [Information] Executed 'Function1' (Succeeded, Id=8429137d-9245-438c-8206-f9e85ef5dd61, Duration=387ms)

还可以通过以下方式进行测试:创建一个 Blob 存储帐户或通用版 V2 存储帐户,添加事件订阅,然后将终结点设置为函数 URL:

函数 URL

处理自定义事件

最后,再次扩展函数,使其还能够处理自定义事件。

添加对事件 Contoso.Items.ItemReceived 的检查。 最终代码应如下所示:

using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System;
using Azure.Messaging.EventGrid;
using Azure.Messaging.EventGrid.SystemEvents;

namespace Function1
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");
            string response = string.Empty;
            BinaryData events = await BinaryData.FromStreamAsync(req.Body);
            log.LogInformation($"Received events: {events}");

            EventGridEvent[] eventGridEvents = EventGridEvent.ParseMany(events);

            foreach (EventGridEvent eventGridEvent in eventGridEvents)
            {
                // Handle system events
                if (eventGridEvent.TryGetSystemEventData(out object eventData))
                {
                    // Handle the subscription validation event
                    if (eventData is SubscriptionValidationEventData subscriptionValidationEventData)
                    {
                        log.LogInformation($"Got SubscriptionValidation event data, validation code: {subscriptionValidationEventData.ValidationCode}, topic: {eventGridEvent.Topic}");
                        // Do any additional validation (as required) and then return back the below response

                        var responseData = new
                        {
                            ValidationResponse = subscriptionValidationEventData.ValidationCode
                        };
                        return new OkObjectResult(responseData);
                    }
                    // Handle the storage blob created event
                    else if (eventData is StorageBlobCreatedEventData storageBlobCreatedEventData)
                    {
                        log.LogInformation($"Got BlobCreated event data, blob URI {storageBlobCreatedEventData.Url}");
                    }
                }
                // Handle the custom contoso event
                else if (eventGridEvent.EventType == "Contoso.Items.ItemReceived")
                {
                    var contosoEventData = eventGridEvent.Data.ToObjectFromJson<ContosoItemReceivedEventData>();
                    log.LogInformation($"Got ContosoItemReceived event data, item SKU {contosoEventData.ItemSku}");
                }
            }
            return new OkObjectResult(response);
        }
    }
}
module.exports = function (context, req) {
    context.log('JavaScript HTTP trigger function begun');
    var validationEventType = "Microsoft.EventGrid.SubscriptionValidationEvent";
    var storageBlobCreatedEvent = "Microsoft.Storage.BlobCreated";
    var customEventType = "Contoso.Items.ItemReceived";

    for (var events in req.body) {
        var body = req.body[events];
        // Deserialize the event data into the appropriate type based on event type
        if (body.data && body.eventType == validationEventType) {
            context.log("Got SubscriptionValidation event data, validation code: " + body.data.validationCode + " topic: " + body.topic);

            // Do any additional validation (as required) and then return back the below response
            var code = body.data.validationCode;
            context.res = { status: 200, body: { "ValidationResponse": code } };
        }

        else if (body.data && body.eventType == storageBlobCreatedEvent) {
            var blobCreatedEventData = body.data;
            context.log("Relaying received blob created event payload:" + JSON.stringify(blobCreatedEventData));
        }

        else if (body.data && body.eventType == customEventType) {
            var payload = body.data;
            context.log("Relaying received custom payload:" + JSON.stringify(payload));
        }
    }
    context.done();
};

测试能否处理自定义事件

最后,测试函数现在能否处理自定义事件类型:

[{
    "subject": "Contoso/foo/bar/items",
    "eventType": "Contoso.Items.ItemReceived",
    "eventTime": "2017-08-16T01:57:26.005121Z",
    "id": "602a88ef-0001-00e6-1233-1646070610ea",
    "data": { 
            "itemSku": "Standard"
            },
    "dataVersion": "",
    "metadataVersion": "1"
}]

还可实时测试此功能,方式是在门户中通过 CURL 发送自定义事件,或者使用任何可通过 POST 发送到终结点的服务或应用程序发布到自定义主题。 使用终结点集作为函数 URL,创建自定义主题和事件订阅。

消息标头

以下是在消息标头中接收的属性:

属性名称 说明
aeg-subscription-name 事件订阅的名称。
aeg-delivery-count 针对该事件进行尝试的次数。
aeg-event-type

事件的类型。

可以为下列值之一:

  • SubscriptionValidation
  • 通知
  • SubscriptionDeletion
aeg-metadata-version

事件的元数据版本。

对于“事件网格事件架构”,此属性表示元数据版本;对于“云事件架构”,此属性表示规范版本。

aeg-data-version

事件的数据版本。

对于“事件网格事件架构”,此属性表示数据版本;对于“云事件架构”,此属性不适用。

aeg-output-event-id 事件网格事件的 ID。