Ένας οδηγός βήμα προς βήμα για τη ρύθμιση και την εκτέλεση

Εκατομμύρια εγγραφές δεδομένων παράγονται κάθε μέρα στα σημερινά υπολογιστικά συστήματα. Αυτές περιλαμβάνουν τις οικονομικές σας συναλλαγές, την υποβολή παραγγελίας ή δεδομένα από τον αισθητήρα του αυτοκινήτου σας. Για να επεξεργαστείτε αυτά τα συμβάντα ροής δεδομένων σε πραγματικό χρόνο και να μετακινήσετε αξιόπιστα τις εγγραφές συμβάντων μεταξύ διαφορετικών εταιρικών συστημάτων, χρειάζεστε Απάτσι Κάφκα.
Το Apache Kafka είναι μια λύση ροής δεδομένων ανοιχτού κώδικα που χειρίζεται πάνω από 1 εκατομμύριο εγγραφές ανά δευτερόλεπτο. Παράλληλα με αυτή την υψηλή απόδοση, το Apache Kafka παρέχει υψηλή επεκτασιμότητα και διαθεσιμότητα, χαμηλή καθυστέρηση και μόνιμη αποθήκευση.
Εταιρείες όπως το LinkedIn, η Uber και το Netflix βασίζονται στον Apache Kafka για επεξεργασία σε πραγματικό χρόνο και ροή δεδομένων. Ο ευκολότερος τρόπος για να ξεκινήσετε με το Apache Kafka είναι να το έχετε σε λειτουργία στον τοπικό σας υπολογιστή. Αυτό σας επιτρέπει όχι μόνο να βλέπετε τον διακομιστή Apache Kafka σε δράση, αλλά επίσης σας επιτρέπει να παράγετε και να καταναλώνετε μηνύματα.
Με πρακτική εμπειρία στην εκκίνηση του διακομιστή, τη δημιουργία θεμάτων και τη σύνταξη κώδικα Java χρησιμοποιώντας τον πελάτη Kafka, θα είστε έτοιμοι να χρησιμοποιήσετε το Apache Kafka για να εκπληρώσετε όλες τις ανάγκες σας για τη διοχέτευση δεδομένων.
Πίνακας περιεχομένων
Πώς να κατεβάσετε το Apache Kafka στον τοπικό σας υπολογιστή
Μπορείτε να κατεβάσετε την πιο πρόσφατη έκδοση του Apache Kafka από το επίσημος σύνδεσμος. Το περιεχόμενο που έχετε λάβει θα συμπιεστεί σε μορφή .tgz. Μετά τη λήψη, θα πρέπει να εξαγάγετε το ίδιο.
Εάν είστε Linux, ανοίξτε το τερματικό σας. Στη συνέχεια, μεταβείτε στην τοποθεσία όπου έχετε κατεβάσει τη συμπιεσμένη έκδοση του Apache Kafka. Εκτελέστε την ακόλουθη εντολή:
tar -xzvf kafka_2.13-3.5.0.tgz
Αφού ολοκληρωθεί η εντολή, θα βρείτε ότι ένας νέος κατάλογος που ονομάζεται kafka_2.13-3.5.0. Πλοηγηθείτε μέσα στο φάκελο χρησιμοποιώντας:
cd kafka_2.13-3.5.0
Τώρα μπορείτε να απαριθμήσετε τα περιεχόμενα αυτού του καταλόγου χρησιμοποιώντας την εντολή ls.
Για χρήστες Windows, μπορείτε να ακολουθήσετε τα ίδια βήματα. Εάν δεν μπορείτε να βρείτε την εντολή tar, μπορείτε να χρησιμοποιήσετε ένα εργαλείο τρίτου κατασκευαστή όπως το WinZip για να ανοίξετε το αρχείο.
Πώς να ξεκινήσετε το Apache Kafka στον τοπικό σας υπολογιστή
Αφού κατεβάσετε και εξαγάγετε το Apache Kafka, ήρθε η ώρα να ξεκινήσετε να το εκτελείτε. Δεν έχει κανένα πρόγραμμα εγκατάστασης. Μπορείτε να αρχίσετε να το χρησιμοποιείτε απευθείας μέσω της γραμμής εντολών ή του παραθύρου του τερματικού σας.
Πριν ξεκινήσετε με το Apache Kafka, βεβαιωθείτε ότι έχετε εγκαταστήσει την Java 8+ στο σύστημά σας. Το Apache Kafka απαιτεί εγκατάσταση Java που εκτελείται.
#1. Εκτελέστε τον διακομιστή Apache Zookeeper
Το πρώτο βήμα είναι η εκτέλεση του Apache Zookeeper. Το λαμβάνετε προκατεβασμένο ως μέρος του αρχείου. Είναι μια υπηρεσία που είναι υπεύθυνη για τη διατήρηση των διαμορφώσεων και την παροχή συγχρονισμού για άλλες υπηρεσίες.
Μόλις εισέλθετε στον κατάλογο όπου έχετε εξαγάγει τα περιεχόμενα του αρχείου, εκτελέστε την ακόλουθη εντολή:
Για χρήστες Linux:
bin/zookeeper-server-start.sh config/zookeeper.properties
Για χρήστες Windows:
bin/windows/zookeeper-server-start.bat config/zookeeper.properties
Το αρχείο zookeeper.properties παρέχει τις διαμορφώσεις για την εκτέλεση του διακομιστή Apache Zookeeper. Μπορείτε να διαμορφώσετε ιδιότητες όπως τον τοπικό κατάλογο όπου θα αποθηκεύονται τα δεδομένα και τη θύρα στην οποία θα εκτελείται ο διακομιστής.
#2. Εκκινήστε τον διακομιστή Apache Kafka
Τώρα που ξεκίνησε ο διακομιστής Apache Zookeeper, ήρθε η ώρα να ξεκινήσετε τον διακομιστή Apache Kafka.
Ανοίξτε ένα νέο τερματικό ή παράθυρο γραμμής εντολών και μεταβείτε στον κατάλογο όπου υπάρχουν τα εξαγόμενα αρχεία. Στη συνέχεια, μπορείτε να ξεκινήσετε τον διακομιστή Apache Kafka χρησιμοποιώντας την παρακάτω εντολή:
Για χρήστες Linux:
bin/kafka-server-start.sh config/server.properties
Για χρήστες Windows:
bin/windows/kafka-server-start.bat config/server.properties
Έχετε τον διακομιστή σας Apache Kafka σε λειτουργία. Σε περίπτωση που θέλετε να αλλάξετε την προεπιλεγμένη διαμόρφωση, μπορείτε να το κάνετε τροποποιώντας το αρχείο server.properties. Οι διαφορετικές τιμές υπάρχουν στο επίσημη τεκμηρίωση.
Πώς να χρησιμοποιήσετε το Apache Kafka στον τοπικό σας υπολογιστή
Τώρα είστε έτοιμοι να αρχίσετε να χρησιμοποιείτε το Apache Kafka στον τοπικό σας υπολογιστή για την παραγωγή και την κατανάλωση μηνυμάτων. Εφόσον οι διακομιστές Apache Zookeeper και Apache Kafka είναι έτοιμοι και λειτουργούν, ας δούμε πώς μπορείτε να δημιουργήσετε το πρώτο σας θέμα, να δημιουργήσετε το πρώτο σας μήνυμα και να καταναλώσετε το ίδιο.
Ποια είναι τα βήματα για να δημιουργήσετε ένα θέμα στον Apache Kafka;
Πριν δημιουργήσετε το πρώτο σας θέμα, ας καταλάβουμε τι είναι στην πραγματικότητα ένα θέμα. Στο Apache Kafka, ένα θέμα είναι μια λογική αποθήκευση δεδομένων που βοηθά στη ροή δεδομένων. Σκεφτείτε το ως το κανάλι μέσω του οποίου τα δεδομένα μεταφέρονται από το ένα στοιχείο στο άλλο.
Ένα θέμα υποστηρίζει πολλούς παραγωγούς και πολλούς καταναλωτές – περισσότερα από ένα συστήματα μπορούν να γράψουν και να διαβάσουν από ένα θέμα. Σε αντίθεση με άλλα συστήματα ανταλλαγής μηνυμάτων, οποιοδήποτε μήνυμα από ένα θέμα μπορεί να καταναλωθεί περισσότερες από μία φορές. Επιπλέον, μπορείτε επίσης να αναφέρετε την περίοδο διατήρησης των μηνυμάτων σας.
Ας πάρουμε το παράδειγμα ενός συστήματος (παραγωγού) που παράγει δεδομένα για τραπεζικές συναλλαγές. Και ένα άλλο σύστημα (καταναλωτής) καταναλώνει αυτά τα δεδομένα και στέλνει μια ειδοποίηση εφαρμογής στον χρήστη. Για να διευκολυνθεί αυτό, απαιτείται ένα θέμα.
Ανοίξτε ένα νέο τερματικό ή παράθυρο γραμμής εντολών και μεταβείτε στον κατάλογο από τον οποίο έχετε εξαγάγει το αρχείο. Η ακόλουθη εντολή θα δημιουργήσει ένα θέμα που ονομάζεται συναλλαγές:
Για χρήστες Linux:
bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092
Για χρήστες Windows:
bin/windows/kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092
Τώρα δημιουργήσατε το πρώτο σας θέμα και είστε έτοιμοι να ξεκινήσετε την παραγωγή και την κατανάλωση μηνυμάτων.
Πώς να δημιουργήσετε ένα μήνυμα στον Απάτσι Κάφκα;
Έχοντας έτοιμο το θέμα του Apache Kafka, μπορείτε τώρα να δημιουργήσετε το πρώτο σας μήνυμα. Ανοίξτε ένα νέο τερματικό ή παράθυρο γραμμής εντολών ή χρησιμοποιήστε το ίδιο που χρησιμοποιήσατε για να δημιουργήσετε το θέμα. Στη συνέχεια, βεβαιωθείτε ότι βρίσκεστε στον κατάλληλο κατάλογο όπου έχετε εξαγάγει τα περιεχόμενα του αρχείου. Μπορείτε να χρησιμοποιήσετε τη γραμμή εντολών για να δημιουργήσετε το μήνυμά σας σχετικά με το θέμα χρησιμοποιώντας την ακόλουθη εντολή:
Για χρήστες Linux:
bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092
Για χρήστες Windows:
bin/windows/kafka-console-producer.bat --topic transactions --bootstrap-server localhost:9092
Μόλις εκτελέσετε την εντολή, θα δείτε ότι το τερματικό ή το παράθυρο της γραμμής εντολών σας περιμένει για εισαγωγή. Γράψτε το πρώτο σας μήνυμα και πατήστε Enter.
> This is a transactional record for $100
Δημιουργήσατε το πρώτο σας μήνυμα στον Apache Kafka στον τοπικό σας υπολογιστή. Στη συνέχεια, είστε πλέον έτοιμοι να καταναλώσετε αυτό το μήνυμα.
Πώς να καταναλώσετε ένα μήνυμα από τον Απάτσι Κάφκα;
Με την προϋπόθεση ότι το θέμα σας έχει δημιουργηθεί και έχετε δημιουργήσει ένα μήνυμα στο θέμα Κάφκα, μπορείτε τώρα να καταναλώσετε αυτό το μήνυμα.
Το Apache Kafka σάς επιτρέπει να συνδέσετε πολλούς καταναλωτές στο ίδιο θέμα. Κάθε καταναλωτής μπορεί να είναι μέρος μιας ομάδας καταναλωτών – ένα λογικό αναγνωριστικό. Για παράδειγμα, εάν έχετε δύο υπηρεσίες που πρέπει να καταναλώνουν τα ίδια δεδομένα, τότε μπορούν να έχουν διαφορετικές ομάδες καταναλωτών.
Ωστόσο, εάν έχετε δύο παρουσίες της ίδιας υπηρεσίας, τότε θα θέλατε να αποφύγετε την κατανάλωση και την επεξεργασία του ίδιου μηνύματος δύο φορές. Σε αυτήν την περίπτωση, και οι δύο θα έχουν την ίδια ομάδα καταναλωτών.
Στο παράθυρο του τερματικού ή της γραμμής εντολών, βεβαιωθείτε ότι βρίσκεστε στον κατάλληλο κατάλογο. Χρησιμοποιήστε την ακόλουθη εντολή για να ξεκινήσετε τον καταναλωτή:
Για χρήστες Linux:
bin/kafka-console-consumer.sh --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer
Για χρήστες Windows:
bin/windows/kafka-console-consumer.bat --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer
Θα δείτε το μήνυμα που έχετε δημιουργήσει προηγουμένως να εμφανίζεται στο τερματικό σας. Τώρα έχετε χρησιμοποιήσει το Apache Kafka για να καταναλώσετε το πρώτο σας μήνυμα.
Η εντολή kafka-console-consumer περιλαμβάνει πολλά επιχειρήματα. Ας δούμε τι σημαίνει καθένα από αυτά:
- Το –topic αναφέρει το θέμα από όπου θα καταναλώνετε
- –από την αρχή λέει στον καταναλωτή της κονσόλας να αρχίσει να διαβάζει μηνύματα από το πρώτο μήνυμα που θα παρουσιαστεί
- Ο διακομιστής σας Apache Kafka αναφέρεται μέσω της επιλογής –bootstrap-server
- Επιπλέον, μπορείτε να αναφέρετε την ομάδα καταναλωτών περνώντας την παράμετρο –group
- Ελλείψει παραμέτρου ομάδας καταναλωτών, δημιουργείται αυτόματα
Με τον καταναλωτή της κονσόλας σε λειτουργία, μπορείτε να δοκιμάσετε να δημιουργήσετε νέα μηνύματα. Θα δείτε ότι όλα καταναλώνονται και εμφανίζονται στο τερματικό σας.
Τώρα που δημιουργήσατε το θέμα σας και δημιουργήσατε και καταναλώσατε με επιτυχία μηνύματα, ας το ενσωματώσουμε σε μια εφαρμογή Java.
Πώς να δημιουργήσετε παραγωγό και καταναλωτή Apache Kafka χρησιμοποιώντας Java
Πριν ξεκινήσετε, βεβαιωθείτε ότι έχετε εγκαταστήσει την Java 8+ στον τοπικό σας υπολογιστή. Το Apache Kafka παρέχει τη δική του βιβλιοθήκη πελατών που σας επιτρέπει να συνδέεστε απρόσκοπτα. Εάν χρησιμοποιείτε το Maven για να διαχειριστείτε τις εξαρτήσεις σας, προσθέστε την παρακάτω εξάρτηση στο pom.xml
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.5.0</version> </dependency>
Μπορείτε επίσης να κατεβάσετε τη βιβλιοθήκη από το Αποθετήριο Maven και προσθέστε το στο Java classpath σας.
Μόλις εγκατασταθεί η βιβλιοθήκη σας, ανοίξτε έναν επεξεργαστή κώδικα της επιλογής σας. Ας δούμε πώς μπορείτε να ξεκινήσετε τον παραγωγό και τον καταναλωτή σας χρησιμοποιώντας Java.
Δημιουργήστε παραγωγό Java Apache Kafka
Με τη βιβλιοθήκη kafka-clients στη θέση του, είστε πλέον έτοιμοι να ξεκινήσετε τη δημιουργία του παραγωγού Kafka σας.
Ας δημιουργήσουμε μια κλάση που ονομάζεται SimpleProducer.java. Αυτό θα είναι υπεύθυνο για την παραγωγή μηνυμάτων σχετικά με το θέμα που δημιουργήσατε νωρίτερα. Μέσα σε αυτήν την κλάση, θα δημιουργήσετε μια παρουσία του org.apache.kafka.clients.producer.KafkaProducer. Στη συνέχεια, θα χρησιμοποιήσετε αυτόν τον παραγωγό για να στείλετε τα μηνύματά σας.
Για τη δημιουργία του παραγωγού Kafka, χρειάζεστε τον κεντρικό υπολογιστή και τη θύρα του διακομιστή σας Apache Kafka. Εφόσον το εκτελείτε στον τοπικό σας υπολογιστή, ο κεντρικός υπολογιστής θα είναι localhost. Δεδομένου ότι δεν έχετε αλλάξει τις προεπιλεγμένες ιδιότητες κατά την εκκίνηση του διακομιστή, η θύρα θα είναι 9092. Σκεφτείτε τον παρακάτω κώδικα που θα σας βοηθήσει να δημιουργήσετε τον παραγωγό σας:
package org.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SimpleProducer { private final KafkaProducer<String, String> producer; public SimpleProducer(String host, String port) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); this.producer = new KafkaProducer<>(properties); } }
Θα παρατηρήσετε ότι υπάρχουν τρεις ιδιότητες που ορίζονται. Ας δούμε γρήγορα το καθένα από αυτά:
- Το BOOTSTRAP_SERVERS_CONFIG σάς επιτρέπει να ορίσετε πού εκτελείται ο διακομιστής Apache Kafka
- Το KEY_SERIALIZER_CLASS_CONFIG λέει στον παραγωγό ποια μορφή να χρησιμοποιήσει για την αποστολή των κλειδιών μηνυμάτων.
- Η μορφή για την αποστολή του πραγματικού μηνύματος ορίζεται χρησιμοποιώντας την ιδιότητα VALUE_SERIALIZER_CLASS_CONFIG.
Εφόσον θα στέλνετε μηνύματα κειμένου, και οι δύο ιδιότητες έχουν ρυθμιστεί να χρησιμοποιούν το StringSerializer.class.
Για να στείλετε πραγματικά ένα μήνυμα στο θέμα σας, πρέπει να χρησιμοποιήσετε τη μέθοδο producer.send() που λαμβάνει ένα ProducerRecord. Ο παρακάτω κώδικας σάς δίνει μια μέθοδο που θα στείλει ένα μήνυμα στο θέμα και θα εκτυπώσει την απάντηση μαζί με τη μετατόπιση του μηνύματος.
public void produce(String topic, String message) throws ExecutionException, InterruptedException { ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); final Future<RecordMetadata> send = this.producer.send(record); final RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata); }
Με ολόκληρο τον κώδικα στη θέση του, μπορείτε πλέον να στέλνετε μηνύματα στο θέμα σας. Μπορείτε να χρησιμοποιήσετε μια κύρια μέθοδο για να το δοκιμάσετε, όπως παρουσιάζεται στον παρακάτω κώδικα:
package org.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SimpleProducer { private final KafkaProducer<String, String> producer; public SimpleProducer(String host, String port) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); this.producer = new KafkaProducer<>(properties); } public void produce(String topic, String message) throws ExecutionException, InterruptedException { ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); final Future<RecordMetadata> send = this.producer.send(record); final RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata); } public static void main(String[] args) throws Exception{ SimpleProducer producer = new SimpleProducer("localhost", "9092"); producer.produce("transactions", "This is a transactional record of $200"); } }
Σε αυτόν τον κώδικα, δημιουργείτε έναν SimpleProducer που συνδέεται με τον διακομιστή Apache Kafka στον τοπικό σας υπολογιστή. Χρησιμοποιεί εσωτερικά τον KafkaProducer για την παραγωγή μηνυμάτων κειμένου για το θέμα σας.
Δημιουργήστε καταναλωτή Java Apache Kafka
Ήρθε η ώρα να δημιουργήσετε έναν καταναλωτή Apache Kafka χρησιμοποιώντας τον πελάτη Java. Δημιουργήστε μια κλάση που ονομάζεται SimpleConsumer.java. Στη συνέχεια, θα δημιουργήσετε έναν κατασκευαστή για αυτήν την κλάση, ο οποίος αρχικοποιεί το org.apache.kafka.clients.consumer.KafkaConsumer. Για τη δημιουργία του καταναλωτή, χρειάζεστε τον κεντρικό υπολογιστή και τη θύρα όπου εκτελείται ο διακομιστής Apache Kafka. Επιπλέον, χρειάζεστε την Ομάδα Καταναλωτών καθώς και το θέμα από το οποίο θέλετε να καταναλώσετε. Χρησιμοποιήστε το απόσπασμα κώδικα που δίνεται παρακάτω:
package org.example.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; public class SimpleConsumer { private static final String OFFSET_RESET = "earliest"; private final KafkaConsumer<String, String> consumer; private boolean keepConsuming = true; public SimpleConsumer(String host, String port, String consumerGroupId, String topic) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<>(properties); this.consumer.subscribe(List.of(topic)); } }
Παρόμοια με τον Παραγωγό Κάφκα, ο Καταναλωτής Κάφκα δέχεται επίσης ένα αντικείμενο Ιδιοτήτων. Ας δούμε όλα τα διαφορετικά σετ ιδιοτήτων:
- Το BOOTSTRAP_SERVERS_CONFIG ενημερώνει τον καταναλωτή πού εκτελείται ο διακομιστής Apache Kafka
- Η ομάδα καταναλωτών αναφέρεται χρησιμοποιώντας το GROUP_ID_CONFIG
- Όταν ο καταναλωτής αρχίσει να καταναλώνει, το AUTO_OFFSET_RESET_CONFIG σάς επιτρέπει να αναφέρετε πόσο πίσω θέλετε να αρχίσετε να καταναλώνετε μηνύματα από
- Το KEY_DESERIALIZER_CLASS_CONFIG λέει στον καταναλωτή τον τύπο του κλειδιού μηνύματος
- Το VALUE_DESERIALIZER_CLASS_CONFIG λέει στον τύπο του καταναλωτή το πραγματικό μήνυμα
Εφόσον, στην περίπτωσή σας, θα καταναλώνετε μηνύματα κειμένου, οι ιδιότητες του deserializer έχουν οριστεί σε StringDeserializer.class.
Τώρα θα καταναλώνετε τα μηνύματα από το θέμα σας. Για να παραμείνουν απλά τα πράγματα, μόλις καταναλωθεί το μήνυμα, θα εκτυπώσετε το μήνυμα στην κονσόλα. Ας δούμε πώς μπορείτε να το πετύχετε χρησιμοποιώντας τον παρακάτω κώδικα:
private boolean keepConsuming = true; public void consume() { while (keepConsuming) { final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L)); if (consumerRecords != null && !consumerRecords.isEmpty()) { consumerRecords.iterator().forEachRemaining(consumerRecord -> { System.out.println(consumerRecord.value()); }); } } }
Αυτός ο κώδικας θα συνεχίσει να ψηφίζει το θέμα. Όταν λάβετε οποιοδήποτε αρχείο καταναλωτή, το μήνυμα θα εκτυπωθεί. Δοκιμάστε τον καταναλωτή σας στη δράση χρησιμοποιώντας μια κύρια μέθοδο. Θα ξεκινήσετε μια εφαρμογή Java που θα συνεχίσει να καταναλώνει το θέμα και να εκτυπώνει τα μηνύματα. Διακόψτε την εφαρμογή Java για να τερματίσετε τον καταναλωτή.
package org.example.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; public class SimpleConsumer { private static final String OFFSET_RESET = "earliest"; private final KafkaConsumer<String, String> consumer; private boolean keepConsuming = true; public SimpleConsumer(String host, String port, String consumerGroupId, String topic) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<>(properties); this.consumer.subscribe(List.of(topic)); } public void consume() { while (keepConsuming) { final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L)); if (consumerRecords != null && !consumerRecords.isEmpty()) { consumerRecords.iterator().forEachRemaining(consumerRecord -> { System.out.println(consumerRecord.value()); }); } } } public static void main(String[] args) { SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", "9092", "transactions-consumer", "transactions"); simpleConsumer.consume(); } }
Όταν εκτελείτε τον κώδικα, θα παρατηρήσετε ότι όχι μόνο καταναλώνει το μήνυμα που παράγεται από τον παραγωγό Java, αλλά και αυτά που έχετε δημιουργήσει μέσω του Παραγωγού Κονσόλας. Αυτό οφείλεται στο γεγονός ότι η ιδιότητα AUTO_OFFSET_RESET_CONFIG έχει οριστεί ως η παλαιότερη.
Με την εκτέλεση του SimpleConsumer, μπορείτε να χρησιμοποιήσετε τον κατασκευαστή κονσόλας ή την εφαρμογή Java SimpleProducer για να δημιουργήσετε περαιτέρω μηνύματα στο θέμα. Θα δείτε να καταναλώνονται και να εκτυπώνονται στην κονσόλα.
Καλύψτε όλες τις ανάγκες σας για τη διοχέτευση δεδομένων με το Apache Kafka
Το Apache Kafka σάς επιτρέπει να χειρίζεστε με ευκολία όλες τις απαιτήσεις σας για τη διοχέτευση δεδομένων. Με τη ρύθμιση του Apache Kafka στον τοπικό σας υπολογιστή, μπορείτε να εξερευνήσετε όλες τις διαφορετικές δυνατότητες που παρέχει ο Kafka. Επιπλέον, ο επίσημος πελάτης Java σάς επιτρέπει να γράφετε, να συνδέεστε και να επικοινωνείτε αποτελεσματικά με τον διακομιστή σας Apache Kafka.
Όντας ένα ευέλικτο, επεκτάσιμο και υψηλής απόδοσης σύστημα ροής δεδομένων, το Apache Kafka μπορεί πραγματικά να αλλάξει το παιχνίδι για εσάς. Μπορείτε να το χρησιμοποιήσετε για την τοπική σας ανάπτυξη ή ακόμα και να το ενσωματώσετε στα συστήματα παραγωγής σας. Ακριβώς όπως είναι εύκολο να ρυθμίσετε τοπικά, έτσι και η ρύθμιση του Apache Kafka για μεγαλύτερες εφαρμογές δεν είναι μεγάλη δουλειά.
Αν αναζητάτε πλατφόρμες ροής δεδομένων, μπορείτε να δείτε τις καλύτερες πλατφόρμες δεδομένων ροής για ανάλυση και επεξεργασία σε πραγματικό χρόνο.