Kafka Streams アプリケーションとの 統合は、環境 内で Node.js Apache から直接データを処理および分析するための強力な方法です 。 を使用すると、リアルタイム データ処理を構築し、それを アプリケーションにシームレスに統合できます。 これを達成するための具体的なガイドは次のとおりです。 Kafka Node.js Kafka Streams Node.js
ステップ 1: Kafka Streams KafkaJS のインストールと
Kafka Streams まず、 KafkaJS をインストールしてアプリケーション Kafka に統合する 必要があります Node.js。 npm を使用してこれらのパッケージをインストールできます。
npm install kafka-streams kafkajs
ステップ 2: Kafka Stream
APIを使用してアプリケーション Kafka Stream 内に を 作成します 。 以下は、あるデータを処理し 、その結果を別のデータに出力する を作成する基本的な例です 。 Node.js Kafka Streams Kafka Stream topic topic
const { KafkaStreams } = require('kafka-streams');
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'your-client-id',
brokers: ['broker1:port1', 'broker2:port2'],
});
const kafkaStreams = new KafkaStreams({
kafka,
logLevel: 2, // Level 2 for debug logs
});
const streamConfig = {
'group.id': 'your-group-id',
'metadata.broker.list': 'broker1:port1,broker2:port2',
'enable.auto.commit': false,
'socket.keepalive.enable': true,
};
const stream = kafkaStreams.getKStream(streamConfig);
stream
.from('input-topic')
.filter(record => record.value && record.value.length > 0)
.map(record =>({
key: record.key,
value: record.value.toUpperCase(),
}))
.to('output-topic');
kafkaStreams.start();
ステップ 3: データを処理する
Kafka Stream 上の例では、からのデータをリッスンするため の を作成し input-topic
、データをすべて大文字に変換して結果を にプッシュすることでデータを処理しました output-topic
。
ステップ 4: アプリケーションを実行する
Node.js 最後に、アプリケーションを実行して からのデータの処理を開始する 必要があります Kafka Streams。
上記の例では、 your-client-id
、 broker1:port1,
your-group-id
、 input-topic
などの値を output-topic
プロジェクトの特定の詳細に置き換える必要があることに注意してください。
Kafka Streams アプリケーションと 統合することで、 Node.js リアルタイム データ処理機能を柔軟かつ強力に構築できます。