290 lines
5.5 KiB
JavaScript
290 lines
5.5 KiB
JavaScript
|
'use strict'
|
||
|
|
||
|
/* eslint-disable no-var */
|
||
|
|
||
|
var reusify = require('reusify')
|
||
|
|
||
|
function fastqueue (context, worker, concurrency) {
|
||
|
if (typeof context === 'function') {
|
||
|
concurrency = worker
|
||
|
worker = context
|
||
|
context = null
|
||
|
}
|
||
|
|
||
|
if (concurrency < 1) {
|
||
|
throw new Error('fastqueue concurrency must be greater than 1')
|
||
|
}
|
||
|
|
||
|
var cache = reusify(Task)
|
||
|
var queueHead = null
|
||
|
var queueTail = null
|
||
|
var _running = 0
|
||
|
var errorHandler = null
|
||
|
|
||
|
var self = {
|
||
|
push: push,
|
||
|
drain: noop,
|
||
|
saturated: noop,
|
||
|
pause: pause,
|
||
|
paused: false,
|
||
|
concurrency: concurrency,
|
||
|
running: running,
|
||
|
resume: resume,
|
||
|
idle: idle,
|
||
|
length: length,
|
||
|
getQueue: getQueue,
|
||
|
unshift: unshift,
|
||
|
empty: noop,
|
||
|
kill: kill,
|
||
|
killAndDrain: killAndDrain,
|
||
|
error: error
|
||
|
}
|
||
|
|
||
|
return self
|
||
|
|
||
|
function running () {
|
||
|
return _running
|
||
|
}
|
||
|
|
||
|
function pause () {
|
||
|
self.paused = true
|
||
|
}
|
||
|
|
||
|
function length () {
|
||
|
var current = queueHead
|
||
|
var counter = 0
|
||
|
|
||
|
while (current) {
|
||
|
current = current.next
|
||
|
counter++
|
||
|
}
|
||
|
|
||
|
return counter
|
||
|
}
|
||
|
|
||
|
function getQueue () {
|
||
|
var current = queueHead
|
||
|
var tasks = []
|
||
|
|
||
|
while (current) {
|
||
|
tasks.push(current.value)
|
||
|
current = current.next
|
||
|
}
|
||
|
|
||
|
return tasks
|
||
|
}
|
||
|
|
||
|
function resume () {
|
||
|
if (!self.paused) return
|
||
|
self.paused = false
|
||
|
for (var i = 0; i < self.concurrency; i++) {
|
||
|
_running++
|
||
|
release()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function idle () {
|
||
|
return _running === 0 && self.length() === 0
|
||
|
}
|
||
|
|
||
|
function push (value, done) {
|
||
|
var current = cache.get()
|
||
|
|
||
|
current.context = context
|
||
|
current.release = release
|
||
|
current.value = value
|
||
|
current.callback = done || noop
|
||
|
current.errorHandler = errorHandler
|
||
|
|
||
|
if (_running === self.concurrency || self.paused) {
|
||
|
if (queueTail) {
|
||
|
queueTail.next = current
|
||
|
queueTail = current
|
||
|
} else {
|
||
|
queueHead = current
|
||
|
queueTail = current
|
||
|
self.saturated()
|
||
|
}
|
||
|
} else {
|
||
|
_running++
|
||
|
worker.call(context, current.value, current.worked)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function unshift (value, done) {
|
||
|
var current = cache.get()
|
||
|
|
||
|
current.context = context
|
||
|
current.release = release
|
||
|
current.value = value
|
||
|
current.callback = done || noop
|
||
|
|
||
|
if (_running === self.concurrency || self.paused) {
|
||
|
if (queueHead) {
|
||
|
current.next = queueHead
|
||
|
queueHead = current
|
||
|
} else {
|
||
|
queueHead = current
|
||
|
queueTail = current
|
||
|
self.saturated()
|
||
|
}
|
||
|
} else {
|
||
|
_running++
|
||
|
worker.call(context, current.value, current.worked)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function release (holder) {
|
||
|
if (holder) {
|
||
|
cache.release(holder)
|
||
|
}
|
||
|
var next = queueHead
|
||
|
if (next) {
|
||
|
if (!self.paused) {
|
||
|
if (queueTail === queueHead) {
|
||
|
queueTail = null
|
||
|
}
|
||
|
queueHead = next.next
|
||
|
next.next = null
|
||
|
worker.call(context, next.value, next.worked)
|
||
|
if (queueTail === null) {
|
||
|
self.empty()
|
||
|
}
|
||
|
} else {
|
||
|
_running--
|
||
|
}
|
||
|
} else if (--_running === 0) {
|
||
|
self.drain()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function kill () {
|
||
|
queueHead = null
|
||
|
queueTail = null
|
||
|
self.drain = noop
|
||
|
}
|
||
|
|
||
|
function killAndDrain () {
|
||
|
queueHead = null
|
||
|
queueTail = null
|
||
|
self.drain()
|
||
|
self.drain = noop
|
||
|
}
|
||
|
|
||
|
function error (handler) {
|
||
|
errorHandler = handler
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function noop () {}
|
||
|
|
||
|
function Task () {
|
||
|
this.value = null
|
||
|
this.callback = noop
|
||
|
this.next = null
|
||
|
this.release = noop
|
||
|
this.context = null
|
||
|
this.errorHandler = null
|
||
|
|
||
|
var self = this
|
||
|
|
||
|
this.worked = function worked (err, result) {
|
||
|
var callback = self.callback
|
||
|
var errorHandler = self.errorHandler
|
||
|
var val = self.value
|
||
|
self.value = null
|
||
|
self.callback = noop
|
||
|
if (self.errorHandler) {
|
||
|
errorHandler(err, val)
|
||
|
}
|
||
|
callback.call(self.context, err, result)
|
||
|
self.release(self)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function queueAsPromised (context, worker, concurrency) {
|
||
|
if (typeof context === 'function') {
|
||
|
concurrency = worker
|
||
|
worker = context
|
||
|
context = null
|
||
|
}
|
||
|
|
||
|
function asyncWrapper (arg, cb) {
|
||
|
worker.call(this, arg)
|
||
|
.then(function (res) {
|
||
|
cb(null, res)
|
||
|
}, cb)
|
||
|
}
|
||
|
|
||
|
var queue = fastqueue(context, asyncWrapper, concurrency)
|
||
|
|
||
|
var pushCb = queue.push
|
||
|
var unshiftCb = queue.unshift
|
||
|
|
||
|
queue.push = push
|
||
|
queue.unshift = unshift
|
||
|
queue.drained = drained
|
||
|
|
||
|
return queue
|
||
|
|
||
|
function push (value) {
|
||
|
var p = new Promise(function (resolve, reject) {
|
||
|
pushCb(value, function (err, result) {
|
||
|
if (err) {
|
||
|
reject(err)
|
||
|
return
|
||
|
}
|
||
|
resolve(result)
|
||
|
})
|
||
|
})
|
||
|
|
||
|
// Let's fork the promise chain to
|
||
|
// make the error bubble up to the user but
|
||
|
// not lead to a unhandledRejection
|
||
|
p.catch(noop)
|
||
|
|
||
|
return p
|
||
|
}
|
||
|
|
||
|
function unshift (value) {
|
||
|
var p = new Promise(function (resolve, reject) {
|
||
|
unshiftCb(value, function (err, result) {
|
||
|
if (err) {
|
||
|
reject(err)
|
||
|
return
|
||
|
}
|
||
|
resolve(result)
|
||
|
})
|
||
|
})
|
||
|
|
||
|
// Let's fork the promise chain to
|
||
|
// make the error bubble up to the user but
|
||
|
// not lead to a unhandledRejection
|
||
|
p.catch(noop)
|
||
|
|
||
|
return p
|
||
|
}
|
||
|
|
||
|
function drained () {
|
||
|
if (queue.idle()) {
|
||
|
return new Promise(function (resolve) {
|
||
|
resolve()
|
||
|
})
|
||
|
}
|
||
|
|
||
|
var previousDrain = queue.drain
|
||
|
|
||
|
var p = new Promise(function (resolve) {
|
||
|
queue.drain = function () {
|
||
|
previousDrain()
|
||
|
resolve()
|
||
|
}
|
||
|
})
|
||
|
|
||
|
return p
|
||
|
}
|
||
|
}
|
||
|
|
||
|
module.exports = fastqueue
|
||
|
module.exports.promise = queueAsPromised
|