En trinn-for-trinn-veiledning for oppsett og kjøring

Millioner av dataposter genereres hver eneste dag i dagens datasystemer. Disse inkluderer dine økonomiske transaksjoner, bestilling eller data fra bilsensoren. For å behandle disse datastrømmehendelsene i sanntid og pålitelig flytte hendelsesposter mellom ulike bedriftssystemer, trenger du Apache Kafka.

Apache Kafka er en åpen kildekode-datastrømningsløsning som håndterer over 1 million poster per sekund. Ved siden av denne høye gjennomstrømningen gir Apache Kafka høy skalerbarhet og tilgjengelighet, lav ventetid og permanent lagring.

Selskaper som LinkedIn, Uber og Netflix stoler på Apache Kafka for sanntidsbehandling og datastrømming. Den enkleste måten å komme i gang med Apache Kafka på er å ha den oppe og kjøre på din lokale maskin. Dette lar deg ikke bare se Apache Kafka-serveren i aksjon, men lar deg også produsere og konsumere meldinger.

Med praktisk erfaring med å starte serveren, lage emner og skrive Java-kode ved hjelp av Kafka-klienten, vil du være klar til å bruke Apache Kafka for å oppfylle alle dine datapipeline-behov.

Hvordan laste ned Apache Kafka på din lokale maskin

Du kan laste ned den nyeste versjonen av Apache Kafka fra offisiell lenke. Det nedlastede innholdet vil bli komprimert i .tgz-format. Når du er lastet ned, må du trekke ut det samme.

Hvis du er Linux, åpne terminalen. Deretter navigerer du til stedet der du har lastet ned den komprimerte Apache Kafka-versjonen. Kjør følgende kommando:

tar -xzvf kafka_2.13-3.5.0.tgz

Etter at kommandoen er fullført, vil du finne en ny katalog kalt kafka_2.13-3.5.0. Naviger inn i mappen ved å bruke:

cd kafka_2.13-3.5.0

Du kan nå liste innholdet i denne katalogen ved å bruke ls-kommandoen.

For Windows-brukere kan du følge de samme trinnene. Hvis du ikke finner tar-kommandoen, kan du bruke et tredjepartsverktøy som WinZip for å åpne arkivet.

Slik starter du Apache Kafka på din lokale maskin

Etter at du har lastet ned og pakket ut Apache Kafka, er det på tide å begynne å kjøre den. Den har ingen installatører. Du kan begynne å bruke den direkte via kommandolinjen eller terminalvinduet.

Før du begynner med Apache Kafka, sørg for at du har Java 8+ installert på systemet ditt. Apache Kafka krever en kjørende Java-installasjon.

#1. Kjør Apache Zookeeper-serveren

Det første trinnet er å kjøre Apache Zookeeper. Du får den forhåndslastet ned som en del av arkivet. Det er en tjeneste som er ansvarlig for å vedlikeholde konfigurasjoner og sørge for synkronisering for andre tjenester.

Når du er inne i katalogen der du har trukket ut innholdet i arkivet, kjør følgende kommando:

For Linux-brukere:

bin/zookeeper-server-start.sh config/zookeeper.properties

For Windows-brukere:

bin/windows/zookeeper-server-start.bat config/zookeeper.properties

Filen zookeeper.properties inneholder konfigurasjonene for å kjøre Apache Zookeeper-serveren. Du kan konfigurere egenskaper som den lokale katalogen der dataene skal lagres og porten som serveren skal kjøre på.

#2. Start Apache Kafka-serveren

Nå som Apache Zookeeper-serveren er startet, er det på tide å starte Apache Kafka-serveren.

Åpne et nytt terminal- eller ledetekstvindu og naviger til katalogen der de utpakkede filene er til stede. Deretter kan du starte Apache Kafka-serveren ved å bruke kommandoen nedenfor:

For Linux-brukere:

bin/kafka-server-start.sh config/server.properties

For Windows-brukere:

bin/windows/kafka-server-start.bat config/server.properties

Du har Apache Kafka-serveren i gang. Hvis du ønsker å endre standardkonfigurasjonen, kan du gjøre det ved å endre filen server.properties. De forskjellige verdiene er tilstede i offisiell dokumentasjon.

