A Redux-Observable egy RxJS alapú köztes szoftver a Redux számára, amely lehetővé teszi a fejlesztők számára, hogy aszinkron műveletekkel dolgozzanak. A redux-thunk és a redux-saga alternatívája.
Ez a cikk ismerteti az RxJS alapjait, a Redux-Observables beállításának módját és néhány gyakorlati felhasználási esetét. De előtte meg kell értenünk a Figyelő mintát .
Figyelő minta
Megfigyelői mintában egy "Megfigyelhető" vagy "Tárgy" nevű objektum "Figyelők" nevű előfizetők gyűjteményét tartja fenn. Amikor az alany állapota megváltozik, értesíti az összes megfigyelőt.
A JavaScript-ben a legegyszerűbb példa az eseménykibocsátók és az eseménykezelők lehetnek.
Amikor ezt megteszi .addEventListener
, egy megfigyelőt tol be az alany megfigyelői gyűjteményébe. Valahányszor az esemény megtörténik, az alany értesíti az összes megfigyelőt.

RxJS
A hivatalos honlap szerint
Az RxJS a ReactiveX, egy olyan könyvtár JavaScript-implementációja, amely aszinkron és eseményalapú programokat állít össze megfigyelhető szekvenciák felhasználásával.Egyszerűbben fogalmazva: az RxJS a Megfigyelő minta megvalósítása. Ezenkívül kiterjeszti a Observer mintát azáltal, hogy olyan operátorokat kínál, amelyek lehetővé teszik számunkra, hogy deklaratív módon összeállítsuk a megfigyelhető tárgyakat és a témákat.
A megfigyelők, a megfigyelhetők, az operátorok és az alanyok az RxJS építőelemei. Nézzük tehát most mindegyiket részletesebben.
Megfigyelők
A megfigyelők olyan objektumok, amelyek feliratkozhatnak a Megfigyelhető tárgyakra és az Alanyokra. Feliratkozás után háromféle értesítést kaphatnak - következő, hiba és teljes.
Bármely, a következő felépítésű objektum felhasználható megfigyelőként.
interface Observer { closed?: boolean; next: (value: T) => void; error: (err: any) => void; complete: () => void; }
Amikor a megfigyelhető tolja következő, hiba, és teljes értesítést, a megfigyelő .next
, .error
és .complete
módszerek alkalmazása esetén.
Megfigyelhetők
A megfigyelhető objektumok olyan objektumok, amelyek egy idő alatt adatokat bocsáthatnak ki. A "márványdiagram" segítségével ábrázolható.

Ahol a vízszintes vonal az időt jelöli, a kör alakú csomópontok az Observable által kibocsátott adatokat képviselik, a függőleges vonal pedig azt, hogy a Observable sikeresen teljesült.

