Using Observables for batch processing in the backend

I've heard about reactive programming being used in the frontend but I've never used it in the backend. In this post, I explain how some things in the backend can be greatly simplified with reactive programming

I've heard about reactive programming and RxJS library before but I never had the chance to use it and understand it better. It seemed to me that it makes sense only in the frontend where you have streams of different user-generated events or applications that use a streaming architecture. In the frontend, this approach became popular after frameworks like Angular and React introduced RxJS to their architecture.

Lately, I've been using the web framework NestJS which is heavily inspired by the Angular architecture, and like Angular, it is also using RxJS and Observables. For example, in the HTTP module, you can receive the data of an HTTP response through an Observable.

Meanwhile, I had to solve a different problem.

Sending events to a constraint resource

Imagine the following setup, you have a job that doesn't run very often but when processed it generates a big amount of events that are published to a message broker. For each of these events, you have to make an HTTP call to an external system. However this external system allows limited calls within a timeframe, so if the amount of events exceeds this limit we have a problem.

To make this more quantifiable and thus easier to understand let's use an example:

The job that generates the events is running once per day.

The generated events can be in the magnitude of 10k - 100k.

The external system has the following limits:

  • 10k calls per day.
  • 100 calls per second.

We have some control over how the external system processes events so we can send multiple events in one call.

My initial approach to solving this problem was to create a subscriber worker that would get all the events from the message broker, store them in the database or key-value store, and then periodically retrieve them from storage, batch them, and dispatch them to the external system.

Enter the Pipe

So I have this problem in the back of my mind while I'm writing an HTTP request in NestJS and stumble upon Observables.
I don't want to go very deep in what Observables are and how they work, I would like to write a separate post on this subject, so for now, let's keep it a bit abstract and use the following definition:

An Observable is a construct that produces events on its own pace

So Observables are event generators, but the important part is "on its own pace" this means that they have control over the flow of events and inherently can provide different useful operations like buffering. But let's go to an example using RxJS:

const EventEmitter = require('events');
const { fromEvent } = require('rxjs');

// Create event emitter
class NotificationEmitter extends EventEmitter {}
const notificationEmitter = new NotificationEmitter();

// Create observable from the event emitter
const notificationObservable = fromEvent(notificationEmitter, 'event')

One of the different ways to create an Observable is from an event generator, to do this we use the fromEvent() function from RxJS. In the example above notificationEmitter is a Nodejs EventEmitter, more on that later.

So in line with the definition, we gave earlier, the notificationObservable will use the events coming from notificationEmitter as seed and then will produce data by performing operations on these events e.g. transform them to something else, filter them, accumulate them. As we already mentioned the important part is that the Observable controls the pace, the notificationEmitter will be triggered based on external factors and does not have any control over when to generate the events. The notificationObservable , on the other hand, has control over the flow, however with what we defined so far we don't try to control this flow at all, so the Observable will follow the emission rate of the notificationEmitter.

But let's see how we can control flow. In the snippet below we use the pipe() method to direct the stream of events the Observable creates into some transformation function. In this case, we use the bufferTime() operator.

const EventEmitter = require('events');
const { fromEvent } = require('rxjs');
const { bufferTime } = require('rxjs/operators');

// Create event emitter
class NotificationEmitter extends EventEmitter {};
const notificationEmitter = new NotificationEmitter();

// Create observable from the event emitter
const notificationObservable = fromEvent(notificationEmitter, 'event');
notificationObservable.pipe(
  bufferTime(2000, null, 3) // produce every 2 seconds or 3 messages
);

The bufferTime() operator accumulates events and emits them when one of the two following conditions are true:

  • Two seconds have passed
  • It accumulated three messages

Maybe this is the point where you get the aha moment and you see how easily we can solve the batching problem I explained in the previous section. But before we go back to the batching problem we need to add the missing part to the puzzle, the consumer.

So far we showed how to create an Observable and how we can transform the stream it generates, but not how to consume it. The way to do this is to use the subscribe() method of the Observable and pass the consumer function that you want to be executed on each of the events the Observable produces.

const EventEmitter = require('events');
const { fromEvent } = require('rxjs');
const { bufferTime } = require('rxjs/operators');

// Create event emitter
class NotificationEmitter extends EventEmitter {};
const notificationEmitter = new NotificationEmitter();

// Create observable from the event emitter
const notificationObservable = fromEvent(notificationEmitter, 'event');
notificationObservable
.pipe(
  bufferTime(2000, null, 3) // produce every 2 seconds or 3 messages
).subscribe(console.log);

In this case, we will simply print out each event.

Finishing the puzzle

Going back to our batching problem:

  • We will create the Observable from an EventEmitter that will be triggered by the messages that come from the message broker.
  • We will buffer the data produced by the Observable to respect the limits of the external system.
  • We will create a consumer that will call the external system and dispatch the batched data.

To keep the example simple we won't showcase the full integration but we will use simple functions to represent the external system and the message broker.

Below messageBrokerEventHandler represents the function that will be called when an event arrives at the appropriate topic and needs to be handled. We call this function multiple times to mock publishing events. The key part we want to showcase is that the event emitter will reproduce these events and they will be received by the Observable.

// Simulation
function messageBrokerEventHandler(eventPayload) {
  notificationEmitter.emit('event', eventPayload);
}

// Mock events
messageBrokerEventHandler({ id: 1, eventType: 'NOTIFY_USER' })
messageBrokerEventHandler({ id: 2, eventType: 'NOTIFY_USER' })
messageBrokerEventHandler({ id: 3, eventType: 'NOTIFY_USER' })
messageBrokerEventHandler({ id: 4, eventType: 'NOTIFY_USER' })
messageBrokerEventHandler({ id: 5, eventType: 'NOTIFY_USER' })

We also need to create the consumer of the Observable stream that will be making the calls to the external system.

function requestToLimitedResource(payload) {
  console.log('Made request');
  console.log(payload);
  return Promise.resolve(payload);
}

async function batchConsumer(bufferedEvents) {
  if (bufferedEvents.length > 0) {
    await requestToLimitedResource(bufferedEvents);
  }
}

The batchConsumer will subscribe to the stream of the Observable and receive the bufferedEvents. If the list of events is not empty it will make a call to the external system to send this information.

So if we glue all the pieces together we end up with the following script that showcases this whole idea.

const EventEmitter = require('events');
const { fromEvent } = require('rxjs');
const { bufferTime } = require('rxjs/operators');

const PUBLISH_RATE_MS = 2000;
const EVENT_BUFFER_SIZE = 3;

// External System
function requestToLimitedResource(payload) {
  console.log('Made request');
  console.log(payload);
  return Promise.resolve(payload);
}

async function batchConsumer(bufferedEvents) {
  if (bufferedEvents.length > 0) {
    await requestToLimitedResource(bufferedEvents);
  }
}


// Create event emitter
class NotificationEmitter extends EventEmitter {};
const notificationEmitter = new NotificationEmitter();

// Create observable from the event emitter
const notificationObservable = fromEvent(notificationEmitter, 'event');
notificationObservable
.pipe(
  bufferTime(PUBLISH_RATE_MS, null, EVENT_BUFFER_SIZE)
).subscribe(batchConsumer);


// Simulation of message broker
function messageBrokerEventHandler(eventPayload) {
  notificationEmitter.emit('event', eventPayload);
}

// Mock events
messageBrokerEventHandler({ id: 1, eventType: 'NOTIFY_USER' })
messageBrokerEventHandler({ id: 2, eventType: 'NOTIFY_USER' })
messageBrokerEventHandler({ id: 3, eventType: 'NOTIFY_USER' })
messageBrokerEventHandler({ id: 4, eventType: 'NOTIFY_USER' })
messageBrokerEventHandler({ id: 5, eventType: 'NOTIFY_USER' })

I hope you found this post informative! Stay tuned!

References

Subscribe to Backend Definite

Don’t miss out on the latest issues. Sign up now to get access to the library of members-only issues.
jamie@example.com
Subscribe