Spark SQL - Guide rapide

Les industries utilisent largement Hadoop pour analyser leurs ensembles de données. La raison en est que le framework Hadoop est basé sur un modèle de programmation simple (MapReduce) et qu'il permet une solution informatique évolutive, flexible, tolérante aux pannes et rentable. Ici, la principale préoccupation est de maintenir la vitesse de traitement des grands ensembles de données en termes de temps d'attente entre les requêtes et de temps d'attente pour exécuter le programme.

Spark a été introduit par Apache Software Foundation pour accélérer le processus du logiciel informatique Hadoop.

Contrairement à une croyance commune, Spark is not a modified version of Hadoopet ne dépend pas vraiment de Hadoop car il dispose de sa propre gestion de cluster. Hadoop n'est que l'un des moyens d'implémenter Spark.

Spark utilise Hadoop de deux manières - l'une est storage et le second est processing. Comme Spark dispose de son propre calcul de gestion de cluster, il utilise Hadoop uniquement à des fins de stockage.

Apache Spark

Apache Spark est une technologie de calcul en cluster ultra-rapide, conçue pour des calculs rapides. Il est basé sur Hadoop MapReduce et étend le modèle MapReduce pour l'utiliser efficacement pour plus de types de calculs, ce qui inclut les requêtes interactives et le traitement de flux. La principale caractéristique de Spark est sain-memory cluster computing qui augmente la vitesse de traitement d'une application.

Spark est conçu pour couvrir un large éventail de charges de travail telles que les applications par lots, les algorithmes itératifs, les requêtes interactives et le streaming. Outre la prise en charge de toutes ces charges de travail dans un système respectif, cela réduit la charge de gestion liée à la maintenance d'outils séparés.

Évolution d'Apache Spark

Spark est l'un des sous-projets d'Hadoop développé en 2009 dans AMPLab d'UC Berkeley par Matei Zaharia. Il a été Open Sourced en 2010 sous une licence BSD. Il a été donné à la fondation logicielle Apache en 2013, et maintenant Apache Spark est devenu un projet Apache de haut niveau à partir de février 2014.

Caractéristiques d'Apache Spark

Apache Spark possède les fonctionnalités suivantes.

  • Speed- Spark permet d'exécuter une application dans un cluster Hadoop, jusqu'à 100 fois plus rapide en mémoire et 10 fois plus rapide lors de l'exécution sur disque. Ceci est possible en réduisant le nombre d'opérations de lecture / écriture sur le disque. Il stocke les données de traitement intermédiaires en mémoire.

  • Supports multiple languages- Spark fournit des API intégrées en Java, Scala ou Python. Par conséquent, vous pouvez écrire des applications dans différentes langues. Spark propose 80 opérateurs de haut niveau pour les requêtes interactives.

  • Advanced Analytics- Spark ne prend pas seulement en charge «Map» et «réduire». Il prend également en charge les requêtes SQL, les données de streaming, l'apprentissage automatique (ML) et les algorithmes de graph.

Spark construit sur Hadoop

Le diagramme suivant montre trois façons de créer Spark avec des composants Hadoop.

Il existe trois façons de déployer Spark, comme expliqué ci-dessous.

  • Standalone- Le déploiement de Spark Standalone signifie que Spark occupe la place au-dessus de HDFS (Hadoop Distributed File System) et de l'espace est alloué pour HDFS, explicitement. Ici, Spark et MapReduce s'exécuteront côte à côte pour couvrir toutes les tâches Spark sur le cluster.

  • Hadoop Yarn- Le déploiement de Hadoop Yarn signifie, simplement, que Spark fonctionne sur Yarn sans aucune pré-installation ou accès root requis. Il aide à intégrer Spark dans l'écosystème Hadoop ou la pile Hadoop. Il permet à d'autres composants de fonctionner au-dessus de la pile.

  • Spark in MapReduce (SIMR)- Spark dans MapReduce est utilisé pour lancer le travail Spark en plus du déploiement autonome. Avec SIMR, l'utilisateur peut démarrer Spark et utiliser son shell sans aucun accès administratif.

Composants de Spark

L'illustration suivante décrit les différents composants de Spark.

Apache Spark Core

Spark Core est le moteur d'exécution générale sous-jacent de la plate-forme Spark sur lequel toutes les autres fonctionnalités sont basées. Il fournit un calcul en mémoire et des ensembles de données de référence dans des systèmes de stockage externes.

Spark SQL

