RxJs, a core dependency of every Angular application, offers extensive power at the fingertips of every Angular developer. Not only is it a great way to deal with reactive forms, handle basic http requests, or implement NgRx effects…it can allow you to convert any imperative piece of code into reactive and functional code.

How do I create Angular applications and libraries?

One of the challenges that often arise with Angular applications is the multi-paradigm code. By multi-paradigm, I mean code bases that are a mixing pot of imperative, Promisy and/or async/await, as well as Observable code…which itself may be a mix of imperative, promisy, functional paradigms. This soup of multiple paradigms can increase cognitive load on each developer, take them out of their realms of skill and understanding at inopportune times, requiring them to try and interpret and reason about code that may be written in multiple styles concurrently!

There can be a key benefit in sticking to a single paradigm: When all of the code in a code base is written the same way, everywhere…there is no need to shift between mental modes or try and interpret and reason about multi-paradigm code. Every piece of code you write, read or refactor will be written the same way, function the same way, be understood the same way. If you are going to write RxJs “reactive” code, which is inherently functional by design, there can be distinct benefits to sticking with the paradigm throughout your Angular project’s code.

Data Services

Most Angular developers will start out writing imperative code. This is fairly natural, as imperative software development is largely what is taught these days. Sadly, imperative code, while it may often present more “obvious” solutions in the short term, quite frequently ends up bloated, complex, more tightly coupled (i.e. ServiceA depends on ServiceB that depends on ServiceC that, oops, depends on ServiceA), potentially riddled with bugs and difficult to maintain in the long run.

The Imperative Service

Data services are core to every angular application that does more than simply render static content. At one point or another, most angular applications require data, and that data usually comes from some web service API hosted somewhere on the internet.

Every angular developer will usually write something like this at one point or another:

export const CustomersService {
 constructor(private http: HttpClient, private auth: AuthService) {}

 load(customerId: number): Observable<Customer> {
  const headers = new HttpHeaders();
  headers.append('Authorization', `Bearer ${this.auth.jwt}`);

  return this.http.get<Customer>(
   `${environment.apiRoot}/api/customers/${customerId}`,
   { headers }
  );
 }

 loadAll(criteria?: CustomerCriteria): Observable<Customer[]> {
  const headers = new HttpHeaders();
  headers.append('Authorization', `Bearer ${this.auth.jwt}`);

  const params = new HttpParams();
  if (criteria?.orderId) {
   params.append('orderId', criteria.orderId);
  }
  if (criteria?.repId) {
   params.append('repId', criteria.repId);
  }

  return this.http.get<Customer[]>(
   `${environment.apiRoot}/api/customers`,
   { headers, params }
  );
 }
 
 // ... 
}

Variable assignments. Explicit control flow. Nested scopes. Imperative! When you want to be as functional as possible, this kind of imperative approach with all of its explicit control flow is the last thing you want to see.

The Reactive Service

Not to fret, writing fully reactive, functional code in a data service like this is definitely within the realm of possibility. Explicit control flow can be ditched in favor of functional pipelines, operators and composable bits and pieces of code. Reactive code is a means of more loosely coupling even individual lines of code from each other, allowing otherwise independent fragments of code to “touch”, “handle” and “transform” bits of data as they move through a pipeline.

Of and From

Two of the most overlooked, yet simple and incredibly useful, factory functions from RxJs are of and from. These two wonderful functions allow us to convert just about anything, from simple static values or arrays to Promises or other Iterables, into Observable streams. With these little gems, we can easily convert the above imperative code into beautiful functional, reactive code.

Reactive Load

It just takes a little bit of a paradigm shift in thinking to approach the problem of building an Http request functionally, instead of imperatively. In this case, of is the function of the day, and it allows us to start off returning a stream, which we will eventually switch into the http stream:

