RxPY - Travailler avec le sujet

Un sujet est une séquence observable, ainsi qu'un observateur qui peut diffuser en multidiffusion, c'est-à-dire parler à de nombreux observateurs qui se sont abonnés.

Nous allons discuter des sujets suivants sur le sujet -

  • Créer un sujet
  • Abonnez-vous à un sujet
  • Passer des données au sujet
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Créer un sujet

Pour travailler avec un sujet, nous devons importer le sujet comme indiqué ci-dessous -

from rx.subject import Subject

Vous pouvez créer un sujet-objet comme suit -

subject_test = Subject()

L'objet est un observateur qui a trois méthodes -

  • on_next(value)
  • on_error (erreur) et
  • on_completed()

S'abonner à un sujet

Vous pouvez créer plusieurs abonnements sur le sujet comme indiqué ci-dessous -

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Transmission des données au sujet

Vous pouvez transmettre des données au sujet créé à l'aide de la méthode on_next (value) comme indiqué ci-dessous -

subject_test.on_next("A")
subject_test.on_next("B")

Les données seront transmises à tous les abonnements, ajoutées sur le sujet.

Voici un exemple de travail du sujet.

Exemple

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

L'objet subject_test est créé en appelant un Subject (). L'objet subject_test fait référence aux méthodes on_next (valeur), on_error (erreur) et on_completed (). La sortie de l'exemple ci-dessus est indiquée ci-dessous -

Production

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Nous pouvons utiliser la méthode on_completed (), pour arrêter l'exécution du sujet comme indiqué ci-dessous.

Exemple

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Une fois que nous appelons complete, la méthode suivante appelée plus tard n'est pas appelée.

Production

E:\pyrx>python testrx.py
The value is A
The value is A

Voyons maintenant comment appeler la méthode on_error (error).

Exemple

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Production

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject vous donnera la dernière valeur une fois appelé. Vous pouvez créer un sujet de comportement comme indiqué ci-dessous -

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Voici un exemple pratique d'utilisation du sujet de comportement

Exemple

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Production

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Rejouer le sujet

Un sujet de relecture est similaire au sujet de comportement, dans lequel il peut mettre en mémoire tampon les valeurs et les rejouer aux nouveaux abonnés. Voici un exemple fonctionnel de sujet de relecture.

Exemple

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

La valeur de tampon utilisée est 2 sur le sujet de la relecture. Ainsi, les deux dernières valeurs seront mises en mémoire tampon et utilisées pour les nouveaux abonnés appelés.

Production

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

Dans le cas d'AsyncSubject, la dernière valeur appelée est transmise à l'abonné, et elle ne sera effectuée qu'après l'appel de la méthode complete ().

Exemple

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Production

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2