You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

283 lines
5.4 KiB

2 years ago
  1. 'use strict'
  2. /* eslint-disable no-var */
  3. var reusify = require('reusify')
  4. function fastqueue (context, worker, concurrency) {
  5. if (typeof context === 'function') {
  6. concurrency = worker
  7. worker = context
  8. context = null
  9. }
  10. if (concurrency < 1) {
  11. throw new Error('fastqueue concurrency must be greater than 1')
  12. }
  13. var cache = reusify(Task)
  14. var queueHead = null
  15. var queueTail = null
  16. var _running = 0
  17. var errorHandler = null
  18. var self = {
  19. push: push,
  20. drain: noop,
  21. saturated: noop,
  22. pause: pause,
  23. paused: false,
  24. concurrency: concurrency,
  25. running: running,
  26. resume: resume,
  27. idle: idle,
  28. length: length,
  29. getQueue: getQueue,
  30. unshift: unshift,
  31. empty: noop,
  32. kill: kill,
  33. killAndDrain: killAndDrain,
  34. error: error
  35. }
  36. return self
  37. function running () {
  38. return _running
  39. }
  40. function pause () {
  41. self.paused = true
  42. }
  43. function length () {
  44. var current = queueHead
  45. var counter = 0
  46. while (current) {
  47. current = current.next
  48. counter++
  49. }
  50. return counter
  51. }
  52. function getQueue () {
  53. var current = queueHead
  54. var tasks = []
  55. while (current) {
  56. tasks.push(current.value)
  57. current = current.next
  58. }
  59. return tasks
  60. }
  61. function resume () {
  62. if (!self.paused) return
  63. self.paused = false
  64. for (var i = 0; i < self.concurrency; i++) {
  65. _running++
  66. release()
  67. }
  68. }
  69. function idle () {
  70. return _running === 0 && self.length() === 0
  71. }
  72. function push (value, done) {
  73. var current = cache.get()
  74. current.context = context
  75. current.release = release
  76. current.value = value
  77. current.callback = done || noop
  78. current.errorHandler = errorHandler
  79. if (_running === self.concurrency || self.paused) {
  80. if (queueTail) {
  81. queueTail.next = current
  82. queueTail = current
  83. } else {
  84. queueHead = current
  85. queueTail = current
  86. self.saturated()
  87. }
  88. } else {
  89. _running++
  90. worker.call(context, current.value, current.worked)
  91. }
  92. }
  93. function unshift (value, done) {
  94. var current = cache.get()
  95. current.context = context
  96. current.release = release
  97. current.value = value
  98. current.callback = done || noop
  99. if (_running === self.concurrency || self.paused) {
  100. if (queueHead) {
  101. current.next = queueHead
  102. queueHead = current
  103. } else {
  104. queueHead = current
  105. queueTail = current
  106. self.saturated()
  107. }
  108. } else {
  109. _running++
  110. worker.call(context, current.value, current.worked)
  111. }
  112. }
  113. function release (holder) {
  114. if (holder) {
  115. cache.release(holder)
  116. }
  117. var next = queueHead
  118. if (next) {
  119. if (!self.paused) {
  120. if (queueTail === queueHead) {
  121. queueTail = null
  122. }
  123. queueHead = next.next
  124. next.next = null
  125. worker.call(context, next.value, next.worked)
  126. if (queueTail === null) {
  127. self.empty()
  128. }
  129. } else {
  130. _running--
  131. }
  132. } else if (--_running === 0) {
  133. self.drain()
  134. }
  135. }
  136. function kill () {
  137. queueHead = null
  138. queueTail = null
  139. self.drain = noop
  140. }
  141. function killAndDrain () {
  142. queueHead = null
  143. queueTail = null
  144. self.drain()
  145. self.drain = noop
  146. }
  147. function error (handler) {
  148. errorHandler = handler
  149. }
  150. }
  151. function noop () {}
  152. function Task () {
  153. this.value = null
  154. this.callback = noop
  155. this.next = null
  156. this.release = noop
  157. this.context = null
  158. this.errorHandler = null
  159. var self = this
  160. this.worked = function worked (err, result) {
  161. var callback = self.callback
  162. var errorHandler = self.errorHandler
  163. var val = self.value
  164. self.value = null
  165. self.callback = noop
  166. if (self.errorHandler) {
  167. errorHandler(err, val)
  168. }
  169. callback.call(self.context, err, result)
  170. self.release(self)
  171. }
  172. }
  173. function queueAsPromised (context, worker, concurrency) {
  174. if (typeof context === 'function') {
  175. concurrency = worker
  176. worker = context
  177. context = null
  178. }
  179. function asyncWrapper (arg, cb) {
  180. worker.call(this, arg)
  181. .then(function (res) {
  182. cb(null, res)
  183. }, cb)
  184. }
  185. var queue = fastqueue(context, asyncWrapper, concurrency)
  186. var pushCb = queue.push
  187. var unshiftCb = queue.unshift
  188. queue.push = push
  189. queue.unshift = unshift
  190. queue.drained = drained
  191. return queue
  192. function push (value) {
  193. var p = new Promise(function (resolve, reject) {
  194. pushCb(value, function (err, result) {
  195. if (err) {
  196. reject(err)
  197. return
  198. }
  199. resolve(result)
  200. })
  201. })
  202. // Let's fork the promise chain to
  203. // make the error bubble up to the user but
  204. // not lead to a unhandledRejection
  205. p.catch(noop)
  206. return p
  207. }
  208. function unshift (value) {
  209. var p = new Promise(function (resolve, reject) {
  210. unshiftCb(value, function (err, result) {
  211. if (err) {
  212. reject(err)
  213. return
  214. }
  215. resolve(result)
  216. })
  217. })
  218. // Let's fork the promise chain to
  219. // make the error bubble up to the user but
  220. // not lead to a unhandledRejection
  221. p.catch(noop)
  222. return p
  223. }
  224. function drained () {
  225. var previousDrain = queue.drain
  226. var p = new Promise(function (resolve) {
  227. queue.drain = function () {
  228. previousDrain()
  229. resolve()
  230. }
  231. })
  232. return p
  233. }
  234. }
  235. module.exports = fastqueue
  236. module.exports.promise = queueAsPromised