Inférence du schéma à l'aide de la réflexion

Cette méthode utilise la réflexion pour générer le schéma d'un RDD qui contient des types spécifiques d'objets. L'interface Scala pour Spark SQL prend en charge la conversion automatique d'un RDD contenant des classes de cas en DataFrame. lecase classdéfinit le schéma de la table. Les noms des arguments de la classe case sont lus à l'aide de la réflexion et deviennent les noms des colonnes.

Les classes de cas peuvent également être imbriquées ou contenir des types complexes tels que des séquences ou des tableaux. Ce RDD peut être implicitement converti en DataFrame puis enregistré en tant que table. Les tables peuvent être utilisées dans les instructions SQL suivantes.

Exemple

Prenons un exemple d'enregistrements d'employés dans un fichier texte nommé employee.txt. Créez un RDD en lisant les données d'un fichier texte et convertissez-le en DataFrame à l'aide des fonctions SQL par défaut.

Given Data - Jetez un œil aux données suivantes d'un fichier nommé employee.txt l'a placé dans le répertoire respectif actuel où s'exécute le point Spark Shell.

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

Les exemples suivants expliquent comment générer un schéma à l'aide de Reflections.

Démarrez le Spark Shell

Démarrez Spark Shell à l'aide de la commande suivante.

$ spark-shell

Créer SQLContext

Générez SQLContext à l'aide de la commande suivante. Ici,sc signifie un objet SparkContext.

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Importer des fonctions SQL

Utilisez la commande suivante pour importer toutes les fonctions SQL utilisées pour convertir implicitement un RDD en DataFrame.

scala> import sqlContext.implicts._

Créer une classe de cas

Ensuite, nous devons définir un schéma pour les données d'enregistrement des employés à l'aide d'une classe de cas. La commande suivante est utilisée pour déclarer la classe de cas en fonction des données données (id, nom, âge).

scala> case class Employee(id: Int, name: String, age: Int)
defined class Employee

Créer RDD et appliquer des transformations

Utilisez la commande suivante pour générer un RDD nommé empl en lisant les données de employee.txt et le convertir en DataFrame, en utilisant les fonctions Map.

Ici, deux fonctions cartographiques sont définies. La première consiste à diviser l'enregistrement de texte en champs (.map(_.split(“,”))) et la deuxième fonction de carte pour convertir des champs individuels (id, nom, âge) en un objet de classe de cas (.map(e(0).trim.toInt, e(1), e(2).trim.toInt)).

Enfin, toDF() est utilisée pour convertir l'objet de classe de cas avec schéma en DataFrame.

scala> val empl=sc.textFile("employee.txt")
.map(_.split(","))
.map(e⇒ employee(e(0).trim.toInt,e(1), e(2).trim.toInt))
.toDF()

Production

empl: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]

Stocker les données DataFrame dans une table

Utilisez la commande suivante pour stocker les données DataFrame dans une table nommée employee. Après cette commande, nous pouvons y appliquer tous les types d'instructions SQL.

scala> empl.registerTempTable("employee")

La table des employés est prête. Passons maintenant quelques requêtes SQL sur la table en utilisantSQLContext.sql() méthode.

Sélectionner une requête sur DataFrame

Utilisez la commande suivante pour sélectionner tous les enregistrements du employeetable. Ici, nous utilisons la variableallrecordspour capturer toutes les données des enregistrements. Pour afficher ces enregistrements, appelezshow() méthode là-dessus.

scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")

Pour voir les données de résultat de allrecords DataFrame, utilisez la commande suivante.

scala> allrecords.show()

Production

+------+---------+----+
|  id  |  name   |age |
+------+---------+----+
| 1201 | satish  | 25 |
| 1202 | krishna | 28 |
| 1203 | amith   | 39 |
| 1204 | javed   | 23 |
| 1205 | prudvi  | 23 |
+------+---------+----+

Requête SQL de clause Where sur DataFrame

Utilisez la commande suivante pour appliquer wheredéclaration dans une table. Ici, la variableagefilter stocke les dossiers des employés âgés de 20 à 35 ans.

scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE ageC>=20 AND age <= 35")

Pour voir les données de résultat de agefilter DataFrame, utilisez la commande suivante.

scala> agefilter.show()

Production

<console>:25, took 0.112757 s
+------+---------+----+
|  id  |  name   |age |
+------+---------+----+
| 1201 | satish  | 25 |
| 1202 | krishna | 28 |
| 1204 | javed   | 23 |
| 1205 | prudvi  | 23 |
+------+---------+----+

Les deux requêtes précédentes ont été transmises à l'ensemble de la table DataFrame. Essayons maintenant de récupérer les données du résultat DataFrame en appliquantTransformations dessus.

Récupérer les valeurs d'ID de agefilter DataFrame à l'aide de l'index de colonne

L'instruction suivante est utilisée pour récupérer les valeurs d'ID à partir de agefilter Résultat RDD, en utilisant l'index de champ.

scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)

Production

<console>:25, took 0.093844 s
ID: 1201
ID: 1202
ID: 1204
ID: 1205

Cette approche basée sur la réflexion conduit à un code plus concis et fonctionne bien lorsque vous connaissez déjà le schéma lors de l'écriture de votre application Spark.