Spark SQL est un composant au-dessus de Spark Core qui introduit une nouvelle abstraction de données appelée SchemaRDD, qui prend en charge les données structurées et semi-structurées.

Spark Streaming

Spark Streaming exploite la capacité de planification rapide de Spark Core pour effectuer des analyses de streaming. Il ingère les données en mini-lots et effectue des transformations RDD (Resilient Distributed Datasets) sur ces mini-lots de données.

MLlib (bibliothèque d'apprentissage automatique)

MLlib est un framework d'apprentissage automatique distribué au-dessus de Spark en raison de l'architecture Spark basée sur la mémoire distribuée. C'est, selon les benchmarks, fait par les développeurs MLlib contre les implémentations ALS (Alternating Meast Squares). Spark MLlib est neuf fois plus rapide que la version sur disque Hadoop deApache Mahout (avant que Mahout n'obtienne une interface Spark).

GraphX

GraphX ​​est un framework de traitement de graphes distribué au-dessus de Spark. Il fournit une API pour exprimer le calcul de graphes qui peut modéliser les graphes définis par l'utilisateur à l'aide de l'API d'abstraction Pregel. Il fournit également un runtime optimisé pour cette abstraction.

Ensembles de données distribués résilients

Les ensembles de données distribués résilients (RDD) sont une structure de données fondamentale de Spark. C'est une collection d'objets distribués immuable. Chaque ensemble de données dans RDD est divisé en partitions logiques, qui peuvent être calculées sur différents nœuds du cluster. Les RDD peuvent contenir tout type d'objets Python, Java ou Scala, y compris des classes définies par l'utilisateur.

Formellement, un RDD est une collection d'enregistrements partitionnés en lecture seule. Les RDD peuvent être créés par des opérations déterministes sur des données sur un stockage stable ou sur d'autres RDD. RDD est un ensemble d'éléments tolérants aux pannes pouvant être exploités en parallèle.

Il existe deux façons de créer des RDD - parallelizing une collection existante dans votre programme de pilote, ou referencing a dataset dans un système de stockage externe, tel qu'un système de fichiers partagé, HDFS, HBase ou toute source de données offrant un format d'entrée Hadoop.

Spark utilise le concept de RDD pour réaliser des opérations MapReduce plus rapides et efficaces. Voyons d'abord comment les opérations MapReduce se déroulent et pourquoi elles ne sont pas aussi efficaces.

Le partage de données est lent dans MapReduce

MapReduce est largement adopté pour traiter et générer de grands ensembles de données avec un algorithme parallèle et distribué sur un cluster. Il permet aux utilisateurs d'écrire des calculs parallèles, à l'aide d'un ensemble d'opérateurs de haut niveau, sans avoir à se soucier de la répartition du travail et de la tolérance aux pannes.

Malheureusement, dans la plupart des frameworks actuels, le seul moyen de réutiliser les données entre les calculs (Ex: entre deux jobs MapReduce) est de les écrire sur un système de stockage externe stable (Ex: HDFS). Bien que ce framework fournisse de nombreuses abstractions pour accéder aux ressources de calcul d'un cluster, les utilisateurs en veulent toujours plus.

Tous les deux Iterative et Interactiveles applications nécessitent un partage de données plus rapide entre les travaux parallèles. Le partage des données est lent dans MapReduce en raison dereplication, serialization, et disk IO. En ce qui concerne le système de stockage, la plupart des applications Hadoop passent plus de 90% du temps à effectuer des opérations de lecture-écriture HDFS.

Opérations itératives sur MapReduce

Réutilisez les résultats intermédiaires sur plusieurs calculs dans des applications en plusieurs étapes. L'illustration suivante explique le fonctionnement de l'infrastructure actuelle, tout en effectuant les opérations itératives sur MapReduce. Cela entraîne des frais généraux importants dus à la réplication des données, aux E / S de disque et à la sérialisation, ce qui ralentit le système.

Opérations interactives sur MapReduce

L'utilisateur exécute des requêtes ad hoc sur le même sous-ensemble de données. Chaque requête effectuera les E / S disque sur le stockage stable, ce qui peut dominer le temps d'exécution de l'application.

L'illustration suivante explique le fonctionnement de l'infrastructure actuelle lors des requêtes interactives sur MapReduce.

Partage de données à l'aide de Spark RDD

Le partage des données est lent dans MapReduce en raison de replication, serialization, et disk IO. La plupart des applications Hadoop passent plus de 90% du temps à effectuer des opérations de lecture-écriture HDFS.

