Mulai cepat: Mengirim acara ke atau menerima acara dari Event Hubs menggunakan Go
Azure Event Hubs adalah platform streaming Big Data dan layanan pengolahan peristiwa, yang mampu menerima dan memproses jutaan peristiwa per detik. Event Hubs dapat memproses dan menyimpan peristiwa, data, atau telemetri yang dihasilkan oleh perangkat lunak dan perangkat yang terdistribusi. Data yang dikirim ke pusat aktivitas dapat ditransformasikan dan disimpan menggunakan penyedia analitik real-time atau adapter batching/penyimpanan. Untuk ringkasan detail Azure Event Hubs, lihat Ringkasan Azure Event Hubs dan fitur Azure Event Hubs.
Mulai cepat ini menjelaskan cara menulis aplikasi Go untuk mengirim peristiwa ke atau menerima peristiwa dari pusat aktivitas.
Catatan
Mulai cepat ini didasarkan pada sampel di GitHub di https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Bagian kirim peristiwa didasarkan pada sampel example_producing_events_test.go dan yang diterima didasarkan pada sampel example_processor_test.go . Kode disederhanakan untuk mulai cepat dan semua komentar terperinci dihapus, jadi lihat sampel untuk detail dan penjelasan selengkapnya.
Prasyarat
Untuk menyelesaikan mulai cepat ini, Anda memerlukan prasyarat berikut:
- Go dipasang secara lokal. Ikuti instruksi ini jika perlu.
- Akun Azure aktif. Jika Anda tidak memiliki langganan Azure, buat akun gratis sebelum Anda memulai.
- Membuat ruang nama Azure Event Hubs dan pusat aktivitas. Gunakan portal Microsoft Azure untuk membuat kumpulan nama jenis Event Hubs, dan dapatkan info manajemen yang diperlukan aplikasi Anda untuk berkomunikasi dengan pusat aktivitas. Untuk membuat namespace layanan dan pusat aktivitas, ikuti prosedur dalam artikel ini.
Mengirim aktivitas
Bagian ini menunjukkan kepada Anda cara membuat aplikasi Go untuk mengirim aktivitas ke pusat aktivitas.
Memasang paket Go
Dapatkan paket Go untuk Azure Event Hubs seperti yang ditunjukkan dalam contoh berikut.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Kode untuk mengirim peristiwa ke pusat aktivitas
Berikut adalah kode untuk mengirim peristiwa ke pusat aktivitas. Langkah utama dalam kode adalah:
- Buat klien produsen Azure Event Hubs menggunakan string koneksi ke namespace layanan Azure Event Hubs dan nama pusat aktivitas.
- Buat objek batch dan tambahkan peristiwa sampel ke batch.
- Kirim batch peristiwa ke peristiwa ke th.
Penting
Ganti NAMESPACE CONNECTION STRING
dengan string koneksi ke namespace Layanan Pusat Aktivitas Anda dan EVENT HUB NAME
dengan nama hub peristiwa dalam kode sampel.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Jangan jalankan aplikasi. Anda harus terlebih dahulu menjalankan aplikasi penerima lalu aplikasi pengirim.
Menerima peristiwa
Membuat kontainer dan akun Azure Storage
Status seperti sewa pada partisi dan titik pemeriksaan dalam peristiwa dibagikan antara penerima menggunakan kontainer Azure Storage. Anda bisa membuat akun penyimpanan dan kontainer dengan Go SDK, tetapi Anda juga bisa membuatnya dengan mengikuti instruksi di Tentang akun Azure Storage.
Ikuti rekomendasi ini saat menggunakan Azure Blob Storage sebagai penyimpanan titik pemeriksaan:
- Gunakan kontainer terpisah untuk setiap grup konsumen. Anda dapat menggunakan akun penyimpanan yang sama, tetapi menggunakan satu kontainer per setiap grup.
- Jangan gunakan kontainer untuk hal lain, dan jangan gunakan akun penyimpanan untuk hal lain.
- Akun penyimpanan harus berada di wilayah yang sama dengan aplikasi yang disebarkan berada. Jika aplikasi lokal, coba pilih wilayah terdekat yang mungkin.
Pada halaman Akun penyimpanan di portal Azure, di bagian Blob service, pastikan bahwa pengaturan berikut dinonaktifkan.
- Namespace hierarkis
- Penghapusan sementara blob
- Penerapan versi
Paket Go
Untuk menerima pesan, dapatkan paket Go untuk Azure Event Hubs seperti yang ditunjukkan dalam contoh berikut.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Kode untuk menerima peristiwa dari pusat aktivitas
Berikut adalah kode untuk menerima peristiwa dari pusat aktivitas. Langkah utama dalam kode adalah:
- Periksa objek penyimpanan titik pemeriksaan yang mewakili Azure Blob Storage yang digunakan oleh pusat aktivitas untuk titik pemeriksaan.
- Buat klien konsumen Azure Event Hubs menggunakan string koneksi ke namespace layanan Azure Event Hubs dan nama pusat aktivitas.
- Buat prosesor peristiwa menggunakan objek klien dan objek penyimpanan titik pemeriksaan. Prosesor menerima dan memproses peristiwa.
- Untuk setiap partisi di pusat aktivitas, buat klien partisi dengan processEvents sebagai fungsi untuk memproses peristiwa.
- Jalankan semua klien partisi untuk menerima dan memproses peristiwa.
Penting
Ganti nilai tempat penampung berikut dengan nilai aktual:
AZURE STORAGE CONNECTION STRING
dengan string koneksi untuk akun penyimpanan Azure AndaBLOB CONTAINER NAME
dengan nama kontainer blob yang Anda buat di akun penyimpananNAMESPACE CONNECTION STRING
dengan string koneksi untuk namespace Layanan Pusat Aktivitas AndaEVENT HUB NAME
dengan nama pusat aktivitas dalam kode sampel.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Menjalankan aplikasi penerima dan pengirim
Jalankan aplikasi penerima terlebih dahulu.
Jalankan aplikasi pengirim.
Tunggu sebentar untuk melihat output berikut di jendela penerima.
Processing 2 event(s) Event received with body hello Event received with body world
Langkah berikutnya
Lihat sampel di GitHub di https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.