Skip to content
Bryce Baril edited this page Aug 3, 2013 · 1 revision

Here's an advanced example using two through2 Transforms. One to convert Fahrenheit to Celsius, and one to do a simple moving average over the temperatures.

var through2 = require("./through2.js")
var concat = require("concat-stream")

var spigot = require("stream-spigot")

var records = 20000

// Just a random temperature generator
function gen() {
  if (records-- <= 0) return null
  var temp = (Math.random() * 100) | 0
  return {temp: temp, unit: "F"}
}

// Convert temperatures from F to C
var ftoc = through2({ objectMode: true}, function (record, enc, callback) {
  if (this.lastTemp == null) this.lastTemp = record.temp

  if (record.temp != null && record.unit == 'F') {
    record.temp = ( ( record.temp - 32 ) * 5 ) / 9
    record.unit = 'C'
  }

  // Check for large temperature jumps. With the rough data there should be a lot.
  if (record.temp - 20 > this.lastTemp) this.emit("jump", this.lastTemp, record.temp, record.unit)
  this.push(record)

  this.lastTemp = record.temp
  callback()
})

// select a smoothing factor, this will be a Simple Moving Average of the last 20 records
var size = 20
var sma = through2({objectMode: true}, function (record, enc, callback) {

  // Initial record, initialize sma storage
  if (this.lastn == null) this.lastn = []

  // Add this record to the recent history
  this.lastn.push(record.temp)

  // Average the last `size` temperatures
  record.temp = this.lastn.reduce(
    function (prev, curr) {
      return prev + curr
    }, 0) / this.lastn.length

  // Check for large temperatore jumps
  if (this.lastTemp == null) this.lastTemp = record.temp
  if (record.temp - 20 > this.lastTemp) this.emit("jump", this.lastTemp, record.temp, record.unit)

  // Remove the oldest value from the history
  this.lastn.shift()

  // Emit the smoothed value
  this.push(record)
  callback()
})

var ftoc_jumps = 0
ftoc.on("jump", function (prev, temp, unit) {
  //console.log("ftoc Big jump from %s\xB0%s to %s\xB0%s", prev, unit, temp, unit)
  ftoc_jumps++
})

var sma_jumps = 0
sma.on("jump", function (prev, temp, unit) {
  //console.log("sma Big jump from %s\xB0%s to %s\xB0%s", prev, unit, temp, unit)
  sma_jumps = 0
})

function collect(records) {
  console.log("records seen: %s", records.length)
  console.log("raw jumps: %s", ftoc_jumps)
  console.log("smoothed jumps: %s", sma_jumps)
}

spigot(gen, {objectMode: true})
  .pipe(ftoc)
  .pipe(sma)
  .pipe(concat(collect))
Clone this wiki locally