PySpark - RDD

Maintenant que nous avons installé et configuré PySpark sur notre système, nous pouvons programmer en Python sur Apache Spark. Mais avant de le faire, comprenons un concept fondamental de Spark - RDD.

RDD signifie Resilient Distributed Dataset, ce sont les éléments qui s'exécutent et fonctionnent sur plusieurs nœuds pour effectuer un traitement parallèle sur un cluster. Les RDD sont des éléments immuables, ce qui signifie qu'une fois que vous créez un RDD, vous ne pouvez pas le modifier. Les RDD sont également tolérants aux pannes, donc en cas de panne, ils se rétablissent automatiquement. Vous pouvez appliquer plusieurs opérations sur ces RDD pour réaliser une certaine tâche.

Pour appliquer des opérations sur ces RDD, il y a deux façons -

  • Transformation et
  • Action

Comprenons ces deux manières en détail.

Transformation- Ce sont les opérations qui sont appliquées sur un RDD pour créer un nouveau RDD. Filter, groupBy et map sont des exemples de transformations.

Action - Ce sont les opérations qui sont appliquées sur RDD, qui ordonne à Spark d'effectuer le calcul et de renvoyer le résultat au pilote.

Pour appliquer une opération dans PySpark, nous devons créer un PySpark RDDpremière. Le bloc de code suivant contient le détail d'une classe PySpark RDD -

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

Voyons comment exécuter quelques opérations de base à l'aide de PySpark. Le code suivant dans un fichier Python crée des mots RDD, qui stockent un ensemble de mots mentionnés.

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

Nous allons maintenant exécuter quelques opérations sur les mots.

compter()

Le nombre d'éléments dans le RDD est renvoyé.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command - La commande pour count () est -

$SPARK_HOME/bin/spark-submit count.py

Output - La sortie de la commande ci-dessus est -

Number of elements in RDD → 8

collecte()

Tous les éléments du RDD sont renvoyés.

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command - La commande pour collect () est -

$SPARK_HOME/bin/spark-submit collect.py

Output - La sortie de la commande ci-dessus est -

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach (f)

Renvoie uniquement les éléments qui remplissent la condition de la fonction dans foreach. Dans l'exemple suivant, nous appelons une fonction d'impression dans foreach, qui imprime tous les éléments du RDD.

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Command - La commande foreach (f) est -

$SPARK_HOME/bin/spark-submit foreach.py

Output - La sortie de la commande ci-dessus est -

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filtre (f)

Un nouveau RDD est retourné contenant les éléments, ce qui satisfait la fonction à l'intérieur du filtre. Dans l'exemple suivant, nous filtrons les chaînes contenant «spark».

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command - La commande pour le filtre (f) est -

$SPARK_HOME/bin/spark-submit filter.py

Output - La sortie de la commande ci-dessus est -

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map (f, préservePartitioning = False)

Un nouveau RDD est renvoyé en appliquant une fonction à chaque élément du RDD. Dans l'exemple suivant, nous formons une paire clé-valeur et mappons chaque chaîne avec une valeur de 1.

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command - La commande pour map (f, préservePartitioning = False) est -

$SPARK_HOME/bin/spark-submit map.py

Output - La sortie de la commande ci-dessus est -

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

réduire (f)

Après avoir effectué l'opération binaire commutative et associative spécifiée, l'élément du RDD est renvoyé. Dans l'exemple suivant, nous importons add package de l'opérateur et l'appliquons sur 'num' pour effectuer une opération d'addition simple.

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command - La commande pour réduire (f) est -

$SPARK_HOME/bin/spark-submit reduce.py

Output - La sortie de la commande ci-dessus est -

Adding all the elements -> 15

join (autre, numPartitions = Aucun)

Il renvoie RDD avec une paire d'éléments avec les clés correspondantes et toutes les valeurs de cette clé particulière. Dans l'exemple suivant, il y a deux paires d'éléments dans deux RDD différents. Après avoir joint ces deux RDD, nous obtenons un RDD avec des éléments ayant des clés correspondantes et leurs valeurs.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command - La commande pour join (other, numPartitions = None) est -

$SPARK_HOME/bin/spark-submit join.py

Output - La sortie de la commande ci-dessus est -

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

cache ()

Conservez ce RDD avec le niveau de stockage par défaut (MEMORY_ONLY). Vous pouvez également vérifier si le RDD est mis en cache ou non.

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command - La commande pour cache () est -

$SPARK_HOME/bin/spark-submit cache.py

Output - La sortie pour le programme ci-dessus est -

Words got cached -> True

Ce sont quelques-unes des opérations les plus importantes effectuées sur PySpark RDD.