If you have been working with RxJs for a while, you have probably come across the forkJoin operator. This operator is a classic but perhaps also a bit more “arcane” than the more commonly used operators. So, what is forking and why would you want to join a fork? Keep reading and find out!

Forking

Before we get to the joining, we first need to understand what forking is. If you are familiar with git, you may understand forking from the standpoint of creating a copy of a repository, that is linked to the original, and from which changes may be pushed to the original. In a more expansive view a single repository may have many “forks”, each one distinct and each one related to the original. These copies of the original and their link back to it could be thought of as the “tines on a literal table fork”.

With reactive programming a fork is similar but different. Forking in code is the process of taking a single path of code and breaking it out into many paths of code. This is not exactly the same as branching, where only one path may be taken. instead, with forking, all paths are taken!

This concept of all paths being taken may seem a bit alien for those coming from an imperative, single-threaded programming background. In the context of an asynchronous language where multiple streams of execution (or data flow), may in effect be “concurrent”, the concept of forking should become clearer.

Joining

Now that you understand forking, you may be asking yourself: What is joining, and why would I want to join? Joining, when on the topic of forking, is the process of taking multiple streams of execution or data and bringing them together as a single stream, a single path.

As you can see above, the previous forking operation has been joined and a single stream is the result. The join operator in effect allows us to “block” the continuation or completion of a process by the completion of many other processes. Thought about another way, when handled properly, joining allows many discrete processes to be synchronized before continuing.

The forkJoin operator

When it comes to RxJs, the forkJoin operator is our primary tool for forking single streams of data into multiple streams as well as joining those streams together into a single stream again once they have all completed. (Note the nuance here, for forkJoin to actually join, all of the forked streams MUST complete.)

When forking in RxJs, you can initiate multiple streams of execution and data flow. These are Observables, either existing streams (i.e. say a hot observable) or functions that return new streams. A common use case for forking might be logging a user out. Logouts may require multiple discrete behaviors over multiple streams of execution to clear out data, sessions, etc. both locally and remotely. You may also wish to track the user’s logout action for analytics purposes, etc.

export class AuthService {
 constructor(
  private analytics: AnalyticsService,
  private session: SessionService,
  private users: UsersService
 ) {}

 logOut(): Observable<[boolean, Date, boolean, boolean]> {
  
       combineLatest([
   this.users.getCurrentUser(),
   this.sessions.getCurrentSession()
  ]).pipe(
      exhaustMap(([user, session]) => 
       forkJoin(
       // Fork and execute all of the following things, and wait for their observables to complete:
       
        // This makes an http call, cold observable, completes.
        this.sessions.endSession(session),  
        // This makes an http call, cold observable, completes.
        this.users.trackLogoutTime(user),
        // This removes locally stored data about the user, completes.
        this.users.discardLocalUserObject(),
        // This makes an http call, cold observable, completes.
        this.analytics.trackAction('user_logout', {user})
        
       ) // We wait here until all the above observables complete, then join the results of each into a single stream
      ),
  );
 }
}

Our above stream would look like this in a flow chart, showing the forking and the joining:

With our above logOut() method, we can call it and subscribe to the result. This will initiate a parallel asynchronous process to end the user’s session on a server, track the user’s logout time in their user data on the server, discard the local user object on the client and then tracks the user’s logout action in an analytics system. By running each of these things in parallel, we guarantee that we take the shortest path to completion vs. serializing each operation (i.e. waiting for each one to complete before we even start the next).

We then get the results of each of these actions in a single stream:

this.auth.logOut().subscribe(
  [sessionEnded, logoutTime, userDiscarded, eventTracked] => {
    console.log(`User logged out!`);
    console.log(`Session was ${sessionEnded ? 'ended' : 'unable to be ended'}.`);
    console.log(`User logout time was ${logoutTime.toISOString()}.`); 
    console.log(`Local user data was ${userDiscarded ? 'discarded' : 'unable to be discarded'}.`);
    console.log(`User logout event was ${eventTracked ? 'tracked' : 'untracked'}.`);
});

Since all of our observables complete, the logOut() method will also complete and will therefore also unsubscribe, etc.

Forking and Joining Continuous Streams

Forking and joining with RxJs’ forkJoin operator requires that all of the forked streams complete, before forkJoin itself will emit a notification. This does not necessarily mean you can only use the operator with observables that intrinsically complete: there are ways of making streams that never complete or complete in certain circumstances.

There are many operators that can be used to complete non-completing streams. The take operator, for example, will emit only a specified number of notifications before it completes the stream. The first operator will emit only the first notification or the first notification that passes a predicate.

