Skip to content

Commit 951dc30

Browse files
committed
recent_senders: Handle moved messages
This is similar to how we add support to handling moves for unreads (commit 34e6201), especially the optimizations to avoid making unnecessary copies of message IDs when the entire conversation is moved (e.g. resolve/unresolve topic). An alternative approach to this is extracting helpers from handleMessages and handleDeleteMessageEvent and combining the two to handle moves, like web does (https://github.com/zulip/zulip/blob/bd04a30b/web/src/recent_senders.ts#L165-L190). Compared to that, creating a dedicated helper (this commit) makes it more straightforward to optimize for our performance needs. (The tests do not have to use PerAccountStore, but this setup makes it a bit more integrated.) Fixes: #901
1 parent 5d9852a commit 951dc30

File tree

3 files changed

+245
-0
lines changed

3 files changed

+245
-0
lines changed

lib/model/recent_senders.dart

+91
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,64 @@ class RecentSenders {
6868
[senderId] ??= MessageIdTracker()).add(messageId);
6969
}
7070

71+
/// Handles channel/topic updates when messages are moved.
72+
///
73+
/// [cachedMessages] should just be a map of messages we know about, i.e.
74+
/// [MessageStore.messages]. It doesn't matter whether the same
75+
/// [UpdateMessageEvent] has been handled by the [MessageStore],
76+
/// since only the sender IDs, which do not change, are looked at.
77+
///
78+
/// This is a no-op if no message move happened.
79+
void handleUpdateMessageEvent(UpdateMessageEvent event, Map<int, Message> cachedMessages) {
80+
if (event.moveData == null) {
81+
return;
82+
}
83+
final UpdateMessageMoveData(
84+
:origStreamId, :newStreamId, :origTopic, :newTopic) = event.moveData!;
85+
86+
final messagesBySender = _groupStreamMessageIdsBySender(event.messageIds, cachedMessages);
87+
final sendersInStream = streamSenders[origStreamId];
88+
final topicsInStream = topicSenders[origStreamId];
89+
final sendersInTopic = topicsInStream?[origTopic];
90+
for (final MapEntry(key: senderId, value: messages) in messagesBySender.entries) {
91+
// The later `popAll` calls require the message IDs to be sorted in
92+
// ascending order. Only sort as many as we need: the message IDs
93+
// with the same sender, instead of all of them in `event.messageIds`.
94+
// TOOD(server) make this an API guarantee. CZO discussion:
95+
// https://chat.zulip.org/#narrow/channel/412-api-documentation/topic/Make.20message_ids.20from.20message.20update.20event.20sorted/near/2143785
96+
messages.sort();
97+
98+
if (newStreamId != origStreamId) {
99+
final streamTracker = sendersInStream?[senderId];
100+
// All messages from both `messages` and `streamTracker` are from the
101+
// same sender and the same channel. `messages` contain only messages
102+
// known to `store.messages`; all of them should have made there way
103+
// to the recent senders data structure as well.
104+
assert(messages.every((id) => streamTracker!.ids.contains(id)));
105+
streamTracker?.removeAll(messages);
106+
if (streamTracker?.maxId == null) sendersInStream?.remove(senderId);
107+
if (messages.isNotEmpty) {
108+
((streamSenders[newStreamId] ??= {})
109+
[senderId] ??= MessageIdTracker()).addAll(messages);
110+
}
111+
}
112+
113+
// This does not need a check like the stream trackers one above,
114+
// because the conversation is guaranteed to have moved. This is an
115+
// invariant [UpdateMessageMoveData] offers.
116+
final topicTracker = sendersInTopic?[senderId];
117+
final movedMessagesInTopicTracker = topicTracker?.popAll(messages);
118+
if (topicTracker?.maxId == null) sendersInTopic?.remove(senderId);
119+
if (movedMessagesInTopicTracker != null) {
120+
(((topicSenders[newStreamId] ??= {})[newTopic] ??= {})
121+
[senderId] ??= MessageIdTracker()).addAll(movedMessagesInTopicTracker);
122+
}
123+
}
124+
if (sendersInStream?.isEmpty ?? false) streamSenders.remove(origStreamId);
125+
if (sendersInTopic?.isEmpty ?? false) topicsInStream?.remove(origTopic);
126+
if (topicsInStream?.isEmpty ?? false) topicSenders.remove(origStreamId);
127+
}
128+
71129
void handleDeleteMessageEvent(DeleteMessageEvent event, Map<int, Message> cachedMessages) {
72130
if (event.messageType != MessageType.stream) return;
73131

@@ -153,6 +211,39 @@ class MessageIdTracker {
153211
ids.removeWhere((id) => binarySearch(idsToRemove, id) != -1);
154212
}
155213

214+
/// Remove message IDs found in [idsToRemove] from the tracker list.
215+
///
216+
/// Returns the removed message IDs sorted in ascending order, or `null` if
217+
/// nothing is removed.
218+
///
219+
/// [idsToRemove] should be sorted ascending.
220+
///
221+
/// Consider using [removeAll] if the returned message IDs are not needed.
222+
// Part of this is adapted from [ListBase.removeWhere].
223+
QueueList<int>? popAll(List<int> idsToRemove) {
224+
assert(isSortedWithoutDuplicates(idsToRemove));
225+
final retainedMessageIds =
226+
ids.where((id) => binarySearch(idsToRemove, id) == -1).toList();
227+
228+
if (retainedMessageIds.isEmpty) {
229+
// All message IDs in this tracker are removed; this is an optimization
230+
// to clear all ids and return the removed ones without making a new copy.
231+
final result = ids;
232+
ids = QueueList();
233+
return result;
234+
}
235+
236+
QueueList<int>? poppedMessageIds;
237+
if (retainedMessageIds.length != ids.length) {
238+
poppedMessageIds = QueueList.from(
239+
ids.where((id) => binarySearch(idsToRemove, id) != -1));
240+
ids.setRange(0, retainedMessageIds.length, retainedMessageIds);
241+
ids.length = retainedMessageIds.length;
242+
assert(isSortedWithoutDuplicates(poppedMessageIds));
243+
}
244+
return poppedMessageIds;
245+
}
246+
156247
@override
157248
String toString() => ids.toString();
158249
}

