Hadoop - MapReduce

MapReduce est un framework à l'aide duquel nous pouvons écrire des applications pour traiter d'énormes quantités de données, en parallèle, sur de grands clusters de matériel de base de manière fiable.

Qu'est-ce que MapReduce?

MapReduce est une technique de traitement et un modèle de programme pour le calcul distribué basé sur java. L'algorithme MapReduce contient deux tâches importantes, à savoir Map et Reduce. Map prend un ensemble de données et le convertit en un autre ensemble de données, où les éléments individuels sont décomposés en tuples (paires clé / valeur). Deuxièmement, réduisez la tâche, qui prend la sortie d'une carte comme entrée et combine ces tuples de données en un ensemble plus petit de tuples. Comme la séquence du nom MapReduce l'indique, la tâche de réduction est toujours effectuée après la tâche de carte.

Le principal avantage de MapReduce est qu'il est facile de faire évoluer le traitement des données sur plusieurs nœuds de calcul. Dans le modèle MapReduce, les primitives de traitement des données sont appelées mappeurs et réducteurs. Décomposer une application de traitement de données en mappeurs et réducteurs n'est parfois pas trivial. Mais, une fois que nous écrivons une application sous la forme MapReduce, la mise à l'échelle de l'application pour qu'elle s'exécute sur des centaines, des milliers, voire des dizaines de milliers de machines dans un cluster n'est qu'un changement de configuration. Cette évolutivité simple est ce qui a incité de nombreux programmeurs à utiliser le modèle MapReduce.

L'algorithme

  • En général, le paradigme MapReduce est basé sur l'envoi de l'ordinateur là où résident les données!

  • Le programme MapReduce s'exécute en trois étapes, à savoir l'étape de la carte, l'étape de lecture aléatoire et l'étape de réduction.

    • Map stage- Le travail de la carte ou du mappeur consiste à traiter les données d'entrée. Généralement, les données d'entrée sont sous forme de fichier ou de répertoire et sont stockées dans le système de fichiers Hadoop (HDFS). Le fichier d'entrée est transmis à la fonction mappeur ligne par ligne. Le mappeur traite les données et crée plusieurs petits morceaux de données.

    • Reduce stage - Cette étape est la combinaison des Shuffle la scène et le Reduceétape. Le travail du réducteur est de traiter les données provenant du mappeur. Après le traitement, il produit un nouvel ensemble de sortie, qui sera stocké dans le HDFS.

  • Au cours d'une tâche MapReduce, Hadoop envoie les tâches de mappage et de réduction aux serveurs appropriés du cluster.

  • Le cadre gère tous les détails de la transmission des données, tels que l'émission de tâches, la vérification de l'achèvement des tâches et la copie de données autour du cluster entre les nœuds.

  • La plupart des calculs ont lieu sur des nœuds avec des données sur des disques locaux, ce qui réduit le trafic réseau.

  • Une fois les tâches données terminées, le cluster collecte et réduit les données pour former un résultat approprié, et les renvoie au serveur Hadoop.

Entrées et sorties (perspective Java)

Le framework MapReduce fonctionne sur des paires <clé, valeur>, c'est-à-dire que le framework voit l'entrée du travail comme un ensemble de paires <clé, valeur> et produit un ensemble de paires <clé, valeur> en tant que sortie du travail , peut-être de types différents.

La clé et les classes de valeur doivent être sérialisées par le framework et doivent donc implémenter l'interface Writable. De plus, les classes clés doivent implémenter l'interface Writable-Comparable pour faciliter le tri par le framework. Types d'entrée et de sortie d'unMapReduce job - (Entrée) <k1, v1> → carte → <k2, v2> → réduire → <k3, v3> (Sortie).

Contribution Production
Carte <k1, v1> liste (<k2, v2>)
Réduire <k2, liste (v2)> liste (<k3, v3>)

Terminologie

  • PayLoad - Les applications implémentent les fonctions Map et Réduire, et constituent le cœur du travail.

  • Mapper - Le mappeur mappe les paires clé / valeur d'entrée à un ensemble de paire clé / valeur intermédiaire.

  • NamedNode - Nœud qui gère le système de fichiers distribués Hadoop (HDFS).

  • DataNode - Nœud où les données sont présentées à l'avance avant tout traitement.

  • MasterNode - Nœud sur lequel JobTracker s'exécute et qui accepte les demandes de travaux des clients.

  • SlaveNode - Nœud où s'exécute le programme Map and Reduce.

  • JobTracker - Planifie les travaux et suit les travaux attribués au suivi des tâches.

  • Task Tracker - Suit la tâche et signale l'état à JobTracker.

  • Job - Un programme est une exécution d'un mappeur et d'un réducteur sur un ensemble de données.

  • Task - Une exécution d'un Mapper ou d'un Reducer sur une tranche de données.

  • Task Attempt - Une instance particulière d'une tentative d'exécution d'une tâche sur un SlaveNode.