Slik bruker du Apache Kafka på din lokale maskin

Du er nå klar til å begynne å bruke Apache Kafka på din lokale maskin for å produsere og konsumere meldinger. Siden Apache Zookeeper- og Apache Kafka-serverne er oppe og går, la oss se hvordan du kan lage ditt første emne, produsere din første melding og konsumere det samme.

Hva er trinnene for å lage et emne i Apache Kafka?

Før du lager ditt første emne, la oss forstå hva et emne faktisk er. I Apache Kafka er et emne et logisk datalager som hjelper til med datastrømming. Tenk på det som kanalen der data transporteres fra den ene komponenten til den andre.

Et emne støtter multiprodusenter og multiforbrukere – mer enn ett system kan skrive og lese fra et emne. I motsetning til andre meldingssystemer, kan enhver melding fra et emne konsumeres mer enn én gang. I tillegg kan du også nevne oppbevaringsperioden for meldingene dine.

La oss ta eksempelet med et system (produsent) som produserer data for banktransaksjoner. Og et annet system (forbruker) bruker disse dataene og sender et appvarsel til brukeren. For å legge til rette for dette kreves et tema.

Åpne et nytt terminal- eller ledetekstvindu, og naviger til katalogen der du har pakket ut arkivet. Følgende kommando vil opprette et emne kalt transaksjoner:

For Linux-brukere:

bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092

For Windows-brukere:

bin/windows/kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092

Du har nå laget ditt første emne, og du er klar til å begynne å produsere og konsumere meldinger.

Hvordan produsere en melding til Apache Kafka?

Med Apache Kafka-emnet ditt klart, kan du nå lage din første melding. Åpne et nytt terminal- eller ledetekstvindu, eller bruk det samme du har brukt til å lage emnet. Deretter må du sørge for at du er i riktig katalog der du har trukket ut innholdet i arkivet. Du kan bruke kommandolinjen til å lage meldingen om emnet ved å bruke følgende kommando:

For Linux-brukere:

bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092

For Windows-brukere:

bin/windows/kafka-console-producer.bat --topic transactions --bootstrap-server localhost:9092

Når du kjører kommandoen, vil du se at terminalen eller ledetekstvinduet venter på inndata. Skriv din første melding og trykk Enter.

> This is a transactional record for $100

Du har produsert din første melding til Apache Kafka på din lokale maskin. Deretter er du nå klar til å bruke denne meldingen.

Hvordan konsumere en melding fra Apache Kafka?

Forutsatt at emnet ditt er opprettet og du har laget en melding til Kafka-emnet ditt, kan du nå konsumere den meldingen.

Apache Kafka lar deg knytte flere forbrukere til samme emne. Hver forbruker kan være en del av en forbrukergruppe – en logisk identifikator. Hvis du for eksempel har to tjenester som trenger å konsumere samme data, kan de ha forskjellige forbrukergrupper.

Men hvis du har to forekomster av samme tjeneste, vil du unngå å konsumere og behandle den samme meldingen to ganger. I så fall vil begge ha samme forbrukergruppe.

I terminal- eller ledetekstvinduet, sørg for at du er i riktig katalog. Bruk følgende kommando for å starte forbrukeren:

For Linux-brukere:

bin/kafka-console-consumer.sh --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer

For Windows-brukere:

bin/windows/kafka-console-consumer.bat --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer

Du vil se meldingen du tidligere har laget vises på terminalen din. Du har nå brukt Apache Kafka til å konsumere din første melding.

Kafka-console-consumer-kommandoen tar mange argumenter som sendes inn. La oss se hva hver av dem betyr:

  • –emnet nevner emnet der du skal konsumere
  • –fra-begynnelsen ber konsollforbrukeren om å begynne å lese meldinger rett fra den første meldingen til stede
  • Apache Kafka-serveren din er nevnt via –bootstrap-server-alternativet
  • I tillegg kan du nevne forbrukergruppen ved å sende parameteren –group
  • I fravær av en forbrukergruppeparameter, blir den automatisk generert

