How to Trace Websocket Events in NestJS - [Case Study]

In NestJS, HTTP requests are traced by req.id parameter. You can write your custom id generator, or use default id generated by NestJS or Express or Fastify. However, there is no such concept of request id, or request tracing when it comes to Websockets / Socket.IO.

Though NestJS allows use of interceptors, filters or guards in Gateways, one feature I've felt missing is request id. Without request id, you can't trace websocket events and that is not very good for observability.

In this post, I will explain how to trace websocket events in NestJS. For this, we will create a custom WebSocketAdapter. In doing so, we will dive deep into source code of NestJS and solve some challenges!


What is a WebSocketAdapter?

NestJS makes use of Adapter Pattern to abstract underlying libraries from your code. For example NestJS doesn't care if you are using Fastify or Express as an HTTP library. If you want to use another library, you just need to implement an adapter defined by NestJS.

There are various websocket libraries in Node.js, and also there is Socket.IO, which makes things easier for developers. In this post, we will modify IoAdapter, from the existing official NestJS package: @nestjs/platform-socket.io.

💡
Socket.IO is a bidirectional communication protocol, than can work over HTTP or Websockets. We will use it as an example websocket library in this post, but actual details of the Socket.IO protocol is out of scope for this post. I'm planning to publish an article about how Socket.IO works, if you don't want to miss out, subscribe to Noop Today!

How to Write a WebSocketAdapter in NestJS

To create a custom WebSocketAdapter in NestJS, you need to implement WebSocketAdapter interface from @nestjs/common.

export interface WebSocketAdapter<TServer = any, TClient = any, TOptions = any> {
    create(port: number, options?: TOptions): TServer;
    bindClientConnect(server: TServer, callback: Function): any;
    bindClientDisconnect?(client: TClient, callback: Function): any;
    bindMessageHandlers(client: TClient, handlers: WsMessageHandler[], transform: (data: any) => Observable<any>): any;
    close(server: TServer): any;
}

https://github.com/nestjs/nest/blob/master/packages/common/interfaces/websockets/web-socket-adapter.interface.ts

In a typical Node.js application involving websockets / socket.io, you probably have the same functionality in a different way. Since we are using NestJS as our framework, lets see how we should structure our code.

A typical socket.io server in Node.js

const express = require('express');
const app = express();
const http = require('http');
const server = http.createServer(app);
const { Server } = require("socket.io");

// create in WebSocketAdapter
const io = new Server(server);

// bindClientConnect in WebSocketAdapter
io.on('connection', (socket) => {
  console.log('handleConnection');
  
  // bindClientDisconnect in WebSocketAdapter
  socket.on('disconnect', () => {
  	console.log('handleDisconnect');
  })
  
  // bindMessageHandlers in WebSocketAdapter
  socket.on('foo', (data, callback) => {
  	console.log('handleFoo');
    callback('OK');
  })
});

server.listen(3000, () => {
  console.log('listening on *:3000');
});

// close in WebSocketAdapter
setTimeout(() => { io.close() }, 10000);

NestJS provides an AbstractWsAdapter class to make things easier for us. So instead of this:

import { WebSocketAdapter } from '@nestjs/common';

export class MyWebSocketAdapter implements WebSocketAdapter {}

We can do this:

import { AbstractWsAdapter } from '@nestjs/websockets';

export class MyWebsocketAdapter extends AbstractWsAdapter {}

The difference is that, AbstractWsAdapter provides default implementations for some methods, but you still need to implement create and bindMessageHandlers methods.

We will act smart and use existing code from the official package, and make modifications only in required parts.

Use Existing IoAdapter from @nestjs/platform-socket.io

This is the IoAdapter from NestJS official packages, that acts as a bridge between NestJS and Socket.IO.

import { isFunction, isNil } from '@nestjs/common/utils/shared.utils';
import {
  AbstractWsAdapter,
  MessageMappingProperties,
} from '@nestjs/websockets';
import { DISCONNECT_EVENT } from '@nestjs/websockets/constants';
import { fromEvent, Observable } from 'rxjs';
import { filter, first, map, mergeMap, share, takeUntil } from 'rxjs/operators';
import { Server, ServerOptions, Socket } from 'socket.io';

export class IoAdapter extends AbstractWsAdapter {
  public create(
    port: number,
    options?: ServerOptions & { namespace?: string; server?: any },
  ): Server {
    if (!options) {
      return this.createIOServer(port);
    }
    const { namespace, server, ...opt } = options;
    return server && isFunction(server.of)
      ? server.of(namespace)
      : namespace
      ? this.createIOServer(port, opt).of(namespace)
      : this.createIOServer(port, opt);
  }

