Spark SQL - DataFrames

Un DataFrame est une collection distribuée de données, qui est organisée en colonnes nommées. Conceptuellement, cela équivaut à des tables relationnelles avec de bonnes techniques d'optimisation.

Un DataFrame peut être construit à partir d'un tableau de différentes sources telles que des tables Hive, des fichiers de données structurées, des bases de données externes ou des RDD existants. Cette API a été conçue pour les applications modernes de Big Data et de science des données en s'inspirant deDataFrame in R Programming et Pandas in Python.

Caractéristiques de DataFrame

Voici un ensemble de quelques caractéristiques caractéristiques de DataFrame -

  • Possibilité de traiter les données de la taille de kilo-octets à pétaoctets sur un cluster à un seul nœud vers un grand cluster.

  • Prend en charge différents formats de données (Avro, csv, recherche élastique et Cassandra) et systèmes de stockage (HDFS, tables HIVE, mysql, etc.).

  • Optimisation de pointe et génération de code via l'optimiseur Spark SQL Catalyst (framework de transformation d'arbre).

  • Peut être facilement intégré à tous les outils et frameworks Big Data via Spark-Core.

  • Fournit une API pour la programmation Python, Java, Scala et R.

SQLContext

SQLContext est une classe et est utilisé pour initialiser les fonctionnalités de Spark SQL. L'objet de classe SparkContext (sc) est requis pour l'initialisation de l'objet de classe SQLContext.

La commande suivante est utilisée pour initialiser SparkContext via spark-shell.

$ spark-shell

Par défaut, l'objet SparkContext est initialisé avec le nom sc quand la coquille d'étincelle démarre.

Utilisez la commande suivante pour créer SQLContext.

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

Exemple

Prenons un exemple d'enregistrements d'employés dans un fichier JSON nommé employee.json. Utilisez les commandes suivantes pour créer un DataFrame (df) et lire un document JSON nomméemployee.json avec le contenu suivant.

employee.json - Placez ce fichier dans le répertoire où le courant scala> le pointeur est localisé.

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

Opérations DataFrame

DataFrame fournit un langage spécifique au domaine pour la manipulation de données structurées. Ici, nous incluons quelques exemples de base de traitement de données structurées à l'aide de DataFrames.

Suivez les étapes ci-dessous pour effectuer des opérations DataFrame -

Lire le document JSON

Tout d'abord, nous devons lire le document JSON. Sur cette base, générez un DataFrame nommé (dfs).

Utilisez la commande suivante pour lire le document JSON nommé employee.json. Les données sont affichées sous forme de tableau avec les champs - id, nom et âge.

scala> val dfs = sqlContext.read.json("employee.json")

Output - Les noms de champs sont automatiquement extraits de employee.json.

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

Afficher les données

Si vous souhaitez voir les données dans le DataFrame, utilisez la commande suivante.

scala> dfs.show()

Output - Vous pouvez voir les données des employés dans un format tabulaire.

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

Utiliser la méthode printSchema

Si vous souhaitez voir la structure (schéma) du DataFrame, utilisez la commande suivante.

scala> dfs.printSchema()

Output

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Utiliser la méthode de sélection

Utilisez la commande suivante pour récupérer name-colonne parmi trois colonnes du DataFrame.

scala> dfs.select("name").show()

Output - Vous pouvez voir les valeurs du name colonne.

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

Utiliser le filtre d'âge

Utilisez la commande suivante pour trouver les employés dont l'âge est supérieur à 23 ans (âge> 23 ans).

scala> dfs.filter(dfs("age") > 23).show()

Output

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

Utiliser la méthode groupBy

Utilisez la commande suivante pour compter le nombre d'employés du même âge.

scala> dfs.groupBy("age").count().show()

Output - deux employés ont 23 ans.

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

Exécution de requêtes SQL par programme

Un SQLContext permet aux applications d'exécuter des requêtes SQL par programme tout en exécutant des fonctions SQL et renvoie le résultat sous forme de DataFrame.

Généralement, en arrière-plan, SparkSQL prend en charge deux méthodes différentes pour convertir les RDD existants en DataFrames -

Sr. Non Méthodes et description
1 Inférence du schéma à l'aide de la réflexion

Cette méthode utilise la réflexion pour générer le schéma d'un RDD qui contient des types spécifiques d'objets.

2 Spécification du schéma par programme

La deuxième méthode de création de DataFrame consiste à utiliser une interface de programmation qui vous permet de construire un schéma, puis de l'appliquer à un RDD existant.