turtlestitch/libraries/mqttExtension.js

333 wiersze
11 KiB
JavaScript

/* MQTTExtension.js - add MQTT protocol to Snap!
* ===========================================
* MQTT library developed by Xavier Pi
* Modified by Simon Walters
* and converted into an extension
* November 2021
* V1.1 - change back to using standard naming e.g payload not message
* V1.2.0 added in code from pixavier to improve sub and unsub 9May2022
* V1.3.0 added in code from pixavier brokerKey to enable more than one connection to the same broker with different users
* V1.4.0 30Jun22 handle binary payloads correctly
*/
SnapExtensions.primitives.set(
'mqt_connect(broker,callback,options)',
function (broker,callback,options,proc) {
/* original code from github.com/pixavier/mqtt4snap */
/* adapted into extension by cymplecy 26Nov21 */
/* modified to add in keepalive parameter by cymplecy 23Nov21 */
function log(txt) {
console.log('mqt_connect: ', new Date().toString(), txt);
}
broker = broker ? broker.trim() : '';
let brokerKey = broker;
if (broker.indexOf('|') >= 0) {
broker = broker.substr(broker.indexOf('|') + 1);
}
options = JSON.parse(options);
const opts = {};
if (options['username']) {
opts.username = options['username'];
if (options["password"]) {
opts.password = options['password'];
} else {
opts.password = '';
}
}
if (options["keepalive"]) {
opts.keepalive = Number(options["keepalive"]);
}
var stage = this.parentThatIsA(StageMorph);
if (!('mqtt' in stage)){
stage.mqtt = [];
}
let wsbroker;
if (broker.startsWith('ws://')) {
wsbroker = broker;
} else if (broker.startsWith('wss://')) {
wsbroker = broker;
} else {
let prefix;
prefix = window.location.protocol == 'https:'?'wss':'ws';
wsbroker = prefix + '://' + broker;
}
if (wsbroker == 'wss://broker.emqx.io') {
wsbroker = wsbroker + ':8084/mqtt'
} else if (wsbroker == 'ws://broker.emqx.io') {
wsbroker = wsbroker + ':8083/mqtt'
} else if (broker == 'mqtt.eclipseprojects.io') {
wsbroker = wsbroker + '/mqtt'
} else if (wsbroker == 'wss://test.mosquitto.org') {
wsbroker = wsbroker + ':8081'
} else if (wsbroker == 'ws://test.mosquitto.org') {
wsbroker = wsbroker + ':8080'
} else if (broker == 'broker.xmqtt.net') {
wsbroker = wsbroker + '/mqtt'
} else if (wsbroker == 'wss://simplesi.cloud') {
wsbroker = wsbroker + ':8084'
} else if (wsbroker == 'ws://simplesi.cloud') {
wsbroker = wsbroker + ':8083'
} else if (wsbroker == 'ws://localhost') {
wsbroker = wsbroker + ':9001'
}
//log(wsbroker)
try {
stage.mqtt[brokerKey].end(true);
} catch (e){}
delete stage.mqtt[brokerKey];
stage.mqtt[brokerKey] = mqtt.connect(wsbroker, opts);
stage.mqtt[brokerKey].on('connect', function(connack) {
log('Connected to ' + wsbroker);
if (callback) {
let p = new Process();
p.initializeFor(callback, new List(["all"]))
//console.log("here1")
stage.threads.processes.push(p);
//log('Callback process pushed');
}
try {
proc.doSetVar('connection status', 'connected');
} catch(e) {}
});
stage.mqtt[brokerKey].stream.on('error', function(error) {
log('error event triggered');
try{
stage.mqtt[brokerKey].end();
}catch(e){}
delete stage.mqtt[brokerKey];
try {
proc.doSetVar('connection status', 'failed to connect to ' + broker);
} catch(e) {}
//alert(error.message);
});
}
);
SnapExtensions.primitives.set(
'mqt_pub(broker,topic,payload,options)',
function (broker,topic,payload,options) {
/* original code from github.com/pixavier/mqtt4snap */
/* adapted into extension by cymplecy 26Nov21 */
/* modified 5 Sep2021 by cymplecy to add parameters for qos and retain flag */
function log(txt) {
console.log('mqt_pub: ', new Date().toString(), txt);
}
broker = broker ? broker.trim() : '';
let brokerKey = broker;
if (broker.indexOf('|') >= 0) {
broker = broker.substr(broker.indexOf('|') + 1);
}
topic = topic ? topic.trim() : topic;
//payload not trimmed as might have real leading/trailing spaces
//console.log(options)
options = JSON.parse(options);
const opts = {};
if (options['qos']) {
opts.qos = Number(options['qos']);
}
if (options["retain"]) {
opts.retain = options["retain"];
}
let stage = this.parentThatIsA(StageMorph);
if (!('mqtt' in stage)){
log('No connection to any broker ' + broker);
throw new Error('No connection to any broker ' + broker);
}
if(!stage.mqtt[brokerKey]){
log('No connection to broker ' + broker);
throw new Error('No connection to broker ' + broker);
}
//let prefix = window.location.protocol == 'https:'?'wss':'ws';
//let wsbroker = prefix+'://'+broker;
try{
let client = stage.mqtt[brokerKey];
client.publish(topic, '' + payload, opts);
//console.log(opts)
} catch(e) {
log('Failed to publish payload ' + payload);
// console.log(e);
throw e;
}
}
);
SnapExtensions.primitives.set(
'mqt_sub(broker,topic,callback,options)',
function (broker,topic,callback,options) {
/* github.com/pixavier/mqtt4snap */
/* adapted into extension by cymplecy 26Nov21 */
function log(txt) {
console.log('mqt_sub: ', new Date().toString(), txt);
}
broker = broker ? broker.trim() : '';
let brokerKey = broker;
if (broker.indexOf('|') >= 0) {
broker = broker.substr(broker.indexOf('|') + 1);
}
topic = topic ? topic.trim() : topic;
let stage = this.parentThatIsA(StageMorph);
if (!('mqtt' in stage)){
log('No connection to any broker ' + broker);
throw new Error('No connection to any broker '+broker);
}
//let prefix = window.location.protocol == 'https:'?'wss':'ws';
//let wsbroker = prefix+'://'+broker;
if (stage.mqtt[brokerKey]) {
try {stage.mqtt[brokerKey].unsubscribe(topic);}catch(e){}
} else {
log('No connection to broker ' + broker);
throw new Error('No connection to broker '+broker);
}
stage.mqtt[brokerKey].subscribe(topic);
let mqttListener = function (aTopic, payload) {
// if (aTopic !== topic) { return; }
if (!mqttWildcard(aTopic, topic)) {return;}
let p = new Process();
newPayload = payload.reduce( (res, val) => res+String.fromCharCode( val), "")
try {
p.initializeFor(callback, new List([newPayload, aTopic]));
} catch(e) {
p.initializeFor(callback, new List([]));
}
stage.threads.processes.push(p);
};
mqttListener.topic = topic;
stage.mqtt[brokerKey].on('message', mqttListener);
let mqttWildcard = function (topic, wildcard) {
if (topic === wildcard) {return true;}
else if (wildcard === '#') {return true;}
var res = [];
var t = String(topic).split('/');
var w = String(wildcard).split('/');
var i = 0;
for (var lt = t.length; i < lt; i++) {
if (w[i] === '+') {
res.push(t[i]);
} else if (w[i] === '#') {
res.push(t.slice(i).join('/'));
return true;
} else if (w[i] !== t[i]) {
return false;
}
}
if (w[i] === '#') {i += 1;}
return (i === w.length) ? true : false;
}
}
);
SnapExtensions.primitives.set(
'mqt_disconnect(broker)',
function (broker) {
/* original code from github.com/pixavier/mqtt4snap */
/* adapted into extension by cymplecy 26Nov21 */
let stage = this.parentThatIsA(StageMorph);
broker = broker ? broker.trim() : '';
let brokerKey = broker;
if (broker.indexOf('|') >= 0) {
broker = broker.substr(broker.indexOf('|') + 1);
}
try {
if(broker=='all'){
for(let brok of Object.keys(stage.mqtt)){
try {
stage.mqtt[brok].end(true);
} catch (e0) {
//console.log('e0');
//console.log(e0);
}
}
} else {
stage.mqtt[brokerKey].end(true);
}
} catch(e1){
//console.log('e1');
//console.log(e1);
}
try{
if(broker=='all'){
try {
delete stage.mqtt;
stage.mqtt=[];
} catch (e2) {
//console.log('e2');
//console.log(e2);
}
} else {
delete stage.mqtt[brokerKey];
}
} catch(e3){
//console.log('e3');
//console.log(e3);
}
}
);
SnapExtensions.primitives.set(
'mqt_unsub(broker,topic)',
function (broker,topic) {
/* original code from github.com/pixavier/mqtt4snap */
/* adapted into extension by cymplecy 26Nov21 */
let stage = this.parentThatIsA(StageMorph);
try{
broker = broker ? broker.trim() : '';
let brokerKey = broker;
if (broker.indexOf('|') >= 0) {
broker = broker.substr(broker.indexOf('|') + 1);
}
stage.mqtt[brokerKey].unsubscribe(topic);
let listeners = stage.mqtt[brokerKey].listeners('message');
// https://github.com/mqttjs/async-mqtt/issues/31
listeners.forEach((listener) => {
//console.dir(listener);
if (topic == listener.topic || topic == '#') { // # = all
stage.mqtt[brokerKey].removeListener('message', listener);
}
});
} catch(e){
//console.log(e);
}
}
)