  public createIOServer(port: number, options?: any): any {
    if (this.httpServer && port === 0) {
      return new Server(this.httpServer, options);
    }
    return new Server(port, options);
  }

  public bindMessageHandlers(
    socket: Socket,
    handlers: MessageMappingProperties[],
    transform: (data: any) => Observable<any>,
  ) {
    const disconnect$ = fromEvent(socket, DISCONNECT_EVENT).pipe(
      share(),
      first(),
    );

    handlers.forEach(({ message, callback }) => {
      const source$ = fromEvent(socket, message).pipe(
        mergeMap((payload: any) => {
          const { data, ack } = this.mapPayload(payload);
          return transform(callback(data, ack)).pipe(
            filter((response: any) => !isNil(response)),
            map((response: any) => [response, ack]),
          );
        }),
        takeUntil(disconnect$),
      );
      source$.subscribe(([response, ack]) => {
        if (response.event) {
          return socket.emit(response.event, response.data);
        }
        isFunction(ack) && ack(response);
      });
    });
  }

  public mapPayload(payload: unknown): { data: any; ack?: Function } {
    if (!Array.isArray(payload)) {
      if (isFunction(payload)) {
        return { data: undefined, ack: payload as Function };
      }
      return { data: payload };
    }
    const lastElement = payload[payload.length - 1];
    const isAck = isFunction(lastElement);
    if (isAck) {
      const size = payload.length - 1;
      return {
        data: size === 1 ? payload[0] : payload.slice(0, size),
        ack: lastElement,
      };
    }
    return { data: payload };
  }
}

https://github.com/nestjs/nest/blob/master/packages/platform-socket.io/adapters/io-adapter.ts

Let's see what does all the methods do:

  • create method is responsible for creating the socket.io server, we don't need to modify here.
  • createIoServer is an helper method for create
  • bindMessageHandlers is responsible for delivering socket.io messages to our handlers. We will modify this for sending extra arguments to our handlers.
  • mapPayload parses payload from socket.io to { ack, data }, we don't need to modify here.

So, we just need to focus on bindMessageHandlers.

Modifying The bindMessageHandlers

  public bindMessageHandlers(
    socket: Socket,
    handlers: MessageMappingProperties[],
    transform: (data: any) => Observable<any>,
  ) {
    const disconnect$ = fromEvent(socket, DISCONNECT_EVENT).pipe(
      share(),
      first(),
    );

    handlers.forEach(({ message, callback }) => {
      const source$ = fromEvent(socket, message).pipe(
        mergeMap((payload: any) => {
          const { data, ack } = this.mapPayload(payload);
          return transform(callback(data, ack)).pipe(
            filter((response: any) => !isNil(response)),
            map((response: any) => [response, ack]),
          );
        }),
        takeUntil(disconnect$),
      );
      source$.subscribe(([response, ack]) => {
        if (response.event) {
          return socket.emit(response.event, response.data);
        }
        isFunction(ack) && ack(response);
      });
    });
  }

First, lets understand what is handlers in this. It has the type: MessageMappingProperties:

export interface MessageMappingProperties {
    message: any;
    methodName: string;
    callback: (...args: any[]) => Observable<any> | Promise<any> | any;
}

In socket.io terms: socket.on(message, callback), message will match the event name, and callback will be the actual handler function.

So, in order to pass parameters to our handler methods, we must pass parameters to the callback.

Lets take a closer look at code with additional commentary:

// message --> eventName
// callback --> eventHandler
handlers.forEach(({ message, callback }) => {
	// RxJS equivelant for --> socket.on(message)
    const source$ = fromEvent(socket, message).pipe(
    	// payload is socket.io payload
        mergeMap((payload: any) => {
            const { data, ack } = this.mapPayload(payload);
            // transform, transforms our callback handler to observable
            // callback function is called with TWO parameters
            return transform(callback(data, ack)).pipe(
                filter((response: any) => !isNil(response)),
                // If callback returns any response, wrap it with ack
                map((response: any) => [response, ack]),
            );
        }),
        takeUntil(disconnect$),
    );
    source$.subscribe(([response, ack]) => {
    	// If you return { event: 'foo', data: 'bar' } from handler
        // See: https://docs.nestjs.com/websockets/gateways#multiple-responses
        if (response.event) {
            return socket.emit(response.event, response.data);
        }
        // If client accepts response, send response
        isFunction(ack) && ack(response);
    });
});

A Little Problem in NestJS

If you've ever used IoAdapter in your project, you might realize even though the adapter passes two arguments to the callback function, your handler methods don't receive them in full. Let me explain:

Say you have a message handler:

class MySocketGateway {
	@SubscribeMessage('foo')
    fooHandler(argument1, argument2, argument3){
    	console.log({
        	argument1, // Client
            argument2, // Body
            argument3, // undefined
        })
    }
}

