PySpark - Diffusion et accumulateur

Pour le traitement parallèle, Apache Spark utilise des variables partagées. Une copie de la variable partagée va sur chaque nœud du cluster lorsque le pilote envoie une tâche à l'exécuteur sur le cluster, afin qu'elle puisse être utilisée pour effectuer des tâches.

Il existe deux types de variables partagées prises en charge par Apache Spark -

  • Broadcast
  • Accumulator

Comprenons-les en détail.

Diffuser

Les variables de diffusion sont utilisées pour enregistrer la copie des données sur tous les nœuds. Cette variable est mise en cache sur toutes les machines et non envoyée sur les machines avec des tâches. Le bloc de code suivant contient les détails d'une classe Broadcast pour PySpark.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

L'exemple suivant montre comment utiliser une variable de diffusion. Une variable de diffusion a un attribut appelé valeur, qui stocke les données et est utilisée pour renvoyer une valeur diffusée.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - La commande pour une variable de diffusion est la suivante -

$SPARK_HOME/bin/spark-submit broadcast.py

Output - La sortie de la commande suivante est donnée ci-dessous.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Accumulateur

Les variables d'accumulateur sont utilisées pour agréger les informations via des opérations associatives et commutatives. Par exemple, vous pouvez utiliser un accumulateur pour une opération de somme ou des compteurs (dans MapReduce). Le bloc de code suivant contient les détails d'une classe Accumulator pour PySpark.

class pyspark.Accumulator(aid, value, accum_param)

L'exemple suivant montre comment utiliser une variable d'accumulateur. Une variable d'accumulateur a un attribut appelé valeur qui est similaire à celui d'une variable de diffusion. Il stocke les données et est utilisé pour renvoyer la valeur de l'accumulateur, mais utilisable uniquement dans un programme pilote.

Dans cet exemple, une variable d'accumulation est utilisée par plusieurs nœuds de calcul et renvoie une valeur accumulée.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - La commande pour une variable d'accumulateur est la suivante -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - La sortie de la commande ci-dessus est donnée ci-dessous.

Accumulated value is -> 150