Использование триггера зависит от модальности C#, используемой в приложении-функции. Это может быть один из следующих вариантов:
Используемые атрибуты зависят от конкретного поставщика событий.
В следующем примере показана функция C#, которая считывает и регистрирует сообщение Kafka как событие Kafka.
[FunctionName("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string> kevent, ILogger log)
{
log.LogInformation($"C# Kafka trigger function processed a message: {kevent.Value}");
}
Чтобы получить события в пакете, используйте входную строку или KafkaEventData
в качестве массива, как показано в следующем примере:
[FunctionName("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string>[] events, ILogger log)
{
foreach (KafkaEventData<string> kevent in events)
{
log.LogInformation($"C# Kafka trigger function processed a message: {kevent.Value}");
}
}
Следующая функция регистрирует сообщение и заголовки для события Kafka.
[FunctionName("KafkaTriggerSingleWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string> kevent, ILogger log)
{
log.LogInformation($"C# Kafka trigger function processed a message: {kevent.Value}");
log.LogInformation("Headers: ");
var headers = kevent.Headers;
foreach (var header in headers)
{
log.LogInformation($"Key = {header.Key} Value = {System.Text.Encoding.UTF8.GetString(header.Value)}");
}
}
Вы можете определить универсальную схему Avro для события, переданного триггеру. Следующее строковое значение определяет универсальную схему Avro.
const string PageViewsSchema = @"{
""type"": ""record"",
""name"": ""pageviews"",
""namespace"": ""ksql"",
""fields"": [
{
""name"": ""viewtime"",
""type"": ""long""
},
{
""name"": ""userid"",
""type"": ""string""
},
{
""name"": ""pageid"",
""type"": ""string""
}
]
}";
В следующей функции экземпляр GenericRecord
доступен в свойстве KafkaEvent.Value
.
[FunctionName(nameof(PageViews))]
public static void PageViews(
[KafkaTrigger("LocalBroker", "pageviews", AvroSchema = PageViewsSchema, ConsumerGroup = "azfunc")] KafkaEventData<string, GenericRecord>[] kafkaEvents,
long[] offsetArray,
int[] partitionArray,
string[] topicArray,
DateTime[] timestampArray,
ILogger logger)
{
for (int i = 0; i < kafkaEvents.Length; i++)
{
var kafkaEvent = kafkaEvents[i];
if (kafkaEvent.Value is GenericRecord genericRecord)
{
logger.LogInformation($"[{timestampArray[i]}] {topicArray[i]} / {partitionArray[i]} / {offsetArray[i]}: {GenericToJson(genericRecord)}");
}
}
}
Вы можете определить конкретную схему Avro для события, переданного триггеру. В указанном ниже коде определяется класс UserRecord
:
public const string SchemaText = @"
{
""type"": ""record"",
""name"": ""UserRecord"",
""namespace"": ""KafkaFunctionSample"",
""fields"": [
{
""name"": ""registertime"",
""type"": ""long""
},
{
""name"": ""userid"",
""type"": ""string""
},
{
""name"": ""regionid"",
""type"": ""string""
},
{
""name"": ""gender"",
""type"": ""string""
}
]
}";
В следующей функции экземпляр UserRecord
доступен в свойстве KafkaEvent.Value
.
[FunctionName(nameof(User))]
public static void User(
[KafkaTrigger("LocalBroker", "users", ConsumerGroup = "azfunc")] KafkaEventData<string, UserRecord>[] kafkaEvents,
ILogger logger)
{
foreach (var kafkaEvent in kafkaEvents)
{
logger.LogInformation($"{JsonConvert.SerializeObject(kafkaEvent.Value)}");
}
}
Полный набор рабочих примеров .NET см. в репозитории расширений Kafka.
В следующем примере показана функция C#, которая считывает и регистрирует сообщение Kafka как событие Kafka.
[FunctionName("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string> kevent, ILogger log)
{
log.LogInformation($"C# Kafka trigger function processed a message: {kevent.Value}");
}
Чтобы получить события в пакете, используйте в качестве входных данных массив строк или массив KafkaEventData
, как показано в следующем примере:
[FunctionName("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string>[] events, ILogger log)
{
foreach (KafkaEventData<string> kevent in events)
{
log.LogInformation($"C# Kafka trigger function processed a message: {kevent.Value}");
}
}
Следующая функция регистрирует сообщение и заголовки для события Kafka.
[FunctionName("KafkaTriggerSingleWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] KafkaEventData<string> kevent, ILogger log)
{
log.LogInformation($"C# Kafka trigger function processed a message: {kevent.Value}");
log.LogInformation("Headers: ");
var headers = kevent.Headers;
foreach (var header in headers)
{
log.LogInformation($"Key = {header.Key} Value = {System.Text.Encoding.UTF8.GetString(header.Value)}");
}
Вы можете определить универсальную схему Avro для события, переданного триггеру. Следующее строковое значение определяет универсальную схему Avro.
const string PageViewsSchema = @"{
""type"": ""record"",
""name"": ""pageviews"",
""namespace"": ""ksql"",
""fields"": [
{
""name"": ""viewtime"",
""type"": ""long""
},
{
""name"": ""userid"",
""type"": ""string""
},
{
""name"": ""pageid"",
""type"": ""string""
}
]
}";
В следующей функции экземпляр GenericRecord
доступен в свойстве KafkaEvent.Value
.
[FunctionName(nameof(PageViews))]
public static void PageViews(
[KafkaTrigger("LocalBroker", "pageviews", AvroSchema = PageViewsSchema, ConsumerGroup = "azfunc")] KafkaEventData<string, GenericRecord>[] kafkaEvents,
long[] offsetArray,
int[] partitionArray,
string[] topicArray,
DateTime[] timestampArray,
ILogger logger)
{
for (int i = 0; i < kafkaEvents.Length; i++)
{
var kafkaEvent = kafkaEvents[i];
if (kafkaEvent.Value is GenericRecord genericRecord)
{
logger.LogInformation($"[{timestampArray[i]}] {topicArray[i]} / {partitionArray[i]} / {offsetArray[i]}: {GenericToJson(genericRecord)}");
}
}
}
Вы можете определить конкретную схему Avro для события, переданного триггеру. В указанном ниже коде определяется класс UserRecord
:
public const string SchemaText = @"
{
""type"": ""record"",
""name"": ""UserRecord"",
""namespace"": ""KafkaFunctionSample"",
""fields"": [
{
""name"": ""registertime"",
""type"": ""long""
},
{
""name"": ""userid"",
""type"": ""string""
},
{
""name"": ""regionid"",
""type"": ""string""
},
{
""name"": ""gender"",
""type"": ""string""
}
]
}";
В следующей функции экземпляр UserRecord
доступен в свойстве KafkaEvent.Value
.
[FunctionName(nameof(User))]
public static void User(
[KafkaTrigger("LocalBroker", "users", ConsumerGroup = "azfunc")] KafkaEventData<string, UserRecord>[] kafkaEvents,
ILogger logger)
{
foreach (var kafkaEvent in kafkaEvents)
{
logger.LogInformation($"{JsonConvert.SerializeObject(kafkaEvent.Value)}");
}
}
Полный набор рабочих примеров .NET см. в репозитории расширений Kafka.
В следующем примере показана функция C#, которая считывает и регистрирует сообщение Kafka как событие Kafka.
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
Чтобы получить события в пакете, используйте в качестве входных данных массив строк, как показано в следующем примере:
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
Следующая функция регистрирует сообщение и заголовки для события Kafka.
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
Полный набор рабочих примеров .NET см. в репозитории расширений Kafka.
В следующем примере показана функция C#, которая считывает и регистрирует сообщение Kafka как событие Kafka.
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "EventHubConnectionString",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
Чтобы получить события в пакете, используйте в качестве входных данных массив строк, как показано в следующем примере:
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "EventHubConnectionString",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
Следующая функция регистрирует сообщение и заголовки для события Kafka.
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "EventHubConnectionString",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
Полный набор рабочих примеров .NET см. в репозитории расширений Kafka.