load(customerId: number): Observable<Customer> {
 return of(new HttpHeaders()).pipe(
  tap(headers => headers.append('Authorization', `Bearer ${this.auth.jwt}`),
  switchMap(headers =>
   this.http.get<Customer>(
    `${environment.apiRoot}/api/customers/${customerId}`,
    { headers }
   )
  )
 );
}

Well, look at that! Not a variable assignment in sight! This was the easy one, though…what about the more complicated case of needing to set up both headers and parameters? How do we handle that? There are in fact many ways to solve that problem, however one option in particular keeps the solution clean and organized:

loadAll(criteria?: CustomerCriteria): Observable<Customer[]> {
 return forkJoin([ // Split our execution into more than one path...
  of(new HttpHeaders()).pipe(
   tap(headers => headers.append('Authorization', `Bearer ${this.auth.jwt}`)
  ),
  of(new HttpParams()).pipe(
   tap(params => criteria?.orderId && params.append('orderId', criteria.orderId)),
   tap(params => criteria?.repId && params.append('repId', criteria.repId)),
  )
 ]).pipe( // ...then gather the results of each path...
  switchMap(([headers, params]) => 
   this.http.get<Customers[]>(
    `${environment.apiRoot}/api/customers/${customerId}`,
    { headers, params }
   )
  )
 );
}

Again, not a variable assignment in site, no explicit control flow. Pure functional goodness! We can get a little cleaner than this, however, as using the HttpHeaders and HttpParams options objects is not strictly necessary. In fact, we can create simple objects for both:

loadAll(criteria?: CustomerCriteria): Observable<Customer[]> {
 return forkJoin([
  of({ Authorization: `Beader ${this.auth.jwt}`}),
  of(criteria)
 ]).pipe(
  switchMap(([headers, params]) => 
   this.http.get<Customers[]>(
    `${environment.apiRoot}/api/customers/${customerId}`,
    { headers, params }
   )
  )
 );
}

That is a little cleaner!

Joining Streams

So, what about more complex scenarios? With reactive applications, ultimately, you tend to find yourself wanting…well, just about everything as a stream! So what if our Auth service’s jwt property was a stream, rather than a static value? With the imperative approach, things get pretty nasty…if it is even possible at all.

With multiple streams, we end up being forced to get more reactive. For those not familiar with reactive solutions to common problems, this can lead to some pretty ugly code, often riddled with more bugs than strait up static data and imperative code.

With our reactive approach above, it’s no problem. Joining streams is an inherent and easy to implement capability of RxJs.

Imperative Impossible

With imperative code, if we need data from a stream…we suddenly find ourselves in the dreaded “explicit subscribe” scenario. We must subscribe to the stream, or streams, that provide the data we need. That can often mean some messy code, since we have to unsubscribe if those streams don’t complete on their own. We can be clever, use things like take(1) or first() to force our one-off requests to clean themselves up…but it still isn’t pretty.

In many cases, imperative solutions involving data from multiple streams are simply impossible. Our loadAll example, in fact, is one such case. After pondering a code example for a bit, a viable example simply does not seem possible at the time of this writing. The only viable solutions involve converting the code to a reactive solution of some kind!

Reactive to the Rescue

loadAll(criteria?: CustomerCriteria): Observable<Customer[]> {
 return forkJoin([
  of(new HttpHeaders()).pipe(
   withLatestFrom(this.auth.jwt$),
   tap(([headers, jwt]) => headers.append('Authorization', `Bearer ${jwt}`)
  ),
  of(criteria)
 ]).pipe(
  switchMap(([headers, params]) => 
   this.http.get<Customers[]>(
    `${environment.apiRoot}/api/customers/${customerId}`,
    { headers, params }
   )
  )
 );
}

Easy peasy!! The withLatestFrom operator is the staple for joining one or more streams to another stream. It provides the values from each stream as a tuple, which can easily be destructured in subsequent operators. In this case, switching back to the HttpHeaders actually makes the problem a little easier to solve. There are other solutions…in fact, we could start with the auth.jwt$ stream, and pipe off of that. That leads to a more complex observable in the end, though:

loadAll(criteria?: CustomerCriteria): Observable<Customer[]> {
 return this.auth.jwt$.pipe(
  switchMap(jwt => 
   forkJoin([
    of({ Authorization: `Beader ${jwt}`}),
    of(criteria)
   ])
  ),
  switchMap(([headers, params]) => 
   this.http.get<Customers[]>(
    `${environment.apiRoot}/api/customers/${customerId}`,
    { headers, params }
   )
  )
 );
}

Here we end up switching through two different streams…first we switch from the auth.jwt stream to the forkJoin that creates our headers and params. From that stream, we finally switch again to the http stream. This is a tricker flow that isn’t quite as easy to reason about as the plain forkJoin. We were able to avoid HttpHeaders though.

Reactive Reuse

In our examples so far, we have been creating an HTTP Authorization header with a JWT as a Bearer token on every request, and repeating that code for each request implementation. This is actually something we do not necessarily need to keep re-creating. So long as the JWT does not change, the token header doesn’t need to change either. If this is the only header we need for our requests, we could encapsulate it entirely within our authorization service.

Reactive Auth

Our basic reactive auth service might track the JWT token in a BehaviorSubject, which will keep it as, in effect, a piece of “state” so long as the service lives. We can expose that subject as an Observable, as well as create another Observable from it for the HTTP header:

export class AuthService {
 private jwt$ = new BehaviorSubject<JWT>(null);
 jwt$ = this.jwt$.asObservable();

 authHeader$ = this.jwt$.pipe(
  map(jwt => jwt ? { Authorization: `Bearer ${jwt}` } : undefined),
  shareReplay(1),
  map(headers => new HttpHeaders(headers))
 );

 // ... other auth/auth code...
}

Our authHeader stream here will, every time the jwt emits a new value, will create new http headers. If the jwt is truthy (i.e. we are logged in), then it adds the Authorization header. If the jwt is falsy (i.e. we have been logged out), it returns undefined.

Note the shareReplay(1), which ensures that we keep providing the most recent http headers for each new subscriber. Further, each new subscriber gets their own HttpHeaders object populated with the headers created the last time the JWT changed. This supports using authHeader in any number of potential data services and service calls, without having to recompute the headers each time. The new HttpHeaders instance allows for some clever use down the line.

export class CustomersService {
 constructor(private http: HttpClient, private auth: AuthService) {}
 
 load(customerId: number): Observable<Customer> {
  return this.auth.authHeader$.pipe(
   switchMap(headers =>
    this.http.get<Customer>(
     `${environment.apiRoot}/api/customers/${customerId}`,
     { headers }
    )
   )
  );
 }

 loadAll(criteria?: CustomerCriteria): Observable<Customer[]> {
  return forkJoin([
   this.auth.authHeaders$,
   of(criteria)
  ]).pipe(
   switchMap(([headers, params]) => 
    this.http.get<Customers[]>(
     `${environment.apiRoot}/api/customers/${customerId}`,
     { headers, params }
    )
   )
  );
 }
}

We are now able to easily include our auth header to any request, and remain reactive and functional in the process.

Adding Headers

With the above approach, it may seem as though adding other headers besides the Authorization header is no longer an option. Thanks to using the HttpHeaders object, we can in fact augment the headers as they move through the stream, allowing us to add any headers we wish, in addition to the Authorization header added by the auth service.

For example, let’s say we wanted to make sure data was not cacheable for a particular request, thus forcing the browser to re-request the data from the remote service, and invalidating the cache in any intermediate proxies:

loadAll(criteria?: CustomerCriteria): Observable<Customer[]> {
 return forkJoin([
  this.auth.authHeaders$,
  of(criteria)
 ]).pipe(
  tap(([headers]) => headers.append('Cache-Control', 'no-cache')),
  switchMap(([headers, params]) => 
   this.http.get<Customers[]>(
    `${environment.apiRoot}/api/customers/${customerId}`,
    { headers, params }
   )
  )
 );
}

We simply tap the stream, append the additional header, and we are on our way!

Reactive Forms

We can use a similar approach to create forms for submission to remote services as well. The modern web supports two types of standardized forms:

  • FormData: multi-part forms
  • URLSearchParams: url-encoded forms

We can use these standardized objects in much the same way we have used Angular’s HttpHeaders and HttpParams objects to create forms from input arguments or other data in a reactive and functional manner.

Multi-Part Form Data

Multi-part form data is a way of encoding each form field as its own bounded part of a multi-part form. By encoding each field in this manner, we are able to choose how each individual field is encoded. This supports encoding complex data, such as images or other file submissions, in an appropriate and distinct manner from other simpler fields, such as text input fields.

We can easily create a Products data service that accepts a simple javascript object for a Product, as well as a File representing a product image added to a form via an <input type="file"> field.

export class ProductService {
 create(product: Product, image: File): Observable<Product> {
  return of(new FormData()).pipe(
   tap(form => form.append('image', image)),
   switchMap(form => from(Object.keys(product)).pipe(
    tap(key => form.append(key, product[key]),
    endWith(form)
   )),
   switchMap(form => 
    this.http.post<Product>(
     `${environment.apiRoot}/api/products`,
     form
    )
   )
  );
 } 
}

We could also choose to send the image in one form part, and the rest of the product data as JSON in another form part:

export class ProductService {
 create(product: Product, image: File): Observable<Product> {
  return of(new FormData()).pipe(
   tap(form => form.append('image', image)),
   tap(form => form.append('product', new File(
    JSON.stringify(product),
    'product.json',
    { type: 'application/json' }
   ))),
   switchMap(form => 
    this.http.post<Product>(
     `${environment.apiRoot}/api/products`,
     form
    )
   )
  );
 } 
}

Clean and easy! Nothing to it! With Angular, its HttpClient will automatically recognize the FormData type and handle setting the Content-Type header to multipart/form-data.

URL Encoded Form Data

While multi-part forms are great for posting complex chunks of data to an API, and especially useful for uploading binary file data such as images, PDFs, etc. they are not the most compact or efficient form of representing form data. In fact, for basic forms they can bloat the payload significantly. Further, many third-party APIs may require data to be sent in the simpler and more classic “url encoded” data format, where key/value pairs are joined by ampersands, and each value is encoded according to url encoding rules.

We can use the URLSearchParams object to facilitate construction of such a form. Many third-party APIs often require data to be submitted as url encoded form data. A common example might be requesting an OAuth access token:

retrieveToken(deviceCode: string): Observable<{ access_token: string, refresh_token?: string }> {
 return of([new URLSearchParams(), new HttpHeaders()]).pipe(
  tap(([form]) => form.append('grant_type', 'urn:ietf:params:oauth:grant-type:device_code')),
  tap(([form]) => form.append('device_code', deviceCode)),
  tap(([, headers]) => headers.append('Content-Type', 'application/x-www-form-urlencoded')),
  switchMap(([form, headers]) =>
   this.http.post<{ access_token: string, refresh_token?: string }>(
    `${environment.authServer}/oauth/token`,  
    form.toString(),
    { headers },
   )
  )
 );
 }

Same as with FormData, populating the key/value pairs is a trivial operation. Unlike FormData, the Angular HttpClient does not natively recognize URLSearchParams as a form type, so remember to set the Content-Type header.

Recipes for Reactive

This is just a small set of recipes for writing reactive code in Angular where normally you might think only imperative code was an option. Through the many RxJs factory functions, notably of and from, but not discounting forkJoin, combineLatest, interval, timer, range and even the somewhat obscure but awesome generate, the power to convert any code into reactive is possible.

Try your hand at converting some of your “standard” imperative Angular code to functional and reactive RxJs code using some of the techniques discussed in this article. Stay tuned for future articles sharing additional Recipes for Reactive code!