Using WebSockets may seem daunting, but they are delightfully simple. And what you’ll be really glad to know, is that RxJs ships with a thin wrapper around the native browser WebSocket client. No additional third party dependencies are needed, and you get an observable data stream right out of the box!

Why

Using a WebSocket connection is ideal not just for chat apps, but many other use-cases: Streaming data, any situation where you are repeating RESTful calls at an interval, or calls that take a long time to return. An added benefit is that when you have “live data” displayed through a WebSocket, it will make your app a whole lot more engaging and interesting to the user.

Setup

It’s important to know that WebSockets use a different protocol than you are probably used to; it’s ws:// instead of http:// or wss:// instead of https://.

You’ll be importing these WebSocket classes from rxjs:

import { webSocket, WebSocketSubject } from 'rxjs/webSocket';

Of course, you’ll need a server to connect to. It’s very easy to set up a local node server with a WebSocket using the popular ws library. You can also mount a WebSocket route onto express with this handy library, which is an extension of ws. I won’t cover setting up a WebSocket server in this article, but this article has a great write up on it.

Create a WebSocket Service

If your apps are anything like mine, your APIs share a root url, so our service will handle converting it to the WS protocol. The service will maintain a single WebSocket connection. First let’s cover connecting.

Connecting

Our connect() method returns an observable of the connection. You will call this method from your components to get an observable of all the data that comes through the WebSocket connection.

connection$: WebSocketSubject<any>;
RETRY_SECONDS = 10; 
connect(): Observable<any> {
  return this.store.pipe(select(getApiUrl)).pipe(
    filter(apiUrl => !!apiUrl),
    // https becomes wws, http becomes ws
    map(apiUrl => apiUrl.replace(/^http/, 'ws') + '/stream'),
    switchMap(wsUrl => {
      if (this.connection$) {
        return this.connection$;
      } else {
        this.connection$ = webSocket(wsUrl);
        return this.connection$;
      }
    }),
    retryWhen((errors) => errors.pipe(delay(this.RETRY_SECONDS)))
  );
}

Let’s break it down line-by-line. We’re piping off the store to get our API url. If you aren’t using NgRx, you could start with the following instead:

return of('http://localhost:3000').pipe(
  1. We have a filter operator so that we can suppress null or empty strings being emitted from the store for the API url. (Not needed if you are not using NgRx).
  2. With the map operator, we are replacing the http protocol with ws, and tacking on the WebSocket endpoint. This is flexible enough to handle both http and https.
  3. Now we switch observables via switchMap to a WebSocket connection. Here we check if one has already been made, and if so, return it. Otherwise, we create a new WebSocket connection with the RxJs wrapper webSocket() and pass it the url. Before we return it, we keep a reference to it with connection%content%lt;/span>, so that we can send data and so that we don't make redundant connections.
  4. Lastly, we can retry the sequence if an error occurs, like if the server drops the connection. We are passing in the constant RETRY_SECONDS which is how long it will wait before trying to connect again.

Important note: The connection will not be opened until you subscribe to it. More on that towards the end…

Sending data

Our service will also include a send method. It works by just calling next() on the connection reference. Simple enough, eh?

send(data: any) {
  if (this.connection$) {
    this.connection$.next(data);
  } else {
    console.error('Did not send data, open a connection first');
  }
}

What about auth?

Until you send something to a WebSocket connection, the server doesn’t know who you are. One technique for handling auth in WebSockets is to include a JWT token in every payload you send to the server. We can modify our method to handle this:

send(data: any) {
  if (this.connection$) {
    const payload = {
      token: this.authService.token,
      ...data,
    };
    this.connection$.next(payload);
  }
}

In this scenario, the server would include some logic so that it doesn’t send anything until it gets a payload from the client and validates the token in it.

Closing the connection

Finally, we need to handle closing the connection, so here’s how our service will do that.

closeConnection() {
  if (this.connection$) {
    this.connection$.complete();
    this.connection$ = null;
  }
}
ngOnDestroy() {
  this.closeConnection();
}

As you can see it’s quite simple to close the connection, you just complete the observable. We also set our connection reference to null so that if connect() is called again it will open a new connection.

Let’s help out the server by closing the connection when our user closes their browser tab, by implementing OnDestroy. This just calls closeConnection().

Integrating the service with your component

Finally let’s bring it all home by integrating our WebSocket service. Let’s say you have a chat app, so your super simplified chat component template might look like this:

<ul>
  <li *ngFor="let message of messages">{{ message }}</li>
</u>
<input placeholder="Send a message..." [formControl]="msgCtrl">
<button (click)="sendMessage()">Send</button>

It loops over messages and has an input to send messages. Here’s the guts of our component:

messages: Message[] = [];
msgCtrl = new FormControl('');
destroyed$ = new Subject();
constructor(private webSocket: WebSocketService) {}
ngOnInit() {
  this.webSocket.connect().pipe(
    takeUntil(this.destroyed$)
  ).subscribe(messages => this.messages.push(messages));
}
sendMessage() {
  this.webSocket.send({ message: this.msgCtrl.value });
  this.msgCtrl.setValue('');
}
ngOnDestroy() {
  this.destroyed$.next();
}

As the server broadcasts new messages, they are emitted through the WebSocket connection. In our component, we initialize an empty array of messages and subscribe to the WebSocket connection. Every time a new message is emitted, we push it onto our existing array of messages.

As previously mentioned, it’s very important that we either call subscribe() or async pipe the WebSocket connection, because that is how the connection is opened. I am not async piping in this situation because I want to aggregate the messages, instead of just show the most recent one.

In the sendMessage() method we are just grabbing the value of the input and sending it through the WebSocket in an object, then clearing the value of the input.

Troubleshooting

If you’re having issues, here are several things that can help you.

  1. Use the Smart Websocket Client chrome extension to test your connection. Give it a URL and you can send and receive data. At least you’ll know if your server is working before you have to blame your client code.
  2. Inspect the WebSocket connection in Chrome DevTools. Go to the Network tab and filter by “WS”. The connections will be listed by the last path segment of the endpoint. You can click on it and see data being sent and received under the “Messages” tab.

Conclusion

I hope this article has given you a good foundation on interacting with WebSockets in Angular. Thanks and see you in the next article!

Thanks to Phil Feinstein.