Hadoop - Streaming
Le streaming Hadoop est un utilitaire fourni avec la distribution Hadoop. Cet utilitaire vous permet de créer et d'exécuter des travaux de mappage / réduction avec n'importe quel exécutable ou script en tant que mappeur et / ou réducteur.
Exemple d'utilisation de Python
Pour le streaming Hadoop, nous examinons le problème du nombre de mots. Tout travail dans Hadoop doit comporter deux phases: le mappeur et le réducteur. Nous avons écrit des codes pour le mappeur et le réducteur en script python pour l'exécuter sous Hadoop. On peut également écrire la même chose en Perl et Ruby.
Code de phase du mappeur
!/usr/bin/python
import sys
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Break the line into words
words = myline.split()
# Iterate the words list
for myword in words:
# Write the results to standard output
print '%s\t%s' % (myword, 1)
Assurez-vous que ce fichier a l'autorisation d'exécution (chmod + x / home / expert / hadoop-1.2.1 / mapper.py).
Code de phase du réducteur
#!/usr/bin/python
from operator import itemgetter
import sys
current_word = ""
current_count = 0
word = ""
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Split the input we got from mapper.py word,
count = myline.split('\t', 1)
# Convert count variable to integer
try:
count = int(count)
except ValueError:
# Count was not a number, so silently ignore this line continue
if current_word == word:
current_count += count
else:
if current_word:
# Write result to standard output print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# Do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Enregistrez les codes du mappeur et du réducteur dans mapper.py et reducer.py dans le répertoire de base Hadoop. Assurez-vous que ces fichiers ont l'autorisation d'exécution (chmod + x mapper.py et chmod + x reducer.py). Comme python est sensible à l'indentation, le même code peut être téléchargé à partir du lien ci-dessous.
Exécution du programme WordCount
$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
-input input_dirs \
-output output_dir \
-mapper <path/mapper.py \
-reducer <path/reducer.py
Où "\" est utilisé pour la continuation de ligne pour une lisibilité claire.
Par exemple,
./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py
Comment fonctionne le streaming
Dans l'exemple ci-dessus, le mappeur et le réducteur sont des scripts Python qui lisent l'entrée à partir de l'entrée standard et émettent la sortie vers la sortie standard. L'utilitaire créera un travail de mappage / réduction, soumettra le travail à un cluster approprié et surveillera la progression du travail jusqu'à ce qu'il se termine.
Lorsqu'un script est spécifié pour les mappeurs, chaque tâche de mappeur lancera le script en tant que processus distinct lorsque le mappeur est initialisé. Lors de l'exécution de la tâche de mappage, elle convertit ses entrées en lignes et alimente les lignes en entrée standard (STDIN) du processus. Dans l'intervalle, le mappeur collecte les sorties orientées ligne à partir de la sortie standard (STDOUT) du processus et convertit chaque ligne en une paire clé / valeur, qui est collectée en tant que sortie du mappeur. Par défaut, le préfixe d'une ligne jusqu'au premier caractère de tabulation est la clé et le reste de la ligne (à l'exclusion du caractère de tabulation) sera la valeur. S'il n'y a pas de caractère de tabulation dans la ligne, alors la ligne entière est considérée comme la clé et la valeur est nulle. Cependant, cela peut être personnalisé, selon un besoin.
Lorsqu'un script est spécifié pour les réducteurs, chaque tâche de réduction lancera le script en tant que processus distinct, puis le réducteur est initialisé. Au fur et à mesure que la tâche de réduction s'exécute, elle convertit ses paires clé / valeurs d'entrée en lignes et alimente les lignes vers l'entrée standard (STDIN) du processus. Dans l'intervalle, le réducteur collecte les sorties orientées ligne à partir de la sortie standard (STDOUT) du processus, convertit chaque ligne en une paire clé / valeur, qui est collectée comme sortie du réducteur. Par défaut, le préfixe d'une ligne jusqu'au premier caractère de tabulation est la clé et le reste de la ligne (à l'exclusion du caractère de tabulation) est la valeur. Cependant, cela peut être personnalisé selon des exigences spécifiques.
Commandes importantes
Paramètres | Options | La description |
---|---|---|
-input répertoire / nom-fichier | Obligatoire | Emplacement d'entrée pour le mappeur. |
-output nom-répertoire | Obligatoire | Emplacement de sortie pour le réducteur. |
-mapper exécutable ou script ou JavaClassName | Obligatoire | Exécutable du mappeur. |
-reducer exécutable ou script ou JavaClassName | Obligatoire | Exécutable du réducteur. |
-file nom-fichier | Optionnel | Rend l'exécutable du mappeur, du réducteur ou du combineur disponible localement sur les nœuds de calcul. |
-inputformat JavaClassName | Optionnel | La classe que vous fournissez doit renvoyer des paires clé / valeur de la classe Text. S'il n'est pas spécifié, TextInputFormat est utilisé par défaut. |
-outputformat JavaClassName | Optionnel | La classe que vous fournissez doit prendre des paires clé / valeur de la classe Text. S'il n'est pas spécifié, TextOutputformat est utilisé par défaut. |
-partitionneur JavaClassName | Optionnel | Classe qui détermine à quelle réduction une clé est envoyée. |
-combiner streamingCommand ou JavaClassName | Optionnel | Exécutable du combineur pour la sortie de la carte. |
-cmdenv nom = valeur | Optionnel | Passe la variable d'environnement aux commandes de streaming. |
-lecteur d'entrée | Optionnel | Pour la rétrocompatibilité: spécifie une classe de lecteur d'enregistrement (au lieu d'une classe de format d'entrée). |
-verbeux | Optionnel | Sortie verbeuse. |
-lazyOutput | Optionnel | Crée la sortie paresseusement. Par exemple, si le format de sortie est basé sur FileOutputFormat, le fichier de sortie est créé uniquement lors du premier appel à output.collect (ou Context.write). |
-numReduceTasks | Optionnel | Spécifie le nombre de réducteurs. |
-mapdebug | Optionnel | Script à appeler lorsque la tâche de carte échoue. |
-bug réduit | Optionnel | Script à appeler lorsque la tâche de réduction échoue. |