Spécification du schéma par programme

La deuxième méthode de création de DataFrame consiste à utiliser une interface de programmation qui vous permet de construire un schéma, puis de l'appliquer à un RDD existant. Nous pouvons créer un DataFrame par programme en utilisant les trois étapes suivantes.

  • Créez un RDD de lignes à partir d'un RDD original.

  • Créez le schéma représenté par un StructType correspondant à la structure des lignes dans le RDD créé à l'étape 1.

  • Appliquez le schéma au RDD de lignes via la méthode createDataFrame fournie par SQLContext.

Exemple

Prenons un exemple d'enregistrements d'employés dans un fichier texte nommé employee.txt. Créez un schéma à l'aide de DataFrame directement en lisant les données à partir d'un fichier texte.

Given Data - Regardez les données suivantes d'un fichier nommé employee.txt placé dans le répertoire respectif actuel où s'exécute le point Spark Shell.

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

Suivez les étapes ci-dessous pour générer un schéma par programme.

Ouvrez Spark Shell

Démarrez le shell Spark en utilisant l'exemple suivant.

$ spark-shell

Créer un objet SQLContext

Générez SQLContext à l'aide de la commande suivante. Ici,sc signifie un objet SparkContext.

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Lire l'entrée à partir d'un fichier texte

Créez un RDD DataFrame en lisant une donnée dans le fichier texte nommé employee.txt en utilisant la commande suivante.

scala> val employee = sc.textFile("employee.txt")

Créer un schéma codé au format chaîne

Utilisez la commande suivante pour créer un schéma codé au format chaîne. Cela signifie, supposez la structure de champ d'une table et transmettez les noms de champ en utilisant un délimiteur.

scala> val schemaString = "id name age"

Production

schemaString: String = id name age

Importer les API respectives

Utilisez la commande suivante pour importer les fonctionnalités de ligne et les types de données SQL.

scala> import org.apache.spark.sql.Row;
scala> import org.apache.spark.sql.types.{StructType, StructField, StringType};

Générer un schéma

La commande suivante est utilisée pour générer un schéma en lisant le schemaStringvariable. Cela signifie que vous devez lire chaque champ en divisant la chaîne entière avec un espace comme délimiteur et en prenant chaque type de champ est de type String, par défaut.

scala> val schema = StructType(schemaString.split(" ").map(fieldName ⇒ StructField(fieldName, StringType, true)))

Appliquer la transformation pour lire des données à partir d'un fichier texte

Utilisez la commande suivante pour convertir un RDD (employé) en lignes. Cela signifie que nous spécifions ici la logique de lecture des données RDD et de stockage dans rowRDD. Ici, nous utilisons deux fonctions de carte: l'une est un délimiteur pour diviser la chaîne d'enregistrement (.map(_.split(","))) et la deuxième fonction de carte pour définir une ligne avec la valeur d'index de champ (.map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt))).

scala> val rowRDD = employee.map(_.split(",")).map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt))

Appliquer RowRDD dans les données de ligne en fonction du schéma

Utilisez l'instruction suivante pour créer un DataFrame à l'aide de rowRDD données et schema (SCHEMA) variable.

scala> val employeeDF = sqlContext.createDataFrame(rowRDD, schema)

Production

employeeDF: org.apache.spark.sql.DataFrame = [id: string, name: string, age: string]

Stocker les données DataFrame dans une table

Utilisez la commande suivante pour stocker le DataFrame dans une table nommée employee.

scala> employeeDF.registerTempTable("employee")

le employeela table est maintenant prête. Passons quelques requêtes SQL dans la table en utilisant la méthodeSQLContext.sql().

Sélectionner une requête sur DataFrame

Utilisez l'instruction suivante pour sélectionner tous les enregistrements de employeetable. Ici, nous utilisons la variableallrecordspour capturer toutes les données des enregistrements. Pour afficher ces enregistrements, appelezshow() méthode là-dessus.

scala> val allrecords = sqlContext.sql("SELECT * FROM employee")

Pour voir les données de résultat de allrecords DataFrame, utilisez la commande suivante.

scala> allrecords.show()

Production

+------+--------+----+
|  id  | name   |age |
+------+--------+----+
| 1201 | satish | 25 |
| 1202 | krishna| 28 |
| 1203 | amith  | 39 |
| 1204 | javed  | 23 |
| 1205 | prudvi | 23 |
+------+--------+----+

La méthode sqlContext.sqlvous permet de construire des DataFrames lorsque les colonnes et leurs types ne sont pas connus avant l'exécution. Vous pouvez maintenant y exécuter différentes requêtes SQL.