Apache Storm - Guide rapide

Qu'est-ce qu'Apache Storm?

Apache Storm est un système de traitement de Big Data distribué en temps réel. Storm est conçu pour traiter une grande quantité de données dans une méthode évolutive horizontale et tolérante aux pannes. Il s'agit d'un cadre de données en continu qui a la capacité des taux d'ingestion les plus élevés. Bien que Storm soit sans état, il gère l'environnement distribué et l'état du cluster via Apache ZooKeeper. C'est simple et vous pouvez exécuter toutes sortes de manipulations sur des données en temps réel en parallèle.

Apache Storm continue d'être un leader de l'analyse de données en temps réel. Storm est facile à configurer, à utiliser et garantit que chaque message sera traité à travers la topologie au moins une fois.

Apache Storm contre Hadoop

Fondamentalement, les frameworks Hadoop et Storm sont utilisés pour analyser le Big Data. Les deux se complètent et diffèrent à certains égards. Apache Storm effectue toutes les opérations à l'exception de la persistance, tandis que Hadoop est bon dans tout mais reste en retard dans le calcul en temps réel. Le tableau suivant compare les attributs de Storm et Hadoop.

Tempête Hadoop
Traitement de flux en temps réel Le traitement par lots
Apatride Avec état
Architecture maître / esclave avec coordination basée sur ZooKeeper. Le nœud maître est appelénimbus et les esclaves sont supervisors. Architecture maître-esclave avec / sans coordination basée sur ZooKeeper. Le nœud maître estjob tracker et le nœud esclave est task tracker.
Un processus de streaming Storm peut accéder à des dizaines de milliers de messages par seconde sur le cluster. Le système de fichiers distribués Hadoop (HDFS) utilise le framework MapReduce pour traiter une grande quantité de données qui prend des minutes ou des heures.
La topologie Storm s'exécute jusqu'à l'arrêt par l'utilisateur ou une défaillance irrécupérable inattendue. Les travaux MapReduce sont exécutés dans un ordre séquentiel et terminés par la suite.
Both are distributed and fault-tolerant
Si nimbus / superviseur meurt, le redémarrage le fait continuer là où il s'est arrêté, donc rien n'est affecté. Si le JobTracker meurt, tous les travaux en cours sont perdus.

Cas d'utilisation d'Apache Storm

Apache Storm est très célèbre pour le traitement de flux de données volumineuses en temps réel. Pour cette raison, la plupart des entreprises utilisent Storm comme partie intégrante de leur système. Quelques exemples notables sont les suivants -

Twitter- Twitter utilise Apache Storm pour sa gamme de «produits Publisher Analytics». Les «produits d'analyse des éditeurs» traitent chacun des tweets et clics sur la plateforme Twitter. Apache Storm est profondément intégré à l'infrastructure Twitter.

NaviSite- NaviSite utilise Storm pour le système de surveillance / d'audit du journal des événements. Tous les journaux générés dans le système passeront par la tempête. Storm vérifiera le message par rapport à l'ensemble configuré d'expressions régulières et s'il y a une correspondance, alors ce message particulier sera enregistré dans la base de données.

Wego- Wego est un méta-moteur de recherche de voyage situé à Singapour. Les données relatives aux voyages proviennent de nombreuses sources du monde entier avec des horaires différents. Storm aide Wego à rechercher des données en temps réel, à résoudre les problèmes de concurrence et à trouver la meilleure correspondance pour l'utilisateur final.

Avantages d'Apache Storm

Voici une liste des avantages qu'offre Apache Storm -

  • Storm est open source, robuste et convivial. Il pourrait être utilisé aussi bien dans les petites entreprises que dans les grandes entreprises.

  • Storm est tolérant aux pannes, flexible, fiable et prend en charge n'importe quel langage de programmation.

  • Permet le traitement des flux en temps réel.

  • Storm est incroyablement rapide car il a une énorme puissance de traitement des données.

  • Storm peut maintenir les performances même sous une charge croissante en ajoutant des ressources de manière linéaire. Il est hautement évolutif.

  • Storm effectue l'actualisation des données et la réponse de livraison de bout en bout en quelques secondes ou minutes dépend du problème. Il a une latence très faible.

  • Storm a une intelligence opérationnelle.

  • Storm assure un traitement des données garanti même si l'un des nœuds connectés du cluster meurt ou si des messages sont perdus.

Apache Storm lit le flux brut de données en temps réel à une extrémité et le transmet à une séquence de petites unités de traitement et génère les informations traitées / utiles à l'autre extrémité.

Le diagramme suivant illustre le concept de base d'Apache Storm.

Examinons maintenant de plus près les composants d'Apache Storm -

Composants La description
Tuple Tuple est la structure de données principale de Storm. C'est une liste d'éléments ordonnés. Par défaut, un Tuple prend en charge tous les types de données. En règle générale, il est modélisé comme un ensemble de valeurs séparées par des virgules et transmis à un cluster Storm.
Courant Stream est une séquence non ordonnée de tuples.
Becs Source de flux. Généralement, Storm accepte les données d'entrée de sources de données brutes telles que l'API Twitter Streaming, la file d'attente Apache Kafka, la file d'attente Kestrel, etc. Sinon, vous pouvez écrire des spouts pour lire les données des sources de données. «ISpout» est l'interface principale pour la mise en œuvre des spouts. Certaines des interfaces spécifiques sont IRichSpout, BaseRichSpout, KafkaSpout, etc.
Boulons Les boulons sont des unités de traitement logiques. Les becs transmettent les données au processus des boulons et boulons et produisent un nouveau flux de sortie. Bolts peut effectuer les opérations de filtrage, d'agrégation, de jonction, d'interaction avec les sources de données et les bases de données. Bolt reçoit des données et émet vers un ou plusieurs boulons. «IBolt» est l'interface principale pour la mise en œuvre des boulons. Certaines des interfaces courantes sont IRichBolt, IBasicBolt, etc.

Prenons un exemple en temps réel de «Twitter Analysis» et voyons comment il peut être modélisé dans Apache Storm. Le diagramme suivant illustre la structure.

L'entrée pour «l'analyse Twitter» provient de l'API Twitter Streaming. Spout lira les tweets des utilisateurs à l'aide de l'API Twitter Streaming et les affichera sous forme de flux de tuples. Un seul tuple du bec aura un nom d'utilisateur Twitter et un seul tweet en tant que valeurs séparées par des virgules. Ensuite, cette vapeur de tuples sera transmise au Bolt et le Bolt divisera le tweet en mot individuel, calculera le nombre de mots et conservera les informations dans une source de données configurée. Maintenant, nous pouvons facilement obtenir le résultat en interrogeant la source de données.

Topologie

Les becs et les boulons sont connectés ensemble et forment une topologie. La logique d'application en temps réel est spécifiée dans la topologie Storm. En termes simples, une topologie est un graphe orienté où les sommets sont le calcul et les arêtes sont un flux de données.

