Hadoop - Streaming

Le streaming Hadoop est un utilitaire fourni avec la distribution Hadoop. Cet utilitaire vous permet de créer et d'exécuter des travaux de mappage / réduction avec n'importe quel exécutable ou script en tant que mappeur et / ou réducteur.

Exemple d'utilisation de Python

Pour le streaming Hadoop, nous examinons le problème du nombre de mots. Tout travail dans Hadoop doit comporter deux phases: le mappeur et le réducteur. Nous avons écrit des codes pour le mappeur et le réducteur en script python pour l'exécuter sous Hadoop. On peut également écrire la même chose en Perl et Ruby.

Code de phase du mappeur

!/usr/bin/python

import sys

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Break the line into words 
   words = myline.split() 

   # Iterate the words list
   for myword in words:
      # Write the results to standard output 
      print '%s\t%s' % (myword, 1)

Assurez-vous que ce fichier a l'autorisation d'exécution (chmod + x / home / expert / hadoop-1.2.1 / mapper.py).

Code de phase du réducteur

#!/usr/bin/python

from operator import itemgetter 
import sys 

current_word = ""
current_count = 0 
word = "" 

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Split the input we got from mapper.py word, 
   count = myline.split('\t', 1) 

   # Convert count variable to integer 
   try: 
      count = int(count) 

   except ValueError: 
      # Count was not a number, so silently ignore this line continue

   if current_word == word: 
   current_count += count 
   else: 
      if current_word: 
         # Write result to standard output print '%s\t%s' % (current_word, current_count) 
   
      current_count = count
      current_word = word

# Do not forget to output the last word if needed! 
if current_word == word: 
   print '%s\t%s' % (current_word, current_count)

Enregistrez les codes du mappeur et du réducteur dans mapper.py et reducer.py dans le répertoire de base Hadoop. Assurez-vous que ces fichiers ont l'autorisation d'exécution (chmod + x mapper.py et chmod + x reducer.py). Comme python est sensible à l'indentation, le même code peut être téléchargé à partir du lien ci-dessous.

Exécution du programme WordCount

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
   -input input_dirs \ 
   -output output_dir \ 
   -mapper <path/mapper.py \ 
   -reducer <path/reducer.py

Où "\" est utilisé pour la continuation de ligne pour une lisibilité claire.

Par exemple,

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py

Comment fonctionne le streaming

Dans l'exemple ci-dessus, le mappeur et le réducteur sont des scripts Python qui lisent l'entrée à partir de l'entrée standard et émettent la sortie vers la sortie standard. L'utilitaire créera un travail de mappage / réduction, soumettra le travail à un cluster approprié et surveillera la progression du travail jusqu'à ce qu'il se termine.

Lorsqu'un script est spécifié pour les mappeurs, chaque tâche de mappeur lancera le script en tant que processus distinct lorsque le mappeur est initialisé. Lors de l'exécution de la tâche de mappage, elle convertit ses entrées en lignes et alimente les lignes en entrée standard (STDIN) du processus. Dans l'intervalle, le mappeur collecte les sorties orientées ligne à partir de la sortie standard (STDOUT) du processus et convertit chaque ligne en une paire clé / valeur, qui est collectée en tant que sortie du mappeur. Par défaut, le préfixe d'une ligne jusqu'au premier caractère de tabulation est la clé et le reste de la ligne (à l'exclusion du caractère de tabulation) sera la valeur. S'il n'y a pas de caractère de tabulation dans la ligne, alors la ligne entière est considérée comme la clé et la valeur est nulle. Cependant, cela peut être personnalisé, selon un besoin.

Lorsqu'un script est spécifié pour les réducteurs, chaque tâche de réduction lancera le script en tant que processus distinct, puis le réducteur est initialisé. Au fur et à mesure que la tâche de réduction s'exécute, elle convertit ses paires clé / valeurs d'entrée en lignes et alimente les lignes vers l'entrée standard (STDIN) du processus. Dans l'intervalle, le réducteur collecte les sorties orientées ligne à partir de la sortie standard (STDOUT) du processus, convertit chaque ligne en une paire clé / valeur, qui est collectée comme sortie du réducteur. Par défaut, le préfixe d'une ligne jusqu'au premier caractère de tabulation est la clé et le reste de la ligne (à l'exclusion du caractère de tabulation) est la valeur. Cependant, cela peut être personnalisé selon des exigences spécifiques.

Commandes importantes

Paramètres Options La description
-input répertoire / nom-fichier Obligatoire Emplacement d'entrée pour le mappeur.
-output nom-répertoire Obligatoire Emplacement de sortie pour le réducteur.
-mapper exécutable ou script ou JavaClassName Obligatoire Exécutable du mappeur.
-reducer exécutable ou script ou JavaClassName Obligatoire Exécutable du réducteur.
-file nom-fichier Optionnel Rend l'exécutable du mappeur, du réducteur ou du combineur disponible localement sur les nœuds de calcul.
-inputformat JavaClassName Optionnel La classe que vous fournissez doit renvoyer des paires clé / valeur de la classe Text. S'il n'est pas spécifié, TextInputFormat est utilisé par défaut.
-outputformat JavaClassName Optionnel La classe que vous fournissez doit prendre des paires clé / valeur de la classe Text. S'il n'est pas spécifié, TextOutputformat est utilisé par défaut.
-partitionneur JavaClassName Optionnel Classe qui détermine à quelle réduction une clé est envoyée.
-combiner streamingCommand ou JavaClassName Optionnel Exécutable du combineur pour la sortie de la carte.
-cmdenv nom = valeur Optionnel Passe la variable d'environnement aux commandes de streaming.
-lecteur d'entrée Optionnel Pour la rétrocompatibilité: spécifie une classe de lecteur d'enregistrement (au lieu d'une classe de format d'entrée).
-verbeux Optionnel Sortie verbeuse.
-lazyOutput Optionnel Crée la sortie paresseusement. Par exemple, si le format de sortie est basé sur FileOutputFormat, le fichier de sortie est créé uniquement lors du premier appel à output.collect (ou Context.write).
-numReduceTasks Optionnel Spécifie le nombre de réducteurs.
-mapdebug Optionnel Script à appeler lorsque la tâche de carte échoue.
-bug réduit Optionnel Script à appeler lorsque la tâche de réduction échoue.