Når konsollforbrukeren kjører, kan du prøve å lage nye meldinger. Du vil se at alle er konsumert og dukker opp i terminalen din.

Nå som du har opprettet emnet og vellykket produsert og konsumert meldinger, la oss integrere dette med en Java-applikasjon.

Hvordan lage Apache Kafka produsent og forbruker ved hjelp av Java

Før du begynner, sørg for at du har Java 8+ installert på din lokale maskin. Apache Kafka tilbyr sitt eget klientbibliotek som lar deg koble til sømløst. Hvis du bruker Maven til å administrere avhengighetene dine, legg til følgende avhengighet til pom.xml

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.0</version>
</dependency>

Du kan også laste ned biblioteket fra Maven-depot og legg den til i Java-klassebanen.

Når biblioteket ditt er på plass, åpner du et valgfritt koderedigeringsprogram. La oss se hvordan du kan starte opp produsent og forbruker ved å bruke Java.

Lag Apache Kafka Java-produsent

Med kafka-klientbiblioteket på plass, er du nå klar til å begynne å lage din Kafka-produsent.

La oss lage en klasse kalt SimpleProducer.java. Denne vil være ansvarlig for å produsere meldinger om emnet du har laget tidligere. Inne i denne klassen vil du lage en forekomst av org.apache.kafka.clients.producer.KafkaProducer. Deretter vil du bruke denne produsenten til å sende meldingene dine.

For å lage Kafka-produsenten trenger du verten og porten til Apache Kafka-serveren. Siden du kjører den på din lokale maskin, vil verten være localhost. Gitt at du ikke har endret standardegenskapene når du starter opp serveren, vil porten være 9092. Tenk på følgende kode nedenfor som vil hjelpe deg med å lage din produsent:

package org.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleProducer {

    private final KafkaProducer<String, String> producer;

    public SimpleProducer(String host, String port) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
    }
}

Du vil legge merke til at det er tre egenskaper som blir angitt. La oss raskt gå gjennom hver av dem:

  • BOOTSTRAP_SERVERS_CONFIG lar deg definere hvor Apache Kafka-serveren kjører
  • KEY_SERIALIZER_CLASS_CONFIG forteller produsenten hvilket format som skal brukes for å sende meldingsnøklene.
  • Formatet for å sende den faktiske meldingen er definert ved hjelp av egenskapen VALUE_SERIALIZER_CLASS_CONFIG.

Siden du skal sende tekstmeldinger, er begge egenskapene satt til å bruke StringSerializer.class.

For å faktisk sende en melding til emnet ditt, må du bruke producer.send()-metoden som tar inn en ProducerRecord. Følgende kode gir deg en metode som sender en melding til emnet og skriver ut svaret sammen med meldingsforskyvningen.

public void produce(String topic, String message) throws ExecutionException, InterruptedException {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
    final Future<RecordMetadata> send = this.producer.send(record);
    final RecordMetadata recordMetadata = send.get();
    System.out.println(recordMetadata);
}

Med hele koden på plass kan du nå sende meldinger til emnet ditt. Du kan bruke en hovedmetode for å teste dette ut, som presentert i koden nedenfor:

package org.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleProducer {

    private final KafkaProducer<String, String> producer;

    public SimpleProducer(String host, String port) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
    }

    public void produce(String topic, String message) throws ExecutionException, InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
        final Future<RecordMetadata> send = this.producer.send(record);
        final RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);
    }

    public static void main(String[] args) throws Exception{
       SimpleProducer producer = new SimpleProducer("localhost", "9092");
       producer.produce("transactions", "This is a transactional record of $200");
    }
}

I denne koden lager du en SimpleProducer som kobles til Apache Kafka-serveren din på din lokale maskin. Den bruker internt KafkaProducer til å lage tekstmeldinger om emnet ditt.

Lag Apache Kafka Java-forbruker

Det er på tide å gjøre en Apache Kafka-forbruker ved å bruke Java-klienten. Lag en klasse kalt SimpleConsumer.java. Deretter skal du lage en konstruktør for denne klassen, som initialiserer org.apache.kafka.clients.consumer.KafkaConsumer. For å opprette forbrukeren trenger du verten og porten der Apache Kafka-serveren kjører. I tillegg trenger du forbrukergruppen samt emnet du ønsker å konsumere fra. Bruk kodebiten gitt nedenfor:

