HCatalog - Lecteur Writer

HCatalog contient une API de transfert de données pour l'entrée et la sortie parallèles sans utiliser MapReduce. Cette API utilise une abstraction de stockage de base de tables et de lignes pour lire les données du cluster Hadoop et y écrire des données.

L'API de transfert de données contient principalement trois classes; ce sont -

  • HCatReader - Lit les données d'un cluster Hadoop.

  • HCatWriter - Écrit des données dans un cluster Hadoop.

  • DataTransferFactory - Génère des instances de lecteur et d'écrivain.

Cette API convient à la configuration du nœud maître-esclave. Laissez-nous discuter plus surHCatReader et HCatWriter.

HCatReader

HCatReader est une classe abstraite interne à HCatalog et résume les complexités du système sous-jacent d'où les enregistrements doivent être récupérés.

N ° Sr. Nom et description de la méthode
1

Public abstract ReaderContext prepareRead() throws HCatException

Cela doit être appelé au nœud maître pour obtenir ReaderContext qui doit ensuite être sérialisé et envoyé aux nœuds esclaves.

2

Public abstract Iterator <HCatRecorder> read() throws HCaException

Cela doit être appelé au niveau des nœuds esclaves pour lire HCatRecords.

3

Public Configuration getConf()

Il renverra l'objet de classe de configuration.

La classe HCatReader est utilisée pour lire les données de HDFS. La lecture est un processus en deux étapes dans lequel la première étape se produit sur le nœud maître d'un système externe. La deuxième étape est réalisée en parallèle sur plusieurs nœuds esclaves.

Les lectures sont effectuées sur un ReadEntity. Avant de commencer à lire, vous devez définir un ReadEntity à partir duquel lire. Cela peut être fait parReadEntity.Builder. Vous pouvez spécifier un nom de base de données, un nom de table, une partition et une chaîne de filtre. Par exemple -

ReadEntity.Builder builder = new ReadEntity.Builder();
ReadEntity entity = builder.withDatabase("mydb").withTable("mytbl").build(); 10.

L'extrait de code ci-dessus définit un objet ReadEntity («entité»), comprenant une table nommée mytbl dans une base de données nommée mydb, qui peut être utilisé pour lire toutes les lignes de ce tableau. Notez que ce tableau doit exister dans HCatalog avant le début de cette opération.

Après avoir défini un ReadEntity, vous obtenez une instance de HCatReader à l'aide de ReadEntity et de la configuration du cluster -

HCatReader reader = DataTransferFactory.getHCatReader(entity, config);

L'étape suivante consiste à obtenir un ReaderContext du lecteur comme suit -

ReaderContext cntxt = reader.prepareRead();

HCatWriter

Cette abstraction est interne à HCatalog. Ceci permet de faciliter l'écriture dans HCatalog à partir de systèmes externes. N'essayez pas de l'instancier directement. Au lieu de cela, utilisez DataTransferFactory.

N ° Sr. Nom et description de la méthode
1

Public abstract WriterContext prepareRead() throws HCatException

Le système externe doit invoquer cette méthode exactement une fois à partir d'un nœud maître. Il renvoie unWriterContext. Cela doit être sérialisé et envoyé aux nœuds esclaves pour construireHCatWriter Là.

2

Public abstract void write(Iterator<HCatRecord> recordItr) throws HCaException

Cette méthode doit être utilisée sur les nœuds esclaves pour effectuer des écritures. RecordItr est un objet itérateur qui contient la collection d'enregistrements à écrire dans HCatalog.

3

Public abstract void abort(WriterContext cntxt) throws HCatException

Cette méthode doit être appelée au niveau du nœud maître. Le but principal de cette méthode est d'effectuer des nettoyages en cas d'échec.

4

public abstract void commit(WriterContext cntxt) throws HCatException

Cette méthode doit être appelée au niveau du nœud maître. Le but de cette méthode est de valider les métadonnées.

Semblable à la lecture, l'écriture est également un processus en deux étapes dans lequel la première étape se produit sur le nœud maître. Par la suite, la deuxième étape se déroule en parallèle sur les nœuds esclaves.

Les écritures sont effectuées sur un WriteEntity qui peut être construit d'une manière similaire aux lectures -

WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withDatabase("mydb").withTable("mytbl").build();

Le code ci-dessus crée un objet WriteEntity entityqui peut être utilisé pour écrire dans une table nomméemytbl dans la base de données mydb.

Après avoir créé un WriteEntity, l'étape suivante consiste à obtenir un WriterContext -

HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();

Toutes les étapes ci-dessus se produisent sur le nœud maître. Le nœud maître sérialise ensuite l'objet WriterContext et le met à disposition de tous les esclaves.

