703 lines
14 KiB
JavaScript
703 lines
14 KiB
JavaScript
|
'use strict'
|
||
|
|
||
|
var fs = require('fs')
|
||
|
var path = require('path')
|
||
|
var test = require('tape').test
|
||
|
var from = require('from2')
|
||
|
var crypto = require('crypto')
|
||
|
var sink = require('flush-write-stream')
|
||
|
var pump = require('pump')
|
||
|
var cloneable = require('./')
|
||
|
|
||
|
test('basic passthrough', function (t) {
|
||
|
t.plan(2)
|
||
|
|
||
|
var read = false
|
||
|
var source = from(function (size, next) {
|
||
|
if (read) {
|
||
|
this.push(null)
|
||
|
} else {
|
||
|
read = true
|
||
|
this.push('hello world')
|
||
|
}
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
instance.pipe(sink(function (chunk, enc, cb) {
|
||
|
t.equal(chunk.toString(), 'hello world', 'chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
})
|
||
|
|
||
|
test('clone sync', function (t) {
|
||
|
t.plan(4)
|
||
|
|
||
|
var read = false
|
||
|
var source = from(function (size, next) {
|
||
|
if (read) {
|
||
|
this.push(null)
|
||
|
} else {
|
||
|
read = true
|
||
|
this.push('hello world')
|
||
|
}
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
var cloned = instance.clone()
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
instance.pipe(sink(function (chunk, enc, cb) {
|
||
|
t.equal(chunk.toString(), 'hello world', 'chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
|
||
|
cloned.pipe(sink(function (chunk, enc, cb) {
|
||
|
t.equal(chunk.toString(), 'hello world', 'chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
})
|
||
|
|
||
|
test('clone async', function (t) {
|
||
|
t.plan(4)
|
||
|
|
||
|
var read = false
|
||
|
var source = from(function (size, next) {
|
||
|
if (read) {
|
||
|
this.push(null)
|
||
|
} else {
|
||
|
read = true
|
||
|
this.push('hello world')
|
||
|
}
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
var cloned = instance.clone()
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
instance.pipe(sink(function (chunk, enc, cb) {
|
||
|
t.equal(chunk.toString(), 'hello world', 'chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
|
||
|
setImmediate(function () {
|
||
|
cloned.pipe(sink(function (chunk, enc, cb) {
|
||
|
t.equal(chunk.toString(), 'hello world', 'chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
})
|
||
|
})
|
||
|
|
||
|
test('basic passthrough in obj mode', function (t) {
|
||
|
t.plan(2)
|
||
|
|
||
|
var read = false
|
||
|
var source = from.obj(function (size, next) {
|
||
|
if (read) {
|
||
|
return this.push(null)
|
||
|
} else {
|
||
|
read = true
|
||
|
this.push({ hello: 'world' })
|
||
|
}
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
instance.pipe(sink.obj(function (chunk, enc, cb) {
|
||
|
t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
})
|
||
|
|
||
|
test('multiple clone in object mode', function (t) {
|
||
|
t.plan(4)
|
||
|
|
||
|
var read = false
|
||
|
var source = from.obj(function (size, next) {
|
||
|
if (read) {
|
||
|
return this.push(null)
|
||
|
} else {
|
||
|
read = true
|
||
|
this.push({ hello: 'world' })
|
||
|
}
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
var cloned = instance.clone()
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
instance.pipe(sink.obj(function (chunk, enc, cb) {
|
||
|
t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
|
||
|
setImmediate(function () {
|
||
|
cloned.pipe(sink.obj(function (chunk, enc, cb) {
|
||
|
t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
})
|
||
|
})
|
||
|
|
||
|
test('basic passthrough with data event', function (t) {
|
||
|
t.plan(2)
|
||
|
|
||
|
var read = false
|
||
|
var source = from(function (size, next) {
|
||
|
if (read) {
|
||
|
this.push(null)
|
||
|
} else {
|
||
|
read = true
|
||
|
this.push('hello world')
|
||
|
}
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
var data = ''
|
||
|
instance.on('data', function (chunk) {
|
||
|
data += chunk.toString()
|
||
|
})
|
||
|
|
||
|
instance.on('end', function () {
|
||
|
t.equal(data, 'hello world', 'chunk matches')
|
||
|
})
|
||
|
})
|
||
|
|
||
|
test('basic passthrough with data event on clone', function (t) {
|
||
|
t.plan(3)
|
||
|
|
||
|
var read = false
|
||
|
var source = from(function (size, next) {
|
||
|
if (read) {
|
||
|
this.push(null)
|
||
|
} else {
|
||
|
read = true
|
||
|
this.push('hello world')
|
||
|
}
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
var cloned = instance.clone()
|
||
|
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
var data = ''
|
||
|
cloned.on('data', function (chunk) {
|
||
|
data += chunk.toString()
|
||
|
})
|
||
|
|
||
|
cloned.on('end', function () {
|
||
|
t.equal(data, 'hello world', 'chunk matches in clone')
|
||
|
})
|
||
|
|
||
|
instance.pipe(sink(function (chunk, enc, cb) {
|
||
|
t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')
|
||
|
cb()
|
||
|
}))
|
||
|
})
|
||
|
|
||
|
test('errors if cloned after start', function (t) {
|
||
|
t.plan(2)
|
||
|
|
||
|
var source = from(function (size, next) {
|
||
|
this.push('hello world')
|
||
|
this.push(null)
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
|
||
|
instance.pipe(sink(function (chunk, enc, cb) {
|
||
|
t.equal(chunk.toString(), 'hello world', 'chunk matches')
|
||
|
t.throws(function () {
|
||
|
instance.clone()
|
||
|
}, 'throws if cloned after start')
|
||
|
cb()
|
||
|
}))
|
||
|
})
|
||
|
|
||
|
test('basic passthrough with readable event', function (t) {
|
||
|
t.plan(2)
|
||
|
|
||
|
var read = false
|
||
|
var source = from(function (size, next) {
|
||
|
if (read) {
|
||
|
this.push(null)
|
||
|
} else {
|
||
|
read = true
|
||
|
this.push('hello world')
|
||
|
}
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
var data = ''
|
||
|
instance.on('readable', function () {
|
||
|
var chunk
|
||
|
while ((chunk = this.read()) !== null) {
|
||
|
data += chunk.toString()
|
||
|
}
|
||
|
})
|
||
|
|
||
|
instance.on('end', function () {
|
||
|
t.equal(data, 'hello world', 'chunk matches')
|
||
|
})
|
||
|
})
|
||
|
|
||
|
test('basic passthrough with readable event on clone', function (t) {
|
||
|
t.plan(3)
|
||
|
|
||
|
var read = false
|
||
|
var source = from(function (size, next) {
|
||
|
if (read) {
|
||
|
this.push(null)
|
||
|
} else {
|
||
|
read = true
|
||
|
this.push('hello world')
|
||
|
}
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
var cloned = instance.clone()
|
||
|
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
var data = ''
|
||
|
cloned.on('readable', function () {
|
||
|
var chunk
|
||
|
while ((chunk = this.read()) !== null) {
|
||
|
data += chunk.toString()
|
||
|
}
|
||
|
})
|
||
|
|
||
|
cloned.on('end', function () {
|
||
|
t.equal(data, 'hello world', 'chunk matches in clone')
|
||
|
})
|
||
|
|
||
|
instance.pipe(sink(function (chunk, enc, cb) {
|
||
|
t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')
|
||
|
cb()
|
||
|
}))
|
||
|
})
|
||
|
|
||
|
test('source error destroys all', function (t) {
|
||
|
t.plan(3)
|
||
|
|
||
|
var source = from()
|
||
|
var instance = cloneable(source)
|
||
|
var clone = instance.clone()
|
||
|
|
||
|
source.on('error', function (err) {
|
||
|
t.ok(err, 'source errors')
|
||
|
|
||
|
instance.on('error', function (err2) {
|
||
|
t.ok(err === err2, 'instance receives same error')
|
||
|
})
|
||
|
|
||
|
clone.on('error', function (err3) {
|
||
|
t.ok(err === err3, 'clone receives same error')
|
||
|
})
|
||
|
})
|
||
|
|
||
|
source.emit('error', new Error())
|
||
|
})
|
||
|
|
||
|
test('source destroy destroys all', function (t) {
|
||
|
t.plan(2)
|
||
|
|
||
|
var source = from()
|
||
|
var instance = cloneable(source)
|
||
|
var clone = instance.clone()
|
||
|
|
||
|
instance.on('end', function () {
|
||
|
t.pass('instance has ended')
|
||
|
})
|
||
|
|
||
|
clone.on('end', function () {
|
||
|
t.pass('clone has ended')
|
||
|
})
|
||
|
|
||
|
clone.resume()
|
||
|
instance.resume()
|
||
|
|
||
|
source.destroy()
|
||
|
})
|
||
|
|
||
|
test('instance error destroys all but the source', function (t) {
|
||
|
t.plan(2)
|
||
|
|
||
|
var source = from()
|
||
|
var instance = cloneable(source)
|
||
|
var clone = instance.clone()
|
||
|
|
||
|
source.on('close', function () {
|
||
|
t.fail('source should not be closed')
|
||
|
})
|
||
|
|
||
|
instance.on('error', function (err) {
|
||
|
t.is(err.message, 'beep', 'instance errors')
|
||
|
})
|
||
|
|
||
|
instance.on('close', function () {
|
||
|
t.fail('close should not be emitted')
|
||
|
})
|
||
|
|
||
|
clone.on('error', function (err) {
|
||
|
t.is(err.message, 'beep', 'instance errors')
|
||
|
})
|
||
|
|
||
|
clone.on('close', function () {
|
||
|
t.fail('close should not be emitted')
|
||
|
})
|
||
|
|
||
|
instance.destroy(new Error('beep'))
|
||
|
})
|
||
|
|
||
|
test('instance destroy destroys all but the source', function (t) {
|
||
|
t.plan(2)
|
||
|
|
||
|
var source = from()
|
||
|
var instance = cloneable(source)
|
||
|
var clone = instance.clone()
|
||
|
|
||
|
source.on('close', function () {
|
||
|
t.fail('source should not be closed')
|
||
|
})
|
||
|
|
||
|
instance.on('end', function () {
|
||
|
t.pass('instance has ended')
|
||
|
})
|
||
|
|
||
|
clone.on('end', function () {
|
||
|
t.pass('clone has ended')
|
||
|
})
|
||
|
|
||
|
instance.resume()
|
||
|
clone.resume()
|
||
|
|
||
|
instance.destroy()
|
||
|
})
|
||
|
|
||
|
test('clone destroy does not affect other clones, cloneable or source', function (t) {
|
||
|
t.plan(1)
|
||
|
|
||
|
var source = from()
|
||
|
var instance = cloneable(source)
|
||
|
var clone = instance.clone()
|
||
|
var other = instance.clone()
|
||
|
|
||
|
source.on('close', function () {
|
||
|
t.fail('source should not be closed')
|
||
|
})
|
||
|
|
||
|
instance.on('close', function () {
|
||
|
t.fail('instance should not be closed')
|
||
|
})
|
||
|
|
||
|
other.on('close', function () {
|
||
|
t.fail('other clone should not be closed')
|
||
|
})
|
||
|
|
||
|
clone.on('close', function () {
|
||
|
t.pass('clone is closed')
|
||
|
})
|
||
|
|
||
|
clone.destroy()
|
||
|
})
|
||
|
|
||
|
test('clone remains readable if other is destroyed', function (t) {
|
||
|
t.plan(3)
|
||
|
|
||
|
var read = false
|
||
|
var source = from(function (size, next) {
|
||
|
if (read) {
|
||
|
this.push(null)
|
||
|
} else {
|
||
|
read = true
|
||
|
this.push('hello')
|
||
|
}
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
var clone = instance.clone()
|
||
|
var other = instance.clone()
|
||
|
|
||
|
instance.pipe(sink.obj(function (chunk, enc, cb) {
|
||
|
t.deepEqual(chunk.toString(), 'hello', 'instance chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
|
||
|
clone.pipe(sink.obj(function (chunk, enc, cb) {
|
||
|
t.deepEqual(chunk.toString(), 'hello', 'clone chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
|
||
|
clone.on('close', function () {
|
||
|
t.fail('clone should not be closed')
|
||
|
})
|
||
|
|
||
|
instance.on('close', function () {
|
||
|
t.fail('instance should not be closed')
|
||
|
})
|
||
|
|
||
|
other.on('close', function () {
|
||
|
t.pass('other is closed')
|
||
|
})
|
||
|
|
||
|
other.destroy()
|
||
|
})
|
||
|
|
||
|
test('clone of clone', function (t) {
|
||
|
t.plan(6)
|
||
|
|
||
|
var read = false
|
||
|
var source = from(function (size, next) {
|
||
|
if (read) {
|
||
|
this.push(null)
|
||
|
} else {
|
||
|
read = true
|
||
|
this.push('hello world')
|
||
|
}
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
var cloned = instance.clone()
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
var replica = cloned.clone()
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
instance.pipe(sink(function (chunk, enc, cb) {
|
||
|
t.equal(chunk.toString(), 'hello world', 'chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
|
||
|
cloned.pipe(sink(function (chunk, enc, cb) {
|
||
|
t.equal(chunk.toString(), 'hello world', 'chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
|
||
|
replica.pipe(sink(function (chunk, enc, cb) {
|
||
|
t.equal(chunk.toString(), 'hello world', 'chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
})
|
||
|
|
||
|
test('from vinyl', function (t) {
|
||
|
t.plan(3)
|
||
|
|
||
|
var source = from(['wa', 'dup'])
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
var clone = instance.clone()
|
||
|
|
||
|
var data = ''
|
||
|
var data2 = ''
|
||
|
var ends = 2
|
||
|
|
||
|
function latch () {
|
||
|
if (--ends === 0) {
|
||
|
t.equal(data, data2)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
instance.on('data', function (chunk) {
|
||
|
data += chunk.toString()
|
||
|
})
|
||
|
|
||
|
process.nextTick(function () {
|
||
|
t.equal('', data, 'nothing was written yet')
|
||
|
t.equal('', data2, 'nothing was written yet')
|
||
|
|
||
|
clone.on('data', function (chunk) {
|
||
|
data2 += chunk.toString()
|
||
|
})
|
||
|
})
|
||
|
|
||
|
instance.on('end', latch)
|
||
|
clone.on('end', latch)
|
||
|
})
|
||
|
|
||
|
test('waits till all are flowing', function (t) {
|
||
|
t.plan(1)
|
||
|
|
||
|
var source = from(['wa', 'dup'])
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
|
||
|
// we create a clone
|
||
|
instance.clone()
|
||
|
|
||
|
instance.on('data', function (chunk) {
|
||
|
t.fail('this should never happen')
|
||
|
})
|
||
|
|
||
|
process.nextTick(function () {
|
||
|
t.pass('wait till nextTick')
|
||
|
})
|
||
|
})
|
||
|
|
||
|
test('isCloneable', function (t) {
|
||
|
t.plan(4)
|
||
|
|
||
|
var source = from(['hello', ' ', 'world'])
|
||
|
t.notOk(cloneable.isCloneable(source), 'a generic readable is not cloneable')
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
t.ok(cloneable.isCloneable(instance), 'a cloneable is cloneable')
|
||
|
|
||
|
var clone = instance.clone()
|
||
|
t.ok(cloneable.isCloneable(clone), 'a clone is cloneable')
|
||
|
|
||
|
var cloneClone = clone.clone()
|
||
|
t.ok(cloneable.isCloneable(cloneClone), 'a clone of a clone is cloneable')
|
||
|
})
|
||
|
|
||
|
test('emits finish', function (t) {
|
||
|
var chunks = ['a', 'b', 'c', 'd', null]
|
||
|
var e1 = ['a', 'b', 'c', 'd']
|
||
|
var e2 = ['a', 'b', 'c', 'd']
|
||
|
|
||
|
t.plan(2 + e1.length + e2.length)
|
||
|
|
||
|
var source = from(function (size, next) {
|
||
|
setImmediate(next, null, chunks.shift())
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
|
||
|
var clone = instance.clone()
|
||
|
|
||
|
clone.on('finish', t.pass.bind(null, 'clone emits finish'))
|
||
|
instance.on('finish', t.pass.bind(null, 'main emits finish'))
|
||
|
|
||
|
instance.pipe(sink(function (chunk, enc, cb) {
|
||
|
t.equal(chunk.toString(), e1.shift(), 'chunk matches')
|
||
|
cb()
|
||
|
}))
|
||
|
|
||
|
clone.on('data', function (chunk) {
|
||
|
t.equal(chunk.toString(), e2.shift(), 'chunk matches')
|
||
|
})
|
||
|
})
|
||
|
|
||
|
test('clone async w resume', function (t) {
|
||
|
t.plan(4)
|
||
|
|
||
|
var read = false
|
||
|
var source = from(function (size, next) {
|
||
|
if (read) {
|
||
|
this.push(null)
|
||
|
} else {
|
||
|
read = true
|
||
|
this.push('hello world')
|
||
|
}
|
||
|
next()
|
||
|
})
|
||
|
|
||
|
var instance = cloneable(source)
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
var cloned = instance.clone()
|
||
|
t.notOk(read, 'stream not started')
|
||
|
|
||
|
instance.on('end', t.pass.bind(null, 'end emitted'))
|
||
|
instance.resume()
|
||
|
|
||
|
setImmediate(function () {
|
||
|
cloned.on('end', t.pass.bind(null, 'end emitted'))
|
||
|
cloned.resume()
|
||
|
})
|
||
|
})
|
||
|
|
||
|
test('big file', function (t) {
|
||
|
t.plan(13)
|
||
|
|
||
|
var stream = cloneable(fs.createReadStream(path.join(__dirname, 'big')))
|
||
|
var hash = crypto.createHash('sha1')
|
||
|
hash.setEncoding('hex')
|
||
|
|
||
|
var toCheck
|
||
|
|
||
|
fs.createReadStream(path.join(__dirname, 'big'))
|
||
|
.pipe(hash)
|
||
|
.once('readable', function () {
|
||
|
toCheck = hash.read()
|
||
|
t.ok(toCheck)
|
||
|
})
|
||
|
|
||
|
function pipe (s, num) {
|
||
|
s.on('end', function () {
|
||
|
t.pass('end for ' + num)
|
||
|
})
|
||
|
|
||
|
var dest = path.join(__dirname, 'out')
|
||
|
|
||
|
s.pipe(fs.createWriteStream(dest))
|
||
|
.on('finish', function () {
|
||
|
t.pass('finish for ' + num)
|
||
|
|
||
|
var destHash = crypto.createHash('sha1')
|
||
|
destHash.setEncoding('hex')
|
||
|
|
||
|
fs.createReadStream(dest)
|
||
|
.pipe(destHash)
|
||
|
.once('readable', function () {
|
||
|
var hash = destHash.read()
|
||
|
t.ok(hash)
|
||
|
t.equal(hash, toCheck)
|
||
|
})
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// Pipe in another event loop tick <-- this one finished only, it's the original cloneable.
|
||
|
setImmediate(pipe.bind(null, stream, 1))
|
||
|
|
||
|
// Pipe in the same event loop tick
|
||
|
pipe(stream.clone(), 0)
|
||
|
|
||
|
// Pipe a long time after
|
||
|
setTimeout(pipe.bind(null, stream.clone(), 2), 1000)
|
||
|
})
|
||
|
|
||
|
test('pump error', function (t) {
|
||
|
t.plan(1)
|
||
|
|
||
|
var err = new Error('kaboom')
|
||
|
|
||
|
pump([
|
||
|
cloneable(from(function () {
|
||
|
this.destroy(err)
|
||
|
})),
|
||
|
sink(function (chunk, enc, cb) {
|
||
|
t.fail('this should not be called')
|
||
|
})
|
||
|
], function (_err) {
|
||
|
t.equal(_err, err)
|
||
|
})
|
||
|
})
|