These operators can be combined with non-completing streams to ensure they complete, thus ensuring a forkJoin operation will also complete, even if the original streams never do. This then allows forking with just about any stream regardless of whether they are hot or cold, continuous or completing.

Resetting an App

One simple case where you might need to convert continuous streams into completing streams could be resetting an app to a default state. Say you have an Ionic Angular app, with an up-front registration process, after which you store and track certain data persistently in NgRx state. You may dispatch a reset action, observed by many effects each of which are responsible for resetting various different aspects of state. Upon the completion of all discrete resets of each piece of state, you then redirect the app to a start page.

In this particular example, we have two forms of forking. An abstract form, which will be demonstrated, and a concrete form with the forkJoin operator.

You may have the following actions, depicting the reset initiation itself as well as its completion, along with the initiating actions for each part of the reset process and their completion.

export const reset = createAction('[App] Reset');

export const clearUserState = createAction('[Reset] Clear User State');
export const dropAuthToken = createAction('[Reset] Drop Auth Token');
export const trackAppReset = createAction('[Reset] Track Activity');

export const userStateCleared = createAction('[Reset] User State Cleared');
export const authTokenDropped = createAction('[Reset] Auth Token Dropped');
export const appResetTracked = createAction('[Reset] App Reset Tracked');

export const resetComplete = createAction('[Reset] Complete');

Initiating Reset

Utilizing the above actions, you may then have the following effects to perform this reset process:

export class ResetEffects {
  constructor(private actions$: Actions, private analytics: AnalyticsService) {}

  clearUserStateOnReset$ = createEffect(
    () => this.actions$.pipe(
      ofType(reset),
      map(() => clearUserState())
    )
  );
  
  dropAuthTokenOnReset$ = createEffect(
    () => this.actions$.pipe(
      ofType(reset),
      map(() => dropAuthToken())
    )
  );

  trackAppResetOnReset$ = createEffect(
    () => this.actions$.pipe(
      ofType(reset),
      map(() => trackAppReset())
    )
  );

  
  clearUserState$ = createEffect(
    () => this.actions$.pipe(
      ofType(clearUserState),
      map(() => userStateCleared())
    )
  );
  
  dropAuthToken$ = createEffect(
    () => this.actions$.pipe(
      ofType(dropAuthToken),
      tap(() => localStorage.removeItem('token')),
      map(() => authTokenDropped())
    )
  );
  
  trackAppReset$ = createEffect(
    () => this.actions$.pipe(
      ofType(trackAppReset),
      tap(() => this.analytics.track('app_reset', { time: new Date() })),
      map(() => appResetTracked())
  );
}

These effects and actions represent an abstract forking process. Reset is an initiating action; a single source, that thanks to how the effects above are implemented, effectively forks into multiple streams. We use discrete effects to dispatch discrete secondary actions to ensure that any data stored in various states within NgRx are properly cleared out, something we would have to jury-rig if we tried to use forkJoin to initiate this process.

Screenshot

Completing Reset

Each of these distinct paths has both an initiation action and a completion action. We can watch for each of the completion actions, join them into a single stream and use that joining to complete the reset process:

export class ResetEffects {
  completeReset$ = createEffect(
    () => this.actions$.pipe(
   ofType(reset),
   switchMap(
  forkJoin(
    this.actions$.pipe(ofType(userStateCleared), take(1)),
       this.actions$.pipe(ofType(authTokenDropped), take(1)),
       this.actions$.pipe(ofType(appResetTracked), take(1)),
     ).pipe(map(() => resetComplete())
   )
 )
  );
  
  // ... other effects ...
 
  resetCompleted$ = createEffect(
    () => this.actions$.pipe(
      ofType(resetComplete()),
      tap(() => this.router.navigate('/'))
    ),
    { dispatch: false }
  );

The Actions stream from @ngrx/effects is a continuous stream. It does not complete on its own, not during normal operation. It is necessary in our above completeReset%content%lt;/span> process to add a take(1) to each of the forked streams that wait for completion actions. This ensures that after we see each of those actions dispatched once, after the reset action is dispatched, we have completed the reset process.

Screenshot

Wrapping Up

In a world of asynchronous programming, forking and joining lay the foundation for asynchronous concurrency. They allow us to separate single streams or threads of execution into many and then later combine those discrete streams into a single stream again.

Forking allows complex processes to be broken down into simpler processes, reducing their complexity, increasing their maintainability and testability. Forking may also allow improved real-world concurrency, if these discrete streams of processing do indeed involve true background processing (i.e. making a call to a network resource or reading/writing files on disk, etc.) As with any concurrent processing, forking and joining can bring with them some increased complexity, however, these complexities still usually pale in comparison to the complexities that often arise with true multi-threaded programming.