A megfigyelhető hibák hibát tapasztalhatnak. A kereszt a Megfigyelhető által kibocsátott hibát jelenti.
A "befejezett" és a "hibás" állapotok véglegesek. Ez azt jelenti, hogy a Megfigyelhetőek nem bocsáthatnak ki adatokat a sikeres teljesítés vagy hiba esetén.
Megfigyelhető létrehozása
Megfigyelhetők a new Observable
konstruktor segítségével hozhatók létre , amely egy argumentumot - az előfizetési függvényt - veszi igénybe. Megfigyelhetőek is létrehozhatók egyes operátorok segítségével, de erről később beszélünk, amikor az Operátorokról beszélünk.
import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { // Subscribe function });
Feliratkozás egy megfigyelhetőre
A megfigyelhetõk a .subscribe
módszerükkel és egy Figyelõ elhaladásával iratkozhatnak fel .
observable.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log('completed'); });
Megfigyelhető kivitelezése
A new Observable
konstruktornak átadott előfizetési függvény minden alkalommal végrehajtásra kerül, amikor az Observable feliratkozik.
Az előfizetés függvénynek egyetlen argumentuma van - az Előfizető. Az Előfizető hasonlít a szerkezet egy megfigyelő, és ez ugyanaz a 3 módszer: .next
, .error
, és .complete
.
A megfigyelhető adatok a .next
módszer segítségével továbbíthatják az adatokat a Megfigyelőhöz . Ha a Megfigyelhető sikeresen teljesült, a .complete
módszerrel értesítheti a Megfigyelőt . Ha a Megfigyelhető hibába ütközött, akkor a .error
módszer segítségével a hibát a Megfigyelőhöz tolhatja .
// Create an Observable const observable = new Observable(subscriber => { subscriber.next('first data'); subscriber.next('second data'); setTimeout(() => { subscriber.next('after 1 second - last data'); subscriber.complete(); subscriber.next('data after completion'); // console.log(x), error: (x) => console.log(x), complete: () => console.log('completed') }); // Outputs: // // first data // second data // third data // after 1 second - last data // completed
Megfigyelhetők Unicast
A megfigyelhetőek unicast , ami azt jelenti, hogy a megfigyelhetőknek legfeljebb egy előfizetője lehet. Amikor egy Megfigyelő feliratkozik egy Megfigyelhetőre, megkapja a Megfigyelhető egy példányát, amelynek saját végrehajtási útvonala van, így a Megfigyelhetőek unicast.
Olyan ez, mint egy YouTube-videó megtekintése. Minden néző ugyanazt a videotartalmat nézi, de a videó különböző szegmenseit nézheti.
Példa : hozzunk létre egy megfigyelhető anyagot, amely 10 másodperc alatt 1-10-et bocsát ki. Ezután azonnal iratkozzon fel az Observable-ra, 5 másodperc múlva pedig újra.
// Create an Observable that emits data every second for 10 seconds const observable = new Observable(subscriber => { let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 10) { clearInterval(interval); } }, 1000); }); // Subscribe to the Observable observable.subscribe({ next: value => { console.log(`Observer 1: ${value}`); } }); // After 5 seconds subscribe again setTimeout(() => { observable.subscribe({ next: value => { console.log(`Observer 2: ${value}`); } }); }, 5000); /* Output Observer 1: 1 Observer 1: 2 Observer 1: 3 Observer 1: 4 Observer 1: 5 Observer 2: 1 Observer 1: 6 Observer 2: 2 Observer 1: 7 Observer 2: 3 Observer 1: 8 Observer 2: 4 Observer 1: 9 Observer 2: 5 Observer 1: 10 Observer 2: 6 Observer 2: 7 Observer 2: 8 Observer 2: 9 Observer 2: 10 */
A kimenetben észreveheti, hogy a második megfigyelő 1-től kezdte a nyomtatást, annak ellenére, hogy 5 másodperc után feliratkozott. Ez azért történik, mert a második Megfigyelő megkapta az Megfigyelhető másolatát, amelynek előfizetés funkcióját ismét meghívták. Ez szemlélteti a Megfigyelhetőek unicast viselkedését.
Tárgyak
Az Alany a megfigyelhető speciális típusa.
Tárgy létrehozása
Az Alany a new Subject
konstruktor segítségével jön létre .
import { Subject } from 'rxjs'; // Create a subject const subject = new Subject();
Feliratkozás egy témára
Tárgyra való feliratkozás hasonlít egy Megfigyelhető előfizetéshez: használja a .subscribe
módszert, és átengedi a Megfigyelőt.
subject.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log("done") });
Tárgy végrehajtása
Ellentétben észlelhetőség, a Tárgy kéri saját .next
, .error
és .complete
módszereket, hogy álljon az adatokat megfigyelők.
// Push data to all observers subject.next('first data'); // Push error to all observers subject.error('oops something went wrong'); // Complete subject.complete('done');
Az alanyok csoportos küldésűek
Az alanyok csoportos küldésűek: több megfigyelő ugyanazt az Alanyot és annak végrehajtási útvonalát osztja meg. Ez azt jelenti, hogy az összes értesítést az összes megfigyelő továbbítja. Olyan ez, mint egy élő műsor nézése. Minden néző ugyanazon tartalom ugyanazon szegmensét nézi egyszerre.
Példa: hozzunk létre egy olyan Tantárgyat, amely 10 másodperc alatt 1-10 -t bocsát ki. Ezután azonnal iratkozzon fel az Observable-ra, 5 másodperc múlva pedig újra.
// Create a subject const subject = new Subject(); let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 10) { clearInterval(interval); } }, 1000); // Subscribe to the subjects subject.subscribe(data => { console.log(`Observer 1: ${data}`); }); // After 5 seconds subscribe again setTimeout(() => { subject.subscribe(data => { console.log(`Observer 2: ${data}`); }); }, 5000); /* OUTPUT Observer 1: 1 Observer 1: 2 Observer 1: 3 Observer 1: 4 Observer 1: 5 Observer 2: 5 Observer 1: 6 Observer 2: 6 Observer 1: 7 Observer 2: 7 Observer 1: 8 Observer 2: 8 Observer 1: 9 Observer 2: 9 Observer 1: 10 Observer 2: 10 */
In the output, you can notice that the second Observer started printing from 5 instead of starting from 1. This happens because the second Observer is sharing the same Subject. Since it subscribed after 5 seconds, the Subject has already finished emitting 1 to 4. This illustrates the multicast behavior of a Subject.
Subjects are both Observable and Observer
Subjects have the .next
, .error
and .complete
methods. That means that they follow the structure of Observers. Hence, a Subject can also be used as an Observer and passed to the .subscribe
function of Observables or other Subjects.
Example: let us create an Observable and a Subject. Then subscribe to the Observable using the Subject as an Observer. Finally, subscribe to the Subject. All the values emitted by the Observable will be pushed to the Subject, and the Subject will broadcast the received values to all its Observers.
// Create an Observable that emits data every second const observable = new Observable(subscriber => { let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 5) { clearInterval(interval); } }, 1000); }); // Create a subject const subject = new Subject(); // Use the Subject as Observer and subscribe to the Observable observable.subscribe(subject); // Subscribe to the subject subject.subscribe({ next: value => console.log(value) }); /* Output 1 2 3 4 5 */
Operators
Operators are what make RxJS useful. Operators are pure functions that return a new Observable. They can be categorized into 2 main categories:
- Creation Operators
- Pipeable Operators
Creation Operators
Creation Operators are functions that can create a new Observable.
Example: we can create an Observable that emits each element of an array using the from
operator.
const observable = from([2, 30, 5, 22, 60, 1]); observable.subscribe({ next: (value) => console.log("Received", value), error: (err) => console.log(err), complete: () => console.log("done") }); /* OUTPUTS Received 2 Received 30 Received 5 Received 22 Received 60 Received 1 done */
The same can be an Observable using the marble diagram.

