-
Notifications
You must be signed in to change notification settings - Fork 105
Advanced Example
Bryce Baril edited this page Aug 3, 2013
·
1 revision
Here's an advanced example using two through2 Transforms. One to convert Fahrenheit to Celsius, and one to do a simple moving average over the temperatures.
var through2 = require("./through2.js")
var concat = require("concat-stream")
var spigot = require("stream-spigot")
var records = 20000
// Just a random temperature generator
function gen() {
if (records-- <= 0) return null
var temp = (Math.random() * 100) | 0
return {temp: temp, unit: "F"}
}
// Convert temperatures from F to C
var ftoc = through2({ objectMode: true}, function (record, enc, callback) {
if (this.lastTemp == null) this.lastTemp = record.temp
if (record.temp != null && record.unit == 'F') {
record.temp = ( ( record.temp - 32 ) * 5 ) / 9
record.unit = 'C'
}
// Check for large temperature jumps. With the rough data there should be a lot.
if (record.temp - 20 > this.lastTemp) this.emit("jump", this.lastTemp, record.temp, record.unit)
this.push(record)
this.lastTemp = record.temp
callback()
})
// select a smoothing factor, this will be a Simple Moving Average of the last 20 records
var size = 20
var sma = through2({objectMode: true}, function (record, enc, callback) {
// Initial record, initialize sma storage
if (this.lastn == null) this.lastn = []
// Add this record to the recent history
this.lastn.push(record.temp)
// Average the last `size` temperatures
record.temp = this.lastn.reduce(
function (prev, curr) {
return prev + curr
}, 0) / this.lastn.length
// Check for large temperatore jumps
if (this.lastTemp == null) this.lastTemp = record.temp
if (record.temp - 20 > this.lastTemp) this.emit("jump", this.lastTemp, record.temp, record.unit)
// Remove the oldest value from the history
this.lastn.shift()
// Emit the smoothed value
this.push(record)
callback()
})
var ftoc_jumps = 0
ftoc.on("jump", function (prev, temp, unit) {
//console.log("ftoc Big jump from %s\xB0%s to %s\xB0%s", prev, unit, temp, unit)
ftoc_jumps++
})
var sma_jumps = 0
sma.on("jump", function (prev, temp, unit) {
//console.log("sma Big jump from %s\xB0%s to %s\xB0%s", prev, unit, temp, unit)
sma_jumps = 0
})
function collect(records) {
console.log("records seen: %s", records.length)
console.log("raw jumps: %s", ftoc_jumps)
console.log("smoothed jumps: %s", sma_jumps)
}
spigot(gen, {objectMode: true})
.pipe(ftoc)
.pipe(sma)
.pipe(concat(collect))