Une topologie simple commence par des becs. Spout envoie les données à un ou plusieurs boulons. Bolt représente un nœud dans la topologie ayant la plus petite logique de traitement et la sortie d'un boulon peut être émise dans un autre boulon en entrée.

Storm maintient la topologie toujours en cours d'exécution, jusqu'à ce que vous la supprimiez. La tâche principale d'Apache Storm est d'exécuter la topologie et d'exécuter n'importe quel nombre de topologie à un moment donné.

Tâches

Vous avez maintenant une idée de base sur les becs et les boulons. Il s'agit de la plus petite unité logique de la topologie et une topologie est construite à l'aide d'un seul bec et d'un ensemble de boulons. Ils doivent être exécutés correctement dans un ordre particulier pour que la topologie s'exécute correctement. L'exécution de chaque bec et boulon par Storm s'appelle «Tâches». En termes simples, une tâche est soit l'exécution d'un bec ou d'un boulon. À un moment donné, chaque bec et chaque boulon peuvent avoir plusieurs instances s'exécutant dans plusieurs threads séparés.

Ouvriers

Une topologie s'exécute de manière distribuée, sur plusieurs nœuds de calcul. Storm répartit les tâches uniformément sur tous les nœuds de travail. Le rôle du nœud worker est d'écouter les travaux et de démarrer ou d'arrêter les processus chaque fois qu'un nouveau travail arrive.

Regroupement de flux

Le flux de données circule des becs aux boulons ou d'un boulon à un autre boulon. Le regroupement de flux contrôle la façon dont les tuples sont routés dans la topologie et nous aide à comprendre le flux de tuples dans la topologie. Il existe quatre groupements intégrés, comme expliqué ci-dessous.

Regroupement aléatoire

Dans le groupement aléatoire, un nombre égal de tuples est distribué aléatoirement sur tous les ouvriers exécutant les boulons. Le diagramme suivant illustre la structure.

Regroupement de champs

Les champs avec les mêmes valeurs dans les tuples sont regroupés et les tuples restants conservés à l'extérieur. Ensuite, les tuples avec les mêmes valeurs de champ sont envoyés au même travailleur exécutant les boulons. Par exemple, si le flux est groupé par le champ «mot», alors les tuples avec la même chaîne, «Hello» seront déplacés vers le même worker. Le diagramme suivant montre le fonctionnement du regroupement de champs.

Regroupement global