lib/model/store.dart

+1
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,7 @@ class PerAccountStore extends ChangeNotifier with EmojiStore, UserStore, Channel
767767

768768
case UpdateMessageEvent():
769769
assert(debugLog("server event: update_message ${event.messageId}"));
770+
recentSenders.handleUpdateMessageEvent(event, messages);
770771
_messages.handleUpdateMessageEvent(event);
771772
unreads.handleUpdateMessageEvent(event);
772773

test/model/recent_senders_test.dart

+153
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ import 'package:checks/checks.dart';
22
import 'package:flutter_test/flutter_test.dart';
33
import 'package:zulip/api/model/model.dart';
44
import 'package:zulip/model/recent_senders.dart';
5+
import 'package:zulip/model/store.dart';
56
import '../example_data.dart' as eg;
7+
import 'test_store.dart';
68

79
/// [messages] should be sorted by [id] ascending.
810
void checkMatchesMessages(RecentSenders model, List<Message> messages) {
911
final Map<int, Map<int, Set<int>>> messagesByUserInStream = {};
1012
final Map<int, Map<TopicName, Map<int, Set<int>>>> messagesByUserInTopic = {};
13+
messages.sort((a, b) => a.id - b.id);
1114
for (final message in messages) {
1215
if (message is! StreamMessage) {
1316
throw UnsupportedError('Message of type ${message.runtimeType} is not expected.');
@@ -142,6 +145,156 @@ void main() {
142145
});
143146
});
144147

