Skip to content

Support query subscriptions on projections with a filter not in the projection fields #316

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 25, 2019
140 changes: 100 additions & 40 deletions lib/agent.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
var hat = require('hat');
var util = require('./util');
var types = require('./types');
var util = require('./util');
var logger = require('./logger');

/**
Expand Down Expand Up @@ -105,8 +105,7 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
logger.error('Doc subscription stream error', collection, id, data.error);
return;
}
if (agent._isOwnOp(collection, data)) return;
agent._sendOp(collection, id, data);
agent._onOp(collection, id, data);
});
stream.on('end', function() {
// The op stream is done sending, so release its reference
Expand Down Expand Up @@ -149,13 +148,45 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query

emitter.onOp = function(op) {
var id = op.d;
if (agent._isOwnOp(collection, op)) return;
agent._sendOp(collection, id, op);
agent._onOp(collection, id, op);
};

emitter._open();
};

Agent.prototype._onOp = function(collection, id, op) {
if (this._isOwnOp(collection, op)) return;

// Ops emitted here are coming directly from pubsub, which emits the same op
// object to listeners without making a copy. The pattern in middleware is to
// manipulate the passed in object, and projections are implemented the same
// way currently.
//
// Deep copying the op would be safest, but deep copies are very expensive,
// especially over arbitrary objects. This function makes a shallow copy of an
// op, and it requires that projections and any user middleware copy deep
// properties as needed when they modify the op.
//
// Polling of query subscriptions is determined by the same op objects. As a
// precaution against op middleware breaking query subscriptions, we delay
// before calling into projection and middleware code
var agent = this;
process.nextTick(function() {
var copy = shallowCopyOp(op);
if (!copy) {
logger.error('Op emitted from subscription failed to copy', collection, id, op);
return;
}
agent.backend.sanitizeOp(agent, collection, id, copy, function(err) {
if (err) {
logger.error('Error sanitizing op emitted from subscription', collection, id, copy, err);
return;
}
agent._sendOp(collection, id, copy);
});
});
};

Agent.prototype._isOwnOp = function(collection, op) {
// Detect ops from this client on the same projection. Since the client sent
// these in, the submit reply will be sufficient and we can silently ignore
Expand Down Expand Up @@ -186,12 +217,17 @@ Agent.prototype._sendOp = function(collection, id, op) {

this.send(message);
};

Agent.prototype._sendOps = function(collection, id, ops) {
for (var i = 0; i < ops.length; i++) {
this._sendOp(collection, id, ops[i]);
}
};
Agent.prototype._sendOpsBulk = function(collection, opsMap) {
for (var id in opsMap) {
var ops = opsMap[id];
this._sendOps(collection, id, ops);
}
};

function getReplyErrorObject(err) {
if (typeof err === 'string') {
Expand Down Expand Up @@ -316,7 +352,8 @@ Agent.prototype._handleMessage = function(request, callback) {
case 'u':
return this._unsubscribe(request.c, request.d, callback);
case 'op':
var op = this._createOp(request);
// Normalize the properties submitted
var op = createClientOp(request, this.clientId);
if (!op) return callback({code: 4000, message: 'Invalid op message'});
return this._submit(request.c, request.d, op, callback);
case 'nf':
Expand Down Expand Up @@ -493,10 +530,7 @@ Agent.prototype._fetchBulkOps = function(collection, versions, callback) {
var agent = this;
this.backend.getOpsBulk(this, collection, versions, null, function(err, opsMap) {
if (err) return callback(err);
for (var id in opsMap) {
var ops = opsMap[id];
agent._sendOps(collection, id, ops);
}
agent._sendOpsBulk(collection, opsMap);
callback();
});
};
Expand All @@ -505,8 +539,18 @@ Agent.prototype._subscribe = function(collection, id, version, callback) {
// If the version is specified, catch the client up by sending all ops
// since the specified version
var agent = this;
this.backend.subscribe(this, collection, id, version, function(err, stream, snapshot) {
this.backend.subscribe(this, collection, id, version, function(err, stream, snapshot, ops) {
if (err) return callback(err);
// If we're subscribing from a known version, send any ops committed since
// the requested version to bring the client's doc up to date
if (ops) {
agent._sendOps(collection, id, ops);
}
// In addition, ops may already be queued on the stream by pubsub.
// Subscribe is called before the ops or snapshot are fetched, so it is
// possible that some ops may be duplicates. Clients should ignore any
// duplicate ops they may receive. This will flush ops already queued and
// subscribe to ongoing ops from the stream
agent._subscribeToStream(collection, id, stream);
// Snapshot is returned only when subscribing from a null version.
// Otherwise, ops will have been pushed into the stream
Expand All @@ -519,9 +563,13 @@ Agent.prototype._subscribe = function(collection, id, version, callback) {
};

Agent.prototype._subscribeBulk = function(collection, versions, callback) {
// See _subscribe() above. This function's logic should match but in bulk
var agent = this;
this.backend.subscribeBulk(this, collection, versions, function(err, streams, snapshotMap) {
this.backend.subscribeBulk(this, collection, versions, function(err, streams, snapshotMap, opsMap) {
if (err) return callback(err);
if (opsMap) {
agent._sendOpsBulk(collection, opsMap);
}
for (var id in streams) {
agent._subscribeToStream(collection, id, streams[id]);
}
Expand Down Expand Up @@ -572,45 +620,57 @@ Agent.prototype._submit = function(collection, id, op, callback) {
});
};

function CreateOp(src, seq, v, create) {
Agent.prototype._fetchSnapshot = function(collection, id, version, callback) {
this.backend.fetchSnapshot(this, collection, id, version, callback);
};

Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp, callback) {
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
};


function createClientOp(request, clientId) {
// src can be provided if it is not the same as the current agent,
// such as a resubmission after a reconnect, but it usually isn't needed
var src = request.src || clientId;
// c, d, and m arguments are intentionally undefined. These are set later
return (request.op) ? new EditOp(src, request.seq, request.v, request.op) :
(request.create) ? new CreateOp(src, request.seq, request.v, request.create) :
(request.del) ? new DeleteOp(src, request.seq, request.v, request.del) :
undefined;
}

function shallowCopyOp(op) {
return (op.op) ? new EditOp(op.src, op.seq, op.v, op.op, op.c, op.d, op.m) :
(op.create) ? new CreateOp(op.src, op.seq, op.v, op.create, op.c, op.d, op.m) :
(op.del) ? new DeleteOp(op.src, op.seq, op.v, op.del, op.c, op.d, op.m) :
undefined;
}

function CreateOp(src, seq, v, create, c, d, m) {
this.src = src;
this.seq = seq;
this.v = v;
this.create = create;
this.m = null;
this.c = c;
this.d = d;
this.m = m;
}
function EditOp(src, seq, v, op) {
function EditOp(src, seq, v, op, c, d, m) {
this.src = src;
this.seq = seq;
this.v = v;
this.op = op;
this.m = null;
this.c = c;
this.d = d;
this.m = m;
}
function DeleteOp(src, seq, v, del) {
function DeleteOp(src, seq, v, del, c, d, m) {
this.src = src;
this.seq = seq;
this.v = v;
this.del = del;
this.m = null;
this.c = c;
this.d = d;
this.m = m;
}
// Normalize the properties submitted
Agent.prototype._createOp = function(request) {
// src can be provided if it is not the same as the current agent,
// such as a resubmission after a reconnect, but it usually isn't needed
var src = request.src || this.clientId;
if (request.op) {
return new EditOp(src, request.seq, request.v, request.op);
} else if (request.create) {
return new CreateOp(src, request.seq, request.v, request.create);
} else if (request.del) {
return new DeleteOp(src, request.seq, request.v, request.del);
}
};

Agent.prototype._fetchSnapshot = function(collection, id, version, callback) {
this.backend.fetchSnapshot(this, collection, id, version, callback);
};

Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp, callback) {
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
};
74 changes: 47 additions & 27 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ Backend.prototype.use = function(action, fn) {
for (var i = 0; i < action.length; i++) {
this.use(action[i], fn);
}
return;
return this;
}
var fns = this.middleware[action] || (this.middleware[action] = []);
fns.push(fn);
Expand Down Expand Up @@ -263,6 +263,12 @@ Backend.prototype.submit = function(agent, index, id, op, options, callback) {
});
};

Backend.prototype.sanitizeOp = function(agent, index, id, op, callback) {
var projection = this.projections[index];
var collection = (projection) ? projection.target : index;
this._sanitizeOp(agent, projection, collection, id, op, callback);
};

Backend.prototype._sanitizeOp = function(agent, projection, collection, id, op, callback) {
if (projection) {
try {
Expand Down Expand Up @@ -317,6 +323,28 @@ Backend.prototype._getSnapshotsFromMap = function(ids, snapshotMap) {
return snapshots;
};

Backend.prototype._getSanitizedOps = function(agent, projection, collection, id, from, to, opsOptions, callback) {
var backend = this;
backend.db.getOps(collection, id, from, to, opsOptions, function(err, ops) {
if (err) return callback(err);
backend._sanitizeOps(agent, projection, collection, id, ops, function(err) {
if (err) return callback(err);
callback(null, ops);
});
});
};

Backend.prototype._getSanitizedOpsBulk = function(agent, projection, collection, fromMap, toMap, opsOptions, callback) {
var backend = this;
backend.db.getOpsBulk(collection, fromMap, toMap, opsOptions, function(err, opsMap) {
if (err) return callback(err);
backend._sanitizeOpsBulk(agent, projection, collection, opsMap, function(err) {
if (err) return callback(err);
callback(null, opsMap);
});
});
};

// Non inclusive - gets ops from [from, to). Ie, all relevant ops. If to is
// not defined (null or undefined) then it returns all ops.
Backend.prototype.getOps = function(agent, index, id, from, to, options, callback) {
Expand All @@ -337,13 +365,10 @@ Backend.prototype.getOps = function(agent, index, id, from, to, options, callbac
to: to
};
var opsOptions = options && options.opsOptions;
backend.db.getOps(collection, id, from, to, opsOptions, function(err, ops) {
backend._getSanitizedOps(agent, projection, collection, id, from, to, opsOptions, function(err, ops) {
if (err) return callback(err);
backend._sanitizeOps(agent, projection, collection, id, ops, function(err) {
if (err) return callback(err);
backend.emit('timing', 'getOps', Date.now() - start, request);
callback(err, ops);
});
backend.emit('timing', 'getOps', Date.now() - start, request);
callback(null, ops);
});
};

Expand All @@ -364,13 +389,10 @@ Backend.prototype.getOpsBulk = function(agent, index, fromMap, toMap, options, c
toMap: toMap
};
var opsOptions = options && options.opsOptions;
backend.db.getOpsBulk(collection, fromMap, toMap, opsOptions, function(err, opsMap) {
backend._getSanitizedOpsBulk(agent, projection, collection, fromMap, toMap, opsOptions, function(err, opsMap) {
if (err) return callback(err);
backend._sanitizeOpsBulk(agent, projection, collection, opsMap, function(err) {
if (err) return callback(err);
backend.emit('timing', 'getOpsBulk', Date.now() - start, request);
callback(err, opsMap);
});
backend.emit('timing', 'getOpsBulk', Date.now() - start, request);
callback(null, opsMap);
});
};

Expand Down Expand Up @@ -471,21 +493,25 @@ Backend.prototype.subscribe = function(agent, index, id, version, options, callb
};
backend.pubsub.subscribe(channel, function(err, stream) {
if (err) return callback(err);
stream.initProjection(backend, agent, projection);
if (version == null) {
// Subscribing from null means that the agent doesn't have a document
// and needs to fetch it as well as subscribing
backend.fetch(agent, index, id, function(err, snapshot) {
if (err) return callback(err);
if (err) {
stream.destroy();
return callback(err);
}
backend.emit('timing', 'subscribe.snapshot', Date.now() - start, request);
callback(null, stream, snapshot);
});
} else {
backend.db.getOps(collection, id, version, null, null, function(err, ops) {
if (err) return callback(err);
stream.pushOps(collection, id, ops);
backend._getSanitizedOps(agent, projection, collection, id, version, null, null, function(err, ops) {
if (err) {
stream.destroy();
return callback(err);
}
backend.emit('timing', 'subscribe.ops', Date.now() - start, request);
callback(null, stream);
callback(null, stream, null, ops);
});
}
});
Expand All @@ -509,7 +535,6 @@ Backend.prototype.subscribeBulk = function(agent, index, versions, callback) {
var channel = backend.getDocChannel(collection, id);
backend.pubsub.subscribe(channel, function(err, stream) {
if (err) return eachCb(err);
stream.initProjection(backend, agent, projection);
streams[id] = stream;
eachCb();
});
Expand All @@ -530,17 +555,13 @@ Backend.prototype.subscribeBulk = function(agent, index, versions, callback) {
});
} else {
// If a versions map, get ops since requested versions
backend.db.getOpsBulk(collection, versions, null, null, function(err, opsMap) {
backend._getSanitizedOpsBulk(agent, projection, collection, versions, null, null, function(err, opsMap) {
if (err) {
destroyStreams(streams);
return callback(err);
}
for (var id in opsMap) {
var ops = opsMap[id];
streams[id].pushOps(collection, id, ops);
}
backend.emit('timing', 'subscribeBulk.ops', Date.now() - start, request);
callback(null, streams);
callback(null, streams, null, opsMap);
});
}
});
Expand Down Expand Up @@ -582,7 +603,6 @@ Backend.prototype.querySubscribe = function(agent, index, query, options, callba
}
backend.pubsub.subscribe(request.channel, function(err, stream) {
if (err) return callback(err);
stream.initProjection(backend, agent, request.projection);
if (options.ids) {
var queryEmitter = new QueryEmitter(request, stream, options.ids);
backend.emit('timing', 'querySubscribe.reconnect', Date.now() - start, request);
Expand Down
Loading