queue refactored, support for idle/noidle

This commit is contained in:
Ondřej Žára 2020-05-05 12:16:33 +02:00
parent 5e5378923f
commit 783fa14351
5 changed files with 72 additions and 50 deletions

View File

@ -1,6 +1,6 @@
<!doctype html> <!doctype html>
<html> <html>
<script> <script>
let ws = new WebSocket("ws://localhost:8080?server=0:6600"); let ws = new WebSocket("ws://localhost:8080?server=:6600");
</script> </script>
</html> </html>

View File

@ -1,4 +1,6 @@
let httpServer = require("http").createServer(); let httpServer = require("http").createServer();
httpServer.listen(8080); httpServer.listen(8080);
require("..").ws2mpd(httpServer); let mod = require("..");
mod.logging(true);
mod.ws2mpd(httpServer);

View File

@ -1,7 +1,7 @@
#!/usr/bin/env node #!/usr/bin/env node
const commands = require("./commands");
const log = require("./log.js").log; const log = require("./log.js").log;
const Queue = require("./queue").Queue;
function initConnection(request) { function initConnection(request) {
let ws = request.accept(); let ws = request.accept();
@ -16,47 +16,36 @@ function initConnection(request) {
mpd.setTimeout(0); mpd.setTimeout(0);
mpd.connect(port, host); mpd.connect(port, host);
let commandQueue = []; // data coming from the response parser
let command = null; let queue = new Queue(mpd);
queue.on("response", data => {
function waitForCommand(cmd) { log("ws <--", data);
command = cmd; ws.send(JSON.stringify(data));
cmd.on("done", data => {
log("ws <--", data);
ws.send(JSON.stringify(data));
command = null;
processQueue();
});
}
function processQueue() {
if (command || !commandQueue.length) { return; }
let cmd = commands.create(mpd, commandQueue.shift());
waitForCommand(cmd);
}
ws.on("message", message => {
log("ws -->", message.utf8Data);
commandQueue.push(message.utf8Data);
processQueue();
}); });
// data going into the response parser
ws.on("message", message => {
log("ws -->", message.utf8Data);
queue.add(message.utf8Data);
});
// client closes
ws.on("close", (reasonCode, description) => { ws.on("close", (reasonCode, description) => {
log(`ws ${ws.remoteAddress} disconnected`); log(`ws ${ws.remoteAddress} disconnected`);
mpd.end(); mpd.end();
}); });
// server closes
mpd.on("close", () => { mpd.on("close", () => {
log("mpd disconnected"); log("mpd disconnected");
ws.close(); ws.close();
}); });
// fail to conect
mpd.on("error", () => { mpd.on("error", () => {
log("mpd connection error"); log("mpd connection error");
ws.close(); ws.close();
}); });
waitForCommand(commands.welcome(mpd));
} }
exports.logging = function(enabled) { exports.logging = function(enabled) {

View File

@ -1,6 +1,6 @@
{ {
"name": "ws2mpd", "name": "ws2mpd",
"version": "2.1.0", "version": "2.2.0",
"description": "", "description": "",
"main": "index.js", "main": "index.js",
"scripts": { "scripts": {

View File

@ -1,7 +1,7 @@
const EventEmitter = require("events"); const EventEmitter = require("events");
const log = require("./log.js").log; const log = require("./log.js").log;
class Command extends EventEmitter { class Response extends EventEmitter {
constructor(mpd) { constructor(mpd) {
super(); super();
this._mpd = mpd; this._mpd = mpd;
@ -33,12 +33,10 @@ class Command extends EventEmitter {
} }
} }
class Normal extends Command { class Normal extends Response {
constructor(mpd, command) { constructor(mpd) {
super(mpd); super(mpd);
this._lines = []; this._lines = [];
log("--> mpd", command);
mpd.write(command + "\n");
} }
_processBuffer() { _processBuffer() {
@ -51,23 +49,22 @@ class Normal extends Command {
} }
} }
class Welcome extends Command { class Idle extends Normal {}
class Welcome extends Response {
_processBuffer() { _processBuffer() {
let line = this._getLine(); let line = this._getLine();
if (line) { this._done([line]); } if (line) { this._done([line]); }
} }
} }
class AlbumArt extends Command { class Binary extends Response {
constructor(mpd, command) { constructor(mpd) {
super(mpd); super(mpd);
this._size = 0; this._size = 0;
this._binary = 0; this._binary = 0;
this._data = null; this._data = null;
this._lines = []; this._lines = [];
log("--> mpd", command);
mpd.write(command + "\n");
} }
_processBuffer() { _processBuffer() {
@ -101,25 +98,59 @@ class AlbumArt extends Command {
this._lines.push([...this._data]); this._lines.push([...this._data]);
log("data", this._data.length); log("data", this._data.length);
} else { return; } } else { return; }
} }
let line = this._getLine(); let line = this._getLine();
if (!line) { return; } if (!line) { return; }
this._lines.push(line); this._lines.push(line);
this._done(this._lines); this._done(this._lines);
} }
} }
exports.create = function(mpd, command) {
if (command.startsWith("albumart")) { exports.Queue = class extends EventEmitter {
return new AlbumArt(mpd, command); constructor(mpd) {
} else { super();
return new Normal(mpd, command); this._mpd = mpd;
this._waiting = [];
this._current = null;
this._create(Welcome);
}
add(str) {
if (str == "noidle" && this._current instanceof Idle) {
this._mpd.write(str + "\n");
return;
}
this._waiting.push(str);
this._process();
}
_process() {
if (this._current || !this._waiting.length) { return; }
let str = this._waiting.shift();
this._create(getCtor(str));
log("--> mpd", str);
this._mpd.write(str + "\n");
}
_create(ctor) {
let cmd = new ctor(this._mpd);
this._current = cmd;
cmd.on("done", data => {
this.emit("response", data);
this._current = null;
this._process();
});
} }
return new Normal(mpd, command);
} }
exports.welcome = function(mpd) { function getCtor(command) {
return new Welcome(mpd); switch (true) {
case command.startsWith("idle"): return Idle;
case command.startsWith("albumart") || command.startsWith("readpicture"): return Binary;
default: return Normal;
}
} }