| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 | 'use strict';var util = require('util');var Stream = require('stream');var ChunkStream = module.exports = function() {  Stream.call(this);  this._buffers = [];  this._buffered = 0;  this._reads = [];  this._paused = false;  this._encoding = 'utf8';  this.writable = true;};util.inherits(ChunkStream, Stream);ChunkStream.prototype.read = function(length, callback) {  this._reads.push({    length: Math.abs(length), // if length < 0 then at most this length    allowLess: length < 0,    func: callback  });  process.nextTick(function() {    this._process();    // its paused and there is not enought data then ask for more    if (this._paused && this._reads.length > 0) {      this._paused = false;      this.emit('drain');    }  }.bind(this));};ChunkStream.prototype.write = function(data, encoding) {  if (!this.writable) {    this.emit('error', new Error('Stream not writable'));    return false;  }  var dataBuffer;  if (Buffer.isBuffer(data)) {    dataBuffer = data;  }  else {    dataBuffer = new Buffer(data, encoding || this._encoding);  }  this._buffers.push(dataBuffer);  this._buffered += dataBuffer.length;  this._process();  // ok if there are no more read requests  if (this._reads && this._reads.length === 0) {    this._paused = true;  }  return this.writable && !this._paused;};ChunkStream.prototype.end = function(data, encoding) {  if (data) {    this.write(data, encoding);  }  this.writable = false;  // already destroyed  if (!this._buffers) {    return;  }  // enqueue or handle end  if (this._buffers.length === 0) {    this._end();  }  else {    this._buffers.push(null);    this._process();  }};ChunkStream.prototype.destroySoon = ChunkStream.prototype.end;ChunkStream.prototype._end = function() {  if (this._reads.length > 0) {    this.emit('error',      new Error('Unexpected end of input')    );  }  this.destroy();};ChunkStream.prototype.destroy = function() {  if (!this._buffers) {    return;  }  this.writable = false;  this._reads = null;  this._buffers = null;  this.emit('close');};ChunkStream.prototype._processReadAllowingLess = function(read) {  // ok there is any data so that we can satisfy this request  this._reads.shift(); // == read  // first we need to peek into first buffer  var smallerBuf = this._buffers[0];  // ok there is more data than we need  if (smallerBuf.length > read.length) {    this._buffered -= read.length;    this._buffers[0] = smallerBuf.slice(read.length);    read.func.call(this, smallerBuf.slice(0, read.length));  }  else {    // ok this is less than maximum length so use it all    this._buffered -= smallerBuf.length;    this._buffers.shift(); // == smallerBuf    read.func.call(this, smallerBuf);  }};ChunkStream.prototype._processRead = function(read) {  this._reads.shift(); // == read  var pos = 0;  var count = 0;  var data = new Buffer(read.length);  // create buffer for all data  while (pos < read.length) {    var buf = this._buffers[count++];    var len = Math.min(buf.length, read.length - pos);    buf.copy(data, pos, 0, len);    pos += len;    // last buffer wasn't used all so just slice it and leave    if (len !== buf.length) {      this._buffers[--count] = buf.slice(len);    }  }  // remove all used buffers  if (count > 0) {    this._buffers.splice(0, count);  }  this._buffered -= read.length;  read.func.call(this, data);};ChunkStream.prototype._process = function() {  try {    // as long as there is any data and read requests    while (this._buffered > 0 && this._reads && this._reads.length > 0) {      var read = this._reads[0];      // read any data (but no more than length)      if (read.allowLess) {        this._processReadAllowingLess(read);      }      else if (this._buffered >= read.length) {        // ok we can meet some expectations        this._processRead(read);      }      else {        // not enought data to satisfy first request in queue        // so we need to wait for more        break;      }    }    if (this._buffers && !this.writable) {      this._end();    }  }  catch (ex) {    this.emit('error', ex);  }};
 |