RxJs is one of the most powerful, and I believe fundamentally important, libraries for JavaScript and TypeScript in the last decade. RxJs is a library that is not just capable of handling the asynchronous nature of JavaScript in a beautiful way, it is also a library for functional and reactive programming. 

Functional programming and interest in it have been on the rise lately, particularly in the JavaScript community. It presents specific opportunities to simplify your code, improve readability, decompose complex problems into simpler problems, make your code more testable and leverage the benefits of functional purity, isolation of side effects, etc.

What Is Functional?

Functional programming is all about programming with functions and at the “lowest” level possibly expressions (such as a ternary) or pattern matching to perform basic computations or decision making. Functional programming is based very much on mathematical functionality and, at a deeper level, category theory.

A lot of the fundamental goals with functional programming come from mathematical functions and aim to follow the rules of mathematics: composability, associativity, transitivity, etc. Two additional concepts, purity and immutability, are fundamental to good functional programming. These provide guarantees that allow entire classes of potential bugs to be avoided or greatly minimized. True functional languages (i.e. Haskell) may take things even farther into “category theory” territory; introducing things like standardized monads, real pattern matching, etc.

What Is Reactive?

While RxJs is a functional programming library, it is also a reactive programming library. Reactive programming allows us to deal with the asynchronous nature of JavaScript in a seamless and very elegant manner. RxJs also gives us the tools to convert non-reactive and even synchronous behavior and data into reactive and asynchronous behavior and data.

With RxJs and its Observables, the code we write “reacts” to changes in the information streaming through those observables. Information streaming through observables is inherently asynchronous in nature so asynchrony becomes the basis upon which everything in a reactive application is built thus making asynchronous behavior “common”, even “mundane”, and ultimately simple and easy to manage. The information streaming through observables can be anything, including commands or events, data, etc.

Writing Functional Code

If you have been using RxJs at all, you’ve been writing functional code already. Since version 6, RxJs has provided the .pipe() method on Observable. Pipe is a very common function (and in some languages, a first class operator) in functional programming languages and libraries. A functional pipe allows for functions to be specified in execution order, often rendering the code clearer to the reader.

With traditional function calling, you would have to wrap each function within a successive function. This results in deep nesting, increasing code complexity and also displays the functions from left to right in the reverse order that they are actually applied. In the following example; doubleSay is applied first, followed by capitalize then exclaim:

exclaim(capitalize(doubleSay('hello'))) // 'Hello hello!'

This single line of code could be broken down into discrete statements:

const doubled = doubleSay('hello'); // 'hello hello'
const capitalized = capitalize(doubled); // 'Hello hello'
const exclaimed = exclaim(capitalized); // 'Hello hello!'

This is clearer in some ways, more complex in others and is a lot more verbose.

The same functions, called within a pipe, are simpler, flatter, and clearer. They are displayed in the order they are called, and in this case they are all point-free (the value argument is not explicitly declared or passed to any of these functions and is only passed into the pipe itself). This presents a very simplified and concise version of the above code:

pipe(doubleSay, capitalize, exclaim)('hello'); // 'Hello hello!'

As a side note: there may come a time when JavaScript gets a first-class pipe operator, sometimes also called a 

'hello' |> doubleSay |> capitalize |> exclaim; // 'Hello hello!'

This is even clearer than the pipe version above and puts the argument (the data) passing through the pipeline first. As a final note: pipe is actually a “curried” function where pipe() itself returns another function that takes in the value or data that should be run through the pipe. Pipe is also a higher-order function that takes other functions as input. Hence the pipe(…operators)(data) application nomenclature. Currying is another common pattern with functional programming, and allows partial application of functions, even if the language does not explicitly support partial application.

The functional pipe is a core part of RxJs that was introduced in version 6. If you have been using either, you have probably written some functional code already. For example:

class CustomerService {
  constructor(private http: HttpClient) {}
 
  loadAll(): Observable<Customer[]> {
    return this.http
      .get<Customer[]>(`${environment.apiHost}/api/customers`)
      .pipe(
        map(result => result.data || []),
        tap(
       data => !data.length 
         ? console.log('Did not find any customers') 
         : console.log(`Loaded ${data.length} customers.`), 
       err => console.error('An error occurred loading all customers:', err)
        ),
        catchError(error => of([]))
      );
  }
}

This is a common pattern for basic angular applications with basic API calls. With the above code, several RxJs operators including maptap and catchError are called within the observable pipe to transform the result and extract the data returned by the server, log the number of entities returned and optionally log and handle an error if one occurs.

How did it look before?

For comparison’s sake, let’s write the above loadAll method in a classic imperative style. Imperative programming is often the default style for many commonly used languages, including: C#/C++/C, Java, JavaScript and TypeScript even. Imperative programming is hallmarked by the use of explicit instructions and overt control flow.

The version below uses the async/await features of modern JavaScript along with Promises to handle the asynchronous nature of an http call. It then uses standard error handling and control flow constructs to handle the result:

