Using the middleware pattern in blue-rabbit
I’m writing a lightweight Nodejs-RabbitMQ microservice framework called blue-rabbit. It’s design is inspired from the middleware pattern as used in Koa.
I’m writing this post to better understand the implementation of the middleware pattern. I’ll use code from blue-rabbit
as the pivot to explain the implementation.
The middleware stack can be understood as a series of functions which every HTTP request passes through. In Koa, a HTTP request first flows down the stack and is processed by each middleware function. Once the request reaches the last function in the middleware stack, it then flows up the stack before the final response is sent.
The implementation of the middleware pattern in blue-rabbit
uses two classes.
- The
Application
class is responsible for setting up the- Connection to the RabbitMQ broker
- Exchange and
- Queues to receive messages from
- The
Context
class is a wrapper around each message received from the queue which provides various methods for manipulating individual messages
There are three key functions used to build the middleware pattern
use
This method is defined in the Application
class and is used to add functions to the middleware stack. It takes a function as an argument and appends it to the middleware
property of the Application
instance.
/**
* Add a function to the middleware stack
* @param {function} middlewareFunction
* @access public
*/
use(middlewareFunction) {
this.middleware.push(middlewareFunction);
}
Each function defined as a middleware has the following signature
function middlewareFunc(context, next) {
// Process context
next();
}
The call to next()
is made to invoke the next function in the middleware stack.
onMessage
This method is invoked each time a message is received from the queue.
/**
* Execute the middleware stack on the message received from the queue
* @param {object} message
* @access private
*/
async onMessage(message) {
const context = new Context(message, this);
const middlewareStack = compose(this.middleware);
try {
await middlewareStack(context);
} catch (err) {
debug('Error caught in middleware stack');
context.onerror(err);
}
}
First it creates a Context
instance which wraps the incoming message and provides methods for working with the message.
It creates a middleware stack using compose
. The middleware stack is invoked with the message received as context
.
The middleware stack can also be invoked as middlewareStack(context, fn)
where fn
is function which will be invoked when the message reaches the end of the middlewareStack
compose
This is the key ingredient of the middleware pattern. It takes as input an array of functions and returns a function which invokes each function in the array in sequence
The following code is from koa-compose
library
function compose (middleware) {
if (!Array.isArray(middleware)) throw new TypeError('Middleware stack must be an array!')
for (const fn of middleware) {
if (typeof fn !== 'function') throw new TypeError('Middleware must be composed of functions!')
}
/**
* @param {Object} context
* @return {Promise}
* @api public
*/
return function (context, next) {
// last called middleware #
let index = -1
return dispatch(0)
function dispatch (i) {
if (i <= index) return Promise.reject(new Error('next() called multiple times'))
index = i
let fn = middleware[i]
if (i === middleware.length) fn = next
if (!fn) return Promise.resolve()
try {
return Promise.resolve(fn(context, dispatch.bind(null, i + 1)));
} catch (err) {
return Promise.reject(err)
}
}
}
}
I’m using subsets of code to focus on one functionality at a time for easier understanding.
Validating arguments
The following lines are checks to ensure that compose
receives an array as an argument and each of the elements inside the array is a function
if (!Array.isArray(middleware)) throw new TypeError('Middleware stack must be an array!')
for (const fn of middleware) {
if (typeof fn !== 'function') throw new TypeError('Middleware must be composed of functions!')
}
Invoke each function in the middleware stack in order
Next, it returns a function which when invoked, invokes each of the functions in the middleware
array.
return function (context, next) {
return dispatch(0)
function dispatch(i) {
let fn = middleware[i]
try {
return Promise.resolve(fn(context, dispatch.bind(null, i + 1)));
} catch (err) {
return Promise.reject(err)
}
}
}
dispatch(0)
invokes the first function in the middleware array from the line let fn = middleware[i]
.
Next, fn(context, dispatch.bind(null, i + 1))
, invokes the next function in the middleware array.
Each function defined as a middleware has the following signature
function middlewareFunc(context, next) {
// Process context
next();
}
The call to next()
is made to invoke the next function in the middleware stack. When we invoke fn(context, dispatch.bind(null, i + 1))
we provide dispatch.bind(null, i+1)
as the argument for next parameter. This is thus invoked using next()
inside the middleware function.
The other instructions in the dispatch
function are used to handle various edge cases.
Define behavior when processing reaches the end of the middleware stack
return function (context, next) {
return dispatch(0)
function dispatch(i) {
let fn = middleware[i]
if (i === middleware.length) fn = next
if (!fn) return Promise.resolve()
try {
return Promise.resolve(fn(context, dispatch.bind(null, i + 1)));
} catch (err) {
return Promise.reject(err)
}
}
}
The following lines
if (i === middleware.length) fn = next
if (!fn) return Promise.resolve()
are used to define behavior when we reach the end of the middleware stack. In this case, the next
argument provided when calling the middleware stack from the onMessage
method is invoked. If it hasn’t been provided an empty promise is resolved.
Check to ensure next()
is invoked only once inside each middleware function
return function (context, next) {
let index = -1
return dispatch(0)
function dispatch(i) {
if (i <= index) return Promise.reject(new Error('next() called multiple times'))
index = i
let fn = middleware[i]
try {
return Promise.resolve(fn(context, dispatch.bind(null, i + 1)));
} catch (err) {
return Promise.reject(err)
}
}
}
The index
variable is used to ensure that next()
is only invoked once inside each middleware function. It does so by storing the index (in the middleware array) of the last invoked middleware function.
Consider a middleware stack with 3 functions. And the 2nd function invokes next
twice
function middlewareFunction2(context, next) {
// Process message
next();
next();
}
When middlewareFunction2
is invoked the value of index would be equal to 0. When middlewareFunction2
invokes next the first time, it will increment index to 1. This will be the end of the stack. When middlewareFunction2
invokes next again, index
would be equal to 1, while i would be equal to 2
thus throwing an error as defined in the following line
if (i <= index) return Promise.reject(new Error('next() called multiple times'))