Reconnaissant ce problème, les chercheurs ont développé un cadre spécialisé appelé Apache Spark. L'idée clé de l'étincelle estRinsilient Distribué Datasets (RDD); il prend en charge le calcul de traitement en mémoire. Cela signifie qu'il stocke l'état de la mémoire en tant qu'objet dans les tâches et que l'objet peut être partagé entre ces tâches. Le partage de données en mémoire est 10 à 100 fois plus rapide que le réseau et le disque.

Essayons maintenant de découvrir comment les opérations itératives et interactives se déroulent dans Spark RDD.

Opérations itératives sur Spark RDD

L'illustration ci-dessous montre les opérations itératives sur Spark RDD. Il stockera les résultats intermédiaires dans une mémoire distribuée au lieu d'un stockage stable (disque) et rendra le système plus rapide.

Note - Si la mémoire distribuée (RAM) est suffisante pour stocker les résultats intermédiaires (état du JOB), alors il stockera ces résultats sur le disque

Opérations interactives sur Spark RDD

Cette illustration montre les opérations interactives sur Spark RDD. Si différentes requêtes sont exécutées à plusieurs reprises sur le même ensemble de données, ces données particulières peuvent être conservées en mémoire pour de meilleurs temps d'exécution.

Par défaut, chaque RDD transformé peut être recalculé chaque fois que vous exécutez une action dessus. Cependant, vous pouvez égalementpersistun RDD en mémoire, auquel cas Spark conservera les éléments sur le cluster pour un accès beaucoup plus rapide, la prochaine fois que vous l'interrogerez. Il existe également une prise en charge des RDD persistants sur le disque ou répliqués sur plusieurs nœuds.

Spark est le sous-projet de Hadoop. Par conséquent, il est préférable d'installer Spark dans un système basé sur Linux. Les étapes suivantes montrent comment installer Apache Spark.

Étape 1: vérification de l'installation de Java

L'installation de Java est l'une des choses obligatoires lors de l'installation de Spark. Essayez la commande suivante pour vérifier la version JAVA.

$java -version

Si Java est déjà installé sur votre système, vous obtenez la réponse suivante -

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

Si Java n'est pas installé sur votre système, installez Java avant de passer à l'étape suivante.

Étape 2: vérification de l'installation de Scala

Vous devez utiliser le langage Scala pour implémenter Spark. Alors vérifions l'installation de Scala en utilisant la commande suivante.

$scala -version

Si Scala est déjà installé sur votre système, vous obtenez la réponse suivante -

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

Si Scala n'est pas installé sur votre système, passez à l'étape suivante pour l'installation de Scala.

Étape 3: Téléchargement de Scala

Téléchargez la dernière version de Scala en visitant le lien suivant Télécharger Scala . Pour ce tutoriel, nous utilisons la version scala-2.11.6. Après le téléchargement, vous trouverez le fichier tar Scala dans le dossier de téléchargement.

Étape 4: Installation de Scala

Suivez les étapes ci-dessous pour installer Scala.

Extrayez le fichier tar Scala

Tapez la commande suivante pour extraire le fichier tar Scala.

$ tar xvf scala-2.11.6.tgz

Déplacer les fichiers du logiciel Scala

Utilisez les commandes suivantes pour déplacer les fichiers du logiciel Scala vers le répertoire respectif (/usr/local/scala).

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit

Définir PATH pour Scala

Utilisez la commande suivante pour définir PATH pour Scala.

$ export PATH = $PATH:/usr/local/scala/bin

Vérification de l'installation de Scala

Après l'installation, il est préférable de le vérifier. Utilisez la commande suivante pour vérifier l'installation de Scala.

$scala -version

Si Scala est déjà installé sur votre système, vous obtenez la réponse suivante -

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

Étape 5: Téléchargement d'Apache Spark

Téléchargez la dernière version de Spark en visitant le lien suivant Télécharger Spark . Pour ce tutoriel, nous utilisonsspark-1.3.1-bin-hadoop2.6version. Après l'avoir téléchargé, vous trouverez le fichier tar Spark dans le dossier de téléchargement.

Étape 6: Installation de Spark

Suivez les étapes ci-dessous pour installer Spark.

Extraction du goudron d'étincelle

La commande suivante pour extraire le fichier tar spark.

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz

Déplacement des fichiers logiciels Spark

Les commandes suivantes pour déplacer les fichiers du logiciel Spark vers le répertoire respectif (/usr/local/spark).

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit

Configuration de l'environnement pour Spark

Ajoutez la ligne suivante à ~/.bashrcfichier. Cela signifie ajouter l'emplacement, où se trouve le fichier du logiciel Spark à la variable PATH.

export PATH = $PATH:/usr/local/spark/bin

Utilisez la commande suivante pour rechercher le fichier ~ / .bashrc.

$ source ~/.bashrc

Étape 7: vérification de l'installation de Spark

Écrivez la commande suivante pour ouvrir le shell Spark.

$spark-shell

Si Spark est installé avec succès, vous trouverez la sortie suivante.

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
    ____             __
   / __/__ ___ _____/ /__
   _\ \/ _ \/ _ `/ __/ '_/
   /___/ .__/\_,_/_/ /_/\_\ version 1.4.0
      /_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Spark introduit un module de programmation pour le traitement de données structurées appelé Spark SQL. Il fournit une abstraction de programmation appelée DataFrame et peut servir de moteur de requête SQL distribué.

Fonctionnalités de Spark SQL

Voici les fonctionnalités de Spark SQL -

  • Integrated- Mélangez de manière transparente les requêtes SQL avec les programmes Spark. Spark SQL vous permet d'interroger des données structurées en tant qu'ensemble de données distribué (RDD) dans Spark, avec des API intégrées en Python, Scala et Java. Cette intégration étroite facilite l'exécution de requêtes SQL parallèlement à des algorithmes analytiques complexes.

  • Unified Data Access- Charger et interroger des données à partir de diverses sources. Schema-RDDs fournit une interface unique pour travailler efficacement avec des données structurées, y compris des tables Apache Hive, des fichiers parquet et des fichiers JSON.

  • Hive Compatibility- Exécutez des requêtes Hive non modifiées sur les entrepôts existants. Spark SQL réutilise le frontend Hive et MetaStore, vous offrant une compatibilité totale avec les données, requêtes et UDF Hive existantes. Installez-le simplement avec Hive.

  • Standard Connectivity- Connectez-vous via JDBC ou ODBC. Spark SQL comprend un mode serveur avec une connectivité JDBC et ODBC standard.

  • Scalability- Utilisez le même moteur pour les requêtes interactives et longues. Spark SQL tire parti du modèle RDD pour prendre en charge la tolérance aux pannes au milieu des requêtes, ce qui lui permet également de s'adapter à de gros travaux. Ne vous inquiétez pas d'utiliser un moteur différent pour les données historiques.

Architecture Spark SQL

L'illustration suivante explique l'architecture de Spark SQL -

Cette architecture contient trois couches à savoir, l'API de langage, le schéma RDD et les sources de données.

  • Language API- Spark est compatible avec différents langages et Spark SQL. Il est également pris en charge par ces langages - API (python, scala, java, HiveQL).

  • Schema RDD- Spark Core est conçu avec une structure de données spéciale appelée RDD. Généralement, Spark SQL fonctionne sur les schémas, les tables et les enregistrements. Par conséquent, nous pouvons utiliser le schéma RDD comme table temporaire. Nous pouvons appeler ce schéma RDD comme trame de données.

  • Data Sources- Habituellement, la source de données pour spark-core est un fichier texte, un fichier Avro, etc. Cependant, les sources de données pour Spark SQL sont différentes. Ce sont le fichier Parquet, le document JSON, les tables HIVE et la base de données Cassandra.

Nous en discuterons plus en détail dans les chapitres suivants.

Un DataFrame est une collection distribuée de données, qui est organisée en colonnes nommées. Conceptuellement, cela équivaut à des tables relationnelles avec de bonnes techniques d'optimisation.

Un DataFrame peut être construit à partir d'un tableau de différentes sources telles que des tables Hive, des fichiers de données structurées, des bases de données externes ou des RDD existants. Cette API a été conçue pour les applications modernes de Big Data et de science des données en s'inspirant deDataFrame in R Programming et Pandas in Python.

Caractéristiques de DataFrame

Voici un ensemble de quelques caractéristiques caractéristiques de DataFrame -

  • Possibilité de traiter les données de la taille de kilo-octets à pétaoctets sur un cluster à un seul nœud vers un grand cluster.

  • Prend en charge différents formats de données (Avro, csv, recherche élastique et Cassandra) et systèmes de stockage (HDFS, tables HIVE, mysql, etc.).

  • Optimisation de pointe et génération de code via l'optimiseur Spark SQL Catalyst (framework de transformation d'arbre).

  • Peut être facilement intégré à tous les outils et frameworks Big Data via Spark-Core.

  • Fournit une API pour la programmation Python, Java, Scala et R.

SQLContext

SQLContext est une classe et est utilisé pour initialiser les fonctionnalités de Spark SQL. L'objet de classe SparkContext (sc) est requis pour l'initialisation de l'objet de classe SQLContext.

La commande suivante est utilisée pour initialiser SparkContext via spark-shell.

$ spark-shell

Par défaut, l'objet SparkContext est initialisé avec le nom sc quand la coquille d'étincelle démarre.

Utilisez la commande suivante pour créer SQLContext.

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

Exemple

Prenons un exemple d'enregistrements d'employés dans un fichier JSON nommé employee.json. Utilisez les commandes suivantes pour créer un DataFrame (df) et lire un document JSON nomméemployee.json avec le contenu suivant.

employee.json - Placez ce fichier dans le répertoire où le courant scala> le pointeur est localisé.

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

Opérations DataFrame

DataFrame fournit un langage spécifique au domaine pour la manipulation de données structurées. Ici, nous incluons quelques exemples de base de traitement de données structurées à l'aide de DataFrames.

Suivez les étapes ci-dessous pour effectuer des opérations DataFrame -

Lire le document JSON

Tout d'abord, nous devons lire le document JSON. Sur cette base, générez un DataFrame nommé (dfs).

Utilisez la commande suivante pour lire le document JSON nommé employee.json. Les données sont affichées sous forme de tableau avec les champs - id, nom et âge.

scala> val dfs = sqlContext.read.json("employee.json")

Output - Les noms de champs sont automatiquement extraits de employee.json.

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

Afficher les données

Si vous souhaitez voir les données dans le DataFrame, utilisez la commande suivante.

scala> dfs.show()

Output - Vous pouvez voir les données des employés dans un format tabulaire.

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

Utiliser la méthode printSchema

Si vous souhaitez voir la structure (schéma) du DataFrame, utilisez la commande suivante.

scala> dfs.printSchema()

Output

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Utiliser la méthode de sélection

Utilisez la commande suivante pour récupérer name-colonne parmi trois colonnes du DataFrame.

scala> dfs.select("name").show()

Output - Vous pouvez voir les valeurs du name colonne.

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

Utiliser le filtre d'âge

Utilisez la commande suivante pour trouver les employés dont l'âge est supérieur à 23 ans (âge> 23 ans).

scala> dfs.filter(dfs("age") > 23).show()

Output

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

Utiliser la méthode groupBy

Utilisez la commande suivante pour compter le nombre d'employés du même âge.

scala> dfs.groupBy("age").count().show()

Output - deux employés ont 23 ans.

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

Exécution de requêtes SQL par programme

Un SQLContext permet aux applications d'exécuter des requêtes SQL par programme tout en exécutant des fonctions SQL et renvoie le résultat sous forme de DataFrame.

Généralement, en arrière-plan, SparkSQL prend en charge deux méthodes différentes pour convertir les RDD existants en DataFrames -

Sr. Non Méthodes et description
1 Inférence du schéma à l'aide de la réflexion

Cette méthode utilise la réflexion pour générer le schéma d'un RDD qui contient des types spécifiques d'objets.

2 Spécification du schéma par programme

La deuxième méthode de création de DataFrame consiste à utiliser une interface de programmation qui vous permet de construire un schéma, puis de l'appliquer à un RDD existant.

Une interface DataFrame permet à différentes DataSources de fonctionner sur Spark SQL. Il s'agit d'une table temporaire et peut être utilisée comme un RDD normal. L'enregistrement d'un DataFrame en tant que table vous permet d'exécuter des requêtes SQL sur ses données.

Dans ce chapitre, nous décrirons les méthodes générales de chargement et d'enregistrement de données à l'aide de différentes sources de données Spark. Par la suite, nous discuterons en détail des options spécifiques disponibles pour les sources de données intégrées.

Il existe différents types de sources de données disponibles dans SparkSQL, dont certaines sont répertoriées ci-dessous -

Sr. Non Les sources de données
1 Ensembles de données JSON

Spark SQL peut capturer automatiquement le schéma d'un ensemble de données JSON et le charger en tant que DataFrame.

2 Tables de ruche

Hive est fourni avec la bibliothèque Spark sous le nom de HiveContext, qui hérite de SQLContext.

3 Dossiers de parquet

Parquet est un format en colonnes, pris en charge par de nombreux systèmes de traitement de données.