Sur les nœuds esclaves, vous devez obtenir un HCatWriter en utilisant WriterContext comme suit -

HCatWriter writer = DataTransferFactory.getHCatWriter(context);

Puis le writerprend un itérateur comme argument de la writeméthode -

writer.write(hCatRecordItr);

le writer puis appelle getNext() sur cet itérateur dans une boucle et écrit tous les enregistrements attachés à l'itérateur.

le TestReaderWriter.javaLe fichier est utilisé pour tester les classes HCatreader et HCatWriter. Le programme suivant montre comment utiliser HCatReader et l'API HCatWriter pour lire des données à partir d'un fichier source et ensuite les écrire dans un fichier de destination.

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hive.HCatalog.common.HCatException;
import org.apache.hive.HCatalog.data.transfer.DataTransferFactory;
import org.apache.hive.HCatalog.data.transfer.HCatReader;
import org.apache.hive.HCatalog.data.transfer.HCatWriter;
import org.apache.hive.HCatalog.data.transfer.ReadEntity;
import org.apache.hive.HCatalog.data.transfer.ReaderContext;
import org.apache.hive.HCatalog.data.transfer.WriteEntity;
import org.apache.hive.HCatalog.data.transfer.WriterContext;
import org.apache.hive.HCatalog.mapreduce.HCatBaseTest;

import org.junit.Assert;
import org.junit.Test;

public class TestReaderWriter extends HCatBaseTest {
   @Test
   public void test() throws MetaException, CommandNeedRetryException,
      IOException, ClassNotFoundException {
		
      driver.run("drop table mytbl");
      driver.run("create table mytbl (a string, b int)");
		
      Iterator<Entry<String, String>> itr = hiveConf.iterator();
      Map<String, String> map = new HashMap<String, String>();
		
      while (itr.hasNext()) {
         Entry<String, String> kv = itr.next();
         map.put(kv.getKey(), kv.getValue());
      }
		
      WriterContext cntxt = runsInMaster(map);
      File writeCntxtFile = File.createTempFile("hcat-write", "temp");
      writeCntxtFile.deleteOnExit();
		
      // Serialize context.
      ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
      oos.writeObject(cntxt);
      oos.flush();
      oos.close();
		
      // Now, deserialize it.
      ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
      cntxt = (WriterContext) ois.readObject();
      ois.close();
      runsInSlave(cntxt);
      commit(map, true, cntxt);
		
      ReaderContext readCntxt = runsInMaster(map, false);
      File readCntxtFile = File.createTempFile("hcat-read", "temp");
      readCntxtFile.deleteOnExit();
      oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
      oos.writeObject(readCntxt);
      oos.flush();
      oos.close();
		
      ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
      readCntxt = (ReaderContext) ois.readObject();
      ois.close();
		
      for (int i = 0; i < readCntxt.numSplits(); i++) {
         runsInSlave(readCntxt, i);
      }
   }
	
   private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
		
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
      WriterContext info = writer.prepareWrite();
      return info;
   }
	
   private ReaderContext runsInMaster(Map<String, String> config, 
      boolean bogus) throws HCatException {
      ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
      HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
      ReaderContext cntxt = reader.prepareRead();
      return cntxt;
   }
	
   private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {
      HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
      Iterator<HCatRecord> itr = reader.read();
      int i = 1;
		
      while (itr.hasNext()) {
         HCatRecord read = itr.next();
         HCatRecord written = getRecord(i++);
			
         // Argh, HCatRecord doesnt implement equals()
         Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
         written.get(0).equals(read.get(0)));
			
         Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
         written.get(1).equals(read.get(1)));
			
         Assert.assertEquals(2, read.size());
      }
		
      //Assert.assertFalse(itr.hasNext());
   }
	
   private void runsInSlave(WriterContext context) throws HCatException {
      HCatWriter writer = DataTransferFactory.getHCatWriter(context);
      writer.write(new HCatRecordItr());
   }
	
   private void commit(Map<String, String> config, boolean status,
      WriterContext context) throws IOException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
		
      if (status) {
         writer.commit(context);
      } else {
         writer.abort(context);
      }
   }
	
   private static HCatRecord getRecord(int i) {
      List<Object> list = new ArrayList<Object>(2);
      list.add("Row #: " + i);
      list.add(i);
      return new DefaultHCatRecord(list);
   }
	
   private static class HCatRecordItr implements Iterator<HCatRecord> {
      int i = 0;
		
      @Override
      public boolean hasNext() {
         return i++ < 100 ? true : false;
      }
		
      @Override
      public HCatRecord next() {
         return getRecord(i);
      }
		
      @Override
      public void remove() {
         throw new RuntimeException();
      }
   }
}

Le programme ci-dessus lit les données du HDFS sous forme d'enregistrements et écrit les données d'enregistrement dans mytable