Exemple de scénario

Vous trouverez ci-dessous les données concernant la consommation électrique d'une organisation. Il contient la consommation électrique mensuelle et la moyenne annuelle pour différentes années.

Jan fév Mar avr Mai Juin juil Août SEP oct nov déc Moy
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Si les données ci-dessus sont données en entrée, nous devons écrire des applications pour les traiter et produire des résultats tels que la recherche de l'année d'utilisation maximale, de l'année d'utilisation minimale, etc. Il s'agit d'un passage pour les programmeurs avec un nombre fini d'enregistrements. Ils écriront simplement la logique pour produire la sortie requise et passeront les données à l'application écrite.

Mais, pensez aux données représentant la consommation électrique de toutes les industries à grande échelle d'un État particulier, depuis sa formation.

Lorsque nous écrivons des applications pour traiter de telles données en masse,

  • Leur exécution prendra beaucoup de temps.

  • Il y aura un trafic réseau important lorsque nous transférons les données de la source au serveur réseau, etc.

Pour résoudre ces problèmes, nous avons le framework MapReduce.

Des données d'entrée

Les données ci-dessus sont enregistrées sous sample.txtet donné en entrée. Le fichier d'entrée ressemble à celui ci-dessous.

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45

Exemple de programme

Ci-dessous, le programme des exemples de données à l'aide du framework MapReduce.

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits {
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   {
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      
      Reporter reporter) throws IOException { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   }
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
         int maxavg = 30; 
         int val = Integer.MIN_VALUE; 
            
         while (values.hasNext()) { 
            if((val = values.next().get())>maxavg) { 
               output.collect(key, new IntWritable(val)); 
            } 
         }
      } 
   }

   //Main function 
   public static void main(String args[])throws Exception { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
}

Enregistrez le programme ci-dessus sous ProcessUnits.java. La compilation et l'exécution du programme sont expliquées ci-dessous.

Compilation et exécution du programme d'unités de processus

Supposons que nous soyons dans le répertoire personnel d'un utilisateur Hadoop (par exemple / home / hadoop).

Suivez les étapes ci-dessous pour compiler et exécuter le programme ci-dessus.

Étape 1

La commande suivante consiste à créer un répertoire pour stocker les classes java compilées.

$ mkdir units

Étape 2

Télécharger Hadoop-core-1.2.1.jar,qui est utilisé pour compiler et exécuter le programme MapReduce. Visitez le lien suivant mvnrepository.com pour télécharger le fichier jar. Supposons que le dossier téléchargé est/home/hadoop/.

Étape 3

Les commandes suivantes sont utilisées pour compiler le ProcessUnits.java programme et en créant un pot pour le programme.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ .

Étape 4

La commande suivante est utilisée pour créer un répertoire d'entrée dans HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Étape 5

La commande suivante est utilisée pour copier le fichier d'entrée nommé sample.txtdans le répertoire d'entrée de HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Étape 6

La commande suivante est utilisée pour vérifier les fichiers dans le répertoire d'entrée.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Étape 7

La commande suivante est utilisée pour exécuter l'application Eleunit_max en prenant les fichiers d'entrée dans le répertoire d'entrée.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Attendez un moment jusqu'à ce que le fichier soit exécuté. Après l'exécution, comme indiqué ci-dessous, la sortie contiendra le nombre de fractionnements d'entrée, le nombre de tâches Map, le nombre de tâches de réduction, etc.

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
   File System Counters 
 
FILE: Number of bytes read = 61 
FILE: Number of bytes written = 279400 
FILE: Number of read operations = 0 
FILE: Number of large read operations = 0   
FILE: Number of write operations = 0 
HDFS: Number of bytes read = 546 
HDFS: Number of bytes written = 40 
HDFS: Number of read operations = 9 
HDFS: Number of large read operations = 0 
HDFS: Number of write operations = 2 Job Counters 


   Launched map tasks = 2  
   Launched reduce tasks = 1 
   Data-local map tasks = 2  
   Total time spent by all maps in occupied slots (ms) = 146137 
   Total time spent by all reduces in occupied slots (ms) = 441   
   Total time spent by all map tasks (ms) = 14613 
   Total time spent by all reduce tasks (ms) = 44120 
   Total vcore-seconds taken by all map tasks = 146137 
   Total vcore-seconds taken by all reduce tasks = 44120 
   Total megabyte-seconds taken by all map tasks = 149644288 
   Total megabyte-seconds taken by all reduce tasks = 45178880 
   
Map-Reduce Framework 
 
   Map input records = 5  
   Map output records = 5   
   Map output bytes = 45  
   Map output materialized bytes = 67  
   Input split bytes = 208 
   Combine input records = 5  
   Combine output records = 5 
   Reduce input groups = 5  
   Reduce shuffle bytes = 6  
   Reduce input records = 5  
   Reduce output records = 5  
   Spilled Records = 10  
   Shuffled Maps  = 2  
   Failed Shuffles = 0  
   Merged Map outputs = 2  
   GC time elapsed (ms) = 948  
   CPU time spent (ms) = 5160  
   Physical memory (bytes) snapshot = 47749120  
   Virtual memory (bytes) snapshot = 2899349504  
   Total committed heap usage (bytes) = 277684224
     
File Output Format Counters 
 
   Bytes Written = 40

Étape 8

La commande suivante est utilisée pour vérifier les fichiers résultants dans le dossier de sortie.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Étape 9

La commande suivante est utilisée pour voir la sortie dans Part-00000 fichier. Ce fichier est généré par HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Voici la sortie générée par le programme MapReduce.

1981    34 
1984    40 
1985    45

Étape 10

La commande suivante est utilisée pour copier le dossier de sortie de HDFS vers le système de fichiers local pour analyse.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop

Commandes importantes

Toutes les commandes Hadoop sont appelées par le $HADOOP_HOME/bin/hadoopcommander. L'exécution du script Hadoop sans aucun argument imprime la description de toutes les commandes.

Usage - hadoop [--config confdir] COMMANDE

Le tableau suivant répertorie les options disponibles et leur description.

N ° Sr. Option et description
1

namenode -format

Formate le système de fichiers DFS.

2

secondarynamenode

Exécute le namenode secondaire DFS.

3

namenode

Exécute le namenode DFS.

4

datanode

Exécute un datanode DFS.

5

dfsadmin

Exécute un client administrateur DFS.

6

mradmin

Exécute un client d'administration Map-Reduce.

sept

fsck

Exécute un utilitaire de vérification du système de fichiers DFS.

8

fs

Exécute un client utilisateur de système de fichiers générique.

9

balancer

Exécute un utilitaire d'équilibrage de cluster.

dix

oiv

Applique la visionneuse fsimage hors ligne à une fsimage.

11

fetchdt

Récupère un jeton de délégation du NameNode.

12

jobtracker

Exécute le nœud de suivi des travaux MapReduce.

13

pipes

Exécute une tâche Pipes.

14

tasktracker

Exécute un nœud de suivi des tâches MapReduce.

15

historyserver

Exécute les serveurs d'historique des travaux en tant que démon autonome.

16

job

Manipule les travaux MapReduce.

17

queue

Obtient des informations sur JobQueues.

18

version

Imprime la version.

19

jar <jar>

Exécute un fichier jar.

20

distcp <srcurl> <desturl>

Copie le fichier ou les répertoires de manière récursive.

21

distcp2 <srcurl> <desturl>

DistCp version 2.

22

archive -archiveName NAME -p <parent path> <src>* <dest>

Crée une archive hadoop.

23

classpath

Imprime le chemin de classe nécessaire pour obtenir le fichier jar Hadoop et les bibliothèques requises.

24

daemonlog

Obtenir / Définir le niveau de journalisation pour chaque démon

Comment interagir avec les tâches MapReduce

Utilisation - tâche hadoop [GENERIC_OPTIONS]

Voici les options génériques disponibles dans un travail Hadoop.

N ° Sr. GENERIC_OPTION et description
1

-submit <job-file>

Soumet le travail.

2

-status <job-id>

Imprime la carte et réduit le pourcentage d'achèvement et tous les compteurs de travaux.

3

-counter <job-id> <group-name> <countername>

Imprime la valeur du compteur.

4

-kill <job-id>

Tue le travail.

5

-events <job-id> <fromevent-#> <#-of-events>

Imprime les détails des événements reçus par Jobtracker pour la plage donnée.

6

-history [all] <jobOutputDir> - history < jobOutputDir>

Imprime les détails du travail, les détails des conseils échoués et supprimés. Plus de détails sur le travail, tels que les tâches réussies et les tentatives de tâches effectuées pour chaque tâche, peuvent être affichés en spécifiant l'option [tout].

sept

-list[all]

Affiche tous les travaux. -list affiche uniquement les travaux qui ne sont pas encore terminés.

8

-kill-task <task-id>

Tue la tâche. Les tâches supprimées ne sont PAS comptabilisées dans les tentatives infructueuses.

9

-fail-task <task-id>

Échoue la tâche. Les tâches ayant échoué sont comptées par rapport aux tentatives infructueuses.

dix

-set-priority <job-id> <priority>

Modifie la priorité du travail. Les valeurs de priorité autorisées sont VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

Pour voir l'état du travail

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004

Pour voir l'historique de la tâche rép_sortie

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output

Pour tuer le travail

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004