Messaging
Introduction
ClearBlade Edge and Cloud Platform support Messaging via MQTT, which is a lightweight IoT Messaging protocol. We have also built an OAuth 2 security model into the MQTT Broker to provide out-of-the-box security around messaging.
To learn about sending messages, click here
Purpose
Lightweight messaging is key to moving data around an IoT System at scale.
Workflows
Workflow 1: Upon publish, process message with code services, which publishes output
Workflow 2: Code Service invoked, perform computation, then publishes output
MQTT
ClearBlade platform implements the full specification of MQTT 3.1.1 to allow for multiple Quality of Service settings that best fit a developers use case.
QoS
Quality of Service | Definition | Desc |
---|---|---|
0 | At Most once | The message is only sent once whether it makes it to the subscriber or not |
1 | At least once | The message will be sent one or more times to the subscriber until the message is received |
2 | Exactly once | QoS will make sure that the message will be received by the subscriber exactly once |
Ports for MQTT and their Use
Port | TLS | Information | Requirements |
---|---|---|---|
1883 | False | MQTT Pub/Sub | user-token |
1884 | True | MQTT Pub/Sub | user-token |
8903 | False | Web-Sockets, usually used by browser/applications (cannot communicate directly over MQTT) | dev-token |
8904 | True | Web-Sockets | dev-token |
8905 | False | Auth over MQTT | SystemKey, Secret, Username, Password, ClientId |
8906 | True | Auth over MQTT | |
8907 | False | Auth over MQTT via Web-Sockets | TODO |
8908 | True | Auth over MQTT via Web-Sockets | TODO |
Working with MQTT
To work with MQTT the following steps are involved:
Prerequisites
You must have a user or device available with valid permissions to publish messages.
Please follow the instructions to do so:
- Log in or create a developer account on a ClearBlade platform instance.
- View or create a System
- View or create a User
- View or assign a role to the user
- View the Roles page
- Add a topic to list of Message Topics enabled for that role, example ‘mytopic’
Connecting to the broker
The MQTT protocol allows for the connect action to provide a username and password. We will modify the use of those fields to accomodate our OAuth styled token model.
Key | Value | Example |
---|---|---|
URL | URL_OF_BROKER | platform.clearblade.com |
PORT | PORT | 1883 |
Username | USER_TOKEN | abcdefabcdef01234567890 |
Password | SYSTEM_KEY | f0cbf0cbf0cbf0cbf0cbf0cbf0cb |
ClientID | UNIQUE_CLIENT_ID | sjdbfkasdbf |
Duplicate Client IDs
If you connect with the same client ID as another MQTT client, your connection will fail.
ClearBlade provides an authentication broker for obtaining a user token. The following steps need to be performed to authenticate over MQTT.
Authentication over MQTT
A ClearBlade User Token is required to communicate with a broker. A token can be obtained via a REST endpoint call or via MQTT. In this section we will cover Authentication over MQTT
1. Connect
Required Keys | Description | Example Values |
---|---|---|
URL | <PLATFORM_IP> | platform.clearblade.com |
Port | <PORT_NUMBER> | 8905 |
Username | <SYSTEM_KEY> | bacb8fb60bb4d7c2c2c0e4bb9701 |
Password | <SYSTEM_SECRET> | BACB8FB60BFDDB7DB97D7A8BF01 |
ClientId | <USER_EMAIL>:<PASSWORD> for User, <DEVICE_NAME>:<ACTIVE_KEY> for devices | cbman@clearblade.com:cl34r8l4d3 or temperature-sensor:faqb8fb60bc2c2b1c0e4bb9701 |
tls(optional) | <ALLOW_TLS> | true |
2. Subscribe
- To receive the new JWT token, subscribe to the “auth” topic (recommended)
- To receive the legacy token, subscribe to the “v/1/auth” topic.
3. Extract Token
ClearBlade publishes the user-token on the “auth” message topic with bit-level encoding.
The payload is of the following format:
- The length block is of 2 bytes usigned 16 bit integer
- The data block is utf-8 encoded bytes
Mapping of the blocks in the above packet structure:
Block-Num | Description |
---|---|
1 | Length of the token |
2 | Token |
3 | Length of the user-id or device-name |
4 | UserId or DeviceName |
5 | Length of the Messaging Url |
6 | Messaging Url |
Once Client is Authenticated, the token can be extracted & the connection can be established.
Error Codes
Referring MQTT 3.1.1 Spec
Value | Return Code Response | Description |
---|---|---|
0 | 0x00 Connection Accepted | Connection accepted |
1 | 0x01 Connection Refused, unacceptable protocol version | The Server does not support the level of the MQTT protocol requested by the Client |
2 | 0x02 Connection Refused, identifier rejected | The Client identifier is correct UTF-8 but not allowed by the Server. This can be caused by a duplicate clientid . |
3 | 0x03 Connection Refused, Server unavailable | The Network Connection has been made but the MQTT service is unavailable |
4 | 0x04 Connection Refused, bad user name or password | The data in the user name or password is malformed |
5 | 0x05 Connection Refused, not authorized | The Client is not authorized to connect. For example, entering the incorrect password. |
6-255 | Reserved for future use |
Message Topics
Topics are used to separate messages and keep them organized based on what they are about.
There are three types of topics:
Regular Topics
Topics comprised of string literals:
/devices
Wildcard Topics
Wildcard subscription means they are subscribed to multiple topics simultaneously. These are of two types:
Single Level (+)
The single level wildcard (+) allows a subscription to match an entire level of the topic’s path.
/device/+/temp
Topic | Matches |
---|---|
/device/a/temp | ✓ |
/device/b/temp | ✓ |
/device/123/temp | ✗ |
/device/temp | ✗ |
/device/temp/a | ✗ |
Multi Level (#)
The multi-level wildcard: “#” matches with all values following the #
The following subscribes to EVERY topic:
#
See the following example:
/device/#
Topic | Matches |
---|---|
/device/1/2/3 | ✓ |
/device/a | ✓ |
/abc | ✗ |
/abc/device/ | ✗ |
It’s important to note that a subscription topic like /Foo/#/Bar
is invalid. Once a hash is in the subscription, that hash is the last element in that subscription.
Message Relay
The ClearBlade Message Relay is a way for edge MQTT clients to communicate with the MQTT clients as well as other Edge MQTT clients. In order to achieve this there are a few reserved MQTT topic paths that one needs to follow for the message relay to work. The message relay uses the following 6 topic paths to route messages between edges and their parent platform.
Note
Message relay applies to communication between edges and platform within one single system
Message Type | Topic Path |
---|---|
Edge to Platform | <TOPIC_NAME>/_platform |
Platform to Edge | <TOPIC_NAME>/_edge/<NAME_OF_EDGE> |
Edge to Edge | <TOPIC_NAME>/_edge/<NAME_OF_EDGE> |
Platform to All Broadcast | <TOPIC_NAME>/_broadcast |
Edge to All Broadcast message | <TOPIC_NAME>/_broadcast |
Edge to Edge and Platform | <TOPIC_NAME>/_edgeAndPlatform/<NAME_OF_EDGE> |
These topic paths are for sending messages. To receive messages subscribe to the same topics, except for Edge to Edge Platform (which you need to subscribe to the same topics for both platform and edge).
Event Topics
Event topics allows for greater scalability when handling events by allowing the platform to send a MQTT message for each event. A user of the system can “subscribe to events” by subscribing to the specific topic for an event or using an MQTT wildcard subscription to subscribe to multiple events. This can be done in stream services by using subscribe()
and waitForMessage()
, as well as with shared topics.
Click here to view the topic paths.
The message payload of each event message consists of the same json-encoded object data that is delivered to an event microservice. Each key in the object corresponds to a key set (by the event) in the req.params
input to a microservice. It is important to do a JSON.parse(payload)
of the message’s payload prior to using the message. Shown below is an example on how to work with specific event types called Triggers.
Example
function triggerStreamService(req, resp) {
var deviceCreatedTopic = "$trigger/device/created";
var deviceConnectedTopic = "$trigger/messaging/device/connected";
ClearBlade.init({request:req});
var messaging = ClearBlade.Messaging();
messaging.subscribe(deviceCreatedTopic, function (err, errMsg) {
if (err) {
messaging.publish("service/error", "Subscribe failed");
resp.error("Sub failed");
}
});
messaging.subscribe(deviceConnectedTopic, function (err, errMsg) {
if (err) {
messaging.publish("service/error", "Subscribe failed");
resp.error("Sub failed");
}
});
while (true) {
messaging.waitForMessage([deviceCreatedTopic, deviceConnectedTopic], function (err, msg, topic) {
if (err) {
messaging.publish("service/error", "Wait for message failed");
resp.error("failed to wait for message: " + err);
}
else if (topic === deviceCreatedTopic) {
// Process device created here. Do JSON.parse(msg) first
}
else if (topic === deviceConnectedTopic) {
// Process device connected here. Do JSON.parse(msg) first
}
else {
messaging.publish("service/error", "Bad topic: " + topic);
}
});
}
}
Shared Subscriptions
Shared subscriptions allows clients to share the same subscription on the MQTT broker. Clients are placed in the same subscription group and will receive the messages in a queue format. The message load of a single topic is distributed among all clients in a subscription group. Multiple shared groups on one topic are not supported in shared subscriptions.
There are 3 parts to the topic structure for share:
$share/<GroupID>/<Topic>
$share
keyword: a static shared subscription identifier- GroupID: used to identify a group
- Topic: A standard MQTT topic (including wildcards)
Sample Topic: $share/StreamServiceGroup/devices/+/+/event
Example
function SharedTopic(req, resp) {
ClearBlade.init({ request: req });
var messaging = ClearBlade.Messaging();
const TOPIC ="topic/+"
var sharedTopic = "$share/GROUP_ID/" + TOPIC;
messaging.subscribe(sharedTopic, function(err, errMsg) {
if (err) {
//DEBUG MESSAGING
messaging.publish("error", "Subscribe failed: " + errMsg);
resp.error();
}
messaging.publish(TOPIC, "Publishing to a shared topic");
}
Sync Topics
These topics allows user to monitor a deployment. They give the status of deployment on edges and sync at runtime. The payload history can be viewed in Notification History under the username on the top right of the page.
Name | Topic | Description | Payload |
---|---|---|---|
Sync Status | Sync is successfully deployed and status is returned | $notification/sync/status |
See Sync Status |
Sync Error | This error happens during a sync (i.e. Device failed to deply) | $notification/sync/error |
See Sync Error |
Sync Generic Error | This error happens before a sync takes place (i.e. Edge not found) | $notification/sync/generic_error |
See Sync Generic Error |
Sync Status
{
"asset_class": <assetClass>,
"asset_id": <assetId>,
"edge": <edge>,
"status": <syncStatus>
}
Sync Error
{
"asset_class": <assetClass>,
"asset_id": <assetId>,
"edge": <edge>,
"originator": <originator>,
"destination": <destination>,
"message": <errorMessage>,
"event": <crudEvent>,
"timestamp": <timestamp>
}
Sync Generic Error
{
"table": <table>,
"error": <errMsg>
}
FAQ
- Why can’t I see messages from a topic with a new shared group?
If you have previously subscribed to a topic with one shared group, you will not be able to see the messages for the same topic with a different shared group. See Shared Subscriptions.
- How can I do load balancing with trigger topics?
By using the topic path
$share/<TriggerGroup>/$trigger/#
.