Apache Kafka - Exemple de producteur simple

Créons une application pour publier et consommer des messages à l'aide d'un client Java. Le client producteur Kafka comprend les API suivantes.

API KafkaProducer

Laissez-nous comprendre l'ensemble le plus important d'API de producteur Kafka dans cette section. La partie centrale de l'API KafkaProducer est la classe KafkaProducer . La classe KafkaProducer fournit une option pour connecter un courtier Kafka dans son constructeur avec les méthodes suivantes.

  • La classe KafkaProducer fournit une méthode d'envoi pour envoyer des messages de manière asynchrone à une rubrique. La signature de send () est la suivante

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - Le producteur gère un buffer d'enregistrements en attente d'envoi.

  • Callback - Un rappel fourni par l'utilisateur à exécuter lorsque l'enregistrement a été acquitté par le serveur (null indique pas de rappel).

  • La classe KafkaProducer fournit une méthode de vidage pour garantir que tous les messages précédemment envoyés ont été effectivement terminés. La syntaxe de la méthode flush est la suivante -

public void flush()
  • La classe KafkaProducer fournit la méthode partitionFor, qui aide à obtenir les métadonnées de partition pour un sujet donné. Cela peut être utilisé pour le partitionnement personnalisé. La signature de cette méthode est la suivante -

public Map metrics()

Il renvoie la carte des métriques internes gérées par le producteur.

  • public void close () - La classe KafkaProducer fournit des blocs de méthode close jusqu'à ce que toutes les requêtes précédemment envoyées soient terminées.

API du producteur

La partie centrale de l'API Producer est la classe Producer . La classe Producer fournit une option pour connecter le courtier Kafka dans son constructeur par les méthodes suivantes.

La classe des producteurs

La classe de producteur fournit une méthode d'envoi à send messages à un ou plusieurs sujets en utilisant les signatures suivantes.

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

Il existe deux types de producteurs - Sync et Async.

La même configuration d'API s'applique également au producteur de synchronisation . La différence entre eux est qu'un producteur de synchronisation envoie des messages directement, mais envoie des messages en arrière-plan. Le producteur asynchrone est préférable lorsque vous souhaitez un débit plus élevé. Dans les versions précédentes comme 0.8, un producteur asynchrone n'a pas de rappel pour send () pour enregistrer les gestionnaires d'erreurs. Ceci n'est disponible que dans la version actuelle de 0.9.

public void close ()

La classe de producteur fournit close méthode pour fermer les connexions du pool de producteurs à tous les courtiers Kafka.

Paramètres de configuration

Les principaux paramètres de configuration de l'API Producer sont répertoriés dans le tableau suivant pour une meilleure compréhension -

S. Non Paramètres de configuration et description
1

client.id

identifie l'application du producteur

2

producer.type

soit sync ou async

3

acks

La configuration d'acks contrôle les critères sous les demandes des producteurs sont considérés comme complets.

4

retries

Si la demande du producteur échoue, réessayez automatiquement avec une valeur spécifique.

5

bootstrap.servers

liste de démarrage des courtiers.

6

linger.ms

si vous souhaitez réduire le nombre de requêtes, vous pouvez définir linger.ms sur une valeur supérieure à une certaine valeur.

sept

key.serializer

Clé de l'interface du sérialiseur.

8

value.serializer

valeur de l'interface du sérialiseur.

9

batch.size

Taille du tampon.

dix

buffer.memory

contrôle la quantité totale de mémoire disponible pour le producteur pour la mise en mémoire tampon.

API ProducerRecord

ProducerRecord est une paire clé / valeur envoyée au constructeur de classe cluster.ProducerRecord Kafka pour créer un enregistrement avec des paires partition, clé et valeur à l'aide de la signature suivante.

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic - nom de sujet défini par l'utilisateur qui sera ajouté à l'enregistrement.

  • Partition - nombre de partitions

  • Key - La clé qui sera incluse dans l'enregistrement.

  • Value - Enregistrer le contenu
public ProducerRecord (string topic, k key, v value)

Le constructeur de classe ProducerRecord est utilisé pour créer un enregistrement avec des paires clé, valeur et sans partition.

  • Topic - Créez un sujet pour attribuer un enregistrement.

  • Key - clé pour l'enregistrement.

  • Value - enregistrer le contenu.

public ProducerRecord (string topic, v value)

La classe ProducerRecord crée un enregistrement sans partition ni clé.

  • Topic - créer un sujet.

  • Value - enregistrer le contenu.

Les méthodes de la classe ProducerRecord sont répertoriées dans le tableau suivant -

S. Non Méthodes de classe et description
1

public string topic()

Le sujet s'ajoutera à l'enregistrement.

2

public K key()

Clé qui sera incluse dans le dossier. Si aucune clé de ce type, nul sera réactivé ici.

3

public V value()

Enregistrez le contenu.

4

partition()

Nombre de partitions pour l'enregistrement

Application SimpleProducer

Avant de créer l'application, démarrez d'abord ZooKeeper et le courtier Kafka, puis créez votre propre rubrique dans le courtier Kafka à l'aide de la commande create topic. Ensuite, créez une classe java nommée Sim-pleProducer.java et saisissez le codage suivant.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation - L'application peut être compilée à l'aide de la commande suivante.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution - L'application peut être exécutée à l'aide de la commande suivante.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

