Apache Kafka - Exemple de groupe de consommateurs

Le groupe de consommateurs est une consommation multi-thread ou multi-machine à partir de sujets Kafka.

Groupe de consommateurs

  • Les consommateurs peuvent rejoindre un groupe en utilisant le même group.id.

  • Le parallélisme maximum d'un groupe est que le nombre de consommateurs dans le groupe ← no de partitions.

  • Kafka affecte les partitions d'une rubrique au consommateur d'un groupe, de sorte que chaque partition soit consommée par exactement un consommateur du groupe.

  • Kafka garantit qu'un message n'est lu que par un seul consommateur du groupe.

  • Les consommateurs peuvent voir le message dans l'ordre dans lequel ils ont été stockés dans le journal.

Rééquilibrage d'un consommateur

L'ajout de plus de processus / threads entraînera le rééquilibrage de Kafka. Si un consommateur ou un courtier ne parvient pas à envoyer la pulsation à ZooKeeper, il peut être reconfiguré via le cluster Kafka. Pendant ce rééquilibrage, Kafka attribuera des partitions disponibles aux threads disponibles, déplaçant éventuellement une partition vers un autre processus.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

Compilation

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Exécution

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

Ici, nous avons créé un exemple de nom de groupe en tant que mon-groupe avec deux consommateurs. De même, vous pouvez créer votre groupe et le nombre de consommateurs dans le groupe.

Contribution

Ouvrez la CLI du producteur et envoyez des messages comme -

Test consumer group 01
Test consumer group 02

Résultat du premier processus

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

Résultat du deuxième processus

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

J'espère que vous auriez compris SimpleConsumer et ConsumeGroup en utilisant la démo du client Java. Vous avez maintenant une idée de la façon d'envoyer et de recevoir des messages à l'aide d'un client Java. Continuons l'intégration de Kafka avec les technologies Big Data dans le chapitre suivant.