In this blog, I’ll show you how to receive a pubsub message in protobuf format and insert it into a MongoDB server. There’re many resources about handling pubsub message in NodeJS, but we were in trouble to find any resources about handling message in protobuf format except the one in Kotlin. But turn out it’s quite simple, we just need 4 steps:

  • Create a client subscribes to the subscription
  • Receive raw message in binary
  • Decode it into JS object using protofile
  • Done

By doing this kind of process through a queue and using protobuf format, we’re able to handle millions of requests per day with only one single core VM.

Create pubSubClient and subcribe it into a subscription

1
2
3
4
5
6
7
8
pubSubClient = new PubSub({grpc, projectId});

function listenEventSub(eventSub) {
    //...
    const subscription = pubSubClient.subscription(subscriptionName);
    console.log(`Tracked subscription: ${subscriptionName}`);
    //...
}

Message handle function

1
2
3
4
5
6
7
async function messageHandler(message, messageCount, eventName, protofilePath, typeName) {
  const mess = await decodeMessage(message.data, protofilePath, typeName);
  await insert(mapEvent(eventName, mess));
  message.ack();
  messageCount += 1;
  return messageCount;
}

In the parameters of the handler, we pass the raw message receiver from pubsub subscription as well as the messageCount variable to keep track of the number of total proccessed messages.

Also, we pass eventName, protofilePath and typeName to provide the information for decode function. At line 2, we code the message into a JS object and at line 3, we pass the decoded result to insert function to write it into a MongoDB.

Decode protobuf message

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// proto_utils.js
const protobuf = require("protobufjs");

async function decodeMessage(buffer, protoFilePath, typeName) {
  const root = await protobuf.load(protoFilePath);
  const rawMessage = root.lookupType(typeName);
  const err = rawMessage.verify(buffer);
  if (err) {
    throw err;
  }
  const message = rawMessage.decode(buffer);
  return rawMessage.toObject(message);
}

We use protobufjs library to do the decoding, it’s just simple as it looks. We load the defined protobuf file, search for the typeName passed in the function input. Then we verify whether received binary message contains this typeName. If not we raise an error, otherwise we decode the message, convert it to JS object and return.