Skip to content

Commit c5d7ca2

Browse files
authored
Merge pull request #316 from share/fix-projection-queries
Support query subscriptions on projections with a filter not in the projection fields
2 parents b9a72eb + 19ba7bc commit c5d7ca2

10 files changed

+263
-177
lines changed

lib/agent.js

+100-40
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
var hat = require('hat');
2-
var util = require('./util');
32
var types = require('./types');
3+
var util = require('./util');
44
var logger = require('./logger');
55

66
/**
@@ -105,8 +105,7 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
105105
logger.error('Doc subscription stream error', collection, id, data.error);
106106
return;
107107
}
108-
if (agent._isOwnOp(collection, data)) return;
109-
agent._sendOp(collection, id, data);
108+
agent._onOp(collection, id, data);
110109
});
111110
stream.on('end', function() {
112111
// The op stream is done sending, so release its reference
@@ -149,13 +148,45 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query
149148

150149
emitter.onOp = function(op) {
151150
var id = op.d;
152-
if (agent._isOwnOp(collection, op)) return;
153-
agent._sendOp(collection, id, op);
151+
agent._onOp(collection, id, op);
154152
};
155153

156154
emitter._open();
157155
};
158156

157+
Agent.prototype._onOp = function(collection, id, op) {
158+
if (this._isOwnOp(collection, op)) return;
159+
160+
// Ops emitted here are coming directly from pubsub, which emits the same op
161+
// object to listeners without making a copy. The pattern in middleware is to
162+
// manipulate the passed in object, and projections are implemented the same
163+
// way currently.
164+
//
165+
// Deep copying the op would be safest, but deep copies are very expensive,
166+
// especially over arbitrary objects. This function makes a shallow copy of an
167+
// op, and it requires that projections and any user middleware copy deep
168+
// properties as needed when they modify the op.
169+
//
170+
// Polling of query subscriptions is determined by the same op objects. As a
171+
// precaution against op middleware breaking query subscriptions, we delay
172+
// before calling into projection and middleware code
173+
var agent = this;
174+
process.nextTick(function() {
175+
var copy = shallowCopyOp(op);
176+
if (!copy) {
177+
logger.error('Op emitted from subscription failed to copy', collection, id, op);
178+
return;
179+
}
180+
agent.backend.sanitizeOp(agent, collection, id, copy, function(err) {
181+
if (err) {
182+
logger.error('Error sanitizing op emitted from subscription', collection, id, copy, err);
183+
return;
184+
}
185+
agent._sendOp(collection, id, copy);
186+
});
187+
});
188+
};
189+
159190
Agent.prototype._isOwnOp = function(collection, op) {
160191
// Detect ops from this client on the same projection. Since the client sent
161192
// these in, the submit reply will be sufficient and we can silently ignore
@@ -186,12 +217,17 @@ Agent.prototype._sendOp = function(collection, id, op) {
186217

187218
this.send(message);
188219
};
189-
190220
Agent.prototype._sendOps = function(collection, id, ops) {
191221
for (var i = 0; i < ops.length; i++) {
192222
this._sendOp(collection, id, ops[i]);
193223
}
194224
};
225+
Agent.prototype._sendOpsBulk = function(collection, opsMap) {
226+
for (var id in opsMap) {
227+
var ops = opsMap[id];
228+
this._sendOps(collection, id, ops);
229+
}
230+
};
195231

196232
function getReplyErrorObject(err) {
197233
if (typeof err === 'string') {
@@ -316,7 +352,8 @@ Agent.prototype._handleMessage = function(request, callback) {
316352
case 'u':
317353
return this._unsubscribe(request.c, request.d, callback);
318354
case 'op':
319-
var op = this._createOp(request);
355+
// Normalize the properties submitted
356+
var op = createClientOp(request, this.clientId);
320357
if (!op) return callback({code: 4000, message: 'Invalid op message'});
321358
return this._submit(request.c, request.d, op, callback);
322359
case 'nf':
@@ -493,10 +530,7 @@ Agent.prototype._fetchBulkOps = function(collection, versions, callback) {
493530
var agent = this;
494531
this.backend.getOpsBulk(this, collection, versions, null, function(err, opsMap) {
495532
if (err) return callback(err);
496-
for (var id in opsMap) {
497-
var ops = opsMap[id];
498-
agent._sendOps(collection, id, ops);
499-
}
533+
agent._sendOpsBulk(collection, opsMap);
500534
callback();
501535
});
502536
};
@@ -505,8 +539,18 @@ Agent.prototype._subscribe = function(collection, id, version, callback) {
505539
// If the version is specified, catch the client up by sending all ops
506540
// since the specified version
507541
var agent = this;
508-
this.backend.subscribe(this, collection, id, version, function(err, stream, snapshot) {
542+
this.backend.subscribe(this, collection, id, version, function(err, stream, snapshot, ops) {
509543
if (err) return callback(err);
544+
// If we're subscribing from a known version, send any ops committed since
545+
// the requested version to bring the client's doc up to date
546+
if (ops) {
547+
agent._sendOps(collection, id, ops);
548+
}
549+
// In addition, ops may already be queued on the stream by pubsub.
550+
// Subscribe is called before the ops or snapshot are fetched, so it is
551+
// possible that some ops may be duplicates. Clients should ignore any
552+
// duplicate ops they may receive. This will flush ops already queued and
553+
// subscribe to ongoing ops from the stream
510554
agent._subscribeToStream(collection, id, stream);
511555
// Snapshot is returned only when subscribing from a null version.
512556
// Otherwise, ops will have been pushed into the stream
@@ -519,9 +563,13 @@ Agent.prototype._subscribe = function(collection, id, version, callback) {
519563
};
520564

521565
Agent.prototype._subscribeBulk = function(collection, versions, callback) {
566+
// See _subscribe() above. This function's logic should match but in bulk
522567
var agent = this;
523-
this.backend.subscribeBulk(this, collection, versions, function(err, streams, snapshotMap) {
568+
this.backend.subscribeBulk(this, collection, versions, function(err, streams, snapshotMap, opsMap) {
524569
if (err) return callback(err);
570+
if (opsMap) {
571+
agent._sendOpsBulk(collection, opsMap);
572+
}
525573
for (var id in streams) {
526574
agent._subscribeToStream(collection, id, streams[id]);
527575
}
@@ -572,45 +620,57 @@ Agent.prototype._submit = function(collection, id, op, callback) {
572620
});
573621
};
574622

575-
function CreateOp(src, seq, v, create) {
623+
Agent.prototype._fetchSnapshot = function(collection, id, version, callback) {
624+
this.backend.fetchSnapshot(this, collection, id, version, callback);
625+
};
626+
627+
Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp, callback) {
628+
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
629+
};
630+
631+
632+
function createClientOp(request, clientId) {
633+
// src can be provided if it is not the same as the current agent,
634+
// such as a resubmission after a reconnect, but it usually isn't needed
635+
var src = request.src || clientId;
636+
// c, d, and m arguments are intentionally undefined. These are set later
637+
return (request.op) ? new EditOp(src, request.seq, request.v, request.op) :
638+
(request.create) ? new CreateOp(src, request.seq, request.v, request.create) :
639+
(request.del) ? new DeleteOp(src, request.seq, request.v, request.del) :
640+
undefined;
641+
}
642+
643+
function shallowCopyOp(op) {
644+
return (op.op) ? new EditOp(op.src, op.seq, op.v, op.op, op.c, op.d, op.m) :
645+
(op.create) ? new CreateOp(op.src, op.seq, op.v, op.create, op.c, op.d, op.m) :
646+
(op.del) ? new DeleteOp(op.src, op.seq, op.v, op.del, op.c, op.d, op.m) :
647+
undefined;
648+
}
649+
650+
function CreateOp(src, seq, v, create, c, d, m) {
576651
this.src = src;
577652
this.seq = seq;
578653
this.v = v;
579654
this.create = create;
580-
this.m = null;
655+
this.c = c;
656+
this.d = d;
657+
this.m = m;
581658
}
582-
function EditOp(src, seq, v, op) {
659+
function EditOp(src, seq, v, op, c, d, m) {
583660
this.src = src;
584661
this.seq = seq;
585662
this.v = v;
586663
this.op = op;
587-
this.m = null;
664+
this.c = c;
665+
this.d = d;
666+
this.m = m;
588667
}
589-
function DeleteOp(src, seq, v, del) {
668+
function DeleteOp(src, seq, v, del, c, d, m) {
590669
this.src = src;
591670
this.seq = seq;
592671
this.v = v;
593672
this.del = del;
594-
this.m = null;
673+
this.c = c;
674+
this.d = d;
675+
this.m = m;
595676
}
596-
// Normalize the properties submitted
597-
Agent.prototype._createOp = function(request) {
598-
// src can be provided if it is not the same as the current agent,
599-
// such as a resubmission after a reconnect, but it usually isn't needed
600-
var src = request.src || this.clientId;
601-
if (request.op) {
602-
return new EditOp(src, request.seq, request.v, request.op);
603-
} else if (request.create) {
604-
return new CreateOp(src, request.seq, request.v, request.create);
605-
} else if (request.del) {
606-
return new DeleteOp(src, request.seq, request.v, request.del);
607-
}
608-
};
609-
610-
Agent.prototype._fetchSnapshot = function(collection, id, version, callback) {
611-
this.backend.fetchSnapshot(this, collection, id, version, callback);
612-
};
613-
614-
Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp, callback) {
615-
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
616-
};

lib/backend.js

+47-27
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ Backend.prototype.use = function(action, fn) {
211211
for (var i = 0; i < action.length; i++) {
212212
this.use(action[i], fn);
213213
}
214-
return;
214+
return this;
215215
}
216216
var fns = this.middleware[action] || (this.middleware[action] = []);
217217
fns.push(fn);
@@ -268,6 +268,12 @@ Backend.prototype.submit = function(agent, index, id, op, options, callback) {
268268
});
269269
};
270270

271+
Backend.prototype.sanitizeOp = function(agent, index, id, op, callback) {
272+
var projection = this.projections[index];
273+
var collection = (projection) ? projection.target : index;
274+
this._sanitizeOp(agent, projection, collection, id, op, callback);
275+
};
276+
271277
Backend.prototype._sanitizeOp = function(agent, projection, collection, id, op, callback) {
272278
if (projection) {
273279
try {
@@ -322,6 +328,28 @@ Backend.prototype._getSnapshotsFromMap = function(ids, snapshotMap) {
322328
return snapshots;
323329
};
324330

331+
Backend.prototype._getSanitizedOps = function(agent, projection, collection, id, from, to, opsOptions, callback) {
332+
var backend = this;
333+
backend.db.getOps(collection, id, from, to, opsOptions, function(err, ops) {
334+
if (err) return callback(err);
335+
backend._sanitizeOps(agent, projection, collection, id, ops, function(err) {
336+
if (err) return callback(err);
337+
callback(null, ops);
338+
});
339+
});
340+
};
341+
342+
Backend.prototype._getSanitizedOpsBulk = function(agent, projection, collection, fromMap, toMap, opsOptions, callback) {
343+
var backend = this;
344+
backend.db.getOpsBulk(collection, fromMap, toMap, opsOptions, function(err, opsMap) {
345+
if (err) return callback(err);
346+
backend._sanitizeOpsBulk(agent, projection, collection, opsMap, function(err) {
347+
if (err) return callback(err);
348+
callback(null, opsMap);
349+
});
350+
});
351+
};
352+
325353
// Non inclusive - gets ops from [from, to). Ie, all relevant ops. If to is
326354
// not defined (null or undefined) then it returns all ops.
327355
Backend.prototype.getOps = function(agent, index, id, from, to, options, callback) {
@@ -342,13 +370,10 @@ Backend.prototype.getOps = function(agent, index, id, from, to, options, callbac
342370
to: to
343371
};
344372
var opsOptions = options && options.opsOptions;
345-
backend.db.getOps(collection, id, from, to, opsOptions, function(err, ops) {
373+
backend._getSanitizedOps(agent, projection, collection, id, from, to, opsOptions, function(err, ops) {
346374
if (err) return callback(err);
347-
backend._sanitizeOps(agent, projection, collection, id, ops, function(err) {
348-
if (err) return callback(err);
349-
backend.emit('timing', 'getOps', Date.now() - start, request);
350-
callback(err, ops);
351-
});
375+
backend.emit('timing', 'getOps', Date.now() - start, request);
376+
callback(null, ops);
352377
});
353378
};
354379

@@ -369,13 +394,10 @@ Backend.prototype.getOpsBulk = function(agent, index, fromMap, toMap, options, c
369394
toMap: toMap
370395
};
371396
var opsOptions = options && options.opsOptions;
372-
backend.db.getOpsBulk(collection, fromMap, toMap, opsOptions, function(err, opsMap) {
397+
backend._getSanitizedOpsBulk(agent, projection, collection, fromMap, toMap, opsOptions, function(err, opsMap) {
373398
if (err) return callback(err);
374-
backend._sanitizeOpsBulk(agent, projection, collection, opsMap, function(err) {
375-
if (err) return callback(err);
376-
backend.emit('timing', 'getOpsBulk', Date.now() - start, request);
377-
callback(err, opsMap);
378-
});
399+
backend.emit('timing', 'getOpsBulk', Date.now() - start, request);
400+
callback(null, opsMap);
379401
});
380402
};
381403

@@ -476,21 +498,25 @@ Backend.prototype.subscribe = function(agent, index, id, version, options, callb
476498
};
477499
backend.pubsub.subscribe(channel, function(err, stream) {
478500
if (err) return callback(err);
479-
stream.initProjection(backend, agent, projection);
480501
if (version == null) {
481502
// Subscribing from null means that the agent doesn't have a document
482503
// and needs to fetch it as well as subscribing
483504
backend.fetch(agent, index, id, function(err, snapshot) {
484-
if (err) return callback(err);
505+
if (err) {
506+
stream.destroy();
507+
return callback(err);
508+
}
485509
backend.emit('timing', 'subscribe.snapshot', Date.now() - start, request);
486510
callback(null, stream, snapshot);
487511
});
488512
} else {
489-
backend.db.getOps(collection, id, version, null, null, function(err, ops) {
490-
if (err) return callback(err);
491-
stream.pushOps(collection, id, ops);
513+
backend._getSanitizedOps(agent, projection, collection, id, version, null, null, function(err, ops) {
514+
if (err) {
515+
stream.destroy();
516+
return callback(err);
517+
}
492518
backend.emit('timing', 'subscribe.ops', Date.now() - start, request);
493-
callback(null, stream);
519+
callback(null, stream, null, ops);
494520
});
495521
}
496522
});
@@ -514,7 +540,6 @@ Backend.prototype.subscribeBulk = function(agent, index, versions, callback) {
514540
var channel = backend.getDocChannel(collection, id);
515541
backend.pubsub.subscribe(channel, function(err, stream) {
516542
if (err) return eachCb(err);
517-
stream.initProjection(backend, agent, projection);
518543
streams[id] = stream;
519544
eachCb();
520545
});
@@ -535,17 +560,13 @@ Backend.prototype.subscribeBulk = function(agent, index, versions, callback) {
535560
});
536561
} else {
537562
// If a versions map, get ops since requested versions
538-
backend.db.getOpsBulk(collection, versions, null, null, function(err, opsMap) {
563+
backend._getSanitizedOpsBulk(agent, projection, collection, versions, null, null, function(err, opsMap) {
539564
if (err) {
540565
destroyStreams(streams);
541566
return callback(err);
542567
}
543-
for (var id in opsMap) {
544-
var ops = opsMap[id];
545-
streams[id].pushOps(collection, id, ops);
546-
}
547568
backend.emit('timing', 'subscribeBulk.ops', Date.now() - start, request);
548-
callback(null, streams);
569+
callback(null, streams, null, opsMap);
549570
});
550571
}
551572
});
@@ -587,7 +608,6 @@ Backend.prototype.querySubscribe = function(agent, index, query, options, callba
587608
}
588609
backend.pubsub.subscribe(request.channel, function(err, stream) {
589610
if (err) return callback(err);
590-
stream.initProjection(backend, agent, request.projection);
591611
if (options.ids) {
592612
var queryEmitter = new QueryEmitter(request, stream, options.ids);
593613
backend.emit('timing', 'querySubscribe.reconnect', Date.now() - start, request);

0 commit comments

Comments
 (0)