| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653 | 'use strict'/* eslint-disable no-var */var test = require('tape')var buildQueue = require('../')test('concurrency', function (t) {  t.plan(6)  t.throws(buildQueue.bind(null, worker, 0))  t.throws(buildQueue.bind(null, worker, NaN))  t.doesNotThrow(buildQueue.bind(null, worker, 1))  var queue = buildQueue(worker, 1)  t.throws(function () {    queue.concurrency = 0  })  t.throws(function () {    queue.concurrency = NaN  })  t.doesNotThrow(function () {    queue.concurrency = 2  })  function worker (arg, cb) {    cb(null, true)  }})test('worker execution', function (t) {  t.plan(3)  var queue = buildQueue(worker, 1)  queue.push(42, function (err, result) {    t.error(err, 'no error')    t.equal(result, true, 'result matches')  })  function worker (arg, cb) {    t.equal(arg, 42)    cb(null, true)  }})test('limit', function (t) {  t.plan(4)  var expected = [10, 0]  var queue = buildQueue(worker, 1)  queue.push(10, result)  queue.push(0, result)  function result (err, arg) {    t.error(err, 'no error')    t.equal(arg, expected.shift(), 'the result matches')  }  function worker (arg, cb) {    setTimeout(cb, arg, null, arg)  }})test('multiple executions', function (t) {  t.plan(15)  var queue = buildQueue(worker, 1)  var toExec = [1, 2, 3, 4, 5]  var count = 0  toExec.forEach(function (task) {    queue.push(task, done)  })  function done (err, result) {    t.error(err, 'no error')    t.equal(result, toExec[count - 1], 'the result matches')  }  function worker (arg, cb) {    t.equal(arg, toExec[count], 'arg matches')    count++    setImmediate(cb, null, arg)  }})test('multiple executions, one after another', function (t) {  t.plan(15)  var queue = buildQueue(worker, 1)  var toExec = [1, 2, 3, 4, 5]  var count = 0  queue.push(toExec[0], done)  function done (err, result) {    t.error(err, 'no error')    t.equal(result, toExec[count - 1], 'the result matches')    if (count < toExec.length) {      queue.push(toExec[count], done)    }  }  function worker (arg, cb) {    t.equal(arg, toExec[count], 'arg matches')    count++    setImmediate(cb, null, arg)  }})test('set this', function (t) {  t.plan(3)  var that = {}  var queue = buildQueue(that, worker, 1)  queue.push(42, function (err, result) {    t.error(err, 'no error')    t.equal(this, that, 'this matches')  })  function worker (arg, cb) {    t.equal(this, that, 'this matches')    cb(null, true)  }})test('drain', function (t) {  t.plan(4)  var queue = buildQueue(worker, 1)  var worked = false  queue.push(42, function (err, result) {    t.error(err, 'no error')    t.equal(result, true, 'result matches')  })  queue.drain = function () {    t.equal(true, worked, 'drained')  }  function worker (arg, cb) {    t.equal(arg, 42)    worked = true    setImmediate(cb, null, true)  }})test('pause && resume', function (t) {  t.plan(13)  var queue = buildQueue(worker, 1)  var worked = false  var expected = [42, 24]  t.notOk(queue.paused, 'it should not be paused')  queue.pause()  queue.push(42, function (err, result) {    t.error(err, 'no error')    t.equal(result, true, 'result matches')  })  queue.push(24, function (err, result) {    t.error(err, 'no error')    t.equal(result, true, 'result matches')  })  t.notOk(worked, 'it should be paused')  t.ok(queue.paused, 'it should be paused')  queue.resume()  queue.pause()  queue.resume()  queue.resume() // second resume is a no-op  function worker (arg, cb) {    t.notOk(queue.paused, 'it should not be paused')    t.ok(queue.running() <= queue.concurrency, 'should respect the concurrency')    t.equal(arg, expected.shift())    worked = true    process.nextTick(function () { cb(null, true) })  }})test('pause in flight && resume', function (t) {  t.plan(16)  var queue = buildQueue(worker, 1)  var expected = [42, 24, 12]  t.notOk(queue.paused, 'it should not be paused')  queue.push(42, function (err, result) {    t.error(err, 'no error')    t.equal(result, true, 'result matches')    t.ok(queue.paused, 'it should be paused')    process.nextTick(function () {      queue.resume()      queue.pause()      queue.resume()    })  })  queue.push(24, function (err, result) {    t.error(err, 'no error')    t.equal(result, true, 'result matches')    t.notOk(queue.paused, 'it should not be paused')  })  queue.push(12, function (err, result) {    t.error(err, 'no error')    t.equal(result, true, 'result matches')    t.notOk(queue.paused, 'it should not be paused')  })  queue.pause()  function worker (arg, cb) {    t.ok(queue.running() <= queue.concurrency, 'should respect the concurrency')    t.equal(arg, expected.shift())    process.nextTick(function () { cb(null, true) })  }})test('altering concurrency', function (t) {  t.plan(24)  var queue = buildQueue(worker, 1)  queue.push(24, workDone)  queue.push(24, workDone)  queue.push(24, workDone)  queue.pause()  queue.concurrency = 3 // concurrency changes are ignored while paused  queue.concurrency = 2  queue.resume()  t.equal(queue.running(), 2, '2 jobs running')  queue.concurrency = 3  t.equal(queue.running(), 3, '3 jobs running')  queue.concurrency = 1  t.equal(queue.running(), 3, '3 jobs running') // running jobs can't be killed  queue.push(24, workDone)  queue.push(24, workDone)  queue.push(24, workDone)  queue.push(24, workDone)  function workDone (err, result) {    t.error(err, 'no error')    t.equal(result, true, 'result matches')  }  function worker (arg, cb) {    t.ok(queue.running() <= queue.concurrency, 'should respect the concurrency')    setImmediate(function () {      cb(null, true)    })  }})test('idle()', function (t) {  t.plan(12)  var queue = buildQueue(worker, 1)  t.ok(queue.idle(), 'queue is idle')  queue.push(42, function (err, result) {    t.error(err, 'no error')    t.equal(result, true, 'result matches')    t.notOk(queue.idle(), 'queue is not idle')  })  queue.push(42, function (err, result) {    t.error(err, 'no error')    t.equal(result, true, 'result matches')    // it will go idle after executing this function    setImmediate(function () {      t.ok(queue.idle(), 'queue is now idle')    })  })  t.notOk(queue.idle(), 'queue is not idle')  function worker (arg, cb) {    t.notOk(queue.idle(), 'queue is not idle')    t.equal(arg, 42)    setImmediate(cb, null, true)  }})test('saturated', function (t) {  t.plan(9)  var queue = buildQueue(worker, 1)  var preworked = 0  var worked = 0  queue.saturated = function () {    t.pass('saturated')    t.equal(preworked, 1, 'started 1 task')    t.equal(worked, 0, 'worked zero task')  }  queue.push(42, done)  queue.push(42, done)  function done (err, result) {    t.error(err, 'no error')    t.equal(result, true, 'result matches')  }  function worker (arg, cb) {    t.equal(arg, 42)    preworked++    setImmediate(function () {      worked++      cb(null, true)    })  }})test('length', function (t) {  t.plan(7)  var queue = buildQueue(worker, 1)  t.equal(queue.length(), 0, 'nothing waiting')  queue.push(42, done)  t.equal(queue.length(), 0, 'nothing waiting')  queue.push(42, done)  t.equal(queue.length(), 1, 'one task waiting')  queue.push(42, done)  t.equal(queue.length(), 2, 'two tasks waiting')  function done (err, result) {    t.error(err, 'no error')  }  function worker (arg, cb) {    setImmediate(function () {      cb(null, true)    })  }})test('getQueue', function (t) {  t.plan(10)  var queue = buildQueue(worker, 1)  t.equal(queue.getQueue().length, 0, 'nothing waiting')  queue.push(42, done)  t.equal(queue.getQueue().length, 0, 'nothing waiting')  queue.push(42, done)  t.equal(queue.getQueue().length, 1, 'one task waiting')  t.equal(queue.getQueue()[0], 42, 'should be equal')  queue.push(43, done)  t.equal(queue.getQueue().length, 2, 'two tasks waiting')  t.equal(queue.getQueue()[0], 42, 'should be equal')  t.equal(queue.getQueue()[1], 43, 'should be equal')  function done (err, result) {    t.error(err, 'no error')  }  function worker (arg, cb) {    setImmediate(function () {      cb(null, true)    })  }})test('unshift', function (t) {  t.plan(8)  var queue = buildQueue(worker, 1)  var expected = [1, 2, 3, 4]  queue.push(1, done)  queue.push(4, done)  queue.unshift(3, done)  queue.unshift(2, done)  function done (err, result) {    t.error(err, 'no error')  }  function worker (arg, cb) {    t.equal(expected.shift(), arg, 'tasks come in order')    setImmediate(function () {      cb(null, true)    })  }})test('unshift && empty', function (t) {  t.plan(2)  var queue = buildQueue(worker, 1)  var completed = false  queue.pause()  queue.empty = function () {    t.notOk(completed, 'the task has not completed yet')  }  queue.unshift(1, done)  queue.resume()  function done (err, result) {    completed = true    t.error(err, 'no error')  }  function worker (arg, cb) {    setImmediate(function () {      cb(null, true)    })  }})test('push && empty', function (t) {  t.plan(2)  var queue = buildQueue(worker, 1)  var completed = false  queue.pause()  queue.empty = function () {    t.notOk(completed, 'the task has not completed yet')  }  queue.push(1, done)  queue.resume()  function done (err, result) {    completed = true    t.error(err, 'no error')  }  function worker (arg, cb) {    setImmediate(function () {      cb(null, true)    })  }})test('kill', function (t) {  t.plan(5)  var queue = buildQueue(worker, 1)  var expected = [1]  var predrain = queue.drain  queue.drain = function drain () {    t.fail('drain should never be called')  }  queue.push(1, done)  queue.push(4, done)  queue.unshift(3, done)  queue.unshift(2, done)  queue.kill()  function done (err, result) {    t.error(err, 'no error')    setImmediate(function () {      t.equal(queue.length(), 0, 'no queued tasks')      t.equal(queue.running(), 0, 'no running tasks')      t.equal(queue.drain, predrain, 'drain is back to default')    })  }  function worker (arg, cb) {    t.equal(expected.shift(), arg, 'tasks come in order')    setImmediate(function () {      cb(null, true)    })  }})test('killAndDrain', function (t) {  t.plan(6)  var queue = buildQueue(worker, 1)  var expected = [1]  var predrain = queue.drain  queue.drain = function drain () {    t.pass('drain has been called')  }  queue.push(1, done)  queue.push(4, done)  queue.unshift(3, done)  queue.unshift(2, done)  queue.killAndDrain()  function done (err, result) {    t.error(err, 'no error')    setImmediate(function () {      t.equal(queue.length(), 0, 'no queued tasks')      t.equal(queue.running(), 0, 'no running tasks')      t.equal(queue.drain, predrain, 'drain is back to default')    })  }  function worker (arg, cb) {    t.equal(expected.shift(), arg, 'tasks come in order')    setImmediate(function () {      cb(null, true)    })  }})test('pause && idle', function (t) {  t.plan(11)  var queue = buildQueue(worker, 1)  var worked = false  t.notOk(queue.paused, 'it should not be paused')  t.ok(queue.idle(), 'should be idle')  queue.pause()  queue.push(42, function (err, result) {    t.error(err, 'no error')    t.equal(result, true, 'result matches')  })  t.notOk(worked, 'it should be paused')  t.ok(queue.paused, 'it should be paused')  t.notOk(queue.idle(), 'should not be idle')  queue.resume()  t.notOk(queue.paused, 'it should not be paused')  t.notOk(queue.idle(), 'it should not be idle')  function worker (arg, cb) {    t.equal(arg, 42)    worked = true    process.nextTick(cb.bind(null, null, true))    process.nextTick(function () {      t.ok(queue.idle(), 'is should be idle')    })  }})test('push without cb', function (t) {  t.plan(1)  var queue = buildQueue(worker, 1)  queue.push(42)  function worker (arg, cb) {    t.equal(arg, 42)    cb()  }})test('unshift without cb', function (t) {  t.plan(1)  var queue = buildQueue(worker, 1)  queue.unshift(42)  function worker (arg, cb) {    t.equal(arg, 42)    cb()  }})test('push with worker throwing error', function (t) {  t.plan(5)  var q = buildQueue(function (task, cb) {    cb(new Error('test error'), null)  }, 1)  q.error(function (err, task) {    t.ok(err instanceof Error, 'global error handler should catch the error')    t.match(err.message, /test error/, 'error message should be "test error"')    t.equal(task, 42, 'The task executed should be passed')  })  q.push(42, function (err) {    t.ok(err instanceof Error, 'push callback should catch the error')    t.match(err.message, /test error/, 'error message should be "test error"')  })})test('unshift with worker throwing error', function (t) {  t.plan(5)  var q = buildQueue(function (task, cb) {    cb(new Error('test error'), null)  }, 1)  q.error(function (err, task) {    t.ok(err instanceof Error, 'global error handler should catch the error')    t.match(err.message, /test error/, 'error message should be "test error"')    t.equal(task, 42, 'The task executed should be passed')  })  q.unshift(42, function (err) {    t.ok(err instanceof Error, 'unshift callback should catch the error')    t.match(err.message, /test error/, 'error message should be "test error"')  })})test('pause/resume should trigger drain event', function (t) {  t.plan(1)  var queue = buildQueue(worker, 1)  queue.pause()  queue.drain = function () {    t.pass('drain should be called')  }  function worker (arg, cb) {    cb(null, true)  }  queue.resume()})test('paused flag', function (t) {  t.plan(2)  var queue = buildQueue(function (arg, cb) {    cb(null)  }, 1)  t.equal(queue.paused, false)  queue.pause()  t.equal(queue.paused, true)})
 |