Integrointi Kafka Streams sovellukseen Node.js on tehokas tapa käsitellä ja analysoida tietoja suoraan Apachesta Kafka ympäristössä Node.js. Kafka Streams avulla voit rakentaa reaaliaikaisen tietojenkäsittelyn ja integroida sen saumattomasti sovellukseesi Node.js. Tässä on erityinen opas tämän saavuttamiseen:
Vaihe 1: Asenna Kafka Streams ja KafkaJS
Ensin sinun on asennettava Kafka Streams ja KafkaJS integroitava Kafka sovellukseesi Node.js. Voit käyttää npm:ää näiden pakettien asentamiseen:
npm install kafka-streams kafkajs
Vaihe 2: Luo a Kafka Stream
Luo Kafka Stream sovellukseesi API:n Node.js avulla Kafka Streams. Tässä on perusesimerkki a:n luomisesta Kafka Stream tietojen käsittelemiseksi yhdestä topic ja tuloksesta toiseen topic:
const { KafkaStreams } = require('kafka-streams');
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'your-client-id',
brokers: ['broker1:port1', 'broker2:port2'],
});
const kafkaStreams = new KafkaStreams({
kafka,
logLevel: 2, // Level 2 for debug logs
});
const streamConfig = {
'group.id': 'your-group-id',
'metadata.broker.list': 'broker1:port1,broker2:port2',
'enable.auto.commit': false,
'socket.keepalive.enable': true,
};
const stream = kafkaStreams.getKStream(streamConfig);
stream
.from('input-topic')
.filter(record => record.value && record.value.length > 0)
.map(record =>({
key: record.key,
value: record.value.toUpperCase(),
}))
.to('output-topic');
kafkaStreams.start();
Vaihe 3: Käsittele tiedot
Yllä olevassa esimerkissä olemme luoneet a Kafka Stream kuuntelemaan tietoja osoitteesta input-topic
, käsitelleet tiedot muuntamalla ne kaikki isoiksi kirjaimille ja työntämällä tuloksen output-topic
.
Vaihe 4: Suorita sovellus
Lopuksi sinun on suoritettava Node.js sovellus aloittaaksesi tietojen käsittelyn Kafka Streams.
Huomaa, että yllä olevassa esimerkissä sinun on korvattava arvot, kuten your-client-id
, ja projektisi erityisillä tiedoilla. broker1:port1,
your-group-id
input-topic
output-topic
Integroimalla Kafka Streams sovellukseen Node.js voit rakentaa joustavasti ja tehokkaasti reaaliaikaisia tietojenkäsittelyominaisuuksia.