wishthis/node_modules/fork-stream/test/tests.js

95 lines
1.8 KiB
JavaScript
Raw Normal View History

2022-01-21 08:28:41 +00:00
var assert = require("chai").assert;
var ForkStream = require("../");
describe("fork-stream", function() {
it("should split objects into their correct streams", function(done) {
var fork = new ForkStream({
classifier: function classify(e, done) {
return done(null, e >= 5);
},
});
var expectedA = [5, 7, 9],
expectedB = [1, 4, 3, 1];
var actualA = [],
actualB = [];
fork.a.on("data", function(e) {
actualA.push(e);
});
fork.b.on("data", function(e) {
actualB.push(e);
});
fork.on("finish", function() {
assert.deepEqual(expectedA, actualA);
assert.deepEqual(expectedB, actualB);
return done();
});
[1, 5, 7, 4, 9, 3, 1].forEach(function(n) {
fork.write(n);
});
fork.end();
});
it("should respect backpressure", function(done) {
var fork = new ForkStream({
highWaterMark: 2,
classifier: function classify(e, done) {
return done(null, e >= 5);
},
});
var expected = [5, 7],
actual = [];
fork.a.on("data", function(e) {
actual.push(e);
});
var timeout = setTimeout(function() {
assert.deepEqual(expected, actual);
return done();
}, 10);
fork.on("finish", function() {
clearTimeout(timeout);
return done(Error("should not finish"));
});
[1, 5, 7, 4, 9, 3, 1].forEach(function(n) {
fork.write(n);
});
fork.end();
});
it("should end the outputs when the input finishes", function(done) {
var fork = new ForkStream();
var count = 0;
var onEnd = function onEnd() {
if (++count === 2) {
return done();
}
};
fork.a.on("end", onEnd)
fork.b.on("end", onEnd);
// start "flowing" mode
fork.a.resume();
fork.b.resume();
fork.end();
});
});