Tous les flux peuvent être regroupés et transmis à un seul boulon. Ce regroupement envoie des tuples générés par toutes les instances de la source vers une seule instance cible (en particulier, choisissez le worker avec l'ID le plus bas).

Tous les regroupements

Tous les regroupements envoie une seule copie de chaque tuple à toutes les instances du boulon de réception. Ce type de regroupement est utilisé pour envoyer des signaux aux boulons. Tous les regroupements sont utiles pour les opérations de jointure.

L'un des principaux points forts d'Apache Storm est qu'il s'agit d'une application distribuée à tolérance de pannes, rapide et sans «point de défaillance unique» (SPOF). Nous pouvons installer Apache Storm dans autant de systèmes que nécessaire pour augmenter la capacité de l'application.

Voyons comment le cluster Apache Storm est conçu et son architecture interne. Le diagramme suivant illustre la conception du cluster.

Apache Storm a deux types de nœuds, Nimbus (nœud maître) et Supervisor(nœud de travail). Nimbus est le composant central d'Apache Storm. Le travail principal de Nimbus est d'exécuter la topologie Storm. Nimbus analyse la topologie et rassemble la tâche à exécuter. Ensuite, il distribuera la tâche à un superviseur disponible.

Un superviseur aura un ou plusieurs processus de travail. Le superviseur déléguera les tâches aux processus de travail. Le processus de travail génère autant d'exécuteurs que nécessaire et exécute la tâche. Apache Storm utilise un système de messagerie interne distribué pour la communication entre nimbus et les superviseurs.

Composants La description
Nimbus Nimbus est un nœud maître du cluster Storm. Tous les autres nœuds du cluster sont appelés commeworker nodes. Le nœud maître est chargé de distribuer les données entre tous les nœuds de travail, d'attribuer des tâches aux nœuds de travail et de surveiller les échecs.
Superviseur Les nœuds qui suivent les instructions données par le nimbus sont appelés superviseurs. UNEsupervisor a plusieurs processus de travail et il régit les processus de travail pour accomplir les tâches assignées par le nimbus.
Processus de travail Un processus de travail exécutera des tâches liées à une topologie spécifique. Un processus de travail n'exécute pas une tâche par lui-même, mais créeexecutorset leur demande d'effectuer une tâche particulière. Un processus de travail aura plusieurs exécuteurs.
Exécuteur Un exécuteur n'est rien d'autre qu'un seul thread engendré par un processus de travail. Un exécuteur exécute une ou plusieurs tâches, mais uniquement pour un bec ou un boulon spécifique.
Tâche Une tâche effectue un traitement de données réel. Donc, c'est soit un bec, soit un boulon.
Cadre ZooKeeper

Apache ZooKeeper est un service utilisé par un cluster (groupe de nœuds) pour se coordonner entre eux et maintenir les données partagées avec des techniques de synchronisation robustes. Nimbus est sans état, il dépend donc de ZooKeeper pour surveiller l'état du nœud de travail.

ZooKeeper aide le superviseur à interagir avec le nimbus. Il est responsable de maintenir l'état de nimbus et de superviseur.

La tempête est de nature apatride. Même si la nature sans état a ses propres inconvénients, elle aide en fait Storm à traiter les données en temps réel de la meilleure manière possible et la plus rapide.

Cependant, Storm n'est pas entièrement apatride. Il stocke son état dans Apache ZooKeeper. Étant donné que l'état est disponible dans Apache ZooKeeper, un nimbus défaillant peut être redémarré et mis en service là où il est parti. Habituellement, les outils de surveillance des services commemonit surveillera Nimbus et le redémarrera en cas de panne.

Apache Storm possède également une topologie avancée appelée Trident Topologyavec la maintenance de l'état et il fournit également une API de haut niveau comme Pig. Nous discuterons de toutes ces fonctionnalités dans les prochains chapitres.

Un cluster Storm opérationnel doit avoir un nimbus et un ou plusieurs superviseurs. Un autre nœud important est Apache ZooKeeper, qui sera utilisé pour la coordination entre le nimbus et les superviseurs.

Examinons maintenant de près le flux de travail d'Apache Storm -

  • Dans un premier temps, le nimbus attendra que la «topologie de tempête» lui soit soumise.

  • Une fois qu'une topologie est soumise, elle traitera la topologie et rassemblera toutes les tâches à effectuer et l'ordre dans lequel la tâche doit être exécutée.

  • Ensuite, le nimbus distribuera uniformément les tâches à tous les superviseurs disponibles.

  • À un intervalle de temps particulier, tous les superviseurs enverront des battements de cœur au nimbus pour l'informer qu'ils sont toujours en vie.

  • Lorsqu'un superviseur meurt et n'envoie pas de battement de cœur au nimbus, le nimbus attribue les tâches à un autre superviseur.

  • Lorsque le nimbus lui-même meurt, les superviseurs travailleront sur la tâche déjà assignée sans aucun problème.

  • Une fois toutes les tâches terminées, le superviseur attendra qu'une nouvelle tâche arrive.

  • En attendant, le nimbus mort sera redémarré automatiquement par des outils de surveillance de service.

  • Le nimbus redémarré continuera là où il s'est arrêté. De même, le superviseur mort peut également être redémarré automatiquement. Comme le nimbus et le superviseur peuvent être redémarrés automatiquement et que les deux continueront comme avant, Storm est assuré de traiter toute la tâche au moins une fois.

  • Une fois que toutes les topologies sont traitées, le nimbus attend qu'une nouvelle topologie arrive et de même le superviseur attend de nouvelles tâches.

Par défaut, il existe deux modes dans un cluster Storm -

  • Local mode- Ce mode est utilisé pour le développement, les tests et le débogage car c'est le moyen le plus simple de voir tous les composants de topologie travailler ensemble. Dans ce mode, nous pouvons ajuster les paramètres qui nous permettent de voir comment notre topologie fonctionne dans différents environnements de configuration Storm. En mode local, les topologies Storm s'exécutent sur la machine locale dans une seule JVM.

  • Production mode- Dans ce mode, nous soumettons notre topologie au cluster Storm de travail, qui est composé de nombreux processus, généralement exécutés sur des machines différentes. Comme indiqué dans le flux de travail de Storm, un cluster de travail fonctionnera indéfiniment jusqu'à ce qu'il soit arrêté.

Apache Storm traite les données en temps réel et l'entrée provient normalement d'un système de mise en file d'attente de messages. Un système de messagerie distribuée externe fournira les entrées nécessaires au calcul en temps réel. Spout lira les données du système de messagerie et les convertira en tuples et les entrera dans Apache Storm. Le fait intéressant est qu'Apache Storm utilise son propre système de messagerie distribué en interne pour la communication entre son nimbus et son superviseur.

Qu'est-ce que le système de messagerie distribuée?

La messagerie distribuée est basée sur le concept de mise en file d'attente fiable des messages. Les messages sont mis en file d'attente de manière asynchrone entre les applications clientes et les systèmes de messagerie. Un système de messagerie distribué offre les avantages de la fiabilité, de l'évolutivité et de la persistance.

La plupart des modèles de messagerie suivent le publish-subscribe modèle (simplement Pub-Sub) où les expéditeurs des messages sont appelés publishers et ceux qui veulent recevoir les messages sont appelés subscribers.

Une fois le message publié par l'expéditeur, les abonnés peuvent recevoir le message sélectionné à l'aide d'une option de filtrage. Habituellement, nous avons deux types de filtrage, l'un esttopic-based filtering et un autre est content-based filtering.

Notez que le modèle pub-sub ne peut communiquer que via des messages. C'est une architecture très faiblement couplée; même les expéditeurs ne savent pas qui sont leurs abonnés. De nombreux modèles de message permettent avec le courtier de messages d'échanger des messages de publication pour un accès en temps opportun par de nombreux abonnés. Un exemple concret est Dish TV, qui publie différentes chaînes telles que le sport, les films, la musique, etc.

Le tableau suivant décrit certains des systèmes de messagerie à haut débit les plus courants:

Système de messagerie distribuée La description
Apache Kafka Kafka a été développé chez LinkedIn Corporation et est devenu plus tard un sous-projet d'Apache. Apache Kafka est basé sur un modèle de publication-abonnement distribué, permanent et activé par les courtiers. Kafka est rapide, évolutif et très efficace.
RabbitMQ RabbitMQ est une application de messagerie robuste distribuée open source. Il est facile à utiliser et fonctionne sur toutes les plateformes.
JMS (service de messagerie Java) JMS est une API open source qui prend en charge la création, la lecture et l'envoi de messages d'une application à une autre. Il fournit une livraison de message garantie et suit le modèle de publication-abonnement.
ActiveMQ Le système de messagerie ActiveMQ est une API open source de JMS.
ZeroMQ ZeroMQ est un traitement de message homologue sans courtier. Il fournit des modèles de messages push-pull, routeur-revendeur.
Crécerelle Kestrel est une file d'attente de messages distribuée rapide, fiable et simple.

Protocole d'épargne

Thrift a été créé chez Facebook pour le développement de services multilingues et l'appel de procédure à distance (RPC). Plus tard, il est devenu un projet Apache open source. Apache Thrift est unInterface Definition Language et permet de définir de nouveaux types de données et la mise en œuvre de services en plus des types de données définis de manière simple.

Apache Thrift est également un cadre de communication qui prend en charge les systèmes embarqués, les applications mobiles, les applications Web et de nombreux autres langages de programmation. Certaines des fonctionnalités clés associées à Apache Thrift sont sa modularité, sa flexibilité et ses hautes performances. En outre, il peut effectuer le streaming, la messagerie et le RPC dans des applications distribuées.

Storm utilise largement le protocole Thrift pour sa communication interne et la définition des données. La topologie des tempêtes est simplementThrift Structs. Storm Nimbus qui exécute la topologie dans Apache Storm est unThrift service.

Voyons maintenant comment installer le framework Apache Storm sur votre machine. Il y a trois étapes majo ici -

  • Installez Java sur votre système, si vous ne l'avez pas déjà.
  • Installez le framework ZooKeeper.
  • Installez le framework Apache Storm.

Étape 1 - Vérification de l'installation de Java

Utilisez la commande suivante pour vérifier si Java est déjà installé sur votre système.

$ java -version

Si Java est déjà là, vous verriez son numéro de version. Sinon, téléchargez la dernière version de JDK.

Étape 1.1 - Téléchargez le JDK

Téléchargez la dernière version de JDK en utilisant le lien suivant - www.oracle.com

La dernière version est JDK 8u 60 et le fichier est “jdk-8u60-linux-x64.tar.gz”. Téléchargez le fichier sur votre machine.

Étape 1.2 - Extraire les fichiers

En général, les fichiers sont téléchargés sur le downloadsdossier. Extrayez la configuration tar à l'aide des commandes suivantes.

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

Étape 1.3 - Déplacer vers le répertoire opt

Pour rendre Java disponible à tous les utilisateurs, déplacez le contenu java extrait vers le dossier «/ usr / local / java».

$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/

Étape 1.4 - Définir le chemin

Pour définir les variables path et JAVA_HOME, ajoutez les commandes suivantes au fichier ~ / .bashrc.

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

Appliquez maintenant toutes les modifications apportées au système en cours d'exécution.

$ source ~/.bashrc

Étape 1.5 - Alternatives Java

Utilisez la commande suivante pour modifier les alternatives Java.

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

Étape 1.6

Vérifiez maintenant l'installation de Java à l'aide de la commande de vérification (java -version) expliqué à l'étape 1.

Étape 2 - Installation de la structure ZooKeeper

Étape 2.1 - Téléchargez ZooKeeper

Pour installer le framework ZooKeeper sur votre ordinateur, visitez le lien suivant et téléchargez la dernière version de ZooKeeper http://zookeeper.apache.org/releases.html

À partir de maintenant, la dernière version de ZooKeeper est la 3.4.6 (ZooKeeper-3.4.6.tar.gz).

Étape 2.2 - Extraire le fichier tar

Extrayez le fichier tar à l'aide des commandes suivantes -

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

Étape 2.3 - Créer un fichier de configuration

Ouvrez le fichier de configuration nommé «conf / zoo.cfg» en utilisant la commande «vi conf / zoo.cfg» et en définissant tous les paramètres suivants comme point de départ.

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

Une fois que le fichier de configuration a été enregistré avec succès, vous pouvez démarrer le serveur ZooKeeper.

Étape 2.4 - Démarrez ZooKeeper Server

Utilisez la commande suivante pour démarrer le serveur ZooKeeper.

$ bin/zkServer.sh start

Après avoir exécuté cette commande, vous obtiendrez une réponse comme suit -

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

Étape 2.5 - Démarrez CLI

Utilisez la commande suivante pour démarrer la CLI.

$ bin/zkCli.sh

Après avoir exécuté la commande ci-dessus, vous serez connecté au serveur ZooKeeper et obtiendrez la réponse suivante.

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

Étape 2.6 - Arrêtez le serveur ZooKeeper

Après avoir connecté le serveur et effectué toutes les opérations, vous pouvez arrêter le serveur ZooKeeper à l'aide de la commande suivante.

bin/zkServer.sh stop

Vous avez correctement installé Java et ZooKeeper sur votre ordinateur. Voyons maintenant les étapes pour installer le framework Apache Storm.

Étape 3 - Installation d'Apache Storm Framework

Étape 3.1 Télécharger Storm

Pour installer le framework Storm sur votre machine, visitez le lien suivant et téléchargez la dernière version de Storm http://storm.apache.org/downloads.html

À partir de maintenant, la dernière version de Storm est «apache-storm-0.9.5.tar.gz».

Étape 3.2 - Extraire le fichier tar

Extrayez le fichier tar à l'aide des commandes suivantes -

$ cd opt/
$ tar -zxf apache-storm-0.9.5.tar.gz
$ cd apache-storm-0.9.5
$ mkdir data

Étape 3.3 - Ouvrir le fichier de configuration

La version actuelle de Storm contient un fichier dans «conf / storm.yaml» qui configure les démons Storm. Ajoutez les informations suivantes à ce fichier.

$ vi conf/storm.yaml
storm.zookeeper.servers:
 - "localhost"
storm.local.dir: “/path/to/storm/data(any path)”
nimbus.host: "localhost"
supervisor.slots.ports:
 - 6700
 - 6701
 - 6702
 - 6703

Après avoir appliqué toutes les modifications, enregistrez et revenez au terminal.

Étape 3.4 - Démarrez le Nimbus

$ bin/storm nimbus

Étape 3.5 - Démarrez le superviseur

$ bin/storm supervisor

Étape 3.6 Démarrez l'interface utilisateur

$ bin/storm ui

Après avoir démarré l'application d'interface utilisateur Storm, saisissez l'URL http://localhost:8080dans votre navigateur préféré et vous pouvez voir les informations du cluster Storm et sa topologie en cours d'exécution. La page doit ressembler à la capture d'écran suivante.

Nous avons parcouru les détails techniques de base d'Apache Storm et il est maintenant temps de coder des scénarios simples.

Scénario - Analyseur de journaux d'appels mobiles

L'appel mobile et sa durée seront donnés en entrée à Apache Storm et le Storm traitera et regroupera l'appel entre le même appelant et le même destinataire et leur nombre total d'appels.

Création de bec

Spout est un composant utilisé pour la génération de données. Fondamentalement, un bec implémentera une interface IRichSpout. L'interface «IRichSpout» a les méthodes importantes suivantes -

  • open- Fournit au bec un environnement à exécuter. Les exécuteurs exécuteront cette méthode pour initialiser le bec.

  • nextTuple - Émet les données générées via le collecteur.

  • close - Cette méthode est appelée lorsqu'un bec va s'arrêter.

  • declareOutputFields - Déclare le schéma de sortie du tuple.

  • ack - Reconnaît qu'un tuple spécifique est traité

  • fail - Spécifie qu'un tuple spécifique n'est pas traité et ne doit pas être retraité.

Ouvert

La signature du open la méthode est la suivante -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - Fournit une configuration de tempête pour ce bec.

  • context - Fournit des informations complètes sur l'emplacement du bec dans la topologie, son identifiant de tâche, les informations d'entrée et de sortie.

  • collector - Permet d'émettre le tuple qui sera traité par les boulons.

nextTuple

La signature du nextTuple la méthode est la suivante -

nextTuple()

nextTuple () est appelé périodiquement à partir de la même boucle que les méthodes ack () et fail (). Il doit libérer le contrôle du thread lorsqu'il n'y a aucun travail à faire, afin que les autres méthodes aient une chance d'être appelées. Ainsi, la première ligne de nextTuple vérifie si le traitement est terminé. Si tel est le cas, il doit dormir pendant au moins une milliseconde pour réduire la charge sur le processeur avant de revenir.

Fermer

La signature du close la méthode est la suivante -

close()

declareOutputFields

La signature du declareOutputFields la méthode est la suivante -

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - Il est utilisé pour déclarer les identifiants de flux de sortie, les champs de sortie, etc.

Cette méthode est utilisée pour spécifier le schéma de sortie du tuple.

accuser

La signature du ack la méthode est la suivante -

ack(Object msgId)

Cette méthode reconnaît qu'un tuple spécifique a été traité.

échouer

La signature du nextTuple la méthode est la suivante -

ack(Object msgId)

Cette méthode informe qu'un tuple spécifique n'a pas été entièrement traité. Storm retraitera le tuple spécifique.

FakeCallLogReaderSpout

Dans notre scénario, nous devons collecter les détails du journal des appels. Les informations du journal des appels contiennent.

  • numéro de l'appelant
  • numéro de récepteur
  • duration

Étant donné que nous n'avons pas d'informations en temps réel sur les journaux d'appels, nous allons générer de faux journaux d'appels. Les fausses informations seront créées à l'aide de la classe Random. Le code de programme complet est donné ci-dessous.

Codage - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Création de boulons

Bolt est un composant qui prend des tuples en entrée, traite le tuple et produit de nouveaux tuples en sortie. Bolts mettra en œuvreIRichBoltinterface. Dans ce programme, deux classes de boulonsCallLogCreatorBolt et CallLogCounterBolt sont utilisés pour effectuer les opérations.

L'interface IRichBolt a les méthodes suivantes -

  • prepare- Fournit au boulon un environnement à exécuter. Les exécuteurs exécuteront cette méthode pour initialiser le bec.

  • execute - Traitez un seul tuple d'entrée.

  • cleanup - Appelé lorsqu'un verrou va s'arrêter.

  • declareOutputFields - Déclare le schéma de sortie du tuple.

Préparer

La signature du prepare la méthode est la suivante -

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - Fournit la configuration Storm pour ce boulon.

  • context - Fournit des informations complètes sur l'emplacement du boulon dans la topologie, son identifiant de tâche, les informations d'entrée et de sortie, etc.

  • collector - Nous permet d'émettre le tuple traité.

exécuter

La signature du execute la méthode est la suivante -

execute(Tuple tuple)

Ici tuple est le tuple d'entrée à traiter.

le executeLa méthode traite un seul tuple à la fois. Les données du tuple sont accessibles par la méthode getValue de la classe Tuple. Il n'est pas nécessaire de traiter immédiatement le tuple d'entrée. Plusieurs tuple peuvent être traités et sortis comme un seul tuple de sortie. Le tuple traité peut être émis à l'aide de la classe OutputCollector.

nettoyer

La signature du cleanup la méthode est la suivante -

cleanup()

declareOutputFields

La signature du declareOutputFields la méthode est la suivante -

declareOutputFields(OutputFieldsDeclarer declarer)

Ici le paramètre declarer est utilisé pour déclarer les identifiants de flux de sortie, les champs de sortie, etc.

Cette méthode est utilisée pour spécifier le schéma de sortie du tuple

Journal des appels Creator Bolt

Le boulon du créateur du journal des appels reçoit le tuple du journal des appels. Le tuple du journal des appels comprend le numéro de l'appelant, le numéro du destinataire et la durée de l'appel. Ce verrou crée simplement une nouvelle valeur en combinant le numéro de l'appelant et le numéro du destinataire. Le format de la nouvelle valeur est "Numéro de l'appelant - Numéro du destinataire" et il est nommé comme nouveau champ, "appel". Le code complet est donné ci-dessous.

Codage - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Boulon du compteur du journal des appels

Le boulon du compteur du journal des appels reçoit l'appel et sa durée sous forme de tuple. Ce boulon initialise un objet dictionnaire (Map) dans la méthode prepare. Dansexecute, il vérifie le tuple et crée une nouvelle entrée dans l'objet dictionnaire pour chaque nouvelle valeur «call» dans le tuple et définit une valeur 1 dans l'objet dictionnaire. Pour l'entrée déjà disponible dans le dictionnaire, il incrémente simplement sa valeur. En termes simples, ce boulon enregistre l'appel et son décompte dans l'objet dictionnaire. Au lieu d'enregistrer l'appel et son décompte dans le dictionnaire, nous pouvons également l'enregistrer dans une source de données. Le code de programme complet est le suivant -

Codage - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Créer une topologie

La topologie Storm est essentiellement une structure Thrift. La classe TopologyBuilder fournit des méthodes simples et faciles pour créer des topologies complexes. La classe TopologyBuilder a des méthodes pour définir spout(setSpout) et pour régler le boulon (setBolt). Enfin, TopologyBuilder a createTopology pour créer une topologie. Utilisez l'extrait de code suivant pour créer une topologie -

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping et fieldsGrouping Les méthodes aident à définir le regroupement de flux pour le bec et les boulons.

Cluster local

À des fins de développement, nous pouvons créer un cluster local à l'aide de l'objet "LocalCluster", puis soumettre la topologie en utilisant la méthode "submitTopology" de la classe "LocalCluster". Un des arguments pour "submitTopology" est une instance de la classe "Config". La classe "Config" est utilisée pour définir les options de configuration avant de soumettre la topologie. Cette option de configuration sera fusionnée avec la configuration du cluster au moment de l'exécution et envoyée à toutes les tâches (spout et bolt) avec la méthode prepare. Une fois la topologie soumise au cluster, nous attendrons 10 secondes que le cluster calcule la topologie soumise, puis nous arrêterons le cluster à l'aide de la méthode «shutdown» de «LocalCluster». Le code de programme complet est le suivant -

Codage - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Création et exécution de l'application

L'application complète a quatre codes Java. Ils sont -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

L'application peut être créée à l'aide de la commande suivante -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

L'application peut être exécutée à l'aide de la commande suivante -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Production

Une fois l'application démarrée, elle affichera les détails complets sur le processus de démarrage du cluster, le traitement du spout et du boulon, et enfin, le processus d'arrêt du cluster. Dans "CallLogCounterBolt", nous avons imprimé l'appel et ses détails de comptage. Ces informations seront affichées sur la console comme suit -

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Langages non JVM

Les topologies Storm sont implémentées par des interfaces Thrift, ce qui facilite la soumission de topologies dans n'importe quelle langue. Storm prend en charge Ruby, Python et de nombreux autres langages. Jetons un coup d'œil à la liaison python.

Liaison Python

Python est un langage de programmation généraliste interprété, interactif, orienté objet et de haut niveau. Storm prend en charge Python pour implémenter sa topologie. Python prend en charge les opérations d'émission, d'ancrage, d'acquittement et de journalisation.

Comme vous le savez, les boulons peuvent être définis dans n'importe quelle langue. Les boulons écrits dans un autre langage sont exécutés en tant que sous-processus et Storm communique avec ces sous-processus avec des messages JSON via stdin / stdout. Commencez par prendre un exemple de boulon WordCount qui prend en charge la liaison python.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

Ici la classe WordCount met en œuvre le IRichBoltinterface et s'exécutant avec la super méthode spécifiée par l'implémentation python, l'argument "splitword.py". Créez maintenant une implémentation python nommée "splitword.py".

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

C'est l'exemple d'implémentation pour Python qui compte les mots dans une phrase donnée. De même, vous pouvez également vous lier à d'autres langues de prise en charge.

Trident est une extension de Storm. Comme Storm, Trident a également été développé par Twitter. La principale raison du développement de Trident est de fournir une abstraction de haut niveau au-dessus de Storm avec un traitement de flux avec état et des requêtes distribuées à faible latence.

Trident utilise le bec et le boulon, mais ces composants de bas niveau sont générés automatiquement par Trident avant l'exécution. Trident a des fonctions, des filtres, des jointures, des regroupements et des agrégations.

Trident traite les flux comme une série de lots appelés transactions. Généralement, la taille de ces petits lots sera de l'ordre de milliers ou millions de tuples, selon le flux d'entrée. De cette façon, Trident est différent de Storm, qui effectue un traitement tuple par tuple.

Le concept de traitement par lots est très similaire aux transactions de base de données. Chaque transaction se voit attribuer un identifiant de transaction. La transaction est considérée comme réussie, une fois que tout son traitement est terminé. Cependant, un échec dans le traitement de l'un des tuples de la transaction entraînera la retransmission de l'ensemble de la transaction. Pour chaque lot, Trident appellera beginCommit au début de la transaction et validera à la fin de celle-ci.

Topologie Trident

L'API Trident présente une option simple pour créer une topologie Trident à l'aide de la classe «TridentTopology». Fondamentalement, la topologie Trident reçoit le flux d'entrée du spout et effectue une séquence d'opérations ordonnée (filtre, agrégation, regroupement, etc.) sur le flux. Storm Tuple est remplacé par Trident Tuple et Bolts sont remplacés par des opérations. Une topologie Trident simple peut être créée comme suit -

TridentTopology topology = new TridentTopology();

Tuples Trident

Le tuple trident est une liste nommée de valeurs. L'interface TridentTuple est le modèle de données d'une topologie Trident. L'interface TridentTuple est l'unité de base de données qui peut être traitée par une topologie Trident.

Bec Trident

Le bec Trident est similaire au bec Storm, avec des options supplémentaires pour utiliser les fonctionnalités de Trident. En fait, nous pouvons toujours utiliser IRichSpout, que nous avons utilisé dans la topologie Storm, mais ce sera de nature non transactionnelle et nous ne pourrons pas utiliser les avantages fournis par Trident.

Le bec de base ayant toutes les fonctionnalités pour utiliser les fonctionnalités de Trident est "ITridentSpout". Il prend en charge la sémantique transactionnelle transactionnelle et opaque. Les autres spouts sont IBatchSpout, IPartitionedTridentSpout et IOpaquePartitionedTridentSpout.

En plus de ces becs génériques, Trident a de nombreux exemples d'implémentation de bec trident. L'un d'eux est le bec verseur FeederBatchSpout, que nous pouvons utiliser pour envoyer facilement une liste nommée de tuples trident sans se soucier du traitement par lots, du parallélisme, etc.

La création et l'alimentation des données de FeederBatchSpout peuvent être effectuées comme indiqué ci-dessous -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Opérations Trident

Trident s'appuie sur «l'opération Trident» pour traiter le flux d'entrée des tuples trident. L'API Trident dispose d'un certain nombre d'opérations intégrées pour gérer le traitement de flux simple à complexe. Ces opérations vont de la simple validation au regroupement et à l'agrégation complexes de tuples de trident. Passons en revue les opérations les plus importantes et les plus fréquemment utilisées.

Filtre

Le filtre est un objet utilisé pour effectuer la tâche de validation d'entrée. Un filtre Trident obtient un sous-ensemble de champs de tuple trident en entrée et renvoie true ou false selon que certaines conditions sont satisfaites ou non. Si true est renvoyé, le tuple est conservé dans le flux de sortie; sinon, le tuple est supprimé du flux. Le filtre héritera essentiellement duBaseFilter classe et implémentez le isKeepméthode. Voici un exemple d'implémentation de l'opération de filtrage -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

La fonction de filtrage peut être appelée dans la topologie en utilisant la méthode «each». La classe «Fields» peut être utilisée pour spécifier l'entrée (sous-ensemble du tuple trident). L'exemple de code est le suivant -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Fonction

Functionest un objet utilisé pour effectuer une opération simple sur un seul tuple trident. Il prend un sous-ensemble de champs de tuple trident et émet zéro ou plusieurs nouveaux champs de tuple trident.

Function hérite essentiellement de la BaseFunction classe et implémente le executeméthode. Un exemple d'implémentation est donné ci-dessous -

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Tout comme l'opération de filtrage, l'opération de fonction peut être appelée dans une topologie à l'aide du eachméthode. L'exemple de code est le suivant -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Agrégation

L'agrégation est un objet utilisé pour effectuer des opérations d'agrégation sur un lot d'entrée, une partition ou un flux. Trident a trois types d'agrégation. Ils sont les suivants -

  • aggregate- Agrège chaque lot de tuple trident de manière isolée. Pendant le processus d'agrégation, les tuples sont initialement repartitionnés à l'aide du regroupement global pour combiner toutes les partitions du même lot en une seule partition.

  • partitionAggregate- Agrège chaque partition au lieu du lot entier de tuple trident. La sortie de l'agrégat de partition remplace complètement le tuple d'entrée. La sortie de l'agrégat de partition contient un seul tuple de champ.

  • persistentaggregate - Agrège sur tous les tuple trident dans tous les lots et stocke le résultat dans la mémoire ou la base de données.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

L'opération d'agrégation peut être créée à l'aide de CombinerAggregator, ReducerAggregator ou de l'interface d'agrégation générique. L'agrégateur "count" utilisé dans l'exemple ci-dessus est l'un des agrégateurs intégrés. Il est implémenté à l'aide de "CombinerAggregator". La mise en œuvre est la suivante:

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Regroupement

L'opération de regroupement est une opération intégrée et peut être appelée par le groupByméthode. La méthode groupBy repartitionne le flux en effectuant un partitionBy sur les champs spécifiés, puis dans chaque partition, elle regroupe les tuples dont les champs de groupe sont égaux. Normalement, nous utilisons «groupBy» avec «persistentAggregate» pour obtenir l'agrégation groupée. L'exemple de code est le suivant -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Fusionner et rejoindre

La fusion et la jointure peuvent être effectuées en utilisant respectivement les méthodes «merge» et «join». La fusion combine un ou plusieurs flux. La jointure est similaire à la fusion, sauf le fait que la jointure utilise un champ de tuple trident des deux côtés pour vérifier et joindre deux flux. De plus, la jonction fonctionnera uniquement au niveau du lot. L'exemple de code est le suivant -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Maintenance de l'état

Trident fournit un mécanisme de maintenance de l'état. Les informations d'état peuvent être stockées dans la topologie elle-même, sinon vous pouvez également les stocker dans une base de données distincte. La raison est de maintenir un état selon lequel si un tuple échoue pendant le traitement, le tuple échoué est retenté. Cela crée un problème lors de la mise à jour de l'état car vous ne savez pas si l'état de ce tuple a été mis à jour précédemment ou non. Si le tuple a échoué avant la mise à jour de l'état, réessayer le tuple rendra l'état stable. Cependant, si le tuple a échoué après la mise à jour de l'état, réessayer le même tuple augmentera à nouveau le nombre dans la base de données et rendra l'état instable. Il faut effectuer les étapes suivantes pour s'assurer qu'un message n'est traité qu'une seule fois -

  • Traitez les tuples par petits lots.

  • Attribuez un ID unique à chaque lot. Si le lot est réessayé, il reçoit le même ID unique.

  • Les mises à jour d'état sont classées parmi les lots. Par exemple, la mise à jour de l'état du deuxième lot ne sera pas possible tant que la mise à jour de l'état du premier lot ne sera pas terminée.

RPC distribué

Distributed RPC est utilisé pour interroger et récupérer le résultat de la topologie Trident. Storm a un serveur RPC distribué intégré. Le serveur RPC distribué reçoit la demande RPC du client et la transmet à la topologie. La topologie traite la demande et envoie le résultat au serveur RPC distribué, qui est redirigé par le serveur RPC distribué vers le client. La requête RPC distribuée de Trident s'exécute comme une requête RPC normale, à l'exception du fait que ces requêtes sont exécutées en parallèle.

Quand utiliser Trident?

Comme dans de nombreux cas d'utilisation, si l'exigence est de traiter une requête une seule fois, nous pouvons y parvenir en écrivant une topologie dans Trident. En revanche, il sera difficile de réaliser exactement une fois le traitement dans le cas de Storm. Par conséquent, Trident sera utile pour les cas d'utilisation où vous avez besoin d'un seul traitement. Trident n'est pas pour tous les cas d'utilisation, en particulier les cas d'utilisation haute performance, car il ajoute de la complexité à Storm et gère l'état.

Exemple de travail de Trident

Nous allons convertir notre application d'analyse du journal des appels élaborée dans la section précédente en framework Trident. L'application Trident sera relativement facile par rapport à plain storm, grâce à son API de haut niveau. Storm devra essentiellement effectuer l'une des opérations Function, Filter, Aggregate, GroupBy, Join et Merge dans Trident. Enfin, nous allons démarrer le serveur DRPC en utilisant leLocalDRPC class et recherchez un mot-clé en utilisant le execute méthode de la classe LocalDRPC.

Formatage des informations d'appel

Le but de la classe FormatCall est de formater les informations d'appel comprenant le «numéro de l'appelant» et le «numéro du récepteur». Le code de programme complet est le suivant -

Codage: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

Le but de la classe CSVSplit est de diviser la chaîne d'entrée en fonction de «virgule (,)» et d'émettre chaque mot de la chaîne. Cette fonction est utilisée pour analyser l'argument d'entrée de l'interrogation distribuée. Le code complet est le suivant -

Codage: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Analyseur de journaux

Ceci est l'application principale. Au départ, l'application initialisera TridentTopology et alimentera les informations de l'appelant en utilisantFeederBatchSpout. Le flux de topologie Trident peut être créé à l'aide dunewStreamméthode de la classe TridentTopology. De même, le flux DRPC de topologie Trident peut être créé à l'aide dunewDRCPStreamméthode de la classe TridentTopology. Un simple serveur DRCP peut être créé à l'aide de la classe LocalDRPC.LocalDRPCa une méthode d'exécution pour rechercher un mot-clé. Le code complet est donné ci-dessous.

Codage: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Création et exécution de l'application

L'application complète a trois codes Java. Ils sont les suivants -

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

L'application peut être créée à l'aide de la commande suivante -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

L'application peut être exécutée à l'aide de la commande suivante -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Production

Une fois l'application démarrée, l'application affichera les détails complets sur le processus de démarrage du cluster, le traitement des opérations, les informations sur le serveur DRPC et le client, et enfin, le processus d'arrêt du cluster. Cette sortie sera affichée sur la console comme indiqué ci-dessous.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends

Ici, dans ce chapitre, nous allons discuter d'une application en temps réel d'Apache Storm. Nous verrons comment Storm est utilisé sur Twitter.

Twitter

Twitter est un service de réseautage social en ligne qui fournit une plateforme pour envoyer et recevoir des tweets d'utilisateurs. Les utilisateurs enregistrés peuvent lire et publier des tweets, mais les utilisateurs non enregistrés ne peuvent lire que des tweets. Hashtag est utilisé pour classer les tweets par mot-clé en ajoutant # avant le mot-clé pertinent. Prenons maintenant un scénario en temps réel pour trouver le hashtag le plus utilisé par sujet.

Création de bec

Le but de spout est d'obtenir les tweets soumis par les gens dès que possible. Twitter fournit «Twitter Streaming API», un outil basé sur un service Web pour récupérer les tweets soumis par des personnes en temps réel. L'API Twitter Streaming est accessible dans n'importe quel langage de programmation.

twitter4j est une bibliothèque Java non officielle open source, qui fournit un module basé sur Java pour accéder facilement à l'API Twitter Streaming. twitter4jfournit un cadre basé sur l'auditeur pour accéder aux tweets. Pour accéder à l'API Twitter Streaming, nous devons nous connecter au compte de développeur Twitter et obtenir les détails d'authentification OAuth suivants.

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

Storm fournit un bec Twitter, TwitterSampleSpout,dans son kit de démarrage. Nous allons l'utiliser pour récupérer les tweets. Le bec verseur a besoin des détails d'authentification OAuth et d'au moins un mot-clé. Le bec émettra des tweets en temps réel basés sur des mots-clés. Le code de programme complet est donné ci-dessous.

Codage: TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
   SpoutOutputCollector _collector;
   LinkedBlockingQueue<Status> queue = null;
   TwitterStream _twitterStream;
		
   String consumerKey;
   String consumerSecret;
   String accessToken;
   String accessTokenSecret;
   String[] keyWords;
		
   public TwitterSampleSpout(String consumerKey, String consumerSecret,
      String accessToken, String accessTokenSecret, String[] keyWords) {
         this.consumerKey = consumerKey;
         this.consumerSecret = consumerSecret;
         this.accessToken = accessToken;
         this.accessTokenSecret = accessTokenSecret;
         this.keyWords = keyWords;
   }
		
   public TwitterSampleSpout() {
      // TODO Auto-generated constructor stub
   }
		
   @Override
   public void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
         queue = new LinkedBlockingQueue<Status>(1000);
         _collector = collector;
         StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
               queue.offer(status);
            }
					
            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {}
					
            @Override
            public void onTrackLimitationNotice(int i) {}
					
            @Override
            public void onScrubGeo(long l, long l1) {}
					
            @Override
            public void onException(Exception ex) {}
					
            @Override
            public void onStallWarning(StallWarning arg0) {
               // TODO Auto-generated method stub
            }
         };
				
         ConfigurationBuilder cb = new ConfigurationBuilder();
				
         cb.setDebugEnabled(true)
            .setOAuthConsumerKey(consumerKey)
            .setOAuthConsumerSecret(consumerSecret)
            .setOAuthAccessToken(accessToken)
            .setOAuthAccessTokenSecret(accessTokenSecret);
					
         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         _twitterStream.addListener(listener);
				
         if (keyWords.length == 0) {
            _twitterStream.sample();
         }else {
            FilterQuery query = new FilterQuery().track(keyWords);
            _twitterStream.filter(query);
         }
   }
			
   @Override
   public void nextTuple() {
      Status ret = queue.poll();
				
      if (ret == null) {
         Utils.sleep(50);
      } else {
         _collector.emit(new Values(ret));
      }
   }
			
   @Override
   public void close() {
      _twitterStream.shutdown();
   }
			
   @Override
   public Map<String, Object> getComponentConfiguration() {
      Config ret = new Config();
      ret.setMaxTaskParallelism(1);
      return ret;
   }
			
   @Override
   public void ack(Object id) {}
			
   @Override
   public void fail(Object id) {}
			
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("tweet"));
   }
}

