RxJS is a library that allows us to do Reactive Programming on JavaScript
Another term we need to know to use the RxJS library is the concept of Subject. We know that the function that is a consumer in Observable is called an observer. It is normal if only one Observer consumes an Observable.
However, if more than one Observer consumes an Observable, it is called a Subject. As seen in the screenshot below, Observers subscribing to the Observable object created with the Subject consume the data in the stream equally.
We can also think like customized observable. Let’s take a look at an example to better understand it. I am sending some data to the Subject Observable object we created with the next () operator.
RxJS-Subject-Example-Author
What we need to know is that we can add any changes to the Observable object created with the subject at any time. There are also cases of subscribing by more than one Observer.
If we subscribe from the very beginning;
subject.subscribe((data) => {
console.log(`ObserverA ${data}`);
});
subject.subscribe((data) => {
console.log(`ObserverB ${data}`);
});
import { Component, OnInit } from '@angular/core';
import { Subject } from 'rxjs';
import { Observable } from 'rxjs';
@Component({selector: 'app-rxjs',templateUrl: './rxjs.component.html',styleUrls: ['./rxjs.component.scss']})
export class RxjsComponent implements OnInit {
constructor() { }
ngOnInit(): void {const subject = new Subject();
subject.subscribe(data => {console.log(`ObserverA ${data}`)})
subject.subscribe(data => {console.log(`ObserverB ${data}`)})
subject.next(50);
subject.next(5);
subject.next(10);
subject.next("Ahmet");
}}
In other words, if we subscribe to Observables without sending data to the stream, they consume the relevant data as soon as the data is sent to the Subject. Let’s check this on the console. We use the ng serve command to run it.
Observer took different values. If we do the subscribe process somewhere in the middle, it may consume the data below. It can not consume the data passing through the stream. In this case, if the observer subscribers at any time of the flow want to access previous data, subject types come into play here.
Subjects
We will examine three common types.
- BehaviorSubject
- ReplaySubject
- AsyncSubject
BehaviorSubject
It enables the observer subscribing to the stream to receive data starting from the previous data in the stream. When an observer subscribes to a BehaviorSubject, it begins by emitting the item most recently emitted by the source Observable (or a seed/default value if none has yet been emitted) and then continues to emit any other items emitted later by the source Observable. No matter where you subscribe, it starts from the previous one.
As you can see here, if you look at the console, all observers start from the previous one.
ReplaySubject
In the BehaviorSubject, the observer continued by taking the previous data in the flow he entered. ReplaySubject can take previous data as desired. When an observer subscribes to the process, it can fetch the previous n data from the point it is. When using ReplaySubject, you have to specify how much data you want to go back to the pre-existing or whatever.
import { Component, OnInit } from ‘@angular/core’;
import { BehaviorSubject, ReplaySubject, Subject } from ‘rxjs’;
import { Observable } from ‘rxjs’;
@Component({selector: ‘app-rxjs’,templateUrl: ‘./rxjs.component.html’,
styleUrls: [‘./rxjs.component.scss’],})
export class RxjsComponent implements OnInit {constructor() { }
ngOnInit(): void
{let data: any = ‘Nazlican’;
const subject = new ReplaySubject(2);
subject.subscribe((data) =>
{console.log(`ObserverA ${data}`);}
);subject.subscribe((data) => {console.log(`ObserverB ${data}`);});subject.next(50);subject.next(5);subject.subscribe((data) => {console.log(`ObserverC ${data}`);});subject.next(10);subject.next(‘Ahmet’);}}
If you check the console, you see ObserverC continues by taking 2 data before it.
AsyncSubject
The subject type used to get the last value in the data stream.
import { Component, OnInit } from '@angular/core';
import { AsyncSubject, BehaviorSubject, ReplaySubject, Subject } from 'rxjs';
import { Observable } from 'rxjs';
@Component({selector: 'app-rxjs',templateUrl: './rxjs.component.html',styleUrls: ['./rxjs.component.scss'],})
export class RxjsComponent implements OnInit {constructor() { }
ngOnInit(): void {
const subject = new AsyncSubject();
subject.subscribe(data => {console.log(`ObserverA ${data}`);});
subject.subscribe(data => {console.log(`ObserverB ${data}`);});
subject.next(50);
subject.next(5);
subject.subscribe(data => {console.log(`ObserverC ${data}`);});
subject.next(10);
subject.complete();
subject.next('Ahmet');}}
This subject waits for the complete () function to be triggered to understand the last data in the stream.
We see the last data.
AsyncSubject/author rxJS
Conclusion
If you develop in Angular or React, you will definitely need RxJS Subjects. If you understand theoretically and practice the applications I have added, you will realize that you are learning.
Thank you for reading!
Love to everyone who starts each day with a new hope without giving up!
If you have any questions, feel free to ask.
Here is my LinkedIn account: https://www.linkedin.com/in/kurtnazlican/