wordpos/tools/buffered-reader.js

405 lines
10 KiB
JavaScript

/**
* @name BufferedReader.
* @description Fully configurable buffered reader for node.js.
*
* @author Gabriel Llamas
* @created 10/04/2012
* @modified 01/05/2012
* @version 0.2.0
*
* Forked: https://github.com/moos/Node-BufferedReader
*/
"use strict";
var EVENTS = require ("events");
var FS = require ("fs");
var BUFFER_SIZE = 16384;
var INVALID_BUFFER_SIZE = "The buffer size must be greater than 0.";
var INVALID_START_OFFSET = "The start offset must be greater than or equals to 0.";
var INVALID_END_OFFSET = "The end offset must be greater than or equals to 0.";
var INVALID_RANGE_OFFSET = "The end offset must be greater than or equals to the start offset.";
var INVALID_BYTES_RANGE_ERROR = "The number of bytes to read must be greater than 0.";
var INVALID_SEEK_OFFSET = "The offset must be greater than or equals to 0.";
var NO_FILE_ERROR = "The source is not a file.";
var BufferedReader = function (fileName, settings){
EVENTS.EventEmitter.call (this);
settings = settings || {};
if (settings.bufferSize === 0) settings.bufferSize = -1;
this._settings = {
bufferSize: settings.bufferSize || BUFFER_SIZE,
encoding: settings.encoding || null,
start: settings.start || 0,
end: settings.end
};
if (this._settings.bufferSize < 1) throw new Error (INVALID_BUFFER_SIZE);
if (this._settings.start < 0) throw new Error (INVALID_START_OFFSET);
if (this._settings.end < 0) throw new Error (INVALID_END_OFFSET);
if (this._settings.end < this._settings.start) throw new Error (INVALID_RANGE_OFFSET);
this._fileName = fileName;
this._fd = null;
this._buffer = null;
this._fileOffset = this._settings.start;
this._bufferOffset = 0;
this._dataOffset = 0;
this._realOffset = this._settings.start;
this._fileSize = null;
this._initialized = false;
this._interrupted = false;
this._isEOF = false;
this._noMoreBuffers = false;
this._needRead = false;
};
BufferedReader.prototype = Object.create (EVENTS.EventEmitter.prototype);
BufferedReader.prototype.constructor = BufferedReader;
BufferedReader.prototype.interrupt = function (){
this._interrupted = true;
};
BufferedReader.prototype.read = function (){
var stream = FS.createReadStream (this._fileName, this._settings);
// node version change: stream.encoding no longer exposed
stream.encoding = this._settings.encoding;
var lastChunk;
var buffer;
var me = this;
var lineOffset = 0,
lineCount = 0,
byteOffset = 0;
var onChar = this.listeners ("character").length !== 0,
onLine = this.listeners ("line").length !== 0,
onByte = this.listeners ("byte").length !== 0,
loop = onChar || onLine || onByte;
stream.on ("data", function (data){
buffer = data;
var offset = 0;
var chunk;
var character;
var len = data.length;
if (loop){
for (var i=0; i<len; i++){
if (me._interrupted) break;
character = data[i];
if (stream.encoding){
onChar && me.emit ("character", character === "\r" ? "\n" : character, byteOffset + i);
}else{
onByte && me.emit ("byte", character, byteOffset + i);
continue;
}
if (!onLine) continue;
if (character === "\n" || character === "\r"){
chunk = data.slice (offset, i);
if (lastChunk){
chunk = lastChunk.concat (chunk);
}
if (i + 1 !== len && character === "\r" && data[i + 1] === "\n"){
i++;
}
me.emit ("line", chunk, lineOffset + offset, ++lineCount);
offset = i + 1;
if (lastChunk){
lineOffset += lastChunk.length;
lastChunk = null;
}
}
}
if (stream.encoding && offset !== len){
var s = offset === 0 ? data : data.slice (offset);
lastChunk = lastChunk ? lastChunk.concat (s) : s;
}
lineOffset += offset;
}
me.emit ("buffer", data, byteOffset);
if (me._interrupted){
me._interrupted = false;
stream.destroy ();
me.emit ("end");
}
byteOffset += len;
});
stream.on ("end", function (){
me._interrupted = false;
if (loop && lastChunk){
me.emit ("line", lastChunk);
}
me.emit ("end");
});
stream.on ("error", function (error){
me._interrupted = false;
me.emit ("error", error);
});
};
BufferedReader.prototype._init = function (cb){
var me = this;
FS.stat (this._fileName, function (error, stats){
if (error) return cb (error);
if (stats.isFile ()){
if (me._settings.start >= stats.size){
me._isEOF = true;
return cb (null);
}
if (!me._settings.end && me._settings.end !== 0){
me._settings.end = stats.size;
}
if (me._settings.end >= stats.size){
me._settings.end = stats.size - 1;
}
me._fileSize = stats.size;
cb (null);
}else{
cb (new Error (NO_FILE_ERROR));
}
});
};
BufferedReader.prototype._read = function (cb){
var me = this;
var size = this._settings.bufferSize;
FS.read (this._fd, this._buffer, 0, size, this._fileOffset, function (error, bytesRead){
if (error) return cb (error);
me._fileOffset += bytesRead;
if (me._fileOffset === me._fileSize){
me._noMoreBuffers = true;
}
if (bytesRead < size){
me._buffer = me._buffer.slice (0, bytesRead);
}
cb (null);
});
};
BufferedReader.prototype._readBytes = function (bytes, cb){
if (this._needRead){
this._needRead = false;
var me = this;
this._read (function (error){
if (error) return cb (error, null, -1);
me._readBytes (bytes, cb);
});
return;
}
var fill = function (){
var endData = bytes - me._dataOffset;
var endBuffer = me._buffer.length - me._bufferOffset;
var end = endBuffer <= endData ? endBuffer : endData;
me._buffer.copy (data, me._dataOffset, me._bufferOffset, me._bufferOffset + end);
me._bufferOffset += end;
me._realOffset += end;
if (me._bufferOffset === me._buffer.length){
me._bufferOffset = 0;
me._needRead = true;
}
me._dataOffset += end;
if (me._dataOffset === bytes){
me._dataOffset = 0;
me._isEOF = me._noMoreBuffers;
cb (null, data, bytes);
}else{
if (me._noMoreBuffers){
me._isEOF = true;
end = me._dataOffset;
me._dataOffset = 0;
cb (null, data.slice (0, end), end);
}else{
me._needRead = false;
me._read (function (error){
if (error) return cb (error, null, -1);
fill ();
});
}
}
};
var me = this;
var max = me._settings.end - me._realOffset + 1;
bytes = max < bytes ? max : bytes;
if (bytes === 0) return cb (null, null, 0);
var data = new Buffer (bytes);
var len = me._buffer.length;
if (bytes <= len){
var end = me._bufferOffset + bytes;
if (end <= len){
me._buffer.copy (data, 0, me._bufferOffset, end);
me._bufferOffset = end;
me._realOffset += bytes;
cb (null, data, bytes);
}else{
var last = len - me._bufferOffset;
me._realOffset += last;
if (last !== 0){
me._buffer.copy (data, 0, me._bufferOffset, me._bufferOffset + last);
}
if (me._noMoreBuffers){
me._isEOF = true;
return cb (null, data.slice (0, last), last);
}
me._read (function (error){
if (error) return cb (error, null, -1);
len = me._buffer.length;
var remaining = bytes - last;
if (len <= remaining){
me._realOffset += len;
me._isEOF = true;
me._buffer.copy (data, last, 0, len);
var lastChunk = last + len;
cb (null, data.slice (0, lastChunk), lastChunk);
}else{
me._realOffset += remaining;
me._bufferOffset = remaining;
me._buffer.copy (data, last, 0, me._bufferOffset);
cb (null, data, bytes);
}
});
}
}else{
fill ();
}
};
BufferedReader.prototype.close = function (cb){
if (cb) cb = cb.bind (this);
if (!this._fd){
if (cb) cb (null);
return;
}
var me = this;
FS.close (this._fd, function (error){
me._fd = null;
me._buffer = null;
if (cb) cb (error);
});
};
BufferedReader.prototype.readBytes = function (bytes, cb){
cb = cb.bind (this);
if (bytes < 1 || this._isEOF) return cb (null, null, 0);
var open = function (){
if (me._isEOF) return cb (null, null, 0);
FS.open (me._fileName, "r", function (error, fd){
if (error) return cb (error, null, -1);
me._fd = fd;
me._buffer = new Buffer (me._settings.bufferSize);
me._read (function (error){
if (error) return cb (error, null, -1);
me._readBytes (bytes, cb);
});
});
};
var me = this;
if (!this._initialized){
this._init (function (error){
if (error) return cb (error, null);
me._initialized = true;
open ();
});
}else{
if (!this._fd) return open ();
this._readBytes (bytes, cb);
}
};
BufferedReader.prototype.seek = function (offset, cb){
cb = cb.bind (this);
if (offset < 0) return cb (new Error (INVALID_SEEK_OFFSET));
var seek = function (){
offset += me._settings.start;
if (offset >= me._settings.end + 1){
me._isEOF = true;
}else{
me._isEOF = false;
var start = me._fileOffset - (me._buffer ? me._buffer.length : 0);
if (offset >= start && offset < me._fileOffset){
me._bufferOffset = offset - start;
me._realOffset = offset;
}else{
me._needRead = me._fd ? true : false;
me._noMoreBuffers = false;
me._fileOffset = offset;
me._bufferOffset = 0;
me._realOffset = offset;
}
}
cb (null);
};
var me = this;
if (!this._initialized){
this._init (function (error){
if (error) return cb (error, null);
me._initialized = true;
seek ();
});
}else{
seek ();
}
};
BufferedReader.prototype.skip = function (bytes, cb){
cb = cb.bind (this);
if (bytes < 1 || this._isEOF) return cb (null, 0);
var skip = function (){
var remaining = me._settings.end - me._realOffset + 1;
bytes = bytes <= remaining ? bytes : remaining;
me.seek (me._realOffset - me._settings.start + bytes, function (){
cb (null, bytes);
});
};
var me = this;
if (!this._initialized){
this._init (function (error){
if (error) return cb (error, null);
me._initialized = true;
skip ();
});
}else{
skip ();
}
};
module.exports = BufferedReader;