Öğretici: Stream analytics kullanarak Event Hubs için Apache Kafka olaylarını işleme
Bu makalede Verileri Event Hubs'a aktarma ve Azure Stream Analytics ile işleme adımları gösterilmektedir. Aşağıdaki adımlarda size yol gösterilir:
- Bir Event Hubs ad alanı oluşturun.
- Olay hub'ına ileti gönderen bir Kafka istemcisi oluşturun.
- Olay hub'ından Azure blob depolama alanına veri kopyalayan bir Stream Analytics işi oluşturun.
Bir olay hub'ı tarafından kullanıma sunulan Kafka uç noktasını kullanırken protokol istemcilerinizi değiştirmeniz veya kendi kümelerinizi çalıştırmanız gerekmez. Azure Event Hubs Apache Kafka sürüm 1.0 ve üzerini destekler.
Önkoşullar
Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşulların karşılandığından emin olun:
- Azure aboneliği. Aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.
- Java Development Kit (JDK) 1.7+.
- Bir Maven ikili arşivini indirin ve yükleyin.
- Git
- Bir Azure Depolama hesabı. Yoksa, devam etmeden önce bir tane oluşturun . Bu kılavuzdaki Stream Analytics işi, çıktı verilerini bir Azure blob depolama alanında depolar.
Event Hubs ad alanı oluşturma
Bir Event Hubs ad alanı oluşturduğunuzda, ad alanının Kafka uç noktası otomatik olarak etkinleştirilir. Kafka protokollerini kullanan uygulamalarınızdaki olayları olay hub'larına akışla aktarabilirsiniz. Event Hubs ad alanı oluşturmak için Azure portal kullanarak olay hub'ı oluşturma başlığındaki adım adım yönergeleri izleyin. Ayrılmış küme kullanıyorsanız bkz. Ayrılmış kümede ad alanı ve olay hub'ı oluşturma.
Not
Kafka için Event Hubs temel katmanda desteklenmez.
Event Hubs'ta Kafka ile ileti gönderme
Kafka deposu için Azure Event Hubs makinenize kopyalayın.
Klasörüne gidin:
azure-event-hubs-for-kafka/quickstart/java/producer
.içindeki
src/main/resources/producer.config
üreticinin yapılandırma ayrıntılarını güncelleştirin. Olay hub'ı ad alanı için adı ve bağlantı dizesini belirtin.bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
adresine
azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/
gidin ve TestDataReporter.java dosyasını istediğiniz bir düzenleyicide açın.Aşağıdaki kod satırını açıklama satırı yapın:
//final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
Açıklamalı kodun yerine aşağıdaki kod satırını ekleyin:
final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");
Bu kod olay verilerini JSON biçiminde gönderir. Stream Analytics işi için girişi yapılandırırken, giriş verilerinin biçimi olarak JSON'u belirtirsiniz.
Üreticiyi çalıştırın ve Event Hubs'a akışla aktarın. Windows makinesinde ,Node.js komut istemi kullanırken bu komutları çalıştırmadan önce klasörüne
azure-event-hubs-for-kafka/quickstart/java/producer
geçin.mvn clean package mvn exec:java -Dexec.mainClass="TestProducer"
Olay hub'sının verileri aldığını doğrulayın
VARLıKLAR'ın altında Event Hubs'ı seçin. test adlı bir olay hub'ı gördüğünüzden emin olun.
Olay hub'ına gelen iletileri gördüğünüzden emin olun.
Stream Analytics işi kullanarak olay verilerini işleme
Bu bölümde bir Azure Stream Analytics işi oluşturacaksınız. Kafka istemcisi olayları olay hub'ına gönderir. Olay verilerini giriş olarak alan ve azure blob depolama alanına aktaran bir Stream Analytics işi oluşturursunuz. Azure Depolama hesabınız yoksa bir hesap oluşturun.
Stream Analytics işindeki sorgu, herhangi bir analiz gerçekleştirmeden verilerden geçer. Çıkış verilerini farklı bir biçimde veya elde edilen içgörülerle üretmek için giriş verilerini dönüştüren bir sorgu oluşturabilirsiniz.
Akış Analizi işi oluşturma
- Azure portal+ Kaynak oluştur'u seçin.
- Azure Market menüsünde Analiz'i ve ardından Stream Analytics işi'ni seçin.
-
Yeni Stream Analytics sayfasında aşağıdaki eylemleri gerçekleştirin:
İş için bir ad girin.
Aboneliğinizi seçin.
Kaynak grubu için Yeni oluştur'u seçin ve adı girin. Var olan bir kaynak grubunu da kullanabilirsiniz.
İş için bir konum seçin.
İşi oluşturmak için Oluştur'u seçin.
İş girişi yapılandırma
Stream Analytics iş sayfasını görmek için bildirim iletisinde Kaynağa git'i seçin.
Soldaki menünün İş TOPOLOJİSİ bölümünde Girişler'i seçin.
Akış girişi ekle'yi ve ardından Olay Hub'ı'na tıklayın.
Olay Hub'ı giriş yapılandırması sayfasında aşağıdaki eylemleri gerçekleştirin:
Giriş için bir diğer ad belirtin.
Azure aboneliğinizi seçin.
Daha önce oluşturduğunuz olay hub'ı ad alanını seçin.
Olay hub'ı için test et'i seçin.
Kaydet’i seçin.
İş çıkışını yapılandırma
- Menünün İş TOPOLOJİSİ bölümünde Çıkışlar'ı seçin.
- Araç çubuğunda + Ekle'yi ve ardından Blob depolama'yı seçin
- Blob depolama çıkış ayarları sayfasında aşağıdaki eylemleri gerçekleştirin:
Çıkış için bir diğer ad belirtin.
Azure aboneliğinizi seçin.
Azure Depolama hesabınızı seçin.
Stream Analytics sorgusundaki çıktı verilerini depolayan kapsayıcı için bir ad girin.
Kaydet’i seçin.
Sorgu tanımlama
Gelen bir veri akışını okumak için bir Stream Analytics işi ayarladıktan sonraki adım, verileri gerçek zamanlı olarak analiz eden bir dönüştürme oluşturmaktır. Dönüştürme sorgusunu Stream Analytics sorgu dilini kullanarak tanımlarsınız. Bu kılavuzda, herhangi bir dönüştürme gerçekleştirmeden verilerden geçen bir sorgu tanımlarsınız.
Sorgu'yu seçin.
Sorgu penceresinde değerini daha önce oluşturduğunuz çıkış diğer adıyla değiştirin
[YourOutputAlias]
.değerini daha önce oluşturduğunuz giriş diğer adıyla değiştirin
[YourInputAlias]
.Araç çubuğunda Kaydet’i seçin.
Stream Analytics işini çalıştırma
Soldaki menüden Genel Bakış'ı seçin.
Başlat'ı seçin.
İşi başlat sayfasında Başlat'ı seçin.
İşin durumu Başlangıç olandan çalışmaya değişene kadar bekleyin.
Senaryoyu test etme
Olay hub'ına olay göndermek için Kafka üreticisini yeniden çalıştırın.
mvn exec:java -Dexec.mainClass="TestProducer"
Çıkış verilerininAzure blob depolama alanında oluşturulduğunu gördüğünüzden emin olmanız gerekir. Kapsayıcıda aşağıdaki örnek satırlara benzeyen 100 satırı olan bir JSON dosyası görürsünüz:
{"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
Azure Stream Analytics işi, olay hub'ından giriş verilerini aldı ve bu senaryoda Azure blob depolama alanında depoladı.
Sonraki adımlar
Bu makalede, protokol istemcilerinizi değiştirmeden veya kendi kümelerinizi çalıştırmadan Event Hubs'a akış yapmayı öğrendiniz. Apache Kafka için Event Hubs hakkında daha fazla bilgi edinmek için bkz. Azure Event Hubs için Apache Kafka geliştirici kılavuzu.