class CustomerService {
  constructor(private http: HttpClient) {}
 
  async loadAll(): Promise<Customer[]> {
    try {
      const result = await this.http.get<Customer[]>(`${environment.apiHost}/api/customers`).toPromise();
      if (!result.data || !result.data.length) {
        console.log(`Did not find any customers.`);
        return [];
      }
      
      console.log(`Loaded ${result.data.length} customers.`);
   return result.data;
    } catch (err) {
   console.error('An error occurred loading all customers:', err);
   return [];
    }
  }
}

There are more assignments with this code, deeper nesting and an increase in overall complexity. Where before we had explicit stages of handling the result, we now have data handling, logging and error handling distributed about the function body. This code is very explicit but it would be harder to maintain. It will also grow even more complex as the needs of the program grow more complex (i.e. if you need to handle pages of data.)

Going Full On Functional

With RxJs, we can get even more functional. We can extract common patterns into functions and then organize simple functions into higher level functions. First things first: let’s “decompose” complex pipelines into simpler and more targeted functions.

Extraction & Reduction of Complexity

Currently, in our example above, we are handling the transformation of a result, logging of the results and handling of errors all within the umbrella of the loadAll function and the http call it makes. It stands to reason that some of these operations, perhaps all of them, are actually reusable bits of code that can be applied to, well, perhaps ALL data service operations. We have data extraction, result logging, and transformation of errors into empty results.

Let’s pull these behaviors out into our own pipeable functions or, as they are called within the domain of RxJs, pipeable operators. RxJs provides a handy pipe function that we can import from the rxjs module. This pipe function is distinct from Observable.prototype.pipe in that it is a free function not bound to the Observable prototype. However, this pipe function fully supports all rxjs operators like switchMap, map, filter, etc.

(NOTE: As functions pull out bits of reusable code, the example below takes some of the units a bit farther than the original to make them as reusable as possible. Some of this leverages a functional NLP, or Natural Language Processing library, called compromise to handle the creation of grammatically correct messages from generic base messages. The general idea is to demonstrate the seamless blending of simple JS functions and RxJs pipable functions (operators) in the same code and how properly decomposed functions become VERY SIMPLE!)

import { pipe } from 'rxjs';
import * as nlp from 'compromise';

export const extractDataOrDefault = defaultData => pipe(
  map(result => result.data || defaultData)
);

export const singularOrPlural = (isPlural: boolean) =>
  isPlural ?  'toPlural' : 'toSingular'

export const naturalMessage = (message: string, shouldPluralize: boolean = false) => 
  nlp(message).nouns()[singularOrPlural(shouldPluralize)]();

export const pastOrGerund = (isPast: boolean) =>
  isPast ? 'toPastTense' : 'toGerund';

export const tensedMessage = (message: string, isPast: boolean = false) => 
  nlp(message).verbs()[pastOrGerund(isPast)]()

export const logLoadResult = (entityName: string, isArray = false) => pipe(
  tap(
    data => !data || !isArray || !data.length
      ? console.log(naturalMessage(`Did not find ${isArray ? 'any' : 'a'} ${entityName}.`, isArray))
      : console.log(naturalMessage(`Loaded ${isArray ? data.length : 'a'} ${entityName}.`, isArray))
    error => console.error(naturalMessage(`An error occurred loading ${entityName}:`, isArray), error)
  )
);

export const logCUDResult = (actionName: string, entityName: string) => pipe(
  tap(
    data => console.log(tensedMessage(`Successfully ${actionName} ${entityName}.`), true),
    error => console.error(tensedMessage(`An error occurrerd while ${actionName} ${entityName}:`), false), error)
  )
);

const IS_ARRAY = true;
const IS_SINGLE = false;

export const mapErrorToDefault = defaultValue => pipe(
  catchError(error => of(defaultValue)) 
);

export const mapErrorToEmpty = (isArray = false) => pipe(
  mapErrorToDefault(isArray ? [] : null)
);

These extracted operations have several benefits: Each of these operators is responsible for just one aspect of our overall entity handling behavior. This conforms to single responsibility principle, a foundational design principle. These simplified operators are also much easier to unit test, which we’ll cover a bit later and in another article.

We now have some generalized, reusable operators with which to build some common functionality across many data service calls. We can leverage them in additional methods on our CustomerService:

class CustomerService {
  constructor(private http: HttpClient) {}
 
  load(id: number): Observable<Customer> {
    return this.http.get<Customer>(`${environment.apiHost}/api/customers/${id}`).pipe(
      extractDataOrDefault(null),
      logLoadResult('customer', IS_SINGLE),
      mapErrorToEmpty(IS_SINGLE)
    );
  }
 
  loadAll(): Observable<Customer[]> {
    return this.http.get<Customer[]>(`${environment.apiHost}/api/customers`).pipe(
      extractDataOrDefault([]),
      logLoadResult('customer', IS_ARRAY),
      mapErrorToEmpty(IS_ARRAY)
    );
  }
  