Pipeable Operators
Az átvezethető operátorok olyan funkciók, amelyek egy Figyelhetőt vesznek bemenetként, és módosított viselkedéssel adják vissza az új Megfigyelhetőt.
Példa: vegyük az Observable-t, amelyet az from
operátor segítségével hoztunk létre . Most ezt a Megfigyelhetőt használva létrehozhatunk egy új Megfigyelhetőt, amely csak 10-nél nagyobb számokat bocsát ki az filter
operátor segítségével.
const greaterThanTen = observable.pipe(filter(x => x > 10)); greaterThanTen.subscribe(console.log, console.log, () => console.log("completed")); // OUTPUT // 11 // 12 // 13 // 14 // 15
Ugyanez ábrázolható a márványdiagram segítségével.

Sokkal több hasznos operátor van ott. Itt láthatja a teljes operátorlistát, valamint a hivatalos RxJS dokumentáció példáit.
Rendkívül fontos megérteni az összes általánosan használt operátort. Íme néhány operátor, amelyet gyakran használok:
mergeMap
switchMap
exhaustMap
map
catchError
startWith
delay
debounce
throttle
interval
from
of
Redux Observables
A hivatalos honlap szerint
RxJS alapú köztes szoftver a Redux számára. Írja össze és törölje az aszinkron műveleteket mellékhatások és egyebek létrehozásához.A Redux-ben, amikor egy műveletet elküldenek, az végigfuttatja az összes reduktor funkciót, és új állapotot ad vissza.
A Redux-observable mindezeket az elküldött cselekvéseket és új állapotokat meghozza, és két megfigyelhetőt hoz létre belőle - megfigyelhető cselekvéseket action$
és megfigyelhető állapotokat state$
.
A megfigyelhető műveletek minden olyan műveletet kibocsátanak, amelyet a store.dispatch()
. A megfigyelhető állapotok kibocsátják az összes új állapotobjektumot, amelyet a gyökérszűkítő ad vissza.
Epics
A hivatalos honlap szerint
Ez egy olyan funkció, amely felveszi a műveletek áramát, és visszaadja a műveletek áramát. Műveletek, cselekvések kiEpics are functions that can be used to subscribe to Actions and States Observables. Once subscribed, epics will receive the stream of actions and states as input, and it must return a stream of actions as an output. Actions In - Actions Out.
const someEpic = (action$, state$) => { return action$.pipe( // subscribe to actions observable map(action => { // Receive every action, Actions In return someOtherAction(); // return an action, Actions Out }) ) }
It is important to understand that all the actions received in the Epic have already finished running through the reducers.
Inside an Epic, we can use any RxJS observable patterns, and this is what makes redux-observables useful.
Example: we can use the .filter
operator to create a new intermediate observable. Similarly, we can create any number of intermediate observables, but the final output of the final observable must be an action, otherwise an exception will be raised by redux-observable.
const sampleEpic = (action$, state$) => { return action$.pipe( filter(action => action.payload.age >= 18), // can create intermediate observables and streams map(value => above18(value)) // where above18 is an action creator ); }
Every action emitted by the Epics are immediately dispatched using the store.dispatch()
.
Setup
First, let's install the dependencies.
npm install --save rxjs redux-observable
Create a separate folder named epics
to keep all the epics. Create a new file index.js
inside the epics
folder and combine all the epics using the combineEpics
function to create the root epic. Then export the root epic.
import { combineEpics } from 'redux-observable'; import { epic1 } from './epic1'; import { epic2 } from './epic2'; const epic1 = (action$, state$) => { ... } const epic2 = (action$, state$) => { ... } export default combineEpics(epic1, epic2);
Create an epic middleware using the createEpicMiddleware
function and pass it to the createStore
Redux function.
import { createEpicMiddleware } from 'redux-observable'; import { createStore, applyMiddleware } from 'redux'; import rootEpic from './rootEpics'; const epicMiddleware = createEpicMiddlware(); const store = createStore( rootReducer, applyMiddleware(epicMiddlware) );
Finally, pass the root epic to epic middleware's .run
method.
epicMiddleware.run(rootEpic);
Some Practical Usecases
RxJS has a big learning curve, and the redux-observable setup worsens the already painful Redux setup process. All that makes Redux observable look like an overkill. But here are some practical use cases that can change your mind.
Throughout this section, I will be comparing redux-observables with redux-thunk to show how redux-observables can be helpful in complex use-cases. I don't hate redux-thunk, I love it, and I use it every day!
1. Make API Calls
Usecase: Make an API call to fetch comments of a post. Show loaders when the API call is in progress and also handle API errors.
A redux-thunk implementation will look like this,
function getComments(postId){ return (dispatch) => { dispatch(getCommentsInProgress()); axios.get(`/v1/api/posts/${postId}/comments`).then(response => { dispatch(getCommentsSuccess(response.data.comments)); }).catch(() => { dispatch(getCommentsFailed()); }); } }
and this is absolutely correct. But the action creator is bloated.
We can write an Epic to implement the same using redux-observables.
const getCommentsEpic = (action$, state$) => action$.pipe( ofType('GET_COMMENTS'), mergeMap((action) => from(axios.get(`/v1/api/posts/${action.payload.postId}/comments`).pipe( map(response => getCommentsSuccess(response.data.comments)), catchError(() => getCommentsFailed()), startWith(getCommentsInProgress()) ) );
Now it allows us to have a clean and simple action creator like this,
function getComments(postId) { return { type: 'GET_COMMENTS', payload: { postId } } }
2. Request Debouncing
Usecase: Provide autocompletion for a text field by calling an API whenever the value of the text field changes. API call should be made 1 second after the user has stopped typing.
A redux-thunk implementation will look like this,
let timeout; function valueChanged(value) { return dispatch => { dispatch(loadSuggestionsInProgress()); dispatch({ type: 'VALUE_CHANGED', payload: { value } }); // If changed again within 1 second, cancel the timeout timeout && clearTimeout(timeout); // Make API Call after 1 second timeout = setTimeout(() => { axios.get(`/suggestions?q=${value}`) .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions))) .catch(() => dispatch(loadSuggestionsFailed())) }, 1000, value); } }
It requires a global variable timeout
. When we start using global variables, our action creators are not longer pure functions. It also becomes difficult to unit test the action creators that use a global variable.
We can implement the same with redux-observable using the .debounce
operator.
const loadSuggestionsEpic = (action$, state$) => action$.pipe( ofType('VALUE_CHANGED'), debounce(1000), mergeMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe( map(response => loadSuggestionsSuccess(response.data.suggestions)), catchError(() => loadSuggestionsFailed()) )), startWith(loadSuggestionsInProgress()) );
Now, our action creators can be cleaned up, and more importantly, they can be pure functions again.
function valueChanged(value) { return { type: 'VALUE_CHANGED', payload: { value } } }
3. Request Cancellation
Usecase: Continuing the previous use-case, assume that the user didn't type anything for 1 second, and we made our 1st API call to fetch the suggestions.
Let's say the API itself takes an average of 2-3 seconds to return the result. Now, if the user types something while the 1st API call is in progress, after 1 second, we will make our 2nd API. We can end up having two API calls at the same time, and it can create a race condition.
To avoid this, we need to cancel the 1st API call before making the 2nd API call.
A redux-thunk implementation will look like this,
let timeout; var cancelToken = axios.cancelToken; let apiCall; function valueChanged(value) { return dispatch => { dispatch(loadSuggestionsInProgress()); dispatch({ type: 'VALUE_CHANGED', payload: { value } }); // If changed again within 1 second, cancel the timeout timeout && clearTimeout(timeout); // Make API Call after 1 second timeout = setTimeout(() => { // Cancel the existing API apiCall && apiCall.cancel('Operation cancelled'); // Generate a new token apiCall = cancelToken.source(); axios.get(`/suggestions?q=${value}`, { cancelToken: apiCall.token }) .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions))) .catch(() => dispatch(loadSuggestionsFailed())) }, 1000, value); } }
Now, it requires another global variable to store the Axios's cancel token. More global variables = more impure functions!
To implement the same using redux-observable, all we need to do is replace .mergeMap
with .switchMap
.
const loadSuggestionsEpic = (action$, state$) => action$.pipe( ofType('VALUE_CHANGED'), throttle(1000), switchMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe( map(response => loadSuggestionsSuccess(response.data.suggestions)), catchError(() => loadSuggestionsFailed()) )), startWith(loadSuggestionsInProgress()) );
Since it doesn't require any changes to our action creators, they can continue to be pure functions.
Similarly, there are many use-cases where Redux-Observables actually shines! For example, polling an API, showing snack bars, managing WebSocket connections, etc.
To Conclude
If you are developing a Redux application that involves such complex use-cases, it is highly recommended to use Redux-Observables. After all, the benefits of using it are directly proportional to the complexity of your application, and it is evident from the above mentioned practical use-cases.
I strongly believe using the right set of libraries will help us to develop much cleaner and maintainable applications, and in the long term, the benefits of using them will outweigh the drawbacks.