Exemple de consommateur simple

À partir de maintenant, nous avons créé un producteur pour envoyer des messages au cluster Kafka. Créons maintenant un consommateur pour consommer les messages du cluster Kafka. L'API KafkaConsumer est utilisée pour consommer les messages du cluster Kafka. Le constructeur de classe KafkaConsumer est défini ci-dessous.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - Renvoie une carte des configurations des consommateurs.

La classe KafkaConsumer possède les méthodes significatives suivantes qui sont répertoriées dans le tableau ci-dessous.

S. Non Méthode et description
1

public java.util.Set<TopicPar-tition> assignment()

Récupère l'ensemble des partitions actuellement assignées par le consommateur.

2

public string subscription()

Abonnez-vous à la liste de sujets donnée pour obtenir dynamiquement des partitions as-signées.

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

Abonnez-vous à la liste de sujets donnée pour obtenir dynamiquement des partitions as-signées.

4

public void unsubscribe()

Désabonnez les rubriques de la liste de partitions donnée.

5

public void sub-scribe(java.util.List<java.lang.String> topics)

Abonnez-vous à la liste de sujets donnée pour obtenir dynamiquement des partitions as-signées. Si la liste de sujets donnée est vide, elle est traitée de la même manière que unsubscribe ().

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

Le modèle d'argument fait référence au modèle d'abonnement au format d'expression régulière et l'argument d'écoute reçoit des notifications du modèle d'abonnement.

sept

public void as-sign(java.util.List<TopicParti-tion> partitions)

Attribuez manuellement une liste de partitions au client.

8

poll()

Récupérez les données des rubriques ou des partitions spécifiées à l'aide de l'une des API d'abonnement / d'attribution. Cela renverra une erreur, si les sujets ne sont pas abonnés avant l'interrogation des données.

9

public void commitSync()

Les décalages de validation renvoyés lors du dernier sondage () pour toutes les listes de sujets et de partitions sous-marquées. La même opération est appliquée à commitAsyn ().

dix

public void seek(TopicPartition partition, long offset)

Récupère la valeur de décalage actuelle que le consommateur utilisera lors de la prochaine méthode poll ().

11

public void resume()

Reprenez les partitions en pause.

12

public void wakeup()

Réveillez le consommateur.

API ConsumerRecord

L'API ConsumerRecord est utilisée pour recevoir des enregistrements du cluster Kafka. Cette API comprend un nom de rubrique, un numéro de partition, à partir de laquelle l'enregistrement est reçu et un décalage qui pointe vers l'enregistrement dans une partition Kafka. La classe ConsumerRecord est utilisée pour créer un enregistrement consommateur avec un nom de rubrique spécifique, un nombre de partitions et des paires <clé, valeur>. Il a la signature suivante.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic - Le nom de rubrique pour l'enregistrement consommateur reçu du cluster Kafka.

  • Partition - Partition pour le sujet.

  • Key - La clé de l'enregistrement, si aucune clé n'existe, null sera renvoyée.

  • Value - Enregistrer le contenu.

API ConsumerRecords

L'API ConsumerRecords agit comme un conteneur pour ConsumerRecord. Cette API est utilisée pour conserver la liste des ConsumerRecord par partition pour un sujet particulier. Son constructeur est défini ci-dessous.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - Retourne une carte de partition pour un sujet particulier.

  • Records - Retournez la liste de ConsumerRecord.

La classe ConsumerRecords a les méthodes suivantes définies.

S. Non Méthodes et description
1

public int count()

Le nombre d'enregistrements pour tous les sujets.

2

public Set partitions()

L'ensemble des partitions contenant des données dans cet ensemble d'enregistrements (si aucune donnée n'a été renvoyée, l'ensemble est vide).

3

public Iterator iterator()

Iterator vous permet de parcourir une collection, d'obtenir ou de déplacer des éléments.

4

public List records()

Obtenez la liste des enregistrements pour la partition donnée.

Paramètres de configuration

Les paramètres de configuration des principaux paramètres de configuration de l'API client client sont répertoriés ci-dessous:

S. Non Paramètres et description
1

bootstrap.servers

Liste de démarrage des courtiers.

2

group.id

Affecte un consommateur individuel à un groupe.

3

enable.auto.commit

Activez la validation automatique pour les décalages si la valeur est true, sinon non validée.

4

auto.commit.interval.ms

Renvoie la fréquence à laquelle les décalages consommés mis à jour sont écrits dans ZooKeeper.

5

session.timeout.ms

Indique combien de millisecondes Kafka attendra que le ZooKeeper réponde à une requête (lecture ou écriture) avant d'abandonner et de continuer à consommer des messages.

Application SimpleConsumer

Les étapes de candidature du producteur restent ici les mêmes. Tout d'abord, démarrez votre courtier ZooKeeper et Kafka. Créez ensuite une application SimpleConsumer avec la classe java nommée SimpleCon-sumer.java et tapez le code suivant.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation - L'application peut être compilée à l'aide de la commande suivante.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution − L'application peut être exécutée à l'aide de la commande suivante

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input- Ouvrez la CLI du producteur et envoyez des messages à la rubrique. Vous pouvez mettre l'entrée smple comme «Bonjour consommateur».

Output - Voici la sortie.

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer