Stream Services

Introduction

A Stream Service is a software service that can be configured to continuously to handle requests. Within ClearBlade Platform or Edge, the most common use of Stream Service is to listen to MQTT Topics, and handle requests that come in the form of MQTT Messages from Devices/Users.

When to use Stream Services?

  • When a user is invoking a code service more than once in 60 seconds (it can be through a timer or trigger event)
  • Using Stream Services helps when a user expects real-time response, since they cut off the time to invoke the code service.
  • When an user chooses to perform parallel processing of incoming data, i.e. distribution of the incoming loads across multiple instances of the same Stream Service.

Configuration

Set execution timeout to Never

Template for Stream Services

Parallel execution using shared topic

The Stream Services are very efficient services which can be designed to perform parallel processing. In this template, users can have 10 instances of this stream service.

The below template code is written in a way that every instance of the Stream Service acts as an Unique Client to the Shared MQTT group topic. The working of shared subscriptions allows for uniform distribution of the load across all the instances of Stream Services. Refer to Shared Subscriptions to understand how shared topics works.

function SharedTopicService(req, resp) {
  ClearBlade.init({ request: req });
  var messaging = ClearBlade.Messaging();
  
  var sharedTopic = "$share/Group1/device/+";

  var deviceCollection = ClearBlade.Collection({
    collectionName: "some_collection"
  });

  // DEBUG MESSAGE
  messaging.publish("success", "Service Started");
  messaging.subscribe(sharedTopic, function(err, errMsg) {
    if (err) {
      // DEBUG MESSAGE
      messaging.publish("error", "Subscribe failed: " + errMsg);
      resp.error();
    }
    // DEBUG MESSAGE
    messaging.publish("success", "Subscribed to Shared Topic");
    // Once successfully subscribed
    WaitLoop();
  });

  function WaitLoop() {
    // DEBUG MESSAGE
    messaging.publish("success", "Starting the Loop");
    while (true) {
      messaging.waitForMessage([sharedTopic], function(err, msg, topic) {
        if (err) {
          // DEBUG MESSAGE
          messaging.publish("error", "Failed to wait for message: " + err + " " + msg + "  " + topic);
          resp.error("Failed to wait for message: " + err + " " + msg + "    " + topic);
        } else {
          // any action
          addCollectionRow(msg);
        }
      });
    }
  }

  function addCollectionRow(msg) {
    try {
      var parseMsg = JSON.parse(msg);
    } catch (e) {
      // DEBUG MESSAGE
      messaging.publish("error","Problem with parsing: " + e);
      
      resp.error("Problem with parsing: " + e);
    }
    var data = {
      column_1_name: "column_1_data",
      column_2_name: "column_2_data"
    };
    //debugging
    deviceCollection.create(data, function(err, result) {
      if (err) {
        // DEBUG MESSAGE
        messaging.publish("error", "failed to create: " + result);
        resp.error("create failed: " + result);
      } else {
        //no op
      }
    });
  }

}