RxPY - Travailler avec des observables

Un observable est une fonction qui crée un observateur et l'attache à la source où des valeurs sont attendues, par exemple, des clics, des événements de souris à partir d'un élément dom, etc.

Les sujets mentionnés ci-dessous seront étudiés en détail dans ce chapitre.

  • Créer des observables

  • Abonnez-vous et exécutez un observable

Créer des observables

Pour créer un observable, nous utiliserons create() et transmettez-lui la fonction contenant les éléments suivants.

  • on_next() - Cette fonction est appelée lorsque l'Observable émet un élément.

  • on_completed() - Cette fonction est appelée lorsque l'observable est terminé.

  • on_error() - Cette fonction est appelée lorsqu'une erreur se produit sur l'Observable.

Pour travailler avec la méthode create (), importez d'abord la méthode comme indiqué ci-dessous -

from rx import create

Voici un exemple de travail, pour créer un observable -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Abonnez-vous et exécutez un observable

Pour souscrire à une observable, nous devons utiliser la fonction subscribe () et passer la fonction de rappel on_next, on_error et on_completed.

Voici un exemple de travail -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

La méthode subscribe () se charge de l'exécution de l'observable. La fonction de rappelon_next, on_error et on_completeddoit être passé à la méthode subscribe. L'appel à la méthode subscribe, à son tour, exécute la fonction test_observable ().

Il n'est pas obligatoire de passer les trois fonctions de rappel à la méthode subscribe (). Vous pouvez passer selon vos besoins le on_next (), on_error () et on_completed ().

La fonction lambda est utilisée pour on_next, on_error et on_completed. Il prendra les arguments et exécutera l'expression donnée.

Voici la sortie, de l'observable créée -

E:\pyrx>python testrx.py
Got - Hello
Job Done!