-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanager.ts
417 lines (372 loc) · 12 KB
/
manager.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
import fs from "fs";
import { appendFile } from "fs/promises";
import path from "path";
import { generateEncryptionKeyHex } from "@helpers/client";
import { defaultValues, sdkVersionOptions, sdkVersions } from "@helpers/tests";
import { type Client, type XmtpEnv } from "@xmtp/node-sdk";
import { generatePrivateKey, privateKeyToAccount } from "viem/accounts";
import { WorkerClient } from "./main";
export type typeofStream = "message" | "conversation" | "consent" | "none";
export type typeOfResponse = "gm" | "gpt" | "none";
export interface WorkerBase {
name: string;
folder: string;
walletKey: string;
encryptionKey: string;
testName: string;
sdkVersion: string;
libXmtpVersion: string;
}
export interface Worker extends WorkerBase {
worker: WorkerClient;
dbPath: string;
client: Client;
sdkVersion: string;
libXmtpVersion: string;
installationId: string;
inboxId: string;
env: XmtpEnv;
folder: string;
address: string;
}
/**
* WorkerManager: A unified class for managing workers and their lifecycle
* Combines the functionality of both WorkerManager and WorkerFactory
*/
export class WorkerManager {
private workers: Record<string, Record<string, Worker>>;
private testName: string;
private activeWorkers: WorkerClient[] = [];
private typeofStream: typeofStream = "message";
private typeOfResponse: typeOfResponse = "gm";
private env: XmtpEnv;
private keysCache: Record<
string,
{ walletKey: string; encryptionKey: string }
> = {};
/**
* Constructor creates an empty manager or populates it with existing workers
*/
constructor(
testName: string,
typeofStream: typeofStream = "message",
typeOfResponse: typeOfResponse = "gm",
env: XmtpEnv,
) {
this.testName = testName;
this.typeofStream = typeofStream;
this.typeOfResponse = typeOfResponse;
this.env = env;
this.workers = {};
}
/**
* Terminates all active workers and cleans up resources
*/
public async terminateAll(deleteDbs: boolean = false): Promise<void> {
const terminationPromises = this.activeWorkers.map(async (worker) => {
try {
await worker.terminate();
if (deleteDbs) {
await worker.clearDB();
}
} catch (error) {
console.warn(`Error terminating worker:`, error);
}
});
await Promise.all(terminationPromises);
this.activeWorkers = [];
// Clear the workers object
this.workers = {};
}
/**
* Gets the total number of workers
*/
public getLength(): number {
let count = 0;
for (const baseName in this.workers) {
count += Object.keys(this.workers[baseName]).length;
}
return count;
}
/**
* Gets a random subset of workers
*/
public getRandomWorkers(count: number): Worker[] {
const allWorkers = this.getWorkers();
return allWorkers.sort(() => 0.5 - Math.random()).slice(0, count);
}
/**
* Gets the version of the first worker (as a representative version)
*/
public getVersion(): string {
const firstBaseName = Object.keys(this.workers)[0];
if (!firstBaseName) return "unknown";
const firstInstallId = Object.keys(this.workers[firstBaseName])[0];
if (!firstInstallId) return "unknown";
return this.workers[firstBaseName][firstInstallId].sdkVersion;
}
public printWorkers() {
try {
let workersToPrint = [];
for (const baseName in this.workers) {
for (const installationId in this.workers[baseName]) {
const currentWorker = this.workers[baseName][installationId];
workersToPrint.push(
`${baseName}-${installationId}-${currentWorker.address}-${currentWorker.sdkVersion}-${currentWorker.libXmtpVersion}`,
);
}
}
} catch (error) {
console.error("Error printing workers:", error);
}
}
/**
* Gets all workers as a flat array
*/
public getWorkers(): Worker[] {
const allWorkers: Worker[] = [];
for (const baseName in this.workers) {
for (const installationId in this.workers[baseName]) {
allWorkers.push(this.workers[baseName][installationId]);
}
}
return allWorkers;
}
/**
* Gets a specific worker by name and optional installation ID
*/
public get(
baseName: string,
installationId: string = "a",
): Worker | undefined {
if (baseName.includes("-")) {
const parts = baseName.split("-");
const name = parts[0];
const id = parts[1];
return this.workers[name]?.[id];
}
return this.workers[baseName]?.[installationId];
}
/**
* Adds a worker to the manager
*/
public addWorker(
baseName: string,
installationId: string,
worker: Worker,
): void {
if (!this.workers[baseName]) {
this.workers[baseName] = {};
}
this.workers[baseName][installationId] = worker;
}
/**
* Ensures a worker has wallet and encryption keys
* Either retrieves from env vars or generates new ones
*/
private ensureKeys(name: string): {
walletKey: string;
encryptionKey: string;
} {
// Extract the base name without installation ID for key lookup
const baseName = name.split("-")[0];
if (baseName in this.keysCache) {
console.log(`Using cached keys for ${baseName}`);
return this.keysCache[baseName];
}
const walletKeyEnv = `WALLET_KEY_${baseName.toUpperCase()}`;
const encryptionKeyEnv = `ENCRYPTION_KEY_${baseName.toUpperCase()}`;
// Check if keys exist in environment variables
if (
process.env[walletKeyEnv] !== undefined &&
process.env[encryptionKeyEnv] !== undefined
) {
this.keysCache[baseName] = {
walletKey: process.env[walletKeyEnv],
encryptionKey: process.env[encryptionKeyEnv],
};
return this.keysCache[baseName];
}
// Keys don't exist, generate new ones
console.log(`Generating new keys for ${baseName}`);
const walletKey = generatePrivateKey();
const account = privateKeyToAccount(walletKey);
const encryptionKey = generateEncryptionKeyHex();
const publicKey = account.address;
// Store in cache
this.keysCache[baseName] = {
walletKey,
encryptionKey,
};
// Update process.env directly so subsequent calls in the same process will find the keys
process.env[walletKeyEnv] = walletKey;
process.env[encryptionKeyEnv] = encryptionKey;
if (!name.includes("random")) {
// Append to .env file for persistence across runs
const filePath =
process.env.CURRENT_ENV_PATH || path.resolve(process.cwd(), ".env");
void appendFile(
filePath,
`\n${walletKeyEnv}=${walletKey}\n${encryptionKeyEnv}=${encryptionKey}\n# public key is ${publicKey}\n`,
);
}
return this.keysCache[baseName];
}
/**
* Creates a new worker with all necessary initialization
*/
public async createWorker(descriptor: string): Promise<Worker> {
const parts = descriptor.split("-");
const baseName = parts[0];
const providedInstallId = parts.length > 1 ? parts[1] : undefined;
// Check if the worker already exists in our internal storage
if (providedInstallId && this.workers[baseName]?.[providedInstallId]) {
console.log(`Reusing existing worker for ${descriptor}`);
return this.workers[baseName][providedInstallId];
}
// Determine folder/installation ID
const folder = providedInstallId || getNextFolderName();
const sdkVersion = parts.length > 2 ? parts[2] : getLatestVersion();
const libXmtpVersion = getLibxmtpVersion(sdkVersion);
// Get or generate keys
const { walletKey, encryptionKey } = this.ensureKeys(baseName);
// Create the base worker data
const workerData: WorkerBase = {
name: baseName,
folder,
testName: this.testName,
walletKey,
encryptionKey,
sdkVersion: sdkVersion,
libXmtpVersion: libXmtpVersion,
};
// console.debug("Worker data created", workerData);
// Create and initialize the worker
const workerClient = new WorkerClient(
workerData,
this.typeofStream,
this.typeOfResponse,
this.env,
);
const initializedWorker = await workerClient.initialize();
// Create the complete worker
const worker: Worker = {
...workerData,
client: initializedWorker.client,
inboxId: initializedWorker.client.inboxId,
dbPath: initializedWorker.dbPath,
sdkVersion: sdkVersion,
libXmtpVersion: libXmtpVersion,
address: initializedWorker.address,
installationId: initializedWorker.client.installationId,
env: this.env,
folder,
worker: workerClient,
};
// Store the new worker for potential cleanup later
this.activeWorkers.push(workerClient);
// Add to our internal storage
this.addWorker(baseName, folder, worker);
return worker;
}
/**
* Creates multiple workers at once from descriptors
*/
public async createWorkers(
descriptorsOrAmount: string[] | number,
randomVersions: boolean = false,
): Promise<Worker[]> {
let descriptors: string[];
const randomSdkVersionReversed =
sdkVersionOptions[sdkVersionOptions.length - 1];
// Handle numeric input (create N default workers)
if (typeof descriptorsOrAmount === "number") {
const workerNames = defaultValues.defaultNames;
descriptors = workerNames.slice(0, descriptorsOrAmount);
// If we need to create multiple workers with random SDK versions
// Generate workers with random SDK versions (100, 105, or 202)
// Create descriptors with random SDK versions
descriptors = [];
for (let i = 0; i < descriptorsOrAmount; i++) {
const workerName =
defaultValues.defaultNames[i % defaultValues.defaultNames.length];
if (randomVersions) {
const randomSdkVersion =
sdkVersionOptions[
Math.floor(Math.random() * sdkVersionOptions.length)
];
descriptors.push(`${workerName}-a-${randomSdkVersion}`);
} else {
descriptors.push(`${workerName}-a-${randomSdkVersionReversed}`);
}
}
} else {
descriptors = [];
for (const descriptor of descriptorsOrAmount) {
if (!sdkVersionOptions.includes(descriptor.split("-")[2])) {
const name = descriptor.split("-")[0];
const installId = descriptor.split("-")[1] ?? "a";
descriptors.push(`${name}-${installId}-${randomSdkVersionReversed}`);
} else {
descriptors.push(descriptor);
}
}
}
// Process descriptors in parallel
const workerPromises = descriptors.map((descriptor) =>
this.createWorker(descriptor),
);
return Promise.all(workerPromises);
}
}
/**
* Factory function to create a WorkerManager with initialized workers
*/
export async function getWorkers(
descriptorsOrAmount: string[] | number,
testName: string,
typeofStream: typeofStream = "message",
typeOfResponse: typeOfResponse = "gm",
env: XmtpEnv = process.env.XMTP_ENV as XmtpEnv,
randomVersions: boolean = false,
): Promise<WorkerManager> {
const manager = new WorkerManager(
testName,
typeofStream,
typeOfResponse,
env,
);
await manager.createWorkers(descriptorsOrAmount, randomVersions);
manager.printWorkers();
return manager;
}
/**
* Helper function to get the next available folder name
*/
function getNextFolderName(): string {
const dataPath = path.resolve(process.cwd(), ".data");
let folder = "a";
if (fs.existsSync(dataPath)) {
const existingFolders = fs
.readdirSync(dataPath)
.filter((f) => /^[a-z]$/.test(f));
folder = String.fromCharCode(
"a".charCodeAt(0) + (existingFolders.length % 26),
);
}
return folder;
}
/**
* Helper function to count data subfolders
*/
export function getDataSubFolderCount() {
const preBasePath = process.cwd();
return fs.readdirSync(`${preBasePath}/.data`).length;
}
export function getLatestVersion(): string {
return Object.keys(sdkVersions).pop() as string;
}
export function getLibxmtpVersion(sdkVersion: string): string {
return sdkVersions[Number(sdkVersion) as keyof typeof sdkVersions]
.libXmtpVersion;
}