add stats call to kaefer to inspect the internal state

pull/39/head^2
Fabian Jakobs 2015-03-03 14:59:20 +00:00
rodzic 341d5a8c7a
commit 76a591b01e
1 zmienionych plików z 79 dodań i 20 usunięć

Wyświetl plik

@ -32,7 +32,11 @@ var ReliableSocket = module.exports = function(socket, options) {
this.retransmissionTimeout = 3000;
this.maxRtt = options.maxRtt || 10000;
this.minRtt = options.minRtt || 2000;
this.duplicateCount = 0;
this.missCount = 0;
this.connectTs = 0;
this.debug = options.debug || false;
this.seq = options.seq || 1;
this.recId = -1;
@ -47,17 +51,9 @@ var ReliableSocket = module.exports = function(socket, options) {
socket.on("message", this.onMessage.bind(this));
socket.on("away", this.onAway.bind(this));
socket.on("back", this.onBack.bind(this));
this.on("stats", this.onStats.bind(this));
this.on("stats_reply", this.onStatsReply.bind(this));
socket.on("error", function(e){ console.error(e.message); });
// var that = this;
// setInterval(function() {
// console.log({
// retransmissionTimeout: that.retransmissionTimeout,
// congestionWindowSize: that.congestionWindowSize,
// srtt: that.srtt,
// rttVar: that.rttVar
// });
// }, 2000);
};
util.inherits(ReliableSocket, EventEmitter);
@ -99,17 +95,22 @@ ReliableSocket.prototype.onMessage = function(msg) {
} else if (recId < expectedId) {
this.debug && console.log("dupe", recId, expectedId);
// we already saw this packet. Make sure the other side knows it
this.duplicateCount += 1;
this._ack();
return;
} else {
this.debug && console.log("miss", recId, expectedId);
// we miss packets in between
this.missCount += 1;
this._ack();
return;
}
}
if (msg.d) {
if (msg.t) {
this.emit(msg.t, msg);
}
else if (msg.d) {
try {
this.emit("message", msg.d);
} catch (e) {
@ -121,6 +122,60 @@ ReliableSocket.prototype.onMessage = function(msg) {
}
};
ReliableSocket.prototype.onStats = function(msg) {
var data = msg.d || {};
data.remote = this._getStats();
this.send(data, "stats_reply");
};
ReliableSocket.prototype.onStatsReply = function(msg) {
var data = msg.d || {};
data.localEnd = this._getStats();
var id = data.id;
if (this.statsCallbacks[id]) {
this.statsCallbacks[id](null, data);
delete this.statsCallbacks[id];
}
};
ReliableSocket.prototype.stats = function(callback) {
if (!this.statsCallbacks) this.statsCallbacks = {};
var id = this.seq;
this.statsCallbacks[id] = callback;
var data = {
id: id,
localStart: this._getStats()
};
this.send(data, "stats");
};
ReliableSocket.prototype._getStats = function() {
var wsBuffer = -1;
try {
if (this.socket.socket.transport.ws)
wsBuffer = this.socket.socket.transport.ws.bufferedAmount;
else if (this.socket.socket.transport.socket)
wsBuffer = this.socket.socket.transport.socket._socket.bufferSize;
} catch(e) {}
return {
livetime: Date.now() - this.connectTs,
ts: Date.now(),
retransmissionTimeout: this.retransmissionTimeout,
congestionWindowSize: this.congestionWindowSize,
srtt: this.srtt,
rttVar: this.rttVar,
duplicateCount: this.duplicateCount,
missCount: this.missCount,
buffered: this.buffer.length,
unacked: Object.keys(this.unacked).length,
eioBuffer: this.socket.socket.writeBuffer.length,
wsBuffer: wsBuffer
};
};
ReliableSocket.prototype.onAway = function() {
this.debug && console.log("away");
this._scheduleDisconnect("client connection went away");
@ -131,6 +186,7 @@ ReliableSocket.prototype.onBack = function() {
this.debug && console.log("back");
this._cancelDisconnect();
this.connectTs = Date.now();
if (!this.connected) {
this.connected = true;
this.emit("connect");
@ -161,7 +217,7 @@ ReliableSocket.prototype._delayedAck = function() {
var that = this;
this._cancelDelayedAck();
this._ackTimer = setTimeout(function() {
that._ack();
that._ack();
}, this.ackTimeout);
};
@ -188,7 +244,7 @@ ReliableSocket.prototype._flush = function() {
this.debug && console.log("flush", toSend, "messages");
for (var i=0; i<toSend; i++) {
var msg = this.buffer.shift();
this._sendMessage(msg);
this._sendMessage(msg[0], msg[1]);
}
if (!this.buffer.length)
this.emit("drain");
@ -199,7 +255,9 @@ ReliableSocket.prototype.disconnect = function(reason) {
this.debug && console.log("disconnect");
this.connected = false;
this.recId = -1;
this.duplicateCount = 0;
this.missCount = 0;
this.buffer = [];
for (var key in this.unacked) {
this.unacked[key].abort();
@ -218,20 +276,20 @@ ReliableSocket.prototype.close = function() {
return this.socket.close();
};
ReliableSocket.prototype.send = function(msg) {
ReliableSocket.prototype.send = function(msg, type) {
this._cancelDelayedAck();
if (this.socket.readyState == "open" && Object.keys(this.unacked).length < this.congestionWindowSize) {
this._sendMessage(msg);
this._sendMessage(msg, type);
return true;
}
else {
this.debug && console.log("buffer");
this.buffer.push(msg);
this.buffer.push([msg, type]);
return false;
}
};
ReliableSocket.prototype._sendMessage = function(data) {
ReliableSocket.prototype._sendMessage = function(data, type) {
var that = this;
var msg = {
@ -269,7 +327,8 @@ ReliableSocket.prototype._sendMessage = function(data) {
return escape(JSON.stringify({
ack: that.recId,
seq: msg.seq,
d: data
d: data,
t: type
}));
}
};