diff --git a/.gitignore b/.gitignore index 3005c1397..abd0d58e5 100644 --- a/.gitignore +++ b/.gitignore @@ -34,4 +34,5 @@ coverage # Dependency directories node_modules package-lock.json +yarn.lock jspm_packages diff --git a/README.md b/README.md index 8cbbfecf0..5cc6cf113 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ tracker](https://github.com/share/sharedb/issues). - Realtime synchronization of any JSON document - Concurrent multi-user collaboration +- Realtime synchronization of any ephemeral "presence" data - Synchronous editing API with asynchronous eventual consistency - Realtime query subscriptions - Simple integration with any database - [MongoDB](https://github.com/share/sharedb-mongo), [PostgresQL](https://github.com/share/sharedb-postgres) (experimental) @@ -73,6 +74,12 @@ initial data. Then you can submit editing operations on the document (using OT). Finally you can delete the document with a delete operation. By default, ShareDB stores all operations forever - nothing is truly deleted. +## User presence synchronization + +ShareDB supports synchronization of user presence data. This feature is opt-in, not enabled by default. To enable this feature, pass the `enablePresence: true` option to the ShareDB constructor (e.g. `var share = new ShareDB({ enablePresence: true })`). + +Presence data represents a user and is automatically synchronized between all clients subscribed to the same document. Its format is defined by the document's [OT Type](https://github.com/ottypes/docs), for example it may contain a user ID and a cursor position in a text document. All clients can modify their own presence data and receive a read-only version of other client's data. Presence data is automatically cleared when a client unsubscribes from the document or disconnects. It is also automatically transformed against applied operations, so that it still makes sense in the context of a modified document, for example a cursor position may be automatically advanced when a user types at the beginning of a text document. + ## Server API ### Initialization @@ -91,6 +98,8 @@ __Options__ * `options.pubsub` _(instance of `ShareDB.PubSub`)_ Notify other ShareDB processes when data changes through this pub/sub adapter. Defaults to `ShareDB.MemoryPubSub()`. +* `options.Presence` _(implementation of `ShareDB.Presence`)_ + Enable user presence synchronization. If not specified, presence features are not enabled. #### Database Adapters * `ShareDB.MemoryDB`, backed by a non-persistent database with no queries @@ -308,6 +317,9 @@ Unique document ID `doc.data` _(Object)_ Document contents. Available after document is fetched or subscribed to. +`doc.presence.current` _(Object)_ +Each property under `doc.presence.current` contains presence data shared by a client subscribed to this document. The property name is an empty string for this client's data and connection IDs for other clients' data. + `doc.fetch(function(err) {...})` Populate the fields on `doc` with a snapshot of the document from the server. @@ -337,6 +349,9 @@ An operation was applied to the data. `source` will be `false` for ops received `doc.on('del', function(data, source) {...})` The document was deleted. Document contents before deletion are passed in as an argument. `source` will be `false` for ops received from the server and defaults to `true` for ops generated locally. +`doc.on('presence', function(srcList, submitted) {...})` +Presence data has changed. `srcList` is an Array of `doc.presence` property names for which values have changed. `submitted` is `true`, if the event is the result of new presence data being submitted by the local or remote user, otherwise it is `false` - eg if the presence data was transformed against an operation or was cleared on unsubscribe, disconnect or roll-back. + `doc.on('error', function(err) {...})` There was an error fetching the document or applying an operation. @@ -370,6 +385,11 @@ Invokes the given callback function after Note that `whenNothingPending` does NOT wait for pending `model.query()` calls. +`doc.submitPresence(presenceData[, function(err) {...}])` +Set local presence data and publish it for other clients. +`presenceData` structure depends on the document type. +Presence is synchronized only when subscribed to the document. + ### Class: `ShareDB.Query` `query.ready` _(Boolean)_ @@ -467,6 +487,9 @@ Additional fields may be added to the error object for debugging context dependi * 4022 - Database adapter does not support queries * 4023 - Cannot project snapshots of this type * 4024 - Invalid version +* 4025 - Not subscribed to document +* 4026 - Presence data superseded +* 4027 - OT Type does not support presence ### 5000 - Internal error diff --git a/lib/agent.js b/lib/agent.js index b5cef65c1..b4da48c0f 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -2,6 +2,7 @@ var hat = require('hat'); var util = require('./util'); var types = require('./types'); var logger = require('./logger'); +var ShareDBError = require('./error'); /** * Agent deserializes the wire protocol messages received from the stream and @@ -26,6 +27,9 @@ function Agent(backend, stream) { // Map from queryId -> emitter this.subscribedQueries = {}; + // The max presence sequence number received from the client. + this.maxPresenceSeq = 0; + // We need to track this manually to make sure we don't reply to messages // after the stream was closed. this.closed = false; @@ -106,10 +110,17 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { logger.error('Doc subscription stream error', collection, id, data.error); return; } + if (data.a === 'p') { + // Send other clients' presence data + if (data.src !== agent.clientId) agent.send(data); + return; + } if (agent._isOwnOp(collection, data)) return; agent._sendOp(collection, id, data); }); stream.on('end', function() { + var presence = agent._createPresence(collection, id); + agent.backend.sendPresence(presence); // The op stream is done sending, so release its reference var streams = agent.subscribedDocs[collection]; if (!streams || streams[id] !== stream) return; @@ -288,6 +299,13 @@ Agent.prototype._checkRequest = function(request) { // Bulk request if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; if (typeof request.b !== 'object') return 'Invalid bulk subscribe data'; + } else if (request.a === 'p') { + // Presence + if (typeof request.c !== 'string') return 'Invalid collection'; + if (typeof request.d !== 'string') return 'Invalid id'; + if (typeof request.v !== 'number' || request.v < 0) return 'Invalid version'; + if (typeof request.seq !== 'number' || request.seq <= 0) return 'Invalid seq'; + if (typeof request.r !== 'undefined' && typeof request.r !== 'boolean') return 'Invalid "request reply" value'; } }; @@ -324,6 +342,9 @@ Agent.prototype._handleMessage = function(request, callback) { return this._fetchSnapshot(request.c, request.d, request.v, callback); case 'nt': return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback); + case 'p': + var presence = this._createPresence(request.c, request.d, request.p, request.v, request.r, request.seq); + return this._presence(presence, callback); default: callback({code: 4000, message: 'Invalid or unknown message'}); } @@ -614,3 +635,34 @@ Agent.prototype._fetchSnapshot = function (collection, id, version, callback) { Agent.prototype._fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) { this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback); }; + +Agent.prototype._presence = function(presence, callback) { + if (presence.seq <= this.maxPresenceSeq) { + return process.nextTick(function() { + callback(new ShareDBError(4026, 'Presence data superseded')); + }); + } + this.maxPresenceSeq = presence.seq; + if (!this.subscribedDocs[presence.c] || !this.subscribedDocs[presence.c][presence.d]) { + return process.nextTick(function() { + callback(new ShareDBError(4025, 'Cannot send presence. Not subscribed to document: ' + presence.c + ' ' + presence.d)); + }); + } + this.backend.sendPresence(presence, function(err) { + if (err) return callback(err); + callback(null, { seq: presence.seq }); + }); +}; + +Agent.prototype._createPresence = function(collection, id, data, version, requestReply, seq) { + return { + a: 'p', + src: this.clientId, + seq: seq != null ? seq : this.maxPresenceSeq, + c: collection, + d: id, + p: data, + v: version, + r: requestReply + }; +}; diff --git a/lib/backend.js b/lib/backend.js index 442da075c..b265ae1d8 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -48,6 +48,8 @@ function Backend(options) { if (!options.disableSpaceDelimitedActions) { this._shimAfterSubmit(); } + + this.Presence = options.Presence; } module.exports = Backend; emitter.mixin(Backend); @@ -155,6 +157,11 @@ Backend.prototype.connect = function(connection, req) { // not used internal to ShareDB, but it is handy for server-side only user // code that may cache state on the agent and read it in middleware connection.agent = agent; + + // Pass through information on whether or not presence is enabled, + // so that Doc instances can use it. + connection.Presence = this.Presence; + return connection; }; @@ -720,6 +727,11 @@ Backend.prototype._buildSnapshotFromOps = function (id, startingSnapshot, ops, c callback(error, snapshot); }; +Backend.prototype.sendPresence = function(presence, callback) { + var channels = [ this.getDocChannel(presence.c, presence.d) ]; + this.pubsub.publish(channels, presence, callback); +}; + function pluckIds(snapshots) { var ids = []; for (var i = 0; i < snapshots.length; i++) { diff --git a/lib/client/connection.js b/lib/client/connection.js index cd56306b2..222d82338 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -254,6 +254,11 @@ Connection.prototype.handleMessage = function(message) { if (doc) doc._handleOp(err, message); return; + case 'p': + var doc = this.getExisting(message.c, message.d); + if (doc) doc._handlePresence(err, message); + return; + default: logger.warn('Ignoring unrecognized message', message); } @@ -424,6 +429,23 @@ Connection.prototype.sendOp = function(doc, op) { this.send(message); }; +Connection.prototype.sendPresence = function(doc, data, requestReply) { + // Ensure the doc is registered so that it receives the reply message + this._addDoc(doc); + var message = { + a: 'p', + c: doc.collection, + d: doc.id, + p: data, + v: doc.version || 0, + seq: this.seq++ + }; + if (requestReply) { + message.r = true; + } + this.send(message); +}; + /** * Sends a message down the socket diff --git a/lib/client/doc.js b/lib/client/doc.js index 15798f0e5..fb74663b6 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -2,6 +2,8 @@ var emitter = require('../emitter'); var logger = require('../logger'); var ShareDBError = require('../error'); var types = require('../types'); +var DummyPresence = require('../presence/dummy'); +var callEach = require('../util').callEach; /** * A Doc is a client's view on a sharejs document. @@ -29,6 +31,14 @@ var types = require('../types'); * }) * * + * Presence + * -------- + * + * We can associate transient "presence" data with a document, eg caret position, etc. + * The presence data is synchronized on the best-effort basis between clients subscribed to the same document. + * Each client has their own presence data which is read-write. Other clients' data is read-only. + * + * * Events * ------ * @@ -43,9 +53,11 @@ var types = require('../types'); * the data is null. It is passed the data before delteion as an * arguments * - `load ()` Fired when a new snapshot is ingested from a fetch, subscribe, or query + * - `presence ([src])` Fired after the presence data has changed. */ module.exports = Doc; + function Doc(connection, collection, id) { emitter.EventEmitter.call(this); @@ -58,6 +70,10 @@ function Doc(connection, collection, id) { this.type = null; this.data = undefined; + this.presence = connection.Presence + ? new connection.Presence(this) + : new DummyPresence(); + // Array of callbacks or nulls as placeholders this.inflightFetch = []; this.inflightSubscribe = []; @@ -111,10 +127,12 @@ Doc.prototype.destroy = function(callback) { if (callback) return callback(err); return doc.emit('error', err); } + doc.presence.destroy(); doc.connection._destroyDoc(doc); if (callback) callback(); }); } else { + doc.presence.destroy(); doc.connection._destroyDoc(doc); if (callback) callback(); } @@ -195,12 +213,16 @@ Doc.prototype.ingestSnapshot = function(snapshot, callback) { if (this.version > snapshot.v) return callback && callback(); this.version = snapshot.v; + + this.presence.clearCachedOps(); + var type = (snapshot.type === undefined) ? types.defaultType : snapshot.type; this._setType(type); this.data = (this.type && this.type.deserialize) ? this.type.deserialize(snapshot.data) : snapshot.data; this.emit('load'); + this.presence.processAllReceivedPresence(); callback && callback(); }; @@ -222,7 +244,8 @@ Doc.prototype.hasPending = function() { this.inflightFetch.length || this.inflightSubscribe.length || this.inflightUnsubscribe.length || - this.pendingFetch.length + this.pendingFetch.length || + this.presence.hasPending() ); }; @@ -269,6 +292,7 @@ Doc.prototype._handleSubscribe = function(err, snapshot) { if (this.wantSubscribe) this.subscribed = true; this.ingestSnapshot(snapshot, callback); this._emitNothingPending(); + this.flush(); }; Doc.prototype._handleUnsubscribe = function(err) { @@ -330,13 +354,25 @@ Doc.prototype._handleOp = function(err, message) { } this.version++; + this.presence.cacheOp({ + src: message.src, + time: Date.now(), + create: !!message.create, + op: message.op, + del: !!message.del + }); try { this._otApply(message, false); + this.presence.processAllReceivedPresence(); } catch (error) { return this._hardRollback(error); } }; +Doc.prototype._handlePresence = function(err, presence) { + this.presence.handlePresence(err, presence); +}; + // Called whenever (you guessed it!) the connection state changes. This will // happen when we get disconnected & reconnect. Doc.prototype._onConnectionStateChanged = function() { @@ -357,7 +393,10 @@ Doc.prototype._onConnectionStateChanged = function() { if (this.inflightUnsubscribe.length) { var callbacks = this.inflightUnsubscribe; this.inflightUnsubscribe = []; + this.presence.pause(); callEach(callbacks); + } else { + this.presence.pause(); } } }; @@ -417,8 +456,10 @@ Doc.prototype.unsubscribe = function(callback) { if (this.connection.canSend) { var isDuplicate = this.connection.sendUnsubscribe(this); pushActionCallback(this.inflightUnsubscribe, isDuplicate, callback); + this.presence.pause(); return; } + this.presence.pause(); if (callback) process.nextTick(callback); }; @@ -449,6 +490,8 @@ Doc.prototype.flush = function() { if (!this.paused && this.pendingOps.length) { this._sendOp(); } + + this.presence.flush(); }; // Helper function to set op to contain a no-op. @@ -553,6 +596,7 @@ Doc.prototype._otApply = function(op, source) { // Apply the individual op component this.emit('before op', componentOp.op, source); this.data = this.type.apply(this.data, componentOp.op); + this.presence.transformAllPresence(componentOp); this.emit('op', componentOp.op, source); } // Pop whatever was submitted since we started applying this op @@ -565,6 +609,7 @@ Doc.prototype._otApply = function(op, source) { this.emit('before op', op.op, source); // Apply the operation to the local data, mutating it in place this.data = this.type.apply(this.data, op.op); + this.presence.transformAllPresence(op); // Emit an 'op' event once the local data includes the changes from the // op. For locally submitted ops, this will be synchronously with // submission and before the server or other clients have received the op. @@ -581,6 +626,7 @@ Doc.prototype._otApply = function(op, source) { this.type.createDeserialized(op.create.data) : this.type.deserialize(this.type.create(op.create.data)) : this.type.create(op.create.data); + this.presence.transformAllPresence(op); this.emit('create', source); return; } @@ -588,6 +634,7 @@ Doc.prototype._otApply = function(op, source) { if (op.del) { var oldData = this.data; this._setType(null); + this.presence.transformAllPresence(op); this.emit('del', oldData, source); return; } @@ -839,7 +886,7 @@ Doc.prototype.resume = function() { Doc.prototype._opAcknowledged = function(message) { if (this.inflightOp.create) { this.version = message.v; - + this.presence.clearCachedOps(); } else if (message.v !== this.version) { // We should already be at the same version, because the server should // have sent all the ops that have happened before acknowledging our op @@ -851,8 +898,16 @@ Doc.prototype._opAcknowledged = function(message) { // The op was committed successfully. Increment the version number this.version++; + this.presence.cacheOp({ + src: this.inflightOp.src, + time: Date.now(), + create: !!this.inflightOp.create, + op: this.inflightOp.op, + del: !!this.inflightOp.del + }); this._clearInflightOp(); + this.presence.processAllReceivedPresence(); }; Doc.prototype._rollback = function(err) { @@ -900,6 +955,9 @@ Doc.prototype._hardRollback = function(err) { if (this.inflightOp) pendingOps.push(this.inflightOp); pendingOps = pendingOps.concat(this.pendingOps); + // Apply a similar technique for presence. + var pendingPresence = this.presence.hardRollback(); + // Cancel all pending ops and reset if we can't invert this._setType(null); this.version = null; @@ -912,13 +970,22 @@ Doc.prototype._hardRollback = function(err) { // We want to check that no errors are swallowed, so we check that: // - there are callbacks to call, and // - that every single pending op called a callback - // If there are no ops queued, or one of them didn't handle the error, - // then we emit the error. var allOpsHadCallbacks = !!pendingOps.length; for (var i = 0; i < pendingOps.length; i++) { allOpsHadCallbacks = callEach(pendingOps[i].callbacks, err) && allOpsHadCallbacks; } - if (err && !allOpsHadCallbacks) return doc.emit('error', err); + + // Apply the same technique for presence. + var allPresenceHadCallbacks = !!pendingPresence.length; + for (var i = 0; i < pendingPresence.length; i++) { + allPresenceHadCallbacks = callEach(pendingPresence[i], err) && allPresenceHadCallbacks; + } + + // If there are no ops or presence queued, or one of them didn't handle the error, + // then we emit the error. + if (err && !allOpsHadCallbacks && !allPresenceHadCallbacks) { + doc.emit('error', err); + } }); }; @@ -934,15 +1001,3 @@ Doc.prototype._clearInflightOp = function(err) { if (err && !called) return this.emit('error', err); }; - -function callEach(callbacks, err) { - var called = false; - for (var i = 0; i < callbacks.length; i++) { - var callback = callbacks[i]; - if (callback) { - callback(err); - called = true; - } - } - return called; -} diff --git a/lib/presence/dummy.js b/lib/presence/dummy.js new file mode 100644 index 000000000..c86d14116 --- /dev/null +++ b/lib/presence/dummy.js @@ -0,0 +1,19 @@ +function DummyPresence () { +} + +function noop () {} + +DummyPresence.prototype.flush = noop; +DummyPresence.prototype.destroy = noop; +DummyPresence.prototype.clearCachedOps = noop; +DummyPresence.prototype.processAllReceivedPresence = noop; +DummyPresence.prototype.hardRollback = function () { return []; }; +DummyPresence.prototype.transformAllPresence = noop; +DummyPresence.prototype.cacheOp = noop; +DummyPresence.prototype.hasPending = function () { return false }; +DummyPresence.prototype.pause = noop; +DummyPresence.prototype.submit = function () { + console.warn('Attempted to submit presence, but presence is not enabled.'); +}; + +module.exports = DummyPresence; diff --git a/lib/presence/stateless.js b/lib/presence/stateless.js new file mode 100644 index 000000000..876b8e3ad --- /dev/null +++ b/lib/presence/stateless.js @@ -0,0 +1,385 @@ +/* + * Stateless Presence + * ------------------ + * + * This module provides an implementation of presence that works, + * but has some scalability problems. Each time a client joins a document, + * this implementation requests current presence information from all other clients, + * via the server. The server does not store any state at all regarding presence, + * it exists only in clients, hence the name "Stateless Presence". + * + */ +var ShareDBError = require('../error'); +var callEach = require('../util').callEach; + +function StatelessPresence (doc) { + this.doc = doc; + + // The current presence data. + // Map of src -> presence data + // Local src === '' + this.current = {}; + + // The presence objects received from the server. + // Map of src -> presence + this.received = {}; + + // The minimum amount of time to wait before removing processed presence from this.received. + // The processed presence is removed to avoid leaking memory, in case peers keep connecting and disconnecting a lot. + // The processed presence is not removed immediately to enable avoiding race conditions, where messages with lower + // sequence number arrive after messages with higher sequence numbers. + this.receivedTimeout = 60000; + + // If set to true, then the next time the local presence is sent, + // all other clients will be asked to reply with their own presence data. + this.requestReply = true; + + // A list of ops sent by the server. These are needed for transforming presence data, + // if we get that presence data for an older version of the document. + this.cachedOps = []; + + // The ops are cached for at least 1 minute by default, which should be lots, considering that the presence + // data is supposed to be synced in real-time. + this.cachedOpsTimeout = 60000; + + // The sequence number of the inflight presence request. + this.inflightSeq = 0; + + // Callbacks (or null) for pending and inflight presence requests. + this.pending = null; + this.inflight = null; +} + +// Submit presence data to a document. +// This is the only public facing method. +// All the others are marked as internal with a leading "_". +StatelessPresence.prototype.submit = function (data, callback) { + if (data != null) { + if (!this.doc.type) { + var doc = this.doc; + return process.nextTick(function() { + var err = new ShareDBError(4015, 'Cannot submit presence. Document has not been created. ' + doc.collection + '.' + doc.id); + if (callback) return callback(err); + doc.emit('error', err); + }); + } + + if (!this.doc.type.createPresence || !this.doc.type.transformPresence) { + var doc = this.doc; + return process.nextTick(function() { + var err = new ShareDBError(4027, 'Cannot submit presence. Document\'s type does not support presence. ' + doc.collection + '.' + doc.id); + if (callback) return callback(err); + doc.emit('error', err); + }); + } + + data = this.doc.type.createPresence(data); + } + + if (this._setPresence('', data, true) || this.hasPending()) { + if (!this.pending) { + this.pending = []; + } + if (callback) { + this.pending.push(callback); + } + + } else if (callback) { + process.nextTick(callback); + } + + process.nextTick(this.doc.flush.bind(this.doc)); +}; + +StatelessPresence.prototype.handlePresence = function (err, presence) { + if (!this.doc.subscribed) return; + + var src = presence.src; + if (!src) { + // Handle the ACK for the presence data we submitted. + // this.inflightSeq would not equal presence.seq after a hard rollback, + // when all callbacks are flushed with an error. + if (this.inflightSeq === presence.seq) { + var callbacks = this.inflight; + this.inflight = null; + this.inflightSeq = 0; + var called = callbacks && callEach(callbacks, err); + if (err && !called) this.doc.emit('error', err); + this.doc.flush(); + this.doc._emitNothingPending(); + } + return; + } + + // This shouldn't happen but check just in case. + if (err) return this.doc.emit('error', err); + + if (presence.r && !this.pending) { + // Another client requested us to share our current presence data + this.pending = []; + this.doc.flush(); + } + + // Ignore older messages which arrived out of order + if ( + this.received[src] && ( + this.received[src].seq > presence.seq || + (this.received[src].seq === presence.seq && presence.v != null) + ) + ) return; + + this.received[src] = presence; + + if (presence.v == null) { + // null version should happen only when the server automatically sends + // null presence for an unsubscribed client + presence.processedAt = Date.now(); + return this._setPresence(src, null, true); + } + + // Get missing ops first, if necessary + if (this.doc.version == null || this.doc.version < presence.v) return this.doc.fetch(); + + this._processReceivedPresence(src, true); +}; + +// If emit is true and presence has changed, emits a presence event. +// Returns true, if presence has changed for src. Otherwise false. +StatelessPresence.prototype._processReceivedPresence = function (src, emit) { + if (!src) return false; + var presence = this.received[src]; + if (!presence) return false; + + if (presence.processedAt != null) { + if (Date.now() >= presence.processedAt + this.receivedTimeout) { + // Remove old received and processed presence. + delete this.received[src]; + } + return false; + } + + if (this.doc.version == null || this.doc.version < presence.v) { + // keep waiting for the missing snapshot or ops. + return false; + } + + if (presence.p == null) { + // Remove presence data as requested. + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + if (!this.doc.type || !this.doc.type.createPresence || !this.doc.type.transformPresence) { + // Remove presence data because the document is not created or its type does not support presence + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + if (this.doc.inflightOp && this.doc.inflightOp.op == null) { + // Remove presence data because presence.received can be transformed only against "op", not "create" nor "del" + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + for (var i = 0; i < this.doc.pendingOps.length; i++) { + if (this.doc.pendingOps[i].op == null) { + // Remove presence data because presence.received can be transformed only against "op", not "create" nor "del" + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + } + + var startIndex = this.cachedOps.length - (this.doc.version - presence.v); + if (startIndex < 0) { + // Remove presence data because we can't transform presence.received + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + for (var i = startIndex; i < this.cachedOps.length; i++) { + if (this.cachedOps[i].op == null) { + // Remove presence data because presence.received can be transformed only against "op", not "create" nor "del" + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + } + + // Make sure the format of the data is correct + var data = this.doc.type.createPresence(presence.p); + + // Transform against past ops + for (var i = startIndex; i < this.cachedOps.length; i++) { + var op = this.cachedOps[i]; + data = this.doc.type.transformPresence(data, op.op, presence.src === op.src); + } + + // Transform against pending ops + if (this.doc.inflightOp) { + data = this.doc.type.transformPresence(data, this.doc.inflightOp.op, false); + } + + for (var i = 0; i < this.doc.pendingOps.length; i++) { + data = this.doc.type.transformPresence(data, this.doc.pendingOps[i].op, false); + } + + // Set presence data + presence.processedAt = Date.now(); + return this._setPresence(src, data, emit); +}; + +StatelessPresence.prototype.processAllReceivedPresence = function () { + if (!this) return; + var srcList = Object.keys(this.received); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (this._processReceivedPresence(src)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList, true); +}; + +StatelessPresence.prototype._transformPresence = function (src, op) { + var presenceData = this.current[src]; + if (op.op != null) { + var isOwnOperation = src === (op.src || ''); + presenceData = this.doc.type.transformPresence(presenceData, op.op, isOwnOperation); + } else { + presenceData = null; + } + return this._setPresence(src, presenceData); +}; + +StatelessPresence.prototype.transformAllPresence = function (op) { + if (!this) return; + var srcList = Object.keys(this.current); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (this.doc._transformPresence(src, op)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList, false); +}; + +StatelessPresence.prototype._pausePresence = function () { + if (!this) return; + + if (this.inflight) { + this.pending = this.pending + ? this.inflight.concat(this.pending) + : this.inflight; + this.inflight = null; + this.inflightSeq = 0; + } else if (!this.pending && this.current[''] != null) { + this.pending = []; + } + this.received = {}; + this.requestReply = true; + var srcList = Object.keys(this.current); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (src && this._setPresence(src, null)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList, false); +}; + +// If emit is true and presence has changed, emits a presence event. +// Returns true, if presence has changed. Otherwise false. +StatelessPresence.prototype._setPresence = function (src, data, emit) { + if (data == null) { + if (this.current[src] == null) return false; + delete this.current[src]; + } else { + var isPresenceEqual = + this.current[src] === data || + (this.doc.type.comparePresence && this.doc.type.comparePresence(this.current[src], data)); + if (isPresenceEqual) return false; + this.current[src] = data; + } + if (emit) this._emitPresence([ src ], true); + return true; +}; + +StatelessPresence.prototype._emitPresence = function (srcList, submitted) { + if (srcList && srcList.length > 0) { + var doc = this.doc; + process.nextTick(function() { + doc.emit('presence', srcList, submitted); + }); + } +}; + +StatelessPresence.prototype.cacheOp = function (op) { + if (!this) return; + // Remove the old ops. + var oldOpTime = Date.now() - this.cachedOpsTimeout; + var i; + for (i = 0; i < this.cachedOps.length; i++) { + if (this.cachedOps[i].time >= oldOpTime) { + break; + } + } + if (i > 0) { + this.cachedOps.splice(0, i); + } + + // Cache the new op. + this.cachedOps.push(op); +}; + +// If there are no pending ops, this.doc method sends the pending presence data, if possible. +StatelessPresence.prototype.flush = function () { + if (this.doc.subscribed && !this.inflight && this.pending && !this.doc.hasWritePending()) { + this.inflight = this.pending; + this.inflightSeq = this.doc.connection.seq; + this.pending = null; + this.doc.connection.sendPresence(this.doc, this.current[''], this.requestReply); + this.requestReply = false; + } +}; + +StatelessPresence.prototype._destroyPresence = function () { + this.received = {}; + this.cachedOps.length = 0; +}; + +// Reset presence-related properties. +StatelessPresence.prototype.hardRollback = function () { + var pendingPresence = []; + if (this.inflight) pendingPresence.push(this.inflight); + if (this.pending) pendingPresence.push(this.pending); + + this.inflight = null; + this.inflightSeq = 0; + this.pending = null; + this.cachedOps.length = 0; + this.received = {}; + this.requestReply = true; + + var srcList = Object.keys(this.current); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (this._setPresence(src, null)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList, false); + return pendingPresence; +}; + +StatelessPresence.prototype.clearCachedOps = function () { + this.cachedOps.length = 0; +}; + +StatelessPresence.prototype.hasPending = function () { + return this.inflight || this.pending; +}; + +module.exports = StatelessPresence; diff --git a/lib/util.js b/lib/util.js index 6ca346ffe..ad7048a58 100644 --- a/lib/util.js +++ b/lib/util.js @@ -22,3 +22,15 @@ exports.isValidVersion = function (version) { exports.isValidTimestamp = function (timestamp) { return exports.isValidVersion(timestamp); }; + +exports.callEach = function (callbacks, err) { + var called = false; + for (var i = 0; i < callbacks.length; i++) { + var callback = callbacks[i]; + if (callback) { + callback(err); + called = true; + } + } + return called; +}; diff --git a/test/client/presence-type.js b/test/client/presence-type.js new file mode 100644 index 000000000..6138eae7f --- /dev/null +++ b/test/client/presence-type.js @@ -0,0 +1,78 @@ +// A simple type for testing presence, where: +// +// - snapshot is a list +// - operation is { index, value } -> insert value at index in snapshot +// - presence is { index } -> an index in the snapshot +exports.type = { + name: 'wrapped-presence-no-compare', + uri: 'http://sharejs.org/types/wrapped-presence-no-compare', + create: create, + apply: apply, + transform: transform, + createPresence: createPresence, + transformPresence: transformPresence +}; + +// The same as `exports.type` but implements `comparePresence`. +exports.type2 = { + name: 'wrapped-presence-with-compare', + uri: 'http://sharejs.org/types/wrapped-presence-with-compare', + create: create, + apply: apply, + transform: transform, + createPresence: createPresence, + transformPresence: transformPresence, + comparePresence: comparePresence +}; + +// The same as `exports.type` but `presence.index` is unwrapped. +exports.type3 = { + name: 'unwrapped-presence', + uri: 'http://sharejs.org/types/unwrapped-presence', + create: create, + apply: apply, + transform: transform, + createPresence: createPresence2, + transformPresence: transformPresence2 +}; + +function create(data) { + return data || []; +} + +function apply(snapshot, op) { + snapshot.splice(op.index, 0, op.value); + return snapshot; +} + +function transform(op1, op2, side) { + return op1.index < op2.index || (op1.index === op2.index && side === 'left') + ? op1 + : { index: op1.index + 1, value: op1.value }; +} + +function createPresence(data) { + return { index: (data && data.index) | 0 }; +} + +function transformPresence(presence, op, isOwnOperation) { + return presence.index < op.index || (presence.index === op.index && !isOwnOperation) + ? presence + : { index: presence.index + 1 }; +} + +function comparePresence(presence1, presence2) { + return presence1 === presence2 || + (presence1 == null && presence2 == null) || + (presence1 != null && presence2 != null && presence1.index === presence2.index); +} + +function createPresence2(data) { + return data | 0; +} + +function transformPresence2(presence, op, isOwnOperation) { + return presence < op.index || (presence === op.index && !isOwnOperation) + ? presence + : presence + 1; +} diff --git a/test/client/presence.js b/test/client/presence.js new file mode 100644 index 000000000..51c6a4e59 --- /dev/null +++ b/test/client/presence.js @@ -0,0 +1,1458 @@ +var async = require('async'); +var lolex = require('lolex'); +var util = require('../util'); +var errorHandler = util.errorHandler; +var Backend = require('../../lib/backend'); +var ShareDBError = require('../../lib/error'); +var StatelessPresence = require('../../lib/presence/stateless'); +var DummyPresence = require('../../lib/presence/dummy'); +var expect = require('expect.js'); +var types = require('../../lib/types'); +var presenceType = require('./presence-type'); +types.register(presenceType.type); +types.register(presenceType.type2); +types.register(presenceType.type3); + +describe('client presence', function() { + it('Uses DummyPresence if presence option not provided', function() { + var backend = new Backend(); + var connection = backend.connect(); + var doc = connection.get('dogs', 'fido'); + expect(doc.presence instanceof DummyPresence); + }); +}); + +[ + 'wrapped-presence-no-compare', + 'wrapped-presence-with-compare', + 'unwrapped-presence' +].forEach(function(typeName) { + + function p(index) { + return typeName === 'unwrapped-presence' ? index : { index: index }; + } + + describe('client presence (' + typeName + ')', function() { + beforeEach(function() { + this.backend = new Backend({ Presence: StatelessPresence }); + this.connection = this.backend.connect(); + this.connection2 = this.backend.connect(); + this.doc = this.connection.get('dogs', 'fido'); + this.doc2 = this.connection2.get('dogs', 'fido'); + }); + + afterEach(function(done) { + this.backend.close(done); + }); + + it('sends presence immediately', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), errorHandler(done)); + this.doc2.once('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('sends presence after pending ops', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc.submitOp({ index: 0, value: 'a' }, errorHandler(done)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), errorHandler(done)); + this.doc2.once('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it.skip('waits for pending ops before processing future presence', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + // A hack to send presence for a future version. + this.doc.version += 2; + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), function(err) { + if (err) return done(err); + this.doc.version -= 2; + this.doc.submitOp({ index: 0, value: 'a' }, errorHandler(done)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (own ops, presence.index < op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), + this.doc.submitOp.bind(this.doc, { index: 2, value: 'c' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(0), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (own ops, presence.index === op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'c' }), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (own ops, presence.index > op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 0, value: 'b' }), + this.doc.submitOp.bind(this.doc, { index: 0, value: 'a' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'c' ]; + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (non-own ops, presence.index < op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), + this.doc2.submitOp.bind(this.doc2, { index: 2, value: 'c' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(0), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (non-own ops, presence.index === op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'c' }), + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (non-own ops, presence.index > op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc2.submitOp.bind(this.doc2, { index: 0, value: 'b' }), + this.doc2.submitOp.bind(this.doc2, { index: 0, value: 'a' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'c' ]; + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (transform against non-op)', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.create.bind(this.doc, [], typeName), + this.doc.submitOp.bind(this.doc, { index: 0, value: 'a' }), + this.doc.del.bind(this.doc), + this.doc.create.bind(this.doc, [ 'b' ], typeName), + function(done) { + this.doc2.once('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'b' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(0), errorHandler(done)); + }.bind(this), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'b' ]); + expect(this.doc2.presence.current).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 2; + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (no cached ops)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), + this.doc.submitOp.bind(this.doc, { index: 2, value: 'c' }), + function(done) { + this.doc2.once('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(0), errorHandler(done)); + }.bind(this), + function(done) { + this.doc2.presence.cachedOps = []; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence.current).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against local delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.presence.submit.bind(this.doc2, p(0)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current).to.not.have.key(''); + expect(this.doc.presence.current).to.not.have.key(this.connection2.id); + done(); + }.bind(this)); + this.doc.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against non-local delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.presence.submit.bind(this.doc2, p(0)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current).to.not.have.key(''); + expect(this.doc.presence.current).to.not.have.key(this.connection2.id); + done(); + }.bind(this)); + this.doc2.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against local op (presence.index != op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.presence.submit.bind(this.doc2, p(2)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(3)); + done(); + }.bind(this)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against non-local op (presence.index != op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.presence.submit.bind(this.doc2, p(2)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(3)); + done(); + }.bind(this)); + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against local op (presence.index == op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(1)), + this.doc2.presence.submit.bind(this.doc2, p(1)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ '' ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current['']).to.eql(p(2)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against non-local op (presence.index == op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(1)), + this.doc2.presence.submit.bind(this.doc2, p(1)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current['']).to.eql(p(1)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(2)); + done(); + }.bind(this)); + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('caches local ops', function(allDone) { + var op = { index: 1, value: 'b' }; + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op), + this.doc.del.bind(this.doc), + function(done) { + expect(this.doc.presence.cachedOps.length).to.equal(3); + expect(this.doc.presence.cachedOps[0].create).to.equal(true); + expect(this.doc.presence.cachedOps[1].op).to.equal(op); + expect(this.doc.presence.cachedOps[2].del).to.equal(true); + done(); + }.bind(this) + ], allDone); + }); + + it('caches non-local ops', function(allDone) { + var op = { index: 1, value: 'b' }; + async.series([ + this.doc2.subscribe.bind(this.doc2), + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op), + this.doc.del.bind(this.doc), + setTimeout, + function(done) { + expect(this.doc2.presence.cachedOps.length).to.equal(3); + expect(this.doc2.presence.cachedOps[0].create).to.equal(true); + expect(this.doc2.presence.cachedOps[1].op).to.eql(op); + expect(this.doc2.presence.cachedOps[2].del).to.equal(true); + done(); + }.bind(this) + ], allDone); + }); + + it('expires cached ops', function(allDone) { + var clock = lolex.install(); + var op1 = { index: 1, value: 'b' }; + var op2 = { index: 2, value: 'b' }; + var op3 = { index: 3, value: 'b' }; + this.doc.presence.cachedOpsTimeout = 60; + async.series([ + // Cache 2 ops. + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op1), + function(done) { + expect(this.doc.presence.cachedOps.length).to.equal(2); + expect(this.doc.presence.cachedOps[0].create).to.equal(true); + expect(this.doc.presence.cachedOps[1].op).to.equal(op1); + done(); + }.bind(this), + + // Cache another op before the first 2 expire. + function (callback) { + setTimeout(callback, 30); + clock.next(); + }, + this.doc.submitOp.bind(this.doc, op2), + function(done) { + expect(this.doc.presence.cachedOps.length).to.equal(3); + expect(this.doc.presence.cachedOps[0].create).to.equal(true); + expect(this.doc.presence.cachedOps[1].op).to.equal(op1); + expect(this.doc.presence.cachedOps[2].op).to.equal(op2); + done(); + }.bind(this), + + // Cache another op after the first 2 expire. + function (callback) { + setTimeout(callback, 31); + clock.next(); + }, + this.doc.submitOp.bind(this.doc, op3), + function(done) { + expect(this.doc.presence.cachedOps.length).to.equal(2); + expect(this.doc.presence.cachedOps[0].op).to.equal(op2); + expect(this.doc.presence.cachedOps[1].op).to.equal(op3); + clock.uninstall(); + done(); + }.bind(this) + ], allDone); + }); + + it('requests reply presence when sending presence for the first time', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + if (srcList[0] === '') { + expect(srcList).to.eql([ '' ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current['']).to.eql(p(1)); + expect(this.doc2.presence.current).to.not.have.key(this.connection.id); + } else { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence.current['']).to.eql(p(1)); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence.requestReply).to.equal(false); + done(); + } + }.bind(this)); + this.doc2.presence.submit(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence for uncreated document: callback(err)', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.presence.submit(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4015); + done(); + }); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence for uncreated document: emit(err)', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4015); + done(); + }); + this.doc.presence.submit(p(0)); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence, if type does not support presence: callback(err)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, {}), + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.presence.submit(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4027); + done(); + }); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence, if type does not support presence: emit(err)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, {}), + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4027); + done(); + }); + this.doc.presence.submit(p(0)); + }.bind(this) + ], allDone); + }); + + it('submits null presence', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + this.doc.presence.submit.bind(this.doc, null) + ], allDone); + }); + + it('sends presence once, if submitted multiple times synchronously', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(2)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(0), errorHandler(done)); + this.doc.presence.submit(p(1), errorHandler(done)); + this.doc.presence.submit(p(2), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('buffers presence until subscribed', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), errorHandler(done)); + setTimeout(function() { + this.doc.subscribe(function(err) { + if (err) return done(err); + expect(this.doc2.presence.current).to.eql({}); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('buffers presence when disconnected', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.connection.close(); + this.doc.presence.submit(p(1), errorHandler(done)); + process.nextTick(function() { + this.backend.connect(this.connection); + this.doc.presence.requestReply = false; + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('submits presence without a callback', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(0)); + }.bind(this) + ], allDone); + }); + + it('hasPending is true, if there is pending presence', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + this.doc.presence.submit(p(0)); + expect(this.doc.hasPending()).to.equal(true); + expect(!!this.doc.presence.pending).to.equal(true); + expect(!!this.doc.presence.inflight).to.equal(false); + this.doc.whenNothingPending(done); + }.bind(this), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + expect(!!this.doc.presence.pending).to.equal(false); + expect(!!this.doc.presence.inflight).to.equal(false); + done(); + }.bind(this) + ], allDone); + }); + + it('hasPending is true, if there is inflight presence', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + this.doc.presence.submit(p(0)); + expect(this.doc.hasPending()).to.equal(true); + expect(!!this.doc.presence.pending).to.equal(true); + expect(!!this.doc.presence.inflight).to.equal(false); + process.nextTick(done); + }.bind(this), + function(done) { + expect(this.doc.hasPending()).to.equal(true); + expect(!!this.doc.presence.pending).to.equal(false); + expect(!!this.doc.presence.inflight).to.equal(true); + this.doc.whenNothingPending(done); + }.bind(this), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + expect(!!this.doc.presence.pending).to.equal(false); + expect(!!this.doc.presence.inflight).to.equal(false); + done(); + }.bind(this) + ], allDone); + }); + + it('receives presence after doc is deleted', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + setTimeout, + function(done) { + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + // The call to `del` transforms the presence and fires the event. + // The call to `presence.submit` does not fire the event because presence is already null. + expect(submitted).to.equal(false); + expect(this.doc2.presence.current).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), errorHandler(done)); + this.doc2.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on peer disconnection', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.presence.submit.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence.current['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current).to.not.have.key(connectionId); + expect(this.doc2.presence.current['']).to.eql(p(1)); + done(); + }.bind(this)); + this.connection.close(); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on own disconnection', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.presence.submit.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence.current['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(false); + expect(this.doc2.presence.current).to.not.have.key(connectionId); + expect(this.doc2.presence.current['']).to.eql(p(1)); + done(); + }.bind(this)); + this.connection2.close(); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on peer unsubscribe', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.presence.submit.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence.current['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current).to.not.have.key(connectionId); + expect(this.doc2.presence.current['']).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.unsubscribe(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on own unsubscribe', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.presence.submit.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence.current['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(false); + expect(this.doc2.presence.current).to.not.have.key(connectionId); + expect(this.doc2.presence.current['']).to.eql(p(1)); + done(); + }.bind(this)); + this.doc2.unsubscribe(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('pauses inflight and pending presence on disconnect', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + var called = 0; + function callback(err) { + if (err) return done(err); + if (++called === 2) done(); + } + this.doc.presence.submit(p(0), callback); + process.nextTick(function() { + this.doc.presence.submit(p(1), callback); + this.connection.close(); + process.nextTick(function() { + this.backend.connect(this.connection); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('pauses inflight and pending presence on unsubscribe', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + var called = 0; + function callback(err) { + if (err) return done(err); + if (++called === 2) done(); + } + this.doc.presence.submit(p(0), callback); + process.nextTick(function() { + this.doc.presence.submit(p(1), callback); + this.doc.unsubscribe(errorHandler(done)); + process.nextTick(function() { + this.doc.subscribe(errorHandler(done)); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('re-synchronizes presence after reconnecting', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.presence.submit.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + this.connection.close(); + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current).to.not.have.key(this.connection2.id); + this.backend.connect(this.connection); + process.nextTick(done); + }.bind(this), + setTimeout, // wait for re-sync + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + process.nextTick(done); + }.bind(this) + ], allDone); + }); + + it('re-synchronizes presence after resubscribing', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.presence.submit.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + this.doc.unsubscribe(errorHandler(done)); + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current).to.not.have.key(this.connection2.id); + this.doc.subscribe(done); + }.bind(this), + setTimeout, // wait for re-sync + function(done) { + expect(this.doc.presence.current['']).to.eql(p(0)); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(1)); + process.nextTick(done); + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight and pending ops (presence.index < op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(0), errorHandler(done)); + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)) + this.doc2.submitOp({ index: 2, value: 'c' }, errorHandler(done)) + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight and pending ops (presence.index === op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), errorHandler(done)); + this.doc2.submitOp({ index: 1, value: 'c' }, errorHandler(done)) + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)) + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight and pending ops (presence.index > op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(1), errorHandler(done)); + this.doc2.submitOp({ index: 0, value: 'b' }, errorHandler(done)) + this.doc2.submitOp({ index: 0, value: 'a' }, errorHandler(done)) + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(1)), + setTimeout, + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + // The call to `del` transforms the presence and fires the event. + // The call to `presence.submit` does not fire the event because presence is already null. + expect(submitted).to.equal(false); + expect(this.doc2.presence.current).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(2), errorHandler(done)); + this.doc2.del(errorHandler(done)); + this.doc2.create([ 'c' ], typeName, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms received presence against a pending delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(1)), + setTimeout, + function(done) { + var firstCall = true; + this.doc2.on('presence', function(srcList, submitted) { + if (firstCall) return firstCall = false; + expect(srcList).to.eql([ this.connection.id ]); + // The call to `del` transforms the presence and fires the event. + // The call to `presence.submit` does not fire the event because presence is already null. + expect(submitted).to.equal(false); + expect(this.doc2.presence.current).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + this.doc.presence.requestReply = false; + this.doc.presence.submit(p(2), errorHandler(done)); + this.doc2.submitOp({ index: 0, value: 'b' }, errorHandler(done)); + this.doc2.del(errorHandler(done)); + this.doc2.create([ 'c' ], typeName, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('emits the same presence only if comparePresence is not implemented (local presence)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.presence.submit.bind(this.doc, p(1)), + function(done) { + this.doc.on('presence', function(srcList, submitted) { + if (typeName === 'wrapped-presence-no-compare') { + expect(srcList).to.eql([ '' ]); + expect(submitted).to.equal(true); + expect(this.doc.presence.current['']).to.eql(p(1)); + done(); + } else { + done(new Error('Unexpected presence event')); + } + }.bind(this)); + this.doc.presence.submit(p(1), typeName === 'wrapped-presence-no-compare' ? errorHandler(done) : done); + }.bind(this) + ], allDone); + }); + + it('emits the same presence only if comparePresence is not implemented (non-local presence)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(1)), + setTimeout, + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + if (typeName === 'wrapped-presence-no-compare') { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence.current[this.connection.id]).to.eql(p(1)); + done(); + } else { + done(new Error('Unexpected presence event')); + } + }.bind(this)); + this.doc.presence.submit(p(1), typeName === 'wrapped-presence-no-compare' ? errorHandler(done) : done); + }.bind(this) + ], allDone); + }); + + it('returns an error when not subscribed on the server', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + this.connection.sendUnsubscribe(this.doc); + process.nextTick(done); + }.bind(this), + function(done) { + this.doc.on('error', done); + this.doc.presence.submit(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4025); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('emits an error when not subscribed on the server and no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + this.connection.sendUnsubscribe(this.doc); + process.nextTick(done); + }.bind(this), + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4025); + done(); + }.bind(this)); + this.doc.presence.submit(p(0)); + }.bind(this) + ], allDone); + }); + + it('returns an error when the server gets an old sequence number', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.presence.submit.bind(this.doc, p(0)), + setTimeout, + function(done) { + this.doc.on('error', done); + this.connection.seq--; + this.doc.presence.submit(p(1), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('emits an error when the server gets an old sequence number and no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.presence.submit.bind(this.doc, p(0)), + setTimeout, + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + done(); + }.bind(this)); + this.connection.seq--; + this.doc.presence.submit(p(1)); + }.bind(this) + ], allDone); + }); + + it('does not publish presence unnecessarily', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.presence.submit.bind(this.doc, p(0)), + setTimeout, + function(done) { + this.doc.on('error', done); + // Decremented sequence number would cause the server to return an error, however, + // the message won't be sent to the server at all because the presence data has not changed. + this.connection.seq--; + this.doc.presence.submit(p(0), function(err) { + if (typeName === 'wrapped-presence-no-compare') { + // The OT type does not support comparing presence. + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + } else { + expect(err).to.not.be.ok(); + } + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('does not publish presence unnecessarily when no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.presence.submit.bind(this.doc, p(0)), + setTimeout, + function(done) { + this.doc.on('error', function(err) { + if (typeName === 'wrapped-presence-no-compare') { + // The OT type does not support comparing presence. + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + done(); + } else { + done(err); + } + }.bind(this)); + // Decremented sequence number would cause the server to return an error, however, + // the message won't be sent to the server at all because the presence data has not changed. + this.connection.seq--; + this.doc.presence.submit(p(0)); + if (typeName !== 'wrapped-presence-no-compare') { + process.nextTick(done); + } + }.bind(this) + ], allDone); + }); + + it('returns an error when publishing presence fails', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + setTimeout, + function(done) { + var sendPresence = this.backend.sendPresence; + this.backend.sendPresence = function(presence, callback) { + if (presence.a === 'p' && presence.v != null) { + return callback(new ShareDBError(-1, 'Test publishing error')); + } + sendPresence.apply(this, arguments); + }; + this.doc.on('error', done); + this.doc.presence.submit(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(-1); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('emits an error when publishing presence fails and no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + setTimeout, + function(done) { + var sendPresence = this.backend.sendPresence; + this.backend.sendPresence = function(presence, callback) { + if (presence.a === 'p' && presence.v != null) { + return callback(new ShareDBError(-1, 'Test publishing error')); + } + sendPresence.apply(this, arguments); + }; + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(-1); + done(); + }.bind(this)); + this.doc.presence.submit(p(0)); + }.bind(this) + ], allDone); + }); + + it('clears presence on hard rollback and emits an error', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'b', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.presence.submit.bind(this.doc2, p(0)), + setTimeout, + function(done) { + // A hack to allow testing of hard rollback of both inflight and pending presence. + var doc = this.doc; + var _handlePresence = this.doc._handlePresence; + this.doc._handlePresence = function(err, presence) { + setTimeout(function() { + _handlePresence.call(doc, err, presence); + }); + }; + process.nextTick(done); + }.bind(this), + this.doc.presence.submit.bind(this.doc, p(1)), // presence.inflight + process.nextTick, // wait for "presence" event + this.doc.presence.submit.bind(this.doc, p(2)), // presence.pending + process.nextTick, // wait for "presence" event + function(done) { + var presenceEmitted = false; + this.doc.on('presence', function(srcList, submitted) { + expect(presenceEmitted).to.equal(false); + presenceEmitted = true; + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current).to.not.have.key(''); + expect(this.doc.presence.current).to.not.have.key(this.connection2.id); + }.bind(this)); + + this.doc.on('error', function(err) { + expect(presenceEmitted).to.equal(true); + expect(err).to.be.an(Error); + expect(err.code).to.equal(4000); + done(); + }.bind(this)); + + // send an invalid op + this.doc._submit({}, null); + }.bind(this) + ], allDone); + }); + + it('clears presence on hard rollback and executes all callbacks', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'b', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.presence.submit.bind(this.doc, p(0)), + this.doc2.presence.submit.bind(this.doc2, p(0)), + setTimeout, + function(done) { + // A hack to allow testing of hard rollback of both inflight and pending presence. + var doc = this.doc; + var _handlePresence = this.doc._handlePresence; + this.doc._handlePresence = function(err, presence) { + setTimeout(function() { + _handlePresence.call(doc, err, presence); + }); + }; + process.nextTick(done); + }.bind(this), + function(done) { + var presenceEmitted = false; + var called = 0; + function callback(err) { + expect(presenceEmitted).to.equal(true); + expect(err).to.be.an(Error); + expect(err.code).to.equal(4000); + if (++called < 3) return; + done(); + } + this.doc.presence.submit(p(1), callback); // presence.inflight + process.nextTick(function() { // wait for presence event + this.doc.presence.submit(p(2), callback); // presence.pending + process.nextTick(function() { // wait for presence event + this.doc.on('presence', function(srcList, submitted) { + expect(presenceEmitted).to.equal(false); + presenceEmitted = true; + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence.current).to.not.have.key(''); + expect(this.doc.presence.current).to.not.have.key(this.connection2.id); + }.bind(this)); + this.doc.on('error', done); + + // send an invalid op + this.doc._submit({ index: 3, value: 'b' }, null, callback); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + function testReceivedMessageExpiry(expireCache, reduceSequence) { + return function(allDone) { + var lastPresence = null; + var handleMessage = this.connection.handleMessage; + this.connection.handleMessage = function(message) { + if (message.a === 'p' && message.src) { + lastPresence = JSON.parse(JSON.stringify(message)); + } + return handleMessage.apply(this, arguments); + }; + if (expireCache) { + this.doc.presence.receivedTimeout = 0; + } + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.presence.requestReply = false; + this.doc2.presence.submit(p(0), done); + }.bind(this), + setTimeout, + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), // forces processing of all received presence + setTimeout, + function(done) { + expect(this.doc.data).to.eql([ 'a', 'b' ]); + expect(this.doc.presence.current[this.connection2.id]).to.eql(p(0)); + // Replay the `lastPresence` with modified payload. + lastPresence.p = p(1); + lastPresence.v++; // +1 to account for the op above + if (reduceSequence) { + lastPresence.seq--; + } + this.connection.handleMessage(lastPresence); + process.nextTick(done); + }.bind(this), + function(done) { + expect(this.doc.presence.current[this.connection2.id]).to.eql(expireCache ? p(1) : p(0)); + process.nextTick(done); + }.bind(this) + ], allDone); + }; + } + + it('ignores an old message (cache not expired, presence.seq === cachedPresence.seq)', testReceivedMessageExpiry(false, false)); + it('ignores an old message (cache not expired, presence.seq < cachedPresence.seq)', testReceivedMessageExpiry(false, true)); + it('processes an old message (cache expired, presence.seq === cachedPresence.seq)', testReceivedMessageExpiry(true, false)); + it('processes an old message (cache expired, presence.seq < cachedPresence.seq)', testReceivedMessageExpiry(true, true)); + }); +}); diff --git a/test/client/submit.js b/test/client/submit.js index 82cecbbe3..1314cfa89 100644 --- a/test/client/submit.js +++ b/test/client/submit.js @@ -1078,6 +1078,39 @@ describe('client submit', function() { }); }); + it('hasWritePending is false when create\'s callback is executed', function(done) { + var doc = this.backend.connect().get('dogs', 'fido'); + doc.create({age: 3}, function(err) { + if (err) return done(err); + expect(doc.hasWritePending()).equal(false); + done(); + }); + }); + + it('hasWritePending is false when submimtOp\'s callback is executed', function(done) { + var doc = this.backend.connect().get('dogs', 'fido'); + doc.create({age: 3}, function(err) { + if (err) return done(err); + doc.submitOp({p: ['age'], na: 2}, function(err) { + if (err) return done(err); + expect(doc.hasWritePending()).equal(false); + done(); + }); + }); + }); + + it('hasWritePending is false when del\'s callback is executed', function(done) { + var doc = this.backend.connect().get('dogs', 'fido'); + doc.create({age: 3}, function(err) { + if (err) return done(err); + doc.del(function(err) { + if (err) return done(err); + expect(doc.hasWritePending()).equal(false); + done(); + }); + }); + }); + describe('type.deserialize', function() { it('can create a new doc', function(done) { var doc = this.backend.connect().get('dogs', 'fido'); diff --git a/test/util.js b/test/util.js index 5f982ed6e..a7c58c38e 100644 --- a/test/util.js +++ b/test/util.js @@ -15,6 +15,12 @@ exports.pluck = function(docs, key) { return values; }; +exports.errorHandler = function(callback) { + return function(err) { + if (err) callback(err); + }; +}; + // Wrap a done function to call back only after a specified number of calls. // For example, `var callbackAfter = callAfter(1, callback)` means that if // `callbackAfter` is called once, it won't call back. If it is called twice