  create(customer: Customer): Observable<Customer> {
    return this.http.post<Customer>(`${environment.apiHost}/api/customers`).pipe(
      extractDataOrDefault(customer),
      logCUDResult('create', 'customer'),
      mapErrorToDefault(customer)
    );
  }
  
  update(customer: Customer): Observable<Customer> {
    return this.http.post<Customer>(`${environment.apiHost}/api/customers`).pipe(
      extractDataOrDefault(customer),
      logCUDResult('update', 'customer'),
      mapErrorToDefault(customer)
    );
  }

  delete(customer: Customer): Observable<boolean> {
    return this.http.delete<boolean>(`${environment.apiHost}/api/customers`).pipe(
      logCUDResult('delete', 'customer'),
      mapErrorToDefault(false)
    );
  }
}

This code is now very expressive and explains itself very well. Each of our extracted operators has a descriptive name that is clear about its behavior. We know just by reading the above code what is going to happen and no longer really need to spend time interpreting what each map, tap, filter, etc. is actually doing.

Coalescing Common Patterns

We can take the above, and improve it even further. We have several common patterns in use here, patterns which are likely to be employed across many more data services. We can make this code even more reusable by coalescing these common patterns into their own functions.

We can identify three common patterns in the service above: loading, saving and deleting. Lets extract these patterns into some additional pipeable operators:

export const handleLoaded = (entityName: string, isArray = false) => pipe(
  extractDataOrDefault(isArray ? [] : null),
  logLoadResult(entityName, isArray),
  mapErrorToEmpty(isArray)
);

export const handleSaved = (actionName: string, entityName: string, defaultValue: any) => pipe(
  extractDataOrDefault(defaultValue),
  logCUDResult(actionName, entityName),
  mapErrorToDefault(defaultValue)
);

export const handleDeleted = (entityName: string) => pipe(
  logCUDResult('delete', entityName),
  mapErrorToDefault(false)
);

Our service then reduces to the following, along with any number of additional services that follow the same pattern, just for different entities:

class CustomerService {
  constructor(private http: HttpClient) {}
 
  load(id: number): Observable<Customer> {
    return this.http.get<Customer>(`${environment.apiHost}/api/customers/${id}`).pipe(
      handleLoaded('customer', IS_SINGLE)
    );
  }
 
  loadAll(): Observable<Customer[]> {
    return this.http.get<Customer[]>(`${environment.apiHost}/api/customers`).pipe(
      handleLoaded('customer', IS_ARRAY)
    );
  }
  
  create(customer: Customer): Observable<Customer> {
    return this.http.post<Customer>(`${environment.apiHost}/api/customers`).pipe(
      handleSaved('create', 'customer', customer)
    );
  }
  
  update(customer: Customer): Observable<Customer> {
    return this.http.post<Customer>(`${environment.apiHost}/api/customers`).pipe(
      handleSaved('update', 'customer', customer)
    );
  }

  delete(customer: Customer): Observable<boolean> {
    return this.http.delete<boolean>(`${environment.apiHost}/api/customers`).pipe(
      handleDeleted('customer')
    );
  }
}

Testing

Our service is now very simple, very straight forward, follows a basic and explicit pattern and will be easy to replicate and unit test for other data services. The explicit behavior that was once encapsulated within an Observable pipe, which for more complex pipelines can be more difficult to unit test themselves, now has been extracted into individual and much simpler, single-responsibility functions. This greatly simplifies unit testing those individual operations themselves (and once you have confidence the elementary parts are working, testing the larger scale composites becomes easier).

A quick example of the simplicity of unit testing a pipeable operator like extractDataOrDefault:

export const extractDataOrDefault = defaultData => pipe(
  map(result => result.data || defaultData)
);
describe('extractDataOrDefault()', () => {
  it('should return the default value when the result data is falsy', () => {
    const defaultData = ['hello', 'world'];
    const result = {};
    const source = cold('r', {r: result});

    const obs = source.pipe(extractDataOrDefault(defaultData));

    expect(obs).toBeObservable(cold('d', {d: defaultData}));
  });
  
  it('should return the result data when the result data is truthy', () => {
    const defaultData = ['hello', 'world'];
    const data = ['goodby', 'planet'];
    const result = {data};
    const source = cold('r', {r: result});

    const obs = source.pipe(extractDataOrDefault(defaultData));

    expect(obs).toBeObservable(cold('d', {d: data}));
  });
});

Nothing to it! Super simple, seamlessly works with test marbles, no additional behavior to consider except that encapsulated in the operator under test itself. Unit tests are not only easier to write, from the operator to the tests and the usage of the operators, they are easier to read and interpret.

Wrapping Up

This is just the beginning of the journey into functional programming with RxJs. The library itself provides a host of lower level, ready to use operators and a functional pipe() that may be used to create fully functional applications.

The power of RxJs grows even more when combined with NgRx and Angular as a complete platform for developing richly functional, stateful and reactive applications.