If you go ahead and try something like this, you will see that 1st argument is Socket / Client object, 2nd argument is body BUT 3rd argument is undefined. Two questions come to mind:

  • Where did 1st argument came from?
  • Where did my ack function disappear?

Lets go with the first question, if we dive deep into underlying NestJS code, we can find the exact place where adapter.bindMessageHandlers is called.

public subscribeMessages<T = any>(
    subscribersMap: MessageMappingProperties[],
    client: T,
    instance: NestGateway,
  ) {
    const adapter = this.config.getIoAdapter();
    const handlers = subscribersMap.map(({ callback, message }) => ({
      message,
      callback: callback.bind(instance, client),
    }));
    adapter.bindMessageHandlers(client, handlers, data =>
      fromPromise(this.pickResult(data)).pipe(mergeAll()),
    );
  }

https://github.com/nestjs/nest/blob/21bd8c37364a2a2591e3de9bfb88d32d09431438/packages/websockets/web-sockets-controller.ts#L147

If you look closely, callback is bound with two parameters. bind method has one required argument for this, and rest of the arguments are supplied as arguments to the function - callback. More info about this in mozilla docs. So, our 1st argument is fixed by NestJS to be client object.

How about ack function, why does it disapper?

This was very hard for me to debug but I've found why does that happen, and how you can fix it!

The problem is not about the ack function. I've tried sending different parameters, or sending more than two parameters to callback function but only the first parameter ends up in handler, and rest is gone.

There is a helper class WsContextCreator in @nestjs/websockets, that is responsible for attaching Interceptors, Guards, Pipes and ExceptionHandlers to handlers, AND managing parameters sent to handler methods!

That helper class is responsible for disappearing parameters! The reason it makes other parameters disappear is related to this use case:

class MyWebsocketGateway {
    @SubscribeMessage('foo')
    handleFoo(client: Client, body){
    	console.log({ body });
    }
    
    @SubscribeMessage('bar')
    handleBar(@MessageBody() body){
    	console.log({ body })
    }
}

If you want to decorate your handler parameters with @MessageBody or @ConnectedSocket, the WsContextCreator assigns those parameters for you. You can even do experiments like this:

class MyWebsocketGateway {
	@SubscribeMessage('foo')
    handleFoo(arg1, arg2, arg3, @MessageBody() body){
    	console.log({ arg1, arg2, arg3 }) // All undefined
        console.log({ body })
    }
}

I won't go into line by line details of how does this happen, but I will provide you a conceptual explanation and if you want to investigate more, you can always look at source code of @nestjs/websockets for a good exercise.

The key point is WsContextCreator extracts data from initial arguments to the callback function, rearranges them and sends those arguments to your handler function. During the extraction process, WsContextCreator looks up metadata of the handler function:

const metadata = this.contextUtils.reflectCallbackMetadata<TMetadata>(
    instance,
    methodName,
    PARAM_ARGS_METADATA,
) || DEFAULT_CALLBACK_METADATA;

https://github.com/nestjs/nest/blob/aa3ad07c1023b71edda6b6ea53374787d3712231/packages/websockets/context/ws-context-creator.ts#L163

If your function doesn't have PARAM_ARGS_METADATA, NestJS assigns default callback metadata. Guess what, DEFAULT_CALLBACK_METADATA assigns only two parameters to our handler parameters - client and payload.

import { WsParamtype } from '../enums/ws-paramtype.enum';

export const DEFAULT_CALLBACK_METADATA = {
  [`${WsParamtype.PAYLOAD}:1`]: { index: 1, data: undefined, pipes: [] },
  [`${WsParamtype.SOCKET}:0`]: { index: 0, data: undefined, pipes: [] },
};

https://github.com/nestjs/nest/blob/master/packages/websockets/context/ws-metadata-constants.ts

Solution for Providing Additional Parameters to Handlers

NestJS exports an undocumented helper function called assignCustomParameterMetadata, which allows us to override DEFAULT_CALLBACK_METADATA and use our own metadata.

export function assignCustomParameterMetadata(
  args: Record<number, RouteParamMetadata>,
  paramtype: number | string,
  index: number,
  factory: CustomParamFactory,
  data?: ParamData,
  ...pipes: (Type<PipeTransform> | PipeTransform)[]
) {
  return {
    ...args,
    [`${paramtype}${CUSTOM_ROUTE_AGRS_METADATA}:${index}`]: {
      index,
      factory,
      data,
      pipes,
    },
  };
}

https://github.com/nestjs/nest/blob/5aeb40b/packages/common/utils/assign-custom-metadata.util.ts#L9

The function is not self-explanatory, but I think it will be much more clear after example below. Important thing is "where should we use this function?".

This function is meant to modify PARAM_ARGS_METADATA, and normally that metadata is defined by decorators such as @ConnectedSocket or @MessageBody. So, lets comply with the conventions and create ourselves a new parameter decorator: @Ack.

Creating the @Ack Decorator

If you are not familiar with the concept of decorators, or decorators in NestJS, you can read my previous article: Using Custom Decorators in NestJS.

import { PARAM_ARGS_METADATA } from '@nestjs/websockets/constants';

export function Ack(): ParameterDecorator {
// index is the index of parameter we decorated
// It is not related to getArgByIndex below!
    return function(target, key, index) {
        // Get existing metadata of the handler
        const args = Reflect.getMetadata(PARAM_ARGS_METADATA, target.constructor, key) || {};
        // Extend with new metadata
        const meta = assignCustomParameterMetadata(args, 'Ack', index, (data, input: ExecutionContext) => {
            // This allows NestJS to extract required parameter from initial arguments supplied to 'callback' function
            // Index here needs to match index of the callback parameters
            // 0 --> Client
            // 1 --> Payload from IoAdapter
            // 2 --> Ack from IoAdapter
            // 0 is always client, but rest of the parameters depend on the underlying adapter.
            return input.getArgByIndex<Function>(2);
        });
        Reflect.defineMetadata(PARAM_ARGS_METADATA, meta, target.constructor, key);
    };
}

Now we can reach our ack function from the handler as follows:

class MyWebSocketGateway {
	@SubscribeMessage('foo')
    fooHandler(@MessageBody() body, @Ack() ack){
    	ack('foo_response');
    }
}

Lets Move On With RequestId

That really was a case study, isn't it? Lets add a request id to our handler parameters in case we want to trace our requests.

First we need to modify our adapter, if you remember from above, the adapter supplies arguments to the callback function. We can create a request id in our adapter and supply it to the callback.

  // You can make your own implementation for this.
  private createRequestId(){
  	return crypto.randomUUID();
  }

  public bindMessageHandlers(
    socket: Socket,
    handlers: MessageMappingProperties[],
    transform: (data: any) => Observable<any>,
  ) {
    const disconnect$ = fromEvent(socket, DISCONNECT_EVENT).pipe(
      share(),
      first(),
    );

    handlers.forEach(({ message, callback }) => {
      const source$ = fromEvent(socket, message).pipe(
        mergeMap((payload: any) => {
          const { data, ack } = this.mapPayload(payload);
          // <-- Modified Section Start
          const requestId = this.createRequestId();
          return transform(callback(data, ack, requestId))
          // Modified Section End -->
          .pipe(
            filter((response: any) => !isNil(response)),
            map((response: any) => [response, ack]),
          );
        }),
        takeUntil(disconnect$),
      );
      source$.subscribe(([response, ack]) => {
        if (response.event) {
          return socket.emit(response.event, response.data);
        }
        isFunction(ack) && ack(response);
      });
    });
  }

That is all the modification our adapter needs. Now we can create another decorator: @RequestId

import { PARAM_ARGS_METADATA } from '@nestjs/websockets/constants';

export function RequestId(): ParameterDecorator {
    return function(target, key, index) {
        const args = Reflect.getMetadata(PARAM_ARGS_METADATA, target.constructor, key) || {};
        const meta = assignCustomParameterMetadata(args, 'RequestId', index, (data, input: ExecutionContext) => {
            return input.getArgByIndex<string>(3);
        });
        Reflect.defineMetadata(PARAM_ARGS_METADATA, meta, target.constructor, key);
    };
}

That is exactly the same decorator with @Ack with only difference being return input.getArgByIndex(3);. Now our handler function looks like this:

class MyWebSocketGateway {
	@SubscribeMessage('foo')
    fooHandler(@MessageBody() body, @Ack() ack, @RequestId() requestId){
    	console.log('Received request with id: ', requestId);
    	ack('foo_response');
    }
}

Next Steps

If you don't want to deal with all of these, I'm currently working on an npm package that has better developer ergonomics compared to official @nestjs/platform-socket.io package. I'm aware that decorating every parameter in handler functions can be ugly and tiring to eyes. I believe there are more beautiful solutions to this problem. I'm trying to figure out which solution would result in better developer experience.

I would like to hear your opinions about this. Whether you suggest a usage example, or you face other problems with official package, please let me know what you think in the comments.

Also, if you don't want to miss out on articles about NestJS and programming in general subscribe to Noop Today. I hope you learned something new today, see you in the upcoming posts!


Here is the repo that contains TraceableIoAdapter with example code: https://github.com/nooptoday/nestjs-trace-websocket-events