55 lines
1.6 KiB
JavaScript
55 lines
1.6 KiB
JavaScript
'use strict';
|
|
|
|
var through2 = require('through2');
|
|
var ForkStream = require('fork-stream');
|
|
var mergeStream = require('merge-stream');
|
|
var duplexify = require('duplexify');
|
|
|
|
module.exports = function (condition, trueStream, falseStream) {
|
|
if (!trueStream) {
|
|
throw new Error('fork-stream: child action is required');
|
|
}
|
|
|
|
// output stream
|
|
var outStream = through2.obj();
|
|
|
|
// create fork-stream
|
|
var forkStream = new ForkStream({
|
|
classifier: function (e, cb) {
|
|
var ans = !!condition(e);
|
|
return cb(null, ans);
|
|
}
|
|
});
|
|
|
|
// if condition is true, pipe input to trueStream
|
|
forkStream.a.pipe(trueStream);
|
|
|
|
var mergedStream;
|
|
|
|
if (falseStream) {
|
|
// if there's an 'else' condition
|
|
// if condition is false
|
|
// pipe input to falseStream
|
|
forkStream.b.pipe(falseStream);
|
|
// merge output with trueStream's output
|
|
mergedStream = mergeStream(falseStream, trueStream);
|
|
// redirect falseStream errors to mergedStream
|
|
falseStream.on('error', function(err) { mergedStream.emit('error', err); });
|
|
} else {
|
|
// if there's no 'else' condition
|
|
// if condition is false
|
|
// merge output with trueStream's output
|
|
mergedStream = mergeStream(forkStream.b, trueStream);
|
|
}
|
|
|
|
// redirect trueStream errors to mergedStream
|
|
trueStream.on('error', function(err) { mergedStream.emit('error', err); });
|
|
|
|
// send everything down-stream
|
|
mergedStream.pipe(outStream);
|
|
// redirect mergedStream errors to outStream
|
|
mergedStream.on('error', function(err) { outStream.emit('error', err); });
|
|
|
|
// consumers write in to forkStream, we write out to outStream
|
|
return duplexify.obj(forkStream, outStream);
|
|
};
|