Apache Storm - Trident

Trident est une extension de Storm. Comme Storm, Trident a également été développé par Twitter. La principale raison derrière le développement de Trident est de fournir une abstraction de haut niveau au-dessus de Storm avec un traitement de flux avec état et des requêtes distribuées à faible latence.

Trident utilise le bec et le boulon, mais ces composants de bas niveau sont générés automatiquement par Trident avant l'exécution. Trident a des fonctions, des filtres, des jointures, des regroupements et des agrégations.

Trident traite les flux comme une série de lots appelés transactions. Généralement, la taille de ces petits lots sera de l'ordre de milliers ou millions de tuples, selon le flux d'entrée. De cette façon, Trident est différent de Storm, qui effectue un traitement tuple par tuple.

Le concept de traitement par lots est très similaire aux transactions de base de données. Chaque transaction se voit attribuer un identifiant de transaction. La transaction est considérée comme réussie, une fois que tout son traitement est terminé. Cependant, un échec dans le traitement de l'un des tuples de la transaction entraînera la retransmission de l'ensemble de la transaction. Pour chaque lot, Trident appellera beginCommit au début de la transaction et validera à la fin de celle-ci.

Topologie Trident

L'API Trident présente une option simple pour créer une topologie Trident à l'aide de la classe «TridentTopology». Fondamentalement, la topologie Trident reçoit le flux d'entrée du bec verseur et effectue une séquence d'opérations ordonnée (filtre, agrégation, regroupement, etc.) sur le flux. Storm Tuple est remplacé par Trident Tuple et Bolts sont remplacés par des opérations. Une topologie Trident simple peut être créée comme suit -

TridentTopology topology = new TridentTopology();

Tuples Trident

Le tuple trident est une liste nommée de valeurs. L'interface TridentTuple est le modèle de données d'une topologie Trident. L'interface TridentTuple est l'unité de base de données qui peut être traitée par une topologie Trident.

Bec Trident

Le bec Trident est similaire au bec Storm, avec des options supplémentaires pour utiliser les fonctionnalités de Trident. En fait, nous pouvons toujours utiliser IRichSpout, que nous avons utilisé dans la topologie Storm, mais ce sera de nature non transactionnelle et nous ne pourrons pas utiliser les avantages fournis par Trident.

Le bec de base ayant toutes les fonctionnalités pour utiliser les fonctionnalités de Trident est "ITridentSpout". Il prend en charge la sémantique transactionnelle transactionnelle et opaque. Les autres spouts sont IBatchSpout, IPartitionedTridentSpout et IOpaquePartitionedTridentSpout.

En plus de ces becs génériques, Trident a de nombreux exemples d'implémentation de bec trident. L'un d'eux est le bec FeederBatchSpout, que nous pouvons utiliser pour envoyer facilement une liste nommée de tuples trident sans se soucier du traitement par lots, du parallélisme, etc.

La création et l'alimentation des données de FeederBatchSpout peuvent être effectuées comme indiqué ci-dessous -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Opérations Trident

Trident s'appuie sur «l'opération Trident» pour traiter le flux d'entrée des tuples trident. L'API Trident dispose d'un certain nombre d'opérations intégrées pour gérer le traitement de flux simple à complexe. Ces opérations vont de la simple validation au regroupement et à l'agrégation complexes de tuples de trident. Passons en revue les opérations les plus importantes et les plus fréquemment utilisées.

Filtre

Le filtre est un objet utilisé pour effectuer la tâche de validation d'entrée. Un filtre Trident obtient un sous-ensemble de champs de tuple trident en entrée et renvoie true ou false selon que certaines conditions sont satisfaites ou non. Si true est renvoyé, le tuple est conservé dans le flux de sortie; sinon, le tuple est supprimé du flux. Le filtre héritera essentiellement de laBaseFilter classe et implémentez le isKeepméthode. Voici un exemple d'implémentation de l'opération de filtrage -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

La fonction de filtrage peut être appelée dans la topologie en utilisant la méthode «each». La classe «Fields» peut être utilisée pour spécifier l'entrée (sous-ensemble du tuple trident). L'exemple de code est le suivant -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Fonction

Functionest un objet utilisé pour effectuer une opération simple sur un seul tuple trident. Il prend un sous-ensemble de champs de tuple trident et émet zéro ou plusieurs nouveaux champs de tuple trident.

Function hérite essentiellement de la BaseFunction classe et implémente le executeméthode. Un exemple d'implémentation est donné ci-dessous -

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Tout comme l'opération de filtrage, l'opération de fonction peut être appelée dans une topologie à l'aide du eachméthode. L'exemple de code est le suivant -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Agrégation

L'agrégation est un objet utilisé pour effectuer des opérations d'agrégation sur un lot d'entrée, une partition ou un flux. Trident a trois types d'agrégation. Ils sont les suivants -

  • aggregate- Agrège chaque lot de tuple trident de manière isolée. Pendant le processus d'agrégation, les tuples sont initialement repartitionnés à l'aide du regroupement global pour combiner toutes les partitions du même lot en une seule partition.

  • partitionAggregate- Agrège chaque partition au lieu du lot entier de tuple trident. La sortie de l'agrégat de partition remplace complètement le tuple d'entrée. La sortie de l'agrégat de partition contient un seul tuple de champ.

  • persistentaggregate - Agrège sur tous les tuple trident dans tous les lots et stocke le résultat dans la mémoire ou la base de données.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

L'opération d'agrégation peut être créée à l'aide de CombinerAggregator, ReducerAggregator ou de l'interface d'agrégation générique. L'agrégateur "count" utilisé dans l'exemple ci-dessus est l'un des agrégateurs intégrés. Il est implémenté à l'aide de "CombinerAggregator". La mise en œuvre est la suivante:

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Regroupement

L'opération de regroupement est une opération intégrée et peut être appelée par le groupByméthode. La méthode groupBy repartitionne le flux en effectuant un partitionBy sur les champs spécifiés, puis dans chaque partition, elle regroupe les tuples dont les champs de groupe sont égaux. Normalement, nous utilisons «groupBy» avec «persistentAggregate» pour obtenir l'agrégation groupée. L'exemple de code est le suivant -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Fusionner et rejoindre

La fusion et la jointure peuvent être effectuées en utilisant respectivement les méthodes «merge» et «join». La fusion combine un ou plusieurs flux. La jointure est similaire à la fusion, sauf le fait que la jointure utilise un champ de tuple trident des deux côtés pour vérifier et joindre deux flux. De plus, la jonction fonctionnera uniquement au niveau du lot. L'exemple de code est le suivant -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Maintenance de l'état

Trident fournit un mécanisme de maintenance de l'état. Les informations d'état peuvent être stockées dans la topologie elle-même, sinon vous pouvez également les stocker dans une base de données distincte. La raison est de maintenir un état selon lequel si un tuple échoue pendant le traitement, le tuple échoué est retenté. Cela crée un problème lors de la mise à jour de l'état car vous ne savez pas si l'état de ce tuple a été mis à jour précédemment ou non. Si le tuple a échoué avant la mise à jour de l'état, réessayer le tuple rendra l'état stable. Cependant, si le tuple a échoué après la mise à jour de l'état, réessayer le même tuple augmentera à nouveau le nombre dans la base de données et rendra l'état instable. Il faut effectuer les étapes suivantes pour s'assurer qu'un message n'est traité qu'une seule fois -

  • Traitez les tuples par petits lots.

  • Attribuez un ID unique à chaque lot. Si le lot est réessayé, il reçoit le même ID unique.

  • Les mises à jour d'état sont classées parmi les lots. Par exemple, la mise à jour de l'état du deuxième lot ne sera pas possible tant que la mise à jour de l'état du premier lot ne sera pas terminée.

RPC distribué

Distributed RPC est utilisé pour interroger et récupérer le résultat de la topologie Trident. Storm a un serveur RPC distribué intégré. Le serveur RPC distribué reçoit la demande RPC du client et la transmet à la topologie. La topologie traite la demande et envoie le résultat au serveur RPC distribué, qui est redirigé par le serveur RPC distribué vers le client. La requête RPC distribuée de Trident s'exécute comme une requête RPC normale, à l'exception du fait que ces requêtes sont exécutées en parallèle.

Quand utiliser Trident?

Comme dans de nombreux cas d'utilisation, si l'exigence est de traiter une requête une seule fois, nous pouvons y parvenir en écrivant une topologie dans Trident. En revanche, il sera difficile de réaliser exactement une fois le traitement dans le cas de Storm. Par conséquent, Trident sera utile pour les cas d'utilisation où vous avez besoin d'un seul traitement. Trident n'est pas pour tous les cas d'utilisation, en particulier les cas d'utilisation haute performance, car il ajoute de la complexité à Storm et gère l'état.

Exemple de travail de Trident

Nous allons convertir notre application d'analyse du journal des appels élaborée dans la section précédente en framework Trident. L'application Trident sera relativement facile par rapport à plain storm, grâce à son API de haut niveau. Storm devra essentiellement effectuer l'une des opérations Function, Filter, Aggregate, GroupBy, Join et Merge dans Trident. Enfin, nous allons démarrer le serveur DRPC en utilisant leLocalDRPC class et recherchez un mot-clé en utilisant le execute méthode de la classe LocalDRPC.

Formatage des informations d'appel

Le but de la classe FormatCall est de formater les informations d'appel comprenant le «numéro de l'appelant» et le «numéro du récepteur». Le code de programme complet est le suivant -

Codage: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

Le but de la classe CSVSplit est de diviser la chaîne d'entrée en fonction de «virgule (,)» et d'émettre chaque mot de la chaîne. Cette fonction est utilisée pour analyser l'argument d'entrée de l'interrogation distribuée. Le code complet est le suivant -

Codage: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Analyseur de journaux

Ceci est l'application principale. Au départ, l'application initialisera TridentTopology et alimentera les informations de l'appelant en utilisantFeederBatchSpout. Le flux de topologie Trident peut être créé à l'aide dunewStreamméthode de la classe TridentTopology. De même, le flux DRPC de topologie Trident peut être créé à l'aide dunewDRCPStreamméthode de la classe TridentTopology. Un simple serveur DRCP peut être créé à l'aide de la classe LocalDRPC.LocalDRPCa une méthode d'exécution pour rechercher un mot-clé. Le code complet est donné ci-dessous.

Codage: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Création et exécution de l'application

L'application complète a trois codes Java. Ils sont les suivants -

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

L'application peut être créée à l'aide de la commande suivante -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

L'application peut être exécutée à l'aide de la commande suivante -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Production

Une fois l'application démarrée, l'application affichera les détails complets sur le processus de démarrage du cluster, le traitement des opérations, les informations sur le serveur DRPC et le client, et enfin, le processus d'arrêt du cluster. Cette sortie sera affichée sur la console comme indiqué ci-dessous.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends