MapReduce - Partitionneur

Un partitionneur fonctionne comme une condition dans le traitement d'un ensemble de données d'entrée. La phase de partition a lieu après la phase de carte et avant la phase de réduction.

Le nombre de partitionneurs est égal au nombre de réducteurs. Cela signifie qu'un partitionneur divisera les données en fonction du nombre de réducteurs. Par conséquent, les données transmises à partir d'un seul partitionneur sont traitées par un seul réducteur.

Partitionneur

Un partitionneur partitionne les paires clé-valeur des sorties Map intermédiaires. Il partitionne les données en utilisant une condition définie par l'utilisateur, qui fonctionne comme une fonction de hachage. Le nombre total de partitions est identique au nombre de tâches du réducteur pour le travail. Prenons un exemple pour comprendre comment fonctionne le partitionneur.

Implémentation de MapReduce Partitioner

Pour des raisons de commodité, supposons que nous ayons une petite table appelée Employee avec les données suivantes. Nous utiliserons ces exemples de données comme ensemble de données d'entrée pour démontrer le fonctionnement du partitionneur.

Id Nom Âge Le sexe Un salaire
1201 gopal 45 Masculin 50 000
1202 manisha 40 Femme 50 000
1203 Khalil 34 Masculin 30 000
1204 prasanthe 30 Masculin 30 000
1205 Kiran 20 Masculin 40 000
1206 laxmi 25 Femme 35 000
1207 bhavya 20 Femme 15 000
1208 reshma 19 Femme 15 000
1209 kranthi 22 Masculin 22 000
1210 Satish 24 Masculin 25 000
1211 Krishna 25 Masculin 25 000
1212 Arshad 28 Masculin 20 000
1213 Lavanya 18 Femme 8 000

Nous devons rédiger une application pour traiter l'ensemble de données d'entrée afin de trouver le salarié le plus élevé par sexe dans différentes tranches d'âge (par exemple, moins de 20 ans, entre 21 et 30 ans, plus de 30 ans).

Des données d'entrée

Les données ci-dessus sont enregistrées sous input.txt dans le répertoire «/ home / hadoop / hadoopPartitioner» et donné en entrée.

1201 gopal 45 Masculin 50000
1202 manisha 40 Femme 51 000
1203 khaleel 34 Masculin 30000
1204 prasanthe 30 Masculin 31 000
1205 Kiran 20 Masculin 40000
1206 laxmi 25 Femme 35 000
1207 bhavya 20 Femme 15 000
1208 reshma 19 Femme 14 000
1209 kranthi 22 Masculin 22 000
1210 Satish 24 Masculin 25 000
1211 Krishna 25 Masculin 26 000
1212 Arshad 28 Masculin 20000
1213 Lavanya 18 Femme 8 000

Sur la base de l'entrée donnée, voici l'explication algorithmique du programme.

Mapper les tâches

La tâche de mappage accepte les paires clé-valeur comme entrée tandis que nous avons les données texte dans un fichier texte. L'entrée pour cette tâche cartographique est la suivante -

Input - La clé serait un modèle tel que «toute clé spéciale + nom de fichier + numéro de ligne» (exemple: clé = @ entrée1) et la valeur serait les données de cette ligne (exemple: valeur = 1201 \ t gopal \ t 45 \ t Homme \ t 50000).

Method - Le fonctionnement de cette tâche cartographique est le suivant -

  • Lis le value (données d'enregistrement), qui vient comme valeur d'entrée de la liste d'arguments dans une chaîne.

  • À l'aide de la fonction de fractionnement, séparez le sexe et stockez-le dans une variable chaîne.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Envoyez les informations de genre et les données d'enregistrement value comme paire clé-valeur de sortie de la tâche de mappage vers le partition task.

context.write(new Text(gender), new Text(value));
  • Répétez toutes les étapes ci-dessus pour tous les enregistrements du fichier texte.

Output - Vous obtiendrez les données de genre et la valeur des données d'enregistrement sous forme de paires clé-valeur.

Tâche de partitionnement

La tâche de partitionnement accepte les paires clé-valeur de la tâche de mappage comme entrée. La partition implique la division des données en segments. Selon les critères conditionnels donnés des partitions, les données appariées clé-valeur d'entrée peuvent être divisées en trois parties en fonction des critères d'âge.

Input - L'ensemble des données dans une collection de paires clé-valeur.

key = valeur du champ Sexe dans l'enregistrement.

value = Valeur entière des données d'enregistrement de ce sexe.

Method - Le processus de logique de partition se déroule comme suit.

  • Lisez la valeur du champ d'âge à partir de la paire clé-valeur d'entrée.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Vérifiez la valeur de l'âge avec les conditions suivantes.

    • Âge inférieur ou égal à 20 ans
    • Âge supérieur à 20 ans et inférieur ou égal à 30 ans.
    • Âge supérieur à 30 ans.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- L'ensemble des données des paires clé-valeur est segmenté en trois collections de paires clé-valeur. Le Réducteur travaille individuellement sur chaque collection.

Réduisez les tâches

Le nombre de tâches de partitionnement est égal au nombre de tâches de réduction. Ici, nous avons trois tâches de partitionnement et nous avons donc trois tâches de réduction à exécuter.

Input - Le réducteur s'exécutera trois fois avec une collection différente de paires clé-valeur.

clé = valeur du champ de sexe dans l'enregistrement.

valeur = l'ensemble des données d'enregistrement de ce sexe.

Method - La logique suivante sera appliquée à chaque collection.

  • Lisez la valeur du champ Salaire de chaque enregistrement.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Vérifiez le salaire avec la variable max. Si str [4] est le salaire maximum, affectez str [4] à max, sinon sautez l'étape.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Répétez les étapes 1 et 2 pour chaque remise de clés (les hommes et les femmes sont les clés). Après avoir exécuté ces trois étapes, vous trouverez un salaire maximum de la remise des clés Homme et un salaire maximum de la remise des clés Femme.

context.write(new Text(key), new IntWritable(max));

Output- Enfin, vous obtiendrez un ensemble de données de paires clé-valeur dans trois collections de groupes d'âge différents. Il contient respectivement le salaire maximum de la collection Homme et le salaire maximum de la collection Femme dans chaque tranche d'âge.

Après avoir exécuté les tâches de mappage, de partitionnement et de réduction, les trois collections de données de paires clé-valeur sont stockées dans trois fichiers différents en tant que sortie.

Les trois tâches sont traitées comme des tâches MapReduce. Les exigences et spécifications suivantes de ces travaux doivent être spécifiées dans les configurations -

  • Nom du travail
  • Formats d'entrée et de sortie des clés et des valeurs
  • Classes individuelles pour les tâches de mappage, de réduction et de partitionnement
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Exemple de programme

Le programme suivant montre comment implémenter les partitionneurs pour les critères donnés dans un programme MapReduce.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Enregistrez le code ci-dessus sous PartitionerExample.javadans «/ home / hadoop / hadoopPartitioner». La compilation et l'exécution du programme sont données ci-dessous.

Compilation et exécution

Supposons que nous soyons dans le répertoire personnel de l'utilisateur Hadoop (par exemple, / home / hadoop).

Suivez les étapes ci-dessous pour compiler et exécuter le programme ci-dessus.

Step 1- Téléchargez Hadoop-core-1.2.1.jar, qui est utilisé pour compiler et exécuter le programme MapReduce. Vous pouvez télécharger le fichier jar sur mvnrepository.com .

Supposons que le dossier téléchargé soit "/ home / hadoop / hadoopPartitioner"

Step 2 - Les commandes suivantes sont utilisées pour compiler le programme PartitionerExample.java et créer un bocal pour le programme.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 - Utilisez la commande suivante pour créer un répertoire d'entrée dans HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - Utilisez la commande suivante pour copier le fichier d'entrée nommé input.txt dans le répertoire d'entrée de HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - Utilisez la commande suivante pour vérifier les fichiers dans le répertoire d'entrée.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - Utilisez la commande suivante pour exécuter l'application Top Salaire en prenant les fichiers d'entrée dans le répertoire d'entrée.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Attendez un moment jusqu'à ce que le fichier soit exécuté. Après l'exécution, la sortie contient un certain nombre de fractionnements d'entrée, de tâches de mappage et de tâches de réduction.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - Utilisez la commande suivante pour vérifier les fichiers résultants dans le dossier de sortie.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Vous trouverez la sortie dans trois fichiers car vous utilisez trois partitionneurs et trois réducteurs dans votre programme.

Step 8 - Utilisez la commande suivante pour voir la sortie dans Part-00000fichier. Ce fichier est généré par HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Utilisez la commande suivante pour voir la sortie dans Part-00001 fichier.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Utilisez la commande suivante pour voir la sortie dans Part-00002 fichier.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000