Boulon de lecture Hashtag

Le tweet émis par spout sera transmis à HashtagReaderBolt, qui traitera le tweet et émettra tous les hashtags disponibles. HashtagReaderBolt utilisegetHashTagEntitiesméthode fournie par twitter4j. getHashTagEntities lit le tweet et renvoie la liste des hashtag. Le code de programme complet est le suivant -

Codage: HashtagReaderBolt.java

import java.util.HashMap;
import java.util.Map;

import twitter4j.*;
import twitter4j.conf.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagReaderBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      Status tweet = (Status) tuple.getValueByField("tweet");
      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
         System.out.println("Hashtag: " + hashtage.getText());
         this.collector.emit(new Values(hashtage.getText()));
      }
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Boulon de compteur Hashtag

Le hashtag émis sera transmis à HashtagCounterBolt. Ce boulon traitera tous les hashtags et enregistrera chaque hashtag et son décompte en mémoire à l'aide de l'objet Java Map. Le code de programme complet est donné ci-dessous.

Codage: HashtagCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String key = tuple.getString(0);

      if(!counterMap.containsKey(key)){
         counterMap.put(key, 1);
      }else{
         Integer c = counterMap.get(key) + 1;
         counterMap.put(key, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Soumettre une topologie

La soumission d'une topologie est l'application principale. La topologie Twitter se compose deTwitterSampleSpout, HashtagReaderBolt, et HashtagCounterBolt. Le code de programme suivant montre comment soumettre une topologie.

Codage: TwitterHashtagStorm.java

import java.util.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class TwitterHashtagStorm {
   public static void main(String[] args) throws Exception{
      String consumerKey = args[0];
      String consumerSecret = args[1];
		
      String accessToken = args[2];
      String accessTokenSecret = args[3];
		
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
		
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
         consumerSecret, accessToken, accessTokenSecret, keyWords));

      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
         .shuffleGrouping("twitter-spout");

      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("TwitterHashtagStorm", config,
         builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Création et exécution de l'application

L'application complète a quatre codes Java. Ils sont les suivants -

  • TwitterSampleSpout.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java

Vous pouvez compiler l'application à l'aide de la commande suivante -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

Exécutez l'application à l'aide des commandes suivantes -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

Production

L'application imprimera le hashtag actuellement disponible et son décompte. La sortie doit être similaire à ce qui suit -

Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1

Yahoo! Finance est le premier site Internet d'informations économiques et de données financières. Il fait partie de Yahoo! et donne des informations sur l'actualité financière, les statistiques du marché, les données du marché international et d'autres informations sur les ressources financières auxquelles tout le monde peut accéder.

Si vous êtes inscrit sur Yahoo! utilisateur, vous pouvez alors personnaliser Yahoo! Financez pour profiter de ses certaines offres. Yahoo! L'API Finance est utilisée pour interroger les données financières de Yahoo!

Cette API affiche des données retardées de 15 minutes par rapport au temps réel et met à jour sa base de données toutes les 1 minute, pour accéder aux informations actuelles relatives aux stocks. Prenons maintenant un scénario en temps réel d'une entreprise et voyons comment déclencher une alerte lorsque la valeur de son action passe en dessous de 100.

Création de bec

Le but du bec est d'obtenir les détails de l'entreprise et d'émettre les prix aux boulons. Vous pouvez utiliser le code de programme suivant pour créer un bec verseur.

Codage: YahooFinanceSpout.java

import java.util.*;
import java.io.*;
import java.math.BigDecimal;

//import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

public class YahooFinanceSpout implements IRichSpout {
   private SpoutOutputCollector collector;
   private boolean completed = false;
   private TopologyContext context;
	
   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      try {
         Stock stock = YahooFinance.get("INTC");
         BigDecimal price = stock.getQuote().getPrice();

         this.collector.emit(new Values("INTC", price.doubleValue()));
         stock = YahooFinance.get("GOOGL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("GOOGL", price.doubleValue()));
         stock = YahooFinance.get("AAPL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("AAPL", price.doubleValue()));
      } catch(Exception e) {}
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("company", "price"));
   }

   @Override
   public void close() {}
	
   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Création de boulons

Ici, le but de bolt est de traiter les prix de l'entreprise donnée lorsque les prix tombent en dessous de 100. Il utilise l'objet Java Map pour définir l'alerte de limite de prix de coupure comme truelorsque le cours des actions tombe en dessous de 100; sinon faux. Le code de programme complet est le suivant -

Codage: PriceCutOffBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

public class PriceCutOffBolt implements IRichBolt {
   Map<String, Integer> cutOffMap;
   Map<String, Boolean> resultMap;
	
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.cutOffMap = new HashMap <String, Integer>();
      this.cutOffMap.put("INTC", 100);
      this.cutOffMap.put("AAPL", 100);
      this.cutOffMap.put("GOOGL", 100);

      this.resultMap = new HashMap<String, Boolean>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String company = tuple.getString(0);
      Double price = tuple.getDouble(1);

      if(this.cutOffMap.containsKey(company)){
         Integer cutOffPrice = this.cutOffMap.get(company);

         if(price < cutOffPrice) {
            this.resultMap.put(company, true);
         } else {
            this.resultMap.put(company, false);
         }
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("cut_off_price"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Soumettre une topologie

C'est l'application principale où YahooFinanceSpout.java et PriceCutOffBolt.java sont connectés ensemble et produisent une topologie. Le code de programme suivant montre comment soumettre une topologie.

Codage: YahooFinanceStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class YahooFinanceStorm {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());

      builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
         .fieldsGrouping("yahoo-finance-spout", new Fields("company"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Création et exécution de l'application

L'application complète a trois codes Java. Ils sont les suivants -

  • YahooFinanceSpout.java
  • PriceCutOffBolt.java
  • YahooFinanceStorm.java

L'application peut être créée à l'aide de la commande suivante -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java

L'application peut être exécutée à l'aide de la commande suivante -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.
YahooFinanceStorm

Production

La sortie sera similaire à ce qui suit -

GOOGL : false
AAPL : false
INTC : true

Le framework Apache Storm prend en charge la plupart des meilleures applications industrielles actuelles. Nous allons fournir un très bref aperçu de certaines des applications les plus notables de Storm dans ce chapitre.

Klout

Klout est une application qui utilise l'analyse des médias sociaux pour classer ses utilisateurs en fonction de l'influence sociale en ligne via Klout Score, qui est une valeur numérique comprise entre 1 et 100. Klout utilise l'abstraction Trident intégrée d'Apache Storm pour créer des topologies complexes qui diffusent des données.

La chaîne météo

The Weather Channel utilise des topologies Storm pour ingérer des données météorologiques. Il s'est associé à Twitter pour permettre la publicité informée sur la météo sur Twitter et les applications mobiles.OpenSignal est une entreprise spécialisée dans la cartographie de la couverture sans fil. StormTag et WeatherSignalsont des projets basés sur la météo créés par OpenSignal. StormTag est une station météo Bluetooth qui se fixe à un porte-clés. Les données météorologiques collectées par l'appareil sont envoyées à l'application WeatherSignal et aux serveurs OpenSignal.

Telecom Industry

Telecommunication providers process millions of phone calls per second. They perform forensics on dropped calls and poor sound quality. Call detail records flow in at a rate of millions per second and Apache Storm processes those in real-time and identifies any troubling patterns. Storm analysis can be used to continuously improve call quality.