Hızlı Başlangıç: Go kullanarak Event Hubs'a olay gönderme veya olay alma
Azure Event Hubs saniyede milyonlarca olay alıp işleme kapasitesine sahip olan bir Büyük Veri akış platformu ve olay alma hizmetidir. Event Hubs dağıtılan yazılımlar ve cihazlar tarafından oluşturulan olayları, verileri ve telemetrileri işleyebilir ve depolayabilir. Bir olay hub’ına gönderilen veriler, herhangi bir gerçek zamanlı analiz sağlayıcısı ve işlem grubu oluşturma/depolama bağdaştırıcıları kullanılarak dönüştürülüp depolanabilir. Event Hubs’a ayrıntılı bir genel bakış için bkz. Event Hubs'a genel bakış ve Event Hubs özellikleri.
Bu hızlı başlangıçta, olay hub'ına olay göndermek veya olay hub'ından olay almak için Go uygulamalarının nasıl yazıldığı açıklanmaktadır.
Not
Bu hızlı başlangıçta GitHub'daki örnekler temel alınmaktadır https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Olayları gönderme bölümü example_producing_events_test.go örneğini, alma bölümü ise example_processor_test.go örneğini temel alır. Kod hızlı başlangıç için basitleştirilmiştir ve tüm ayrıntılı açıklamalar kaldırılır, bu nedenle daha fazla ayrıntı ve açıklama için örneklere bakın.
Önkoşullar
Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşullara ihtiyacınız vardır:
- Git yerel olarak yüklensin. Gerekirse bu yönergeleri izleyin.
- Etkin bir Azure hesabı. Azure aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.
- Event Hubs ad alanı ve olay hub'ı oluşturun. Event Hubs türünde bir ad alanı oluşturmak ve uygulamanızın olay hub'ı ile iletişim kurmak için ihtiyaç duyduğu yönetim kimlik bilgilerini almak için Azure portalını kullanın. Ad alanı ve olay hub'ı oluşturmak için bu makaledeki yordamı izleyin.
Olayları gönderme
Bu bölümde, bir olay hub'ına olay göndermek için bir Go uygulaması oluşturma gösterilmektedir.
Go paketini yükleme
Aşağıdaki örnekte gösterildiği gibi Event Hubs için Go paketini alın.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Olay hub'ına olay gönderme kodu
Olay hub'ına olay gönderme kodu aşağıdadır. Koddaki ana adımlar şunlardır:
- Event Hubs ad alanına ve olay hub'ı adına bağlantı dizesi kullanarak bir Event Hubs üretici istemcisi oluşturun.
- Bir batch nesnesi oluşturun ve toplu işleme örnek olaylar ekleyin.
- Olay toplu işlemini th olaylara gönderin.
Önemli
değerini Event Hubs ad alanınızın bağlantı dizesi ve EVENT HUB NAME
örnek koddaki olay hub'ı adıyla değiştirinNAMESPACE CONNECTION STRING
.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Uygulamayı henüz çalıştırmayın. Önce alıcı uygulamasını ve ardından gönderen uygulamasını çalıştırmanız gerekir.
Olayları alma
Depolama hesabı ve kapsayıcı oluşturma
Olaylardaki bölümlerdeki kiralar ve denetim noktaları gibi durumlar, Azure Depolama kapsayıcısı kullanılarak alıcılar arasında paylaşılır. Go SDK'sı ile bir depolama hesabı ve kapsayıcı oluşturabilirsiniz, ancak Azure depolama hesapları hakkında başlığındaki yönergeleri izleyerek de bir depolama hesabı oluşturabilirsiniz.
Azure Blob Depolama denetim noktası deposu olarak kullanırken şu önerileri izleyin:
- Her tüketici grubu için ayrı bir kapsayıcı kullanın. Aynı depolama hesabını kullanabilirsiniz, ancak her grup için bir kapsayıcı kullanabilirsiniz.
- Kapsayıcıyı başka hiçbir şey için kullanmayın ve depolama hesabını başka hiçbir şey için kullanmayın.
- Depolama hesabın dağıtılan uygulamanın bulunduğu bölgede olması gerekir. Uygulama şirket içindeyse, mümkün olan en yakın bölgeyi seçmeyi deneyin.
Azure portalındaki Depolama hesabı sayfasında, Blob hizmeti bölümünde aşağıdaki ayarların devre dışı bırakıldığından emin olun.
- Hiyerarşik ad alanı
- Blob geçici silme
- Sürüm oluşturma
Go paketleri
İletileri almak için, aşağıdaki örnekte gösterildiği gibi Event Hubs için Go paketlerini alın.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Olay hub'ından olay alma kodu
Olay hub'ından olay almaya yönelik kod aşağıdadır. Koddaki ana adımlar şunlardır:
- Denetim noktası oluşturma için olay hub'ı tarafından kullanılan Azure Blob Depolama temsil eden bir denetim noktası deposu nesnesini denetleyin.
- Event Hubs ad alanına ve olay hub'ı adına bağlantı dizesi kullanarak bir Event Hubs tüketici istemcisi oluşturun.
- İstemci nesnesini ve denetim noktası deposu nesnesini kullanarak bir olay işlemcisi oluşturun. İşlemci olayları alır ve işler.
- Olay hub'ında her bölüm için olayları işlemek için işlev olarak processEvents ile bir bölüm istemcisi oluşturun.
- Olayları almak ve işlemek için tüm bölüm istemcilerini çalıştırın.
Önemli
Aşağıdaki yer tutucu değerleri gerçek değerlerle değiştirin:
AZURE STORAGE CONNECTION STRING
Azure depolama hesabınızın bağlantı dizesiBLOB CONTAINER NAME
depolama hesabında oluşturduğunuz blob kapsayıcısının adıylaNAMESPACE CONNECTION STRING
Event Hubs ad alanınızın bağlantı dizesi ileEVENT HUB NAME
örnek kodda olay hub'ı adıyla.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Alıcı ve gönderen uygulamalarını çalıştırma
Önce alıcı uygulamasını çalıştırın.
Gönderen uygulamasını çalıştırın.
Alıcı penceresinde aşağıdaki çıkışı görmek için bir dakika bekleyin.
Processing 2 event(s) Event received with body hello Event received with body world
Sonraki adımlar
GitHub'daki örneklere bakın.https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs