MapReduce - Implémentation Hadoop

MapReduce est un framework utilisé pour écrire des applications afin de traiter d'énormes volumes de données sur de grands clusters de matériel de base de manière fiable. Ce chapitre vous présente le fonctionnement de MapReduce dans le framework Hadoop à l'aide de Java.

Algorithme MapReduce

Généralement, le paradigme MapReduce est basé sur l'envoi de programmes de réduction de carte aux ordinateurs où résident les données réelles.

  • Au cours d'une tâche MapReduce, Hadoop envoie les tâches de mappage et de réduction aux serveurs appropriés du cluster.

  • Le cadre gère tous les détails de la transmission de données, tels que l'émission de tâches, la vérification de l'achèvement des tâches et la copie de données autour du cluster entre les nœuds.

  • La plupart des calculs ont lieu sur les nœuds avec des données sur des disques locaux qui réduisent le trafic réseau.

  • Après avoir terminé une tâche donnée, le cluster collecte et réduit les données pour former un résultat approprié, et les renvoie au serveur Hadoop.

Entrées et sorties (perspective Java)

Le framework MapReduce fonctionne sur des paires clé-valeur, c'est-à-dire que le framework considère l'entrée du travail comme un ensemble de paires clé-valeur et produit un ensemble de paires clé-valeur en tant que sortie du travail, éventuellement de types différents.

Les classes de clé et de valeur doivent être sérialisables par le framework et par conséquent, il est nécessaire d'implémenter l'interface Writable. En outre, les classes de clés doivent implémenter l'interface WritableComparable pour faciliter le tri par le framework.

Le format d'entrée et de sortie d'un travail MapReduce se présente sous la forme de paires clé-valeur -

(Entrée) <k1, v1> -> carte -> <k2, v2> -> réduire -> <k3, v3> (sortie).

Contribution Production
Carte <k1, v1> liste (<k2, v2>)
Réduire <k2, liste (v2)> liste (<k3, v3>)

Implémentation de MapReduce

Le tableau suivant présente les données relatives à la consommation électrique d'une organisation. Le tableau comprend la consommation électrique mensuelle et la moyenne annuelle pour cinq années consécutives.

Jan fév Mar avr Mai Juin juil Août SEP oct nov déc Moy
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Nous devons écrire des applications pour traiter les données d'entrée dans le tableau donné afin de trouver l'année d'utilisation maximale, l'année d'utilisation minimale, etc. Cette tâche est facile pour les programmeurs avec un nombre limité d'enregistrements, car ils écriront simplement la logique pour produire la sortie requise et passeront les données à l'application écrite.

Élevons maintenant l'échelle des données d'entrée. Supposons que nous devions analyser la consommation électrique de toutes les industries à grande échelle d'un État particulier. Lorsque nous écrivons des applications pour traiter de telles données en masse,

  • Leur exécution prendra beaucoup de temps.

  • Il y aura un trafic réseau important lorsque nous transférons les données de la source vers le serveur réseau.

Pour résoudre ces problèmes, nous avons le framework MapReduce.

Des données d'entrée

Les données ci-dessus sont enregistrées sous sample.txtet donné en entrée. Le fichier d'entrée ressemble à celui ci-dessous.

1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Exemple de programme

Le programme suivant pour les exemples de données utilise le framework MapReduce.

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

Enregistrez le programme ci-dessus dans ProcessUnits.java. La compilation et l'exécution du programme sont données ci-dessous.

Compilation et exécution du programme ProcessUnits

Supposons que nous soyons dans le répertoire personnel de l'utilisateur Hadoop (par exemple / home / hadoop).

Suivez les étapes ci-dessous pour compiler et exécuter le programme ci-dessus.

Step 1 - Utilisez la commande suivante pour créer un répertoire pour stocker les classes java compilées.

$ mkdir units

Step 2- Téléchargez Hadoop-core-1.2.1.jar, qui est utilisé pour compiler et exécuter le programme MapReduce. Téléchargez le fichier jar sur mvnrepository.com . Supposons que le dossier de téléchargement soit / home / hadoop /.

Step 3 - Les commandes suivantes sont utilisées pour compiler le ProcessUnits.java programme et pour créer un fichier jar pour le programme.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 - La commande suivante est utilisée pour créer un répertoire d'entrée dans HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - La commande suivante permet de copier le fichier d'entrée nommé sample.txt dans le répertoire d'entrée de HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 - La commande suivante est utilisée pour vérifier les fichiers dans le répertoire d'entrée

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - La commande suivante est utilisée pour exécuter l'application Eleunit_max en prenant les fichiers d'entrée du répertoire d'entrée.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Attendez un moment jusqu'à ce que le fichier soit exécuté. Après l'exécution, la sortie contient un certain nombre de fractionnements d'entrée, de tâches de mappage, de tâches de réduction, etc.

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 - La commande suivante est utilisée pour vérifier les fichiers résultants dans le dossier de sortie.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - La commande suivante est utilisée pour voir la sortie dans Part-00000fichier. Ce fichier est généré par HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Voici la sortie générée par le programme MapReduce -

1981 34
1984 40
1985 45

Step 10 - La commande suivante est utilisée pour copier le dossier de sortie de HDFS vers le système de fichiers local.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop