PySpark - Guide rapide

Dans ce chapitre, nous allons nous familiariser avec ce qu'est Apache Spark et comment PySpark a été développé.

Spark - Présentation

Apache Spark est un cadre de traitement en temps réel ultra-rapide. Il effectue des calculs en mémoire pour analyser les données en temps réel. Il est entré en image commeApache Hadoop MapReduceeffectuait uniquement un traitement par lots et ne disposait pas d'une fonction de traitement en temps réel. Par conséquent, Apache Spark a été introduit car il peut effectuer le traitement de flux en temps réel et peut également prendre en charge le traitement par lots.

Outre le traitement en temps réel et par lots, Apache Spark prend également en charge les requêtes interactives et les algorithmes itératifs. Apache Spark possède son propre gestionnaire de cluster, où il peut héberger son application. Il exploite Apache Hadoop pour le stockage et le traitement. Il utiliseHDFS (Système de fichiers distribués Hadoop) pour le stockage et il peut exécuter des applications Spark sur YARN ainsi que.

PySpark - Présentation

Apache Spark est écrit en Scala programming language. Pour prendre en charge Python avec Spark, Apache Spark Community a publié un outil, PySpark. En utilisant PySpark, vous pouvez travailler avecRDDsen langage de programmation Python également. C'est à cause d'une bibliothèque appeléePy4j qu’ils sont capables d’y parvenir.

Offres PySpark PySpark Shellqui relie l'API Python au noyau Spark et initialise le contexte Spark. La majorité des scientifiques des données et des experts en analyse utilisent aujourd'hui Python en raison de son riche ensemble de bibliothèques. L'intégration de Python à Spark est une aubaine pour eux.

Dans ce chapitre, nous allons comprendre la configuration de l'environnement de PySpark.

Note - Cela tient compte du fait que Java et Scala sont installés sur votre ordinateur.

Laissez-nous maintenant télécharger et configurer PySpark avec les étapes suivantes.

Step 1- Accédez à la page de téléchargement officielle d'Apache Spark et téléchargez la dernière version d'Apache Spark disponible. Dans ce tutoriel, nous utilisonsspark-2.1.0-bin-hadoop2.7.

Step 2- Maintenant, extrayez le fichier tar Spark téléchargé. Par défaut, il sera téléchargé dans le répertoire Téléchargements.

# tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz

Cela créera un répertoire spark-2.1.0-bin-hadoop2.7. Avant de démarrer PySpark, vous devez définir les environnements suivants pour définir le chemin Spark et lePy4j path.

export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin
export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH = $SPARK_HOME/python:$PATH

Ou, pour définir globalement les environnements ci-dessus, placez-les dans le .bashrc file. Exécutez ensuite la commande suivante pour que les environnements fonctionnent.

# source .bashrc

Maintenant que tous les environnements sont définis, allons dans le répertoire Spark et appelons le shell PySpark en exécutant la commande suivante -

# ./bin/pyspark

Cela démarrera votre shell PySpark.

Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
<<<

SparkContext est le point d'entrée de toute fonctionnalité Spark. Lorsque nous exécutons une application Spark, un programme pilote démarre, qui a la fonction principale et votre SparkContext est lancé ici. Le programme pilote exécute ensuite les opérations à l'intérieur des exécuteurs sur les nœuds de travail.

SparkContext utilise Py4J pour lancer un JVM et crée un JavaSparkContext. Par défaut, PySpark a SparkContext disponible en tant que‘sc’, donc la création d'un nouveau SparkContext ne fonctionnera pas.

Le bloc de code suivant contient les détails d'une classe PySpark et les paramètres qu'un SparkContext peut accepter.

class pyspark.SparkContext (
   master = None,
   appName = None, 
   sparkHome = None, 
   pyFiles = None, 
   environment = None, 
   batchSize = 0, 
   serializer = PickleSerializer(), 
   conf = None, 
   gateway = None, 
   jsc = None, 
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

Paramètres

Voici les paramètres d'un SparkContext.

  • Master - C'est l'URL du cluster auquel il se connecte.

  • appName - Nom de votre travail.

  • sparkHome - Répertoire d'installation de Spark.

  • pyFiles - Les fichiers .zip ou .py à envoyer au cluster et à ajouter au PYTHONPATH.

  • Environment - Variables d'environnement des nœuds de travail.

  • batchSize- Le nombre d'objets Python représentés comme un seul objet Java. Définissez 1 pour désactiver le traitement par lots, 0 pour choisir automatiquement la taille du lot en fonction de la taille des objets ou -1 pour utiliser une taille de lot illimitée.

  • Serializer - Sérialiseur RDD.

  • Conf - Un objet de L {SparkConf} pour définir toutes les propriétés Spark.

  • Gateway - Utilisez une passerelle et une JVM existantes, sinon initialisez une nouvelle JVM.

  • JSC - L'instance JavaSparkContext.

  • profiler_cls - Une classe de Profiler personnalisé utilisée pour faire le profilage (la valeur par défaut est pyspark.profiler.BasicProfiler).

Parmi les paramètres ci-dessus, master et appnamesont principalement utilisés. Les deux premières lignes de tout programme PySpark se présentent comme indiqué ci-dessous -

from pyspark import SparkContext
sc = SparkContext("local", "First App")

Exemple SparkContext - PySpark Shell

Maintenant que vous en savez assez sur SparkContext, laissez-nous exécuter un exemple simple sur le shell PySpark. Dans cet exemple, nous compterons le nombre de lignes avec le caractère 'a' ou 'b' dans leREADME.mdfichier. Donc, disons s'il y a 5 lignes dans un fichier et 3 lignes ont le caractère 'a', alors la sortie sera →Line with a: 3. Il en sera de même pour le caractère «b».

Note- Nous ne créons aucun objet SparkContext dans l'exemple suivant car, par défaut, Spark crée automatiquement l'objet SparkContext nommé sc, au démarrage du shell PySpark. Si vous essayez de créer un autre objet SparkContext, vous obtiendrez l'erreur suivante -"ValueError: Cannot run multiple SparkContexts at once".

<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30

Exemple SparkContext - Programme Python

Exécutons le même exemple en utilisant un programme Python. Créez un fichier Python appeléfirstapp.py et entrez le code suivant dans ce fichier.

----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"  
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------

Ensuite, nous exécuterons la commande suivante dans le terminal pour exécuter ce fichier Python. Nous obtiendrons le même résultat que ci-dessus.

$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30

Maintenant que nous avons installé et configuré PySpark sur notre système, nous pouvons programmer en Python sur Apache Spark. Mais avant de le faire, comprenons un concept fondamental de Spark - RDD.

RDD signifie Resilient Distributed Dataset, ce sont les éléments qui s'exécutent et fonctionnent sur plusieurs nœuds pour effectuer un traitement parallèle sur un cluster. Les RDD sont des éléments immuables, ce qui signifie qu'une fois que vous créez un RDD, vous ne pouvez pas le modifier. Les RDD sont également tolérants aux pannes, donc en cas de panne, ils se rétablissent automatiquement. Vous pouvez appliquer plusieurs opérations sur ces RDD pour réaliser une certaine tâche.

Pour appliquer des opérations sur ces RDD, il y a deux façons -

  • Transformation et
  • Action

Comprenons ces deux manières en détail.

Transformation- Ce sont les opérations qui sont appliquées sur un RDD pour créer un nouveau RDD. Filter, groupBy et map sont des exemples de transformations.

Action - Ce sont les opérations qui sont appliquées sur RDD, qui ordonne à Spark d'effectuer le calcul et de renvoyer le résultat au pilote.

Pour appliquer une opération dans PySpark, nous devons créer un PySpark RDDpremière. Le bloc de code suivant contient le détail d'une classe PySpark RDD -

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

Voyons comment exécuter quelques opérations de base à l'aide de PySpark. Le code suivant dans un fichier Python crée des mots RDD, qui stockent un ensemble de mots mentionnés.

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

Nous allons maintenant exécuter quelques opérations sur les mots.

compter()

Le nombre d'éléments dans le RDD est renvoyé.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command - La commande pour count () est -

$SPARK_HOME/bin/spark-submit count.py

Output - La sortie de la commande ci-dessus est -

Number of elements in RDD → 8

collecte()

Tous les éléments du RDD sont renvoyés.

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command - La commande pour collect () est -

$SPARK_HOME/bin/spark-submit collect.py

Output - La sortie de la commande ci-dessus est -

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach (f)

Renvoie uniquement les éléments qui remplissent la condition de la fonction dans foreach. Dans l'exemple suivant, nous appelons une fonction d'impression dans foreach, qui imprime tous les éléments du RDD.

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Command - La commande foreach (f) est -

$SPARK_HOME/bin/spark-submit foreach.py

Output - La sortie de la commande ci-dessus est -

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filtre (f)

Un nouveau RDD est retourné contenant les éléments, ce qui satisfait la fonction à l'intérieur du filtre. Dans l'exemple suivant, nous filtrons les chaînes contenant «spark».

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command - La commande pour le filtre (f) est -

$SPARK_HOME/bin/spark-submit filter.py

Output - La sortie de la commande ci-dessus est -

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map (f, préservePartitioning = False)

Un nouveau RDD est renvoyé en appliquant une fonction à chaque élément du RDD. Dans l'exemple suivant, nous formons une paire clé-valeur et mappons chaque chaîne avec une valeur de 1.

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command - La commande pour map (f, préservePartitioning = False) est -

$SPARK_HOME/bin/spark-submit map.py

Output - La sortie de la commande ci-dessus est -

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

réduire (f)

Après avoir effectué l'opération binaire commutative et associative spécifiée, l'élément du RDD est renvoyé. Dans l'exemple suivant, nous importons add package de l'opérateur et l'appliquons sur 'num' pour effectuer une opération d'addition simple.

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command - La commande pour réduire (f) est -

$SPARK_HOME/bin/spark-submit reduce.py

Output - La sortie de la commande ci-dessus est -

Adding all the elements -> 15

join (autre, numPartitions = Aucun)

Il renvoie RDD avec une paire d'éléments avec les clés correspondantes et toutes les valeurs de cette clé particulière. Dans l'exemple suivant, il y a deux paires d'éléments dans deux RDD différents. Après avoir joint ces deux RDD, nous obtenons un RDD avec des éléments ayant des clés correspondantes et leurs valeurs.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command - La commande pour join (other, numPartitions = None) est -

$SPARK_HOME/bin/spark-submit join.py

Output - La sortie de la commande ci-dessus est -

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

cache ()

Conservez ce RDD avec le niveau de stockage par défaut (MEMORY_ONLY). Vous pouvez également vérifier si le RDD est mis en cache ou non.

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command - La commande pour cache () est -

$SPARK_HOME/bin/spark-submit cache.py

Output - La sortie pour le programme ci-dessus est -

Words got cached -> True

Ce sont quelques-unes des opérations les plus importantes effectuées sur PySpark RDD.

Pour le traitement parallèle, Apache Spark utilise des variables partagées. Une copie de la variable partagée va sur chaque nœud du cluster lorsque le pilote envoie une tâche à l'exécuteur sur le cluster, afin qu'elle puisse être utilisée pour effectuer des tâches.

Il existe deux types de variables partagées prises en charge par Apache Spark -

  • Broadcast
  • Accumulator

Comprenons-les en détail.

Diffuser

Les variables de diffusion sont utilisées pour enregistrer la copie des données sur tous les nœuds. Cette variable est mise en cache sur toutes les machines et non envoyée sur les machines avec des tâches. Le bloc de code suivant contient les détails d'une classe Broadcast pour PySpark.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

L'exemple suivant montre comment utiliser une variable de diffusion. Une variable de diffusion a un attribut appelé valeur, qui stocke les données et est utilisée pour renvoyer une valeur diffusée.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - La commande pour une variable de diffusion est la suivante -

$SPARK_HOME/bin/spark-submit broadcast.py

Output - La sortie de la commande suivante est donnée ci-dessous.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Accumulateur

Les variables d'accumulateur sont utilisées pour agréger les informations via des opérations associatives et commutatives. Par exemple, vous pouvez utiliser un accumulateur pour une opération de somme ou des compteurs (dans MapReduce). Le bloc de code suivant contient les détails d'une classe Accumulator pour PySpark.

class pyspark.Accumulator(aid, value, accum_param)

L'exemple suivant montre comment utiliser une variable d'accumulateur. Une variable d'accumulateur a un attribut appelé valeur qui est similaire à celui d'une variable de diffusion. Il stocke les données et est utilisé pour renvoyer la valeur de l'accumulateur, mais utilisable uniquement dans un programme pilote.

Dans cet exemple, une variable d'accumulation est utilisée par plusieurs nœuds de calcul et renvoie une valeur accumulée.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - La commande pour une variable d'accumulateur est la suivante -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - La sortie de la commande ci-dessus est donnée ci-dessous.

Accumulated value is -> 150

Pour exécuter une application Spark sur le local / cluster, vous devez définir quelques configurations et paramètres, c'est ce que SparkConf aide. Il fournit des configurations pour exécuter une application Spark. Le bloc de code suivant contient les détails d'une classe SparkConf pour PySpark.

class pyspark.SparkConf (
   loadDefaults = True, 
   _jvm = None, 
   _jconf = None
)

Dans un premier temps, nous allons créer un objet SparkConf avec SparkConf (), qui chargera les valeurs de spark.*Propriétés système Java également. Vous pouvez maintenant définir différents paramètres à l'aide de l'objet SparkConf et leurs paramètres auront priorité sur les propriétés système.

Dans une classe SparkConf, il existe des méthodes de définition qui prennent en charge le chaînage. Par exemple, vous pouvez écrireconf.setAppName(“PySpark App”).setMaster(“local”). Une fois que nous avons transmis un objet SparkConf à Apache Spark, il ne peut être modifié par aucun utilisateur.

Voici quelques-uns des attributs les plus couramment utilisés de SparkConf -

  • set(key, value) - Pour définir une propriété de configuration.

  • setMaster(value) - Pour définir l'URL principale.

  • setAppName(value) - Pour définir un nom d'application.

  • get(key, defaultValue=None) - Pour obtenir une valeur de configuration d'une clé.

  • setSparkHome(value) - Pour définir le chemin d'installation de Spark sur les nœuds de travail.

Prenons l'exemple suivant d'utilisation de SparkConf dans un programme PySpark. Dans cet exemple, nous définissons le nom de l'application Spark commePySpark App et en définissant l'URL principale d'une application Spark sur → spark://master:7077.

Le bloc de code suivant contient les lignes, lorsqu'elles sont ajoutées dans le fichier Python, il définit les configurations de base pour exécuter une application PySpark.

---------------------------------------------------------------------------------------
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)
---------------------------------------------------------------------------------------

Dans Apache Spark, vous pouvez télécharger vos fichiers en utilisant sc.addFile (sc est votre SparkContext par défaut) et obtenez le chemin sur un worker en utilisant SparkFiles.get. Ainsi, SparkFiles résout les chemins vers les fichiers ajoutés viaSparkContext.addFile().

SparkFiles contient les méthodes de classe suivantes -

  • get(filename)
  • getrootdirectory()

Comprenons-les en détail.

get (nom de fichier)

Il spécifie le chemin du fichier ajouté via SparkContext.addFile ().

getrootdirectory ()

Il spécifie le chemin d'accès au répertoire racine, qui contient le fichier ajouté via SparkContext.addFile ().

----------------------------------------sparkfile.py------------------------------------
from pyspark import SparkContext
from pyspark import SparkFiles
finddistance = "/home/hadoop/examples_pyspark/finddistance.R"
finddistancename = "finddistance.R"
sc = SparkContext("local", "SparkFile App")
sc.addFile(finddistance)
print "Absolute Path -> %s" % SparkFiles.get(finddistancename)
----------------------------------------sparkfile.py------------------------------------

Command - La commande est la suivante -

$SPARK_HOME/bin/spark-submit sparkfiles.py

Output - La sortie de la commande ci-dessus est -

Absolute Path -> 
   /tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R

StorageLevel décide de la manière dont RDD doit être stocké. Dans Apache Spark, StorageLevel décide si RDD doit être stocké dans la mémoire ou doit-il être stocké sur le disque, ou les deux. Il décide également s'il faut sérialiser RDD et s'il faut répliquer les partitions RDD.

Le bloc de code suivant a la définition de classe d'un StorageLevel -

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)

Maintenant, pour décider du stockage de RDD, il existe différents niveaux de stockage, qui sont donnés ci-dessous -

  • DISK_ONLY = StorageLevel (Vrai, Faux, Faux, Faux, 1)

  • DISK_ONLY_2 = StorageLevel (Vrai, Faux, Faux, Faux, 2)

  • MEMORY_AND_DISK = StorageLevel (Vrai, Vrai, Faux, Faux, 1)

  • MEMORY_AND_DISK_2 = StorageLevel (Vrai, Vrai, Faux, Faux, 2)

  • MEMORY_AND_DISK_SER = StorageLevel (Vrai, Vrai, Faux, Faux, 1)

  • MEMORY_AND_DISK_SER_2 = StorageLevel (Vrai, Vrai, Faux, Faux, 2)

  • MEMORY_ONLY = StorageLevel (False, True, False, False, 1)

  • MEMORY_ONLY_2 = StorageLevel (False, True, False, False, 2)

  • MEMORY_ONLY_SER = StorageLevel (False, True, False, False, 1)

  • MEMORY_ONLY_SER_2 = StorageLevel (False, True, False, False, 2)

  • OFF_HEAP = StorageLevel (Vrai, Vrai, Vrai, Faux, 1)

Prenons l'exemple suivant de StorageLevel, où nous utilisons le niveau de stockage MEMORY_AND_DISK_2, ce qui signifie que les partitions RDD auront une réplication de 2.

------------------------------------storagelevel.py-------------------------------------
from pyspark import SparkContext
import pyspark
sc = SparkContext (
   "local", 
   "storagelevel app"
)
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())
------------------------------------storagelevel.py-------------------------------------

Command - La commande est la suivante -

$SPARK_HOME/bin/spark-submit storagelevel.py

Output - La sortie de la commande ci-dessus est donnée ci-dessous -

Disk Memory Serialized 2x Replicated

Apache Spark propose une API de Machine Learning appelée MLlib. PySpark a également cette API d'apprentissage automatique en Python. Il prend en charge différents types d'algorithmes, mentionnés ci-dessous -

  • mllib.classification - Le spark.mllibLe package prend en charge diverses méthodes de classification binaire, de classification multiclasse et d'analyse de régression. Certains des algorithmes de classification les plus populaires sontRandom Forest, Naive Bayes, Decision Tree, etc.

  • mllib.clustering - Le clustering est un problème d'apprentissage non supervisé, dans lequel vous souhaitez regrouper des sous-ensembles d'entités les uns avec les autres sur la base d'une certaine notion de similitude.

  • mllib.fpm- La correspondance de modèles fréquente consiste à extraire des éléments fréquents, des ensembles d'éléments, des sous-séquences ou d'autres sous-structures qui font généralement partie des premières étapes pour analyser un ensemble de données à grande échelle. Cela a été un sujet de recherche actif dans l'exploration de données pendant des années.

  • mllib.linalg - Utilitaires MLlib pour l'algèbre linéaire.

  • mllib.recommendation- Le filtrage collaboratif est couramment utilisé pour les systèmes de recommandation. Ces techniques visent à remplir les entrées manquantes d'une matrice d'association d'élément utilisateur.

  • spark.mllib- Il prend actuellement en charge le filtrage collaboratif basé sur un modèle, dans lequel les utilisateurs et les produits sont décrits par un petit ensemble de facteurs latents qui peuvent être utilisés pour prédire les entrées manquantes. spark.mllib utilise l'algorithme des moindres carrés alternés (ALS) pour apprendre ces facteurs latents.

  • mllib.regression- La régression linéaire appartient à la famille des algorithmes de régression. Le but de la régression est de trouver des relations et des dépendances entre les variables. L'interface pour travailler avec des modèles de régression linéaire et des résumés de modèles est similaire au cas de régression logistique.

Il existe d'autres algorithmes, classes et fonctions qui font également partie du package mllib. A partir de maintenant, comprenons une démonstration surpyspark.mllib.

L'exemple suivant est un filtrage collaboratif utilisant l'algorithme ALS pour créer le modèle de recommandation et l'évaluer sur les données d'entraînement.

Dataset used - test.data

1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
--------------------------------------recommend.py----------------------------------------
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
   sc = SparkContext(appName="Pspark mllib Example")
   data = sc.textFile("test.data")
   ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
   
   # Build the recommendation model using Alternating Least Squares
   rank = 10
   numIterations = 10
   model = ALS.train(ratings, rank, numIterations)
   
   # Evaluate the model on training data
   testdata = ratings.map(lambda p: (p[0], p[1]))
   predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
   ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
   MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
   print("Mean Squared Error = " + str(MSE))
   
   # Save and load model
   model.save(sc, "target/tmp/myCollaborativeFilter")
   sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
--------------------------------------recommend.py----------------------------------------

Command - La commande sera la suivante -

$SPARK_HOME/bin/spark-submit recommend.py

Output - La sortie de la commande ci-dessus sera -

Mean Squared Error = 1.20536041839e-05

La sérialisation est utilisée pour le réglage des performances sur Apache Spark. Toutes les données envoyées sur le réseau ou écrites sur le disque ou conservées dans la mémoire doivent être sérialisées. La sérialisation joue un rôle important dans les opérations coûteuses.

PySpark prend en charge les sérialiseurs personnalisés pour le réglage des performances. Les deux sérialiseurs suivants sont pris en charge par PySpark -

MarshalSerializer

Sérialise les objets à l'aide du sérialiseur Marshal de Python. Ce sérialiseur est plus rapide que PickleSerializer, mais prend en charge moins de types de données.

class pyspark.MarshalSerializer

PickleSerializer

Sérialise les objets à l'aide du sérialiseur Pickle de Python. Ce sérialiseur prend en charge presque tous les objets Python, mais peut ne pas être aussi rapide que les sérialiseurs plus spécialisés.

class pyspark.PickleSerializer

Voyons un exemple sur la sérialisation PySpark. Ici, nous sérialisons les données à l'aide de MarshalSerializer.

--------------------------------------serializing.py-------------------------------------
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))
sc.stop()
--------------------------------------serializing.py-------------------------------------

Command - La commande est la suivante -

$SPARK_HOME/bin/spark-submit serializing.py

Output - La sortie de la commande ci-dessus est -

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]