Programmation réactive

La programmation réactive est un paradigme de programmation qui traite des flux de données et de la propagation du changement. Cela signifie que lorsqu'un flux de données est émis par un composant, le changement sera propagé à d'autres composants par la bibliothèque de programmation réactive. La propagation du changement continuera jusqu'à ce qu'il atteigne le récepteur final. La différence entre la programmation événementielle et la programmation réactive est que la programmation événementielle tourne autour d'événements et la programmation réactive tourne autour des données.

ReactiveX ou RX pour la programmation réactive

ReactiveX ou Raective Extension est l'implémentation la plus connue de la programmation réactive. Le fonctionnement de ReactiveX dépend des deux classes suivantes -

Classe observable

Cette classe est la source du flux de données ou des événements et elle emballe les données entrantes afin que les données puissent être transmises d'un thread à un autre. Il ne donnera pas de données tant qu'un observateur ne s'y sera pas abonné.

Classe d'observateur

Cette classe consomme le flux de données émis par observable. Il peut y avoir plusieurs observateurs avec observable et chaque observateur recevra chaque élément de données émis. L'observateur peut recevoir trois types d'événements en souscrivant à observable -

  • on_next() event - Cela implique qu'il y a un élément dans le flux de données.

  • on_completed() event - Cela implique la fin de l'émission et il n'y a plus d'articles à venir.

  • on_error() event - Cela implique également la fin de l'émission mais dans le cas où une erreur est lancée par observable.

RxPY - Module Python pour la programmation réactive

RxPY est un module Python qui peut être utilisé pour la programmation réactive. Nous devons nous assurer que le module est installé. La commande suivante peut être utilisée pour installer le module RxPY -

pip install RxPY

Exemple

Voici un script Python, qui utilise RxPY module et ses classes Observable et Observe forprogrammation réactive. Il existe essentiellement deux classes -

  • get_strings() - pour obtenir les chaînes de l'observateur.

  • PrintObserver()- pour imprimer les chaînes de l'observateur. Il utilise les trois événements de la classe observateur. Il utilise également la classe subscribe ().

from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

Production

Received Ram
Received Mohan
Received Shyam
Finished

Bibliothèque PyFunctional pour la programmation réactive

PyFunctionalest une autre bibliothèque Python qui peut être utilisée pour la programmation réactive. Cela nous permet de créer des programmes fonctionnels en utilisant le langage de programmation Python. Il est utile car il nous permet de créer des pipelines de données en utilisant des opérateurs fonctionnels chaînés.

Différence entre RxPY et PyFunctional

Les deux bibliothèques sont utilisées pour la programmation réactive et gèrent le flux de la même manière, mais la principale différence entre les deux dépend de la gestion des données. RxPY gère les données et les événements du système tout en PyFunctional se concentre sur la transformation des données à l'aide de paradigmes de programmation fonctionnelle.

Installation du module PyFunctional

Nous devons installer ce module avant de l'utiliser. Il peut être installé à l'aide de la commande pip comme suit -

pip install pyfunctional

Exemple

L'exemple suivant utilise the PyFunctional module et son seqclass qui agit comme l'objet de flux avec lequel nous pouvons itérer et manipuler. Dans ce programme, il mappe la séquence en utilisant la fonction lamda qui double chaque valeur, puis filtre la valeur où x est supérieur à 4 et finalement réduit la séquence en une somme de toutes les valeurs restantes.

from functional import seq

result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)

print ("Result: {}".format(result))

Production

Result: 6