Apache Flink - Création d'une application Flink

Dans ce chapitre, nous allons apprendre à créer une application Flink.

Ouvrez Eclipse IDE, cliquez sur Nouveau projet et sélectionnez Projet Java.

Donnez le nom du projet et cliquez sur Terminer.

Maintenant, cliquez sur Terminer comme indiqué dans la capture d'écran suivante.

Maintenant, faites un clic droit sur src et allez dans Nouvelle >> Classe.

Donnez un nom de classe et cliquez sur Terminer.

Copiez et collez le code ci-dessous dans l'éditeur.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {

   // *************************************************************************
   // PROGRAM
   // *************************************************************************
   public static void main(String[] args) throws Exception {
      final ParameterTool params = ParameterTool.fromArgs(args);
      // set up the execution environment
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      // make parameters available in the web interface
      env.getConfig().setGlobalJobParameters(params);
      // get input data
      DataSet<String> text = env.readTextFile(params.get("input"));
      DataSet<Tuple2<String, Integer>> counts =
      // split up the lines in pairs (2-tuples) containing: (word,1)
      text.flatMap(new Tokenizer())
      // group by the tuple field "0" and sum up tuple field "1"
      .groupBy(0)
      .sum(1);
      // emit result
      if (params.has("output")) {
         counts.writeAsCsv(params.get("output"), "\n", " ");
         // execute program
         env.execute("WordCount Example");
      } else {
         System.out.println("Printing result to stdout. Use --output to specify output path.");
         counts.print();
      }
   }
   
   // *************************************************************************
   // USER FUNCTIONS
   // *************************************************************************
   public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
      public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
         // normalize and split the line
         String[] tokens = value.toLowerCase().split("\\W+");
         // emit the pairs
         for (String token : tokens) {
            if (token.length() > 0) {
               out.collect(new Tuple2<>(token, 1));
            }
         }
      }
   }
}

Vous obtiendrez de nombreuses erreurs dans l'éditeur, car les bibliothèques Flink doivent être ajoutées à ce projet.

Cliquez avec le bouton droit sur le projet >> Chemin de construction >> Configurer le chemin de construction.

Sélectionnez l'onglet Bibliothèques et cliquez sur Ajouter des JAR externes.

Allez dans le répertoire lib de Flink, sélectionnez les 4 bibliothèques et cliquez sur OK.

Allez dans l'onglet Order and Export, sélectionnez toutes les bibliothèques et cliquez sur OK.

Vous verrez que les erreurs ne sont plus là.

Maintenant, exportons cette application. Faites un clic droit sur le projet et cliquez sur Exporter.

Sélectionnez le fichier JAR et cliquez sur Suivant

Donnez un chemin de destination et cliquez sur Suivant

Cliquez sur Suivant>

Cliquez sur Parcourir, sélectionnez la classe principale (WordCount) et cliquez sur Terminer.

Note - Cliquez sur OK, au cas où vous auriez un avertissement.

Exécutez la commande ci-dessous. Il exécutera en outre l'application Flink que vous venez de créer.

./bin/flink run /home/ubuntu/wordcount.jar --input README.txt --output /home/ubuntu/output