package org.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimpleConsumer {

    private static final String OFFSET_RESET = "earliest";

    private final KafkaConsumer<String, String> consumer;
    private boolean keepConsuming = true;

    public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(List.of(topic));
    }
}

I likhet med Kafka-produsenten tar Kafka-forbrukeren også inn et Egenskapsobjekt. La oss se på alle de forskjellige egenskapene:

  • BOOTSTRAP_SERVERS_CONFIG forteller forbrukeren hvor Apache Kafka-serveren kjører
  • Forbrukergruppen er nevnt ved å bruke GROUP_ID_CONFIG
  • Når forbrukeren begynner å konsumere, lar AUTO_OFFSET_RESET_CONFIG deg nevne hvor langt tilbake du vil begynne å konsumere meldinger fra
  • KEY_DESERIALIZER_CLASS_CONFIG forteller forbrukeren typen meldingsnøkkel
  • VALUE_DESERIALIZER_CLASS_CONFIG forteller forbrukertypen om den faktiske meldingen

Siden du i ditt tilfelle vil bruke tekstmeldinger, er deserializer-egenskapene satt til StringDeserializer.class.

Du vil nå konsumere meldingene fra emnet ditt. For å gjøre ting enkelt vil du skrive ut meldingen til konsollen når meldingen er konsumert. La oss se hvordan du kan oppnå dette ved å bruke koden nedenfor:

private boolean keepConsuming = true;

public void consume() {
    while (keepConsuming) {
        final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
        if (consumerRecords != null && !consumerRecords.isEmpty()) {
            consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                System.out.println(consumerRecord.value());
            });
        }
    }
}

Denne koden vil fortsette å spørre om emnet. Når du mottar en forbrukerjournal, vil meldingen bli skrevet ut. Test ut forbrukeren din i aksjon ved å bruke en hovedmetode. Du starter et Java-program som vil fortsette å konsumere emnet og skrive ut meldingene. Stopp Java-applikasjonen for å avslutte forbrukeren.

package org.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimpleConsumer {

    private static final String OFFSET_RESET = "earliest";

    private final KafkaConsumer<String, String> consumer;
    private boolean keepConsuming = true;

    public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(List.of(topic));
    }

    public void consume() {
        while (keepConsuming) {
            final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
            if (consumerRecords != null && !consumerRecords.isEmpty()) {
                consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                    System.out.println(consumerRecord.value());
                });
            }
        }
    }

    public static void main(String[] args) {
        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", "9092", "transactions-consumer", "transactions");
        simpleConsumer.consume();
    }
}

Når du kjører koden, vil du se at den ikke bare bruker meldingen som er produsert av Java-produsenten, men også de du har produsert via konsollprodusenten. Dette er fordi AUTO_OFFSET_RESET_CONFIG-egenskapen er satt til tidligst.

Når SimpleConsumer kjører, kan du bruke konsollprodusenten eller SimpleProducer Java-applikasjonen til å produsere flere meldinger til emnet. Du vil se dem bli konsumert og skrevet ut på konsollen.

Oppfyll alle dine datapipelinebehov med Apache Kafka

Apache Kafka lar deg håndtere alle dine datapipeline-krav med letthet. Med Apache Kafka-oppsett på din lokale maskin kan du utforske alle de forskjellige funksjonene som Kafka tilbyr. I tillegg lar den offisielle Java-klienten deg skrive, koble til og kommunisere effektivt med Apache Kafka-serveren din.

Apache Kafka er et allsidig, skalerbart og høyytende datastrømningssystem, og kan virkelig være en game changer for deg. Du kan bruke den til din lokale utvikling eller til og med integrere den i produksjonssystemene dine. Akkurat som det er enkelt å sette opp lokalt, er det ingen stor oppgave å sette Apache Kafka for større applikasjoner.

Hvis du leter etter plattformer for datastrømming, kan du se på de beste strømmedataplattformene for sanntidsanalyse og -behandling.