148+
group('RecentSenders.handleUpdateMessageEvent', () {
149+
late PerAccountStore store;
150+
late RecentSenders model;
151+
152+
final origChannel = eg.stream(); final newChannel = eg.stream();
153+
final origTopic = 'origTopic'; final newTopic = 'newTopic';
154+
final userX = eg.user(); final userY = eg.user();
155+
156+
Future<void> prepare(List<Message> messages) async {
157+
store = eg.store();
158+
await store.addMessages(messages);
159+
await store.addStreams([origChannel, newChannel]);
160+
await store.addUsers([userX, userY]);
161+
model = store.recentSenders;
162+
}
163+
164+
List<StreamMessage> copyMessagesWith(Iterable<StreamMessage> messages, {
165+
ZulipStream? newChannel,
166+
String? newTopic,
167+
}) {
168+
assert(newChannel != null || newTopic != null);
169+
return messages.map((message) => StreamMessage.fromJson(
170+
message.toJson()
171+
..['stream_id'] = newChannel?.streamId ?? message.streamId
172+
// See [StreamMessage.displayRecipient] for why this is needed.
173+
..['display_recipient'] = newChannel?.name ?? message.displayRecipient!
174+
175+
..['subject'] = newTopic ?? message.topic
176+
)).toList();
177+
}
178+
179+
test('move a conversation entirely, with additional unknown messages', () async {
180+
final messages = List.generate(10, (i) => eg.streamMessage(
181+
stream: origChannel, topic: origTopic, sender: userX));
182+
await prepare(messages);
183+
final unknownMessages = List.generate(10, (i) => eg.streamMessage(
184+
stream: origChannel, topic: origTopic, sender: userX));
185+
checkMatchesMessages(model, messages);
186+
187+
final messageIdsByUserInTopicBefore =
188+
model.topicSenders[origChannel.streamId]![eg.t(origTopic)]![userX.userId]!.ids;
189+
190+
await store.handleEvent(eg.updateMessageEventMoveFrom(
191+
origMessages: messages + unknownMessages,
192+
newStreamId: newChannel.streamId));
193+
checkMatchesMessages(model, copyMessagesWith(
194+
messages, newChannel: newChannel));
195+
196+
// Check we avoided creating a new list for the moved message IDs.
197+
check(messageIdsByUserInTopicBefore).identicalTo(
198+
model.topicSenders[newChannel.streamId]![eg.t(origTopic)]![userX.userId]!.ids);
199+
});
200+
201+
test('move a conversation exactly', () async {
202+
final messages = List.generate(10, (i) => eg.streamMessage(
203+
stream: origChannel, topic: origTopic, sender: userX));
204+
await prepare(messages);
205+
206+
final messageIdsByUserInTopicBefore =
207+
model.topicSenders[origChannel.streamId]![eg.t(origTopic)]![userX.userId]!.ids;
208+
209+
await store.handleEvent(eg.updateMessageEventMoveFrom(
210+
origMessages: messages,
211+
newStreamId: newChannel.streamId,
212+
newTopicStr: newTopic));
213+
checkMatchesMessages(model, copyMessagesWith(
214+
messages, newChannel: newChannel, newTopic: newTopic));
215+
216+
// Check we avoided creating a new list for the moved message IDs.
217+
check(messageIdsByUserInTopicBefore).identicalTo(
218+
model.topicSenders[newChannel.streamId]![eg.t(newTopic)]![userX.userId]!.ids);
219+
});
220+
221+
test('move a conversation partially to a different channel', () async {
222+
final messages = List.generate(10, (i) => eg.streamMessage(
223+
stream: origChannel, topic: origTopic));
224+
final movedMessages = messages.take(5).toList();
225+
final otherMessages = messages.skip(5);
226+
await prepare(messages);
227+
228+
await store.handleEvent(eg.updateMessageEventMoveFrom(
229+
origMessages: movedMessages,
230+
newStreamId: newChannel.streamId));
231+
checkMatchesMessages(model, [
232+
...copyMessagesWith(movedMessages, newChannel: newChannel),
233+
...otherMessages,
234+
]);
235+
});
236+
237+
test('move a conversation partially to a different topic, within the same channel', () async {
238+
final messages = List.generate(10, (i) => eg.streamMessage(
239+
stream: origChannel, topic: origTopic, sender: userX));
240+
final movedMessages = messages.take(5).toList();
241+
final otherMessages = messages.skip(5);
242+
await prepare(messages);
243+
244+
final messageIdsByUserInStreamBefore =
245+
model.streamSenders[origChannel.streamId]![userX.userId]!.ids;
246+
247+
await store.handleEvent(eg.updateMessageEventMoveFrom(
248+
origMessages: movedMessages,
249+
newTopicStr: newTopic));
250+
checkMatchesMessages(model, [
251+
...copyMessagesWith(movedMessages, newTopic: newTopic),
252+
...otherMessages,
253+
]);
254+
255+
// Check that we did not touch stream message IDs tracker
256+
// when there wasn't a stream move.
257+
check(messageIdsByUserInStreamBefore).identicalTo(
258+
model.streamSenders[origChannel.streamId]![userX.userId]!.ids);
259+
});
260+
261+
test('move a conversation with multiple senders', () async {
262+
final messages = [
263+
eg.streamMessage(stream: origChannel, topic: origTopic, sender: userX),
264+
eg.streamMessage(stream: origChannel, topic: origTopic, sender: userX),
265+
eg.streamMessage(stream: origChannel, topic: origTopic, sender: userY),
266+
];
267+
await prepare(messages);
268+
269+
await store.handleEvent(eg.updateMessageEventMoveFrom(
270+
origMessages: messages,
271+
newStreamId: newChannel.streamId));
272+
checkMatchesMessages(model, copyMessagesWith(
273+
messages, newChannel: newChannel));
274+
});
275+
276+
test('move a converstion, but message IDs from the event are not sorted in ascending order', () async {
277+
final messages = List.generate(10, (i) => eg.streamMessage(
278+
id: 100-i, stream: origChannel, topic: origTopic));
279+
await prepare(messages);
280+
281+
await store.handleEvent(eg.updateMessageEventMoveFrom(
282+
origMessages: messages,
283+
newStreamId: newChannel.streamId));
284+
checkMatchesMessages(model,
285+
copyMessagesWith(messages, newChannel: newChannel));
286+
});
287+
288+
test('message edit update without move', () async {
289+
final messages = List.generate(10, (i) => eg.streamMessage(
290+
stream: origChannel, topic: origTopic));
291+
await prepare(messages);
292+
293+
await store.handleEvent(eg.updateMessageEditEvent(messages[0]));
294+
checkMatchesMessages(model, messages);
295+
});
296+
});
297+
145298
test('RecentSenders.handleDeleteMessageEvent', () {
146299
final model = RecentSenders();
147300
final stream = eg.stream();

0 commit comments

Comments
 (0)