mirror of
https://github.com/docmost/docmost.git
synced 2026-06-22 12:11:51 +10:00
refactor(base): rename baseId to pageId across WS, processors, tasks, formula, events
Renames baseId → pageId in: - Domain event types (BaseEventBase) and all derived event types - WS Zod schemas (wire-protocol field names now emit pageId) - BaseWsService, BaseWsConsumers, BasePresenceService - FormulaLockService, FormulaService - BullMQ job interfaces (IBaseTypeConversionJob, IBaseCellGcJob, IBaseFormulaRecomputeJob) - BaseQueueProcessor and all task functions - Service emit-payload keys in base.service, base-property.service, base-row.service, base-view.service, base-csv-export.service - Formula spec test fixtures
This commit is contained in:
@@ -8,7 +8,7 @@ import { BaseProperty, BaseRow, BaseView } from '@docmost/db/types/entity.types'
|
||||
*/
|
||||
|
||||
type BaseEventBase = {
|
||||
baseId: string;
|
||||
pageId: string;
|
||||
workspaceId: string;
|
||||
actorId?: string | null;
|
||||
requestId?: string | null;
|
||||
|
||||
@@ -5,7 +5,7 @@ const mkProp = (
|
||||
id: string, type: string, typeOptions: any = {},
|
||||
name = id,
|
||||
): BaseProperty => ({
|
||||
id, baseId: "base_1", name, type: type as any, position: "a",
|
||||
id, pageId: "base_1", name, type: type as any, position: "a",
|
||||
typeOptions, isPrimary: false, workspaceId: "ws_1",
|
||||
createdAt: new Date(), updatedAt: new Date(),
|
||||
schemaVersion: 0, pendingType: null, pendingTypeOptions: null,
|
||||
|
||||
@@ -18,10 +18,10 @@ export class FormulaLockService {
|
||||
* Returns a release token on success, or null if the lock is held. Callers
|
||||
* must pass the token back to release() to prevent cross-holder releases.
|
||||
*/
|
||||
async acquire(baseId: string): Promise<string | null> {
|
||||
async acquire(pageId: string): Promise<string | null> {
|
||||
const token = `${Date.now()}-${Math.random()}`;
|
||||
const ok = await this.redis.set(
|
||||
LOCK_PREFIX + baseId,
|
||||
LOCK_PREFIX + pageId,
|
||||
token,
|
||||
"PX",
|
||||
LOCK_TTL_MS,
|
||||
@@ -30,7 +30,7 @@ export class FormulaLockService {
|
||||
return ok === "OK" ? token : null;
|
||||
}
|
||||
|
||||
async release(baseId: string, token: string): Promise<void> {
|
||||
async release(pageId: string, token: string): Promise<void> {
|
||||
const lua = `
|
||||
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("DEL", KEYS[1])
|
||||
@@ -38,7 +38,7 @@ export class FormulaLockService {
|
||||
return 0
|
||||
end
|
||||
`;
|
||||
await this.redis.eval(lua, 1, LOCK_PREFIX + baseId, token);
|
||||
await this.redis.eval(lua, 1, LOCK_PREFIX + pageId, token);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -46,11 +46,11 @@ export class FormulaLockService {
|
||||
* on timeout. Workers call this at job start — if acquisition times out
|
||||
* the job is retried by BullMQ.
|
||||
*/
|
||||
async acquireWait(baseId: string, opts: { timeoutMs: number; pollMs?: number }): Promise<string | null> {
|
||||
async acquireWait(pageId: string, opts: { timeoutMs: number; pollMs?: number }): Promise<string | null> {
|
||||
const deadline = Date.now() + opts.timeoutMs;
|
||||
const poll = opts.pollMs ?? 500;
|
||||
while (Date.now() < deadline) {
|
||||
const t = await this.acquire(baseId);
|
||||
const t = await this.acquire(pageId);
|
||||
if (t) return t;
|
||||
await new Promise((r) => setTimeout(r, poll));
|
||||
}
|
||||
|
||||
@@ -113,13 +113,13 @@ export class FormulaService {
|
||||
|
||||
/*
|
||||
* Enqueue a full recompute for the given formula property IDs on the given
|
||||
* base. Reasons let the worker log why the job ran. Job ID includes baseId
|
||||
* base. Reasons let the worker log why the job ran. Job ID includes pageId
|
||||
* so BullMQ will dedupe when the same base has multiple edits in flight —
|
||||
* see FormulaLock for the per-base Redis serialization.
|
||||
*/
|
||||
async enqueueRecompute(args: IBaseFormulaRecomputeJob): Promise<void> {
|
||||
await this.queue.add(QueueJob.BASE_FORMULA_RECOMPUTE, args, {
|
||||
jobId: `formula-recompute:${args.baseId}:${Date.now()}`,
|
||||
jobId: `formula-recompute:${args.pageId}:${Date.now()}`,
|
||||
removeOnComplete: 1000,
|
||||
removeOnFail: 1000,
|
||||
});
|
||||
|
||||
@@ -69,7 +69,7 @@ export class BaseQueueProcessor
|
||||
trx,
|
||||
);
|
||||
await this.basePropertyRepo.bumpSchemaVersion(data.propertyId, trx);
|
||||
const v = await this.baseRepo.bumpSchemaVersion(data.baseId, trx);
|
||||
const v = await this.baseRepo.bumpSchemaVersion(data.pageId, trx);
|
||||
return { summary: s, schemaVersion: v };
|
||||
},
|
||||
);
|
||||
@@ -80,7 +80,7 @@ export class BaseQueueProcessor
|
||||
const updated = await this.basePropertyRepo.findById(data.propertyId);
|
||||
if (updated) {
|
||||
const event: BasePropertyUpdatedEvent = {
|
||||
baseId: data.baseId,
|
||||
pageId: data.pageId,
|
||||
workspaceId: data.workspaceId,
|
||||
actorId: data.actorId ?? null,
|
||||
requestId: null,
|
||||
@@ -90,7 +90,7 @@ export class BaseQueueProcessor
|
||||
this.eventEmitter.emit(EventName.BASE_PROPERTY_UPDATED, event);
|
||||
}
|
||||
this.emitSchemaBumped(
|
||||
data.baseId,
|
||||
data.pageId,
|
||||
data.workspaceId,
|
||||
schemaVersion,
|
||||
data.actorId,
|
||||
@@ -106,24 +106,24 @@ export class BaseQueueProcessor
|
||||
data,
|
||||
);
|
||||
const schemaVersion = await this.baseRepo.bumpSchemaVersion(
|
||||
data.baseId,
|
||||
data.pageId,
|
||||
);
|
||||
this.emitSchemaBumped(data.baseId, data.workspaceId, schemaVersion);
|
||||
this.emitSchemaBumped(data.pageId, data.workspaceId, schemaVersion);
|
||||
return;
|
||||
}
|
||||
case QueueJob.BASE_FORMULA_RECOMPUTE: {
|
||||
const data = job.data as IBaseFormulaRecomputeJob;
|
||||
const token = await this.formulaLock.acquireWait(data.baseId, {
|
||||
const token = await this.formulaLock.acquireWait(data.pageId, {
|
||||
timeoutMs: 30_000,
|
||||
});
|
||||
if (!token) {
|
||||
throw new Error(
|
||||
`formula recompute: lock acquire timeout for base ${data.baseId}`,
|
||||
`formula recompute: lock acquire timeout for base ${data.pageId}`,
|
||||
);
|
||||
}
|
||||
try {
|
||||
this.eventEmitter.emit(EventName.BASE_FORMULA_RECOMPUTE_STARTED, {
|
||||
baseId: data.baseId,
|
||||
pageId: data.pageId,
|
||||
workspaceId: data.workspaceId,
|
||||
actorId: data.actorId ?? null,
|
||||
requestId: null,
|
||||
@@ -140,7 +140,7 @@ export class BaseQueueProcessor
|
||||
progress: (processed) => job.updateProgress({ processed }),
|
||||
onBatch: async (batch) => {
|
||||
this.eventEmitter.emit(EventName.BASE_ROWS_UPDATED, {
|
||||
baseId: data.baseId,
|
||||
pageId: data.pageId,
|
||||
workspaceId: data.workspaceId,
|
||||
actorId: null,
|
||||
requestId: null,
|
||||
@@ -152,10 +152,10 @@ export class BaseQueueProcessor
|
||||
);
|
||||
|
||||
const schemaVersion = await this.baseRepo.bumpSchemaVersion(
|
||||
data.baseId,
|
||||
data.pageId,
|
||||
);
|
||||
this.eventEmitter.emit(EventName.BASE_SCHEMA_BUMPED, {
|
||||
baseId: data.baseId,
|
||||
pageId: data.pageId,
|
||||
workspaceId: data.workspaceId,
|
||||
actorId: data.actorId ?? null,
|
||||
requestId: null,
|
||||
@@ -163,7 +163,7 @@ export class BaseQueueProcessor
|
||||
} satisfies BaseSchemaBumpedEvent);
|
||||
|
||||
this.eventEmitter.emit(EventName.BASE_FORMULA_RECOMPUTE_COMPLETED, {
|
||||
baseId: data.baseId,
|
||||
pageId: data.pageId,
|
||||
workspaceId: data.workspaceId,
|
||||
actorId: data.actorId ?? null,
|
||||
requestId: null,
|
||||
@@ -175,7 +175,7 @@ export class BaseQueueProcessor
|
||||
|
||||
return result;
|
||||
} finally {
|
||||
await this.formulaLock.release(data.baseId, token);
|
||||
await this.formulaLock.release(data.pageId, token);
|
||||
}
|
||||
}
|
||||
default:
|
||||
@@ -184,13 +184,13 @@ export class BaseQueueProcessor
|
||||
}
|
||||
|
||||
private emitSchemaBumped(
|
||||
baseId: string,
|
||||
pageId: string,
|
||||
workspaceId: string,
|
||||
schemaVersion: number,
|
||||
actorId?: string,
|
||||
): void {
|
||||
const event: BaseSchemaBumpedEvent = {
|
||||
baseId,
|
||||
pageId,
|
||||
workspaceId,
|
||||
actorId: actorId ?? null,
|
||||
requestId: null,
|
||||
@@ -220,7 +220,7 @@ export class BaseQueueProcessor
|
||||
const reverted = await this.basePropertyRepo.findById(data.propertyId);
|
||||
if (reverted) {
|
||||
const event: BasePropertyUpdatedEvent = {
|
||||
baseId: data.baseId,
|
||||
pageId: data.pageId,
|
||||
workspaceId: data.workspaceId,
|
||||
actorId: data.actorId ?? null,
|
||||
requestId: null,
|
||||
|
||||
@@ -14,7 +14,7 @@ export type PresenceEntry = {
|
||||
};
|
||||
|
||||
/*
|
||||
* Ephemeral per-base presence. No DB. `presence:base:{baseId}` is a Redis
|
||||
* Ephemeral per-base presence. No DB. `presence:base:{pageId}` is a Redis
|
||||
* HASH keyed by userId with a JSON-serialised entry. Entries older than
|
||||
* PRESENCE_ENTRY_TTL_MS are filtered on read; the key itself is refreshed
|
||||
* with a longer Redis EXPIRE on every write so unused rooms drain on
|
||||
@@ -30,10 +30,10 @@ export class BasePresenceService {
|
||||
}
|
||||
|
||||
async setPresence(
|
||||
baseId: string,
|
||||
pageId: string,
|
||||
entry: PresenceEntry,
|
||||
): Promise<void> {
|
||||
const key = PRESENCE_KEY_PREFIX + baseId;
|
||||
const key = PRESENCE_KEY_PREFIX + pageId;
|
||||
await this.redis
|
||||
.multi()
|
||||
.hset(key, entry.userId, JSON.stringify(entry))
|
||||
@@ -41,13 +41,13 @@ export class BasePresenceService {
|
||||
.exec();
|
||||
}
|
||||
|
||||
async leave(baseId: string, userId: string): Promise<void> {
|
||||
const key = PRESENCE_KEY_PREFIX + baseId;
|
||||
async leave(pageId: string, userId: string): Promise<void> {
|
||||
const key = PRESENCE_KEY_PREFIX + pageId;
|
||||
await this.redis.hdel(key, userId);
|
||||
}
|
||||
|
||||
async snapshot(baseId: string): Promise<PresenceEntry[]> {
|
||||
const key = PRESENCE_KEY_PREFIX + baseId;
|
||||
async snapshot(pageId: string): Promise<PresenceEntry[]> {
|
||||
const key = PRESENCE_KEY_PREFIX + pageId;
|
||||
const raw = await this.redis.hgetall(key);
|
||||
const now = Date.now();
|
||||
const out: PresenceEntry[] = [];
|
||||
|
||||
@@ -23,7 +23,7 @@ import {
|
||||
|
||||
/*
|
||||
* In-process listeners that forward base domain events onto the
|
||||
* `base-{baseId}` socket.io room. Originating clients suppress their own
|
||||
* `base-{pageId}` socket.io room. Originating clients suppress their own
|
||||
* echoes via `requestId`.
|
||||
*/
|
||||
@Injectable()
|
||||
@@ -34,9 +34,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_ROW_CREATED)
|
||||
onRowCreated(e: BaseRowCreatedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:row:created',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
row: e.row,
|
||||
actorId: e.actorId ?? null,
|
||||
requestId: e.requestId ?? null,
|
||||
@@ -45,9 +45,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_ROW_UPDATED)
|
||||
onRowUpdated(e: BaseRowUpdatedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:row:updated',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
rowId: e.rowId,
|
||||
patch: e.patch,
|
||||
updatedCells: e.updatedCells,
|
||||
@@ -58,9 +58,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_ROW_DELETED)
|
||||
onRowDeleted(e: BaseRowDeletedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:row:deleted',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
rowId: e.rowId,
|
||||
actorId: e.actorId ?? null,
|
||||
requestId: e.requestId ?? null,
|
||||
@@ -69,9 +69,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_ROWS_DELETED)
|
||||
onRowsDeleted(e: BaseRowsDeletedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:rows:deleted',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
rowIds: e.rowIds,
|
||||
actorId: e.actorId ?? null,
|
||||
requestId: e.requestId ?? null,
|
||||
@@ -80,9 +80,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_ROW_REORDERED)
|
||||
onRowReordered(e: BaseRowReorderedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:row:reordered',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
rowId: e.rowId,
|
||||
position: e.position,
|
||||
actorId: e.actorId ?? null,
|
||||
@@ -92,9 +92,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_PROPERTY_CREATED)
|
||||
onPropertyCreated(e: BasePropertyCreatedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:property:created',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
property: e.property,
|
||||
actorId: e.actorId ?? null,
|
||||
requestId: e.requestId ?? null,
|
||||
@@ -103,9 +103,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_PROPERTY_UPDATED)
|
||||
onPropertyUpdated(e: BasePropertyUpdatedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:property:updated',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
property: e.property,
|
||||
schemaVersion: e.schemaVersion,
|
||||
actorId: e.actorId ?? null,
|
||||
@@ -115,9 +115,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_PROPERTY_DELETED)
|
||||
onPropertyDeleted(e: BasePropertyDeletedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:property:deleted',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
propertyId: e.propertyId,
|
||||
actorId: e.actorId ?? null,
|
||||
requestId: e.requestId ?? null,
|
||||
@@ -126,9 +126,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_PROPERTY_REORDERED)
|
||||
onPropertyReordered(e: BasePropertyReorderedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:property:reordered',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
propertyId: e.propertyId,
|
||||
position: e.position,
|
||||
actorId: e.actorId ?? null,
|
||||
@@ -138,9 +138,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_VIEW_CREATED)
|
||||
onViewCreated(e: BaseViewCreatedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:view:created',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
view: e.view,
|
||||
actorId: e.actorId ?? null,
|
||||
requestId: e.requestId ?? null,
|
||||
@@ -149,9 +149,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_VIEW_UPDATED)
|
||||
onViewUpdated(e: BaseViewUpdatedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:view:updated',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
view: e.view,
|
||||
actorId: e.actorId ?? null,
|
||||
requestId: e.requestId ?? null,
|
||||
@@ -160,9 +160,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_VIEW_DELETED)
|
||||
onViewDeleted(e: BaseViewDeletedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:view:deleted',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
viewId: e.viewId,
|
||||
actorId: e.actorId ?? null,
|
||||
requestId: e.requestId ?? null,
|
||||
@@ -171,18 +171,18 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_SCHEMA_BUMPED)
|
||||
onSchemaBumped(e: BaseSchemaBumpedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:schema:bumped',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
schemaVersion: e.schemaVersion,
|
||||
});
|
||||
}
|
||||
|
||||
@OnEvent(EventName.BASE_ROWS_UPDATED)
|
||||
onRowsUpdated(e: BaseRowsUpdatedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:rows:updated',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
rowIds: e.rowIds,
|
||||
propertyIds: e.propertyIds,
|
||||
actorId: e.actorId ?? null,
|
||||
@@ -192,9 +192,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_FORMULA_RECOMPUTE_STARTED)
|
||||
onFormulaRecomputeStarted(e: BaseFormulaRecomputeStartedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:formula:recompute:started',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
propertyIds: e.propertyIds,
|
||||
jobId: e.jobId,
|
||||
actorId: e.actorId ?? null,
|
||||
@@ -203,9 +203,9 @@ export class BaseWsConsumers {
|
||||
|
||||
@OnEvent(EventName.BASE_FORMULA_RECOMPUTE_COMPLETED)
|
||||
onFormulaRecomputeCompleted(e: BaseFormulaRecomputeCompletedEvent) {
|
||||
this.ws.emitToBase(e.baseId, {
|
||||
this.ws.emitToBase(e.pageId, {
|
||||
operation: 'base:formula:recompute:completed',
|
||||
baseId: e.baseId,
|
||||
pageId: e.pageId,
|
||||
propertyIds: e.propertyIds,
|
||||
jobId: e.jobId,
|
||||
processed: e.processed,
|
||||
|
||||
@@ -9,29 +9,29 @@ import { BasePresenceService, PresenceEntry } from './base-presence.service';
|
||||
|
||||
/*
|
||||
* Inbound shapes from untrusted socket clients. Zod-validated at the
|
||||
* boundary so malformed payloads (non-uuid baseId, missing fields,
|
||||
* boundary so malformed payloads (non-uuid pageId, missing fields,
|
||||
* oversized selection blobs) never reach the permission check or Redis.
|
||||
*/
|
||||
const baseSubscribeSchema = z.object({
|
||||
operation: z.literal('base:subscribe'),
|
||||
baseId: z.uuid(),
|
||||
pageId: z.uuid(),
|
||||
});
|
||||
|
||||
const baseUnsubscribeSchema = z.object({
|
||||
operation: z.literal('base:unsubscribe'),
|
||||
baseId: z.uuid(),
|
||||
pageId: z.uuid(),
|
||||
});
|
||||
|
||||
const basePresenceSchema = z.object({
|
||||
operation: z.literal('base:presence'),
|
||||
baseId: z.uuid(),
|
||||
pageId: z.uuid(),
|
||||
cellId: z.string().max(200).optional().nullable(),
|
||||
selection: z.unknown().optional(),
|
||||
});
|
||||
|
||||
const basePresenceLeaveSchema = z.object({
|
||||
operation: z.literal('base:presence:leave'),
|
||||
baseId: z.uuid(),
|
||||
pageId: z.uuid(),
|
||||
});
|
||||
|
||||
const inboundSchema = z.union([
|
||||
@@ -77,23 +77,23 @@ export class BaseWsService {
|
||||
const data = parsed.data;
|
||||
switch (data.operation) {
|
||||
case 'base:subscribe':
|
||||
await this.subscribe(client, data.baseId);
|
||||
await this.subscribe(client, data.pageId);
|
||||
return;
|
||||
case 'base:unsubscribe':
|
||||
await this.unsubscribe(client, data.baseId);
|
||||
await this.unsubscribe(client, data.pageId);
|
||||
return;
|
||||
case 'base:presence':
|
||||
await this.handlePresence(client, data);
|
||||
return;
|
||||
case 'base:presence:leave':
|
||||
await this.handlePresenceLeave(client, data.baseId);
|
||||
await this.handlePresenceLeave(client, data.pageId);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
emitToBase(baseId: string, payload: BaseOutbound): void {
|
||||
emitToBase(pageId: string, payload: BaseOutbound): void {
|
||||
if (!this.server) return;
|
||||
this.server.to(getBaseRoomName(baseId)).emit('message', payload);
|
||||
this.server.to(getBaseRoomName(pageId)).emit('message', payload);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -106,11 +106,11 @@ export class BaseWsService {
|
||||
const userId = client.data?.userId as string | undefined;
|
||||
const subs = this.subscriptionsFor(client);
|
||||
if (!userId || subs.size === 0) return;
|
||||
for (const baseId of subs) {
|
||||
await this.presence.leave(baseId, userId);
|
||||
this.emitToBase(baseId, {
|
||||
for (const pageId of subs) {
|
||||
await this.presence.leave(pageId, userId);
|
||||
this.emitToBase(pageId, {
|
||||
operation: 'base:presence:leave',
|
||||
baseId,
|
||||
pageId,
|
||||
userId,
|
||||
});
|
||||
}
|
||||
@@ -119,22 +119,22 @@ export class BaseWsService {
|
||||
|
||||
// --- private -------------------------------------------------------
|
||||
|
||||
private async subscribe(client: Socket, baseId: string): Promise<void> {
|
||||
private async subscribe(client: Socket, pageId: string): Promise<void> {
|
||||
const userId = client.data?.userId as string | undefined;
|
||||
if (!userId) {
|
||||
client.emit('message', {
|
||||
operation: 'base:subscribe:error',
|
||||
baseId,
|
||||
pageId,
|
||||
reason: 'unauthenticated',
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const base = await this.baseRepo.findById(baseId);
|
||||
const base = await this.baseRepo.findById(pageId);
|
||||
if (!base) {
|
||||
client.emit('message', {
|
||||
operation: 'base:subscribe:error',
|
||||
baseId,
|
||||
pageId,
|
||||
reason: 'not_found',
|
||||
});
|
||||
return;
|
||||
@@ -144,36 +144,36 @@ export class BaseWsService {
|
||||
if (!canRead) {
|
||||
client.emit('message', {
|
||||
operation: 'base:subscribe:error',
|
||||
baseId,
|
||||
pageId,
|
||||
reason: 'forbidden',
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
client.join(getBaseRoomName(baseId));
|
||||
this.subscriptionsFor(client).add(baseId);
|
||||
client.join(getBaseRoomName(pageId));
|
||||
this.subscriptionsFor(client).add(pageId);
|
||||
|
||||
// Send the current presence snapshot to just this client so their UI
|
||||
// can paint who's already editing what.
|
||||
const snapshot = await this.presence.snapshot(baseId);
|
||||
const snapshot = await this.presence.snapshot(pageId);
|
||||
client.emit('message', {
|
||||
operation: 'base:presence:snapshot',
|
||||
baseId,
|
||||
pageId,
|
||||
entries: snapshot,
|
||||
});
|
||||
}
|
||||
|
||||
private async unsubscribe(client: Socket, baseId: string): Promise<void> {
|
||||
private async unsubscribe(client: Socket, pageId: string): Promise<void> {
|
||||
const userId = client.data?.userId as string | undefined;
|
||||
if (!userId) return;
|
||||
|
||||
client.leave(getBaseRoomName(baseId));
|
||||
this.subscriptionsFor(client).delete(baseId);
|
||||
client.leave(getBaseRoomName(pageId));
|
||||
this.subscriptionsFor(client).delete(pageId);
|
||||
|
||||
await this.presence.leave(baseId, userId);
|
||||
this.emitToBase(baseId, {
|
||||
await this.presence.leave(pageId, userId);
|
||||
this.emitToBase(pageId, {
|
||||
operation: 'base:presence:leave',
|
||||
baseId,
|
||||
pageId,
|
||||
userId,
|
||||
});
|
||||
}
|
||||
@@ -184,7 +184,7 @@ export class BaseWsService {
|
||||
): Promise<void> {
|
||||
const userId = client.data?.userId as string | undefined;
|
||||
if (!userId) return;
|
||||
if (!client.rooms.has(getBaseRoomName(data.baseId))) return;
|
||||
if (!client.rooms.has(getBaseRoomName(data.pageId))) return;
|
||||
|
||||
const entry: PresenceEntry = {
|
||||
userId,
|
||||
@@ -192,25 +192,25 @@ export class BaseWsService {
|
||||
selection: data.selection ?? null,
|
||||
ts: Date.now(),
|
||||
};
|
||||
await this.presence.setPresence(data.baseId, entry);
|
||||
await this.presence.setPresence(data.pageId, entry);
|
||||
|
||||
this.emitToBase(data.baseId, {
|
||||
this.emitToBase(data.pageId, {
|
||||
operation: 'base:presence',
|
||||
baseId: data.baseId,
|
||||
pageId: data.pageId,
|
||||
...entry,
|
||||
});
|
||||
}
|
||||
|
||||
private async handlePresenceLeave(
|
||||
client: Socket,
|
||||
baseId: string,
|
||||
pageId: string,
|
||||
): Promise<void> {
|
||||
const userId = client.data?.userId as string | undefined;
|
||||
if (!userId) return;
|
||||
await this.presence.leave(baseId, userId);
|
||||
this.emitToBase(baseId, {
|
||||
await this.presence.leave(pageId, userId);
|
||||
this.emitToBase(pageId, {
|
||||
operation: 'base:presence:leave',
|
||||
baseId,
|
||||
pageId,
|
||||
userId,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -31,16 +31,16 @@ export class BaseCsvExportService {
|
||||
) {}
|
||||
|
||||
async streamBaseAsCsv(
|
||||
baseId: string,
|
||||
pageId: string,
|
||||
workspaceId: string,
|
||||
reply: FastifyReply,
|
||||
): Promise<void> {
|
||||
const base = await this.baseRepo.findById(baseId);
|
||||
const base = await this.baseRepo.findById(pageId);
|
||||
if (!base || base.workspaceId !== workspaceId) {
|
||||
throw new NotFoundException('Base not found');
|
||||
}
|
||||
|
||||
const properties = await this.basePropertyRepo.findByPageId(baseId);
|
||||
const properties = await this.basePropertyRepo.findByPageId(pageId);
|
||||
|
||||
const fileName = sanitize(base.title || 'base') + '.csv';
|
||||
|
||||
@@ -85,7 +85,7 @@ export class BaseCsvExportService {
|
||||
});
|
||||
|
||||
try {
|
||||
for await (const chunk of this.baseRowRepo.streamByPageId(baseId, {
|
||||
for await (const chunk of this.baseRowRepo.streamByPageId(pageId, {
|
||||
workspaceId,
|
||||
chunkSize: CHUNK_SIZE,
|
||||
})) {
|
||||
@@ -129,7 +129,7 @@ export class BaseCsvExportService {
|
||||
// trigger Nest's exception filter to try to send another
|
||||
// response, which Fastify rejects. Destroying the stringifier
|
||||
// cascades to `out` and signals EOF to the client.
|
||||
this.logger.error(`csv export failed base=${baseId}`, err);
|
||||
this.logger.error(`csv export failed base=${pageId}`, err);
|
||||
stringifier.destroy(err as Error);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,7 +134,7 @@ export class BasePropertyService {
|
||||
});
|
||||
|
||||
const event: BasePropertyCreatedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: actorId ?? null,
|
||||
requestId: null,
|
||||
@@ -144,7 +144,7 @@ export class BasePropertyService {
|
||||
|
||||
if (created.type === 'formula') {
|
||||
await this.formulaService.enqueueRecompute({
|
||||
baseId: created.pageId,
|
||||
pageId: created.pageId,
|
||||
workspaceId,
|
||||
propertyIds: [created.id],
|
||||
reason: 'formula_created',
|
||||
@@ -278,7 +278,7 @@ export class BasePropertyService {
|
||||
|
||||
if (newType === 'formula' && (isTypeChange || sourceChanged)) {
|
||||
await this.formulaService.enqueueRecompute({
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
propertyIds: [dto.propertyId],
|
||||
reason: isTypeChange ? 'formula_created' : 'formula_edited',
|
||||
@@ -292,7 +292,7 @@ export class BasePropertyService {
|
||||
const affected = graph.affectedFormulas([dto.propertyId]);
|
||||
if (affected.length > 0) {
|
||||
await this.formulaService.enqueueRecompute({
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
propertyIds: affected,
|
||||
reason: 'dep_type_changed',
|
||||
@@ -306,7 +306,7 @@ export class BasePropertyService {
|
||||
|
||||
// --- Path 2 or 3: cell rewrite needed -------------------------------
|
||||
const conversionPayload: IBaseTypeConversionJob = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
propertyId: dto.propertyId,
|
||||
workspaceId,
|
||||
fromType: oldType,
|
||||
@@ -360,7 +360,7 @@ export class BasePropertyService {
|
||||
});
|
||||
tick('inline-tx-done');
|
||||
const bumpEvent: BaseSchemaBumpedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: actorId ?? null,
|
||||
requestId: null,
|
||||
@@ -454,7 +454,7 @@ export class BasePropertyService {
|
||||
const updated = await this.basePropertyRepo.findById(dto.propertyId);
|
||||
if (updated) {
|
||||
const event: BasePropertyUpdatedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: actorId ?? null,
|
||||
requestId: dto.requestId ?? null,
|
||||
@@ -515,7 +515,7 @@ export class BasePropertyService {
|
||||
});
|
||||
|
||||
const payload: IBaseCellGcJob = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
propertyId: dto.propertyId,
|
||||
workspaceId,
|
||||
};
|
||||
@@ -542,7 +542,7 @@ export class BasePropertyService {
|
||||
}
|
||||
|
||||
const event: BasePropertyDeletedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: actorId ?? null,
|
||||
requestId: dto.requestId ?? null,
|
||||
@@ -552,7 +552,7 @@ export class BasePropertyService {
|
||||
|
||||
if (affected.length > 0) {
|
||||
await this.formulaService.enqueueRecompute({
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
propertyIds: affected,
|
||||
reason: 'dep_deleted',
|
||||
@@ -580,7 +580,7 @@ export class BasePropertyService {
|
||||
});
|
||||
|
||||
const event: BasePropertyReorderedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: actorId ?? null,
|
||||
requestId: dto.requestId ?? null,
|
||||
|
||||
@@ -112,7 +112,7 @@ export class BaseRowService {
|
||||
});
|
||||
|
||||
const event: BaseRowCreatedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: userId,
|
||||
requestId: dto.requestId ?? null,
|
||||
@@ -162,7 +162,7 @@ export class BaseRowService {
|
||||
}
|
||||
|
||||
const event: BaseRowUpdatedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: userId ?? null,
|
||||
requestId: dto.requestId ?? null,
|
||||
@@ -187,7 +187,7 @@ export class BaseRowService {
|
||||
});
|
||||
|
||||
const event: BaseRowDeletedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: userId ?? null,
|
||||
requestId: dto.requestId ?? null,
|
||||
@@ -215,7 +215,7 @@ export class BaseRowService {
|
||||
});
|
||||
|
||||
const event: BaseRowsDeletedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: userId ?? null,
|
||||
requestId: dto.requestId ?? null,
|
||||
@@ -310,7 +310,7 @@ export class BaseRowService {
|
||||
});
|
||||
|
||||
const event: BaseRowReorderedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: userId ?? null,
|
||||
requestId: dto.requestId ?? null,
|
||||
|
||||
@@ -52,7 +52,7 @@ export class BaseViewService {
|
||||
});
|
||||
|
||||
const event: BaseViewCreatedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: userId,
|
||||
requestId: null,
|
||||
@@ -101,7 +101,7 @@ export class BaseViewService {
|
||||
|
||||
if (updated) {
|
||||
const event: BaseViewUpdatedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: userId ?? null,
|
||||
requestId: null,
|
||||
@@ -133,7 +133,7 @@ export class BaseViewService {
|
||||
await this.baseViewRepo.deleteView(dto.viewId, { workspaceId });
|
||||
|
||||
const event: BaseViewDeletedEvent = {
|
||||
baseId: dto.pageId,
|
||||
pageId: dto.pageId,
|
||||
workspaceId,
|
||||
actorId: userId ?? null,
|
||||
requestId: null,
|
||||
|
||||
@@ -107,13 +107,13 @@ export class BaseService {
|
||||
return this.baseRepo.findById(dto.pageId);
|
||||
}
|
||||
|
||||
async delete(baseId: string) {
|
||||
const base = await this.baseRepo.findById(baseId);
|
||||
async delete(pageId: string) {
|
||||
const base = await this.baseRepo.findById(pageId);
|
||||
if (!base) {
|
||||
throw new NotFoundException('Base not found');
|
||||
}
|
||||
|
||||
await this.baseRepo.softDelete(baseId);
|
||||
await this.baseRepo.softDelete(pageId);
|
||||
}
|
||||
|
||||
async listBySpaceId(spaceId: string, pagination: PaginationOptions) {
|
||||
|
||||
@@ -21,15 +21,15 @@ export async function processBaseCellGc(
|
||||
basePropertyRepo: BasePropertyRepo,
|
||||
data: IBaseCellGcJob,
|
||||
): Promise<void> {
|
||||
const { baseId, propertyId, workspaceId } = data;
|
||||
const { pageId, propertyId, workspaceId } = data;
|
||||
|
||||
await executeTx(db, async (trx) => {
|
||||
await baseRowRepo.removeCellKey(baseId, propertyId, {
|
||||
await baseRowRepo.removeCellKey(pageId, propertyId, {
|
||||
workspaceId,
|
||||
trx,
|
||||
});
|
||||
await basePropertyRepo.hardDelete(propertyId, trx);
|
||||
});
|
||||
|
||||
logger.log(`cell-gc complete base=${baseId} prop=${propertyId}`);
|
||||
logger.log(`cell-gc complete base=${pageId} prop=${propertyId}`);
|
||||
}
|
||||
|
||||
@@ -30,8 +30,8 @@ export async function processBaseFormulaRecompute(
|
||||
trx?: KyselyTransaction;
|
||||
},
|
||||
): Promise<{ processed: number; errored: number }> {
|
||||
const { baseId, workspaceId, propertyIds, rowIds } = data;
|
||||
const properties = await basePropertyRepo.findByPageId(baseId);
|
||||
const { pageId, workspaceId, propertyIds, rowIds } = data;
|
||||
const properties = await basePropertyRepo.findByPageId(pageId);
|
||||
const targets = properties.filter(
|
||||
(p) => p.type === "formula" && propertyIds.includes(p.id),
|
||||
);
|
||||
@@ -46,7 +46,7 @@ export async function processBaseFormulaRecompute(
|
||||
let processed = 0;
|
||||
let errored = 0;
|
||||
|
||||
for await (const chunk of baseRowRepo.streamByPageId(baseId, {
|
||||
for await (const chunk of baseRowRepo.streamByPageId(pageId, {
|
||||
workspaceId,
|
||||
chunkSize: CHUNK_SIZE,
|
||||
trx: opts?.trx,
|
||||
@@ -90,7 +90,7 @@ export async function processBaseFormulaRecompute(
|
||||
// so passing actorId: undefined preserves last_updated_by_id while still
|
||||
// bumping updated_at — matches spec "only lastEditedAt moves".
|
||||
await baseRowRepo.batchUpdateCells(updates, {
|
||||
pageId: baseId,
|
||||
pageId,
|
||||
workspaceId,
|
||||
actorId: undefined,
|
||||
trx: opts?.trx,
|
||||
@@ -102,7 +102,7 @@ export async function processBaseFormulaRecompute(
|
||||
}
|
||||
|
||||
logger.log(
|
||||
`formula-recompute base=${baseId} props=${propertyIds.join(",")} processed=${processed} errored=${errored}`,
|
||||
`formula-recompute base=${pageId} props=${propertyIds.join(",")} processed=${processed} errored=${errored}`,
|
||||
);
|
||||
return { processed, errored };
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ export async function processBaseTypeConversion(
|
||||
},
|
||||
): Promise<{ converted: number; cleared: number; total: number }> {
|
||||
const {
|
||||
baseId,
|
||||
pageId,
|
||||
propertyId,
|
||||
workspaceId,
|
||||
fromType,
|
||||
@@ -59,7 +59,7 @@ export async function processBaseTypeConversion(
|
||||
// rewriting — everything else is already consistent with the new type
|
||||
// (empty value → empty value). Skips the full-table scan on bases
|
||||
// where the property was only ever set on a few rows.
|
||||
for await (const chunk of baseRowRepo.streamByPageId(baseId, {
|
||||
for await (const chunk of baseRowRepo.streamByPageId(pageId, {
|
||||
workspaceId,
|
||||
chunkSize: CHUNK_SIZE,
|
||||
trx,
|
||||
@@ -105,7 +105,7 @@ export async function processBaseTypeConversion(
|
||||
|
||||
if (updates.length > 0) {
|
||||
await baseRowRepo.batchUpdateCells(updates, {
|
||||
pageId: baseId,
|
||||
pageId,
|
||||
workspaceId,
|
||||
actorId,
|
||||
trx,
|
||||
@@ -116,7 +116,7 @@ export async function processBaseTypeConversion(
|
||||
}
|
||||
|
||||
logger.log(
|
||||
`type-conversion ${fromType}→${toType} base=${baseId} prop=${propertyId} total=${total} converted=${converted} cleared=${cleared}`,
|
||||
`type-conversion ${fromType}→${toType} base=${pageId} prop=${propertyId} total=${total} converted=${converted} cleared=${cleared}`,
|
||||
);
|
||||
|
||||
return { converted, cleared, total };
|
||||
|
||||
@@ -115,7 +115,7 @@ export interface IApprovalRejectedNotificationJob {
|
||||
}
|
||||
|
||||
export interface IBaseTypeConversionJob {
|
||||
baseId: string;
|
||||
pageId: string;
|
||||
propertyId: string;
|
||||
workspaceId: string;
|
||||
fromType: string;
|
||||
@@ -133,13 +133,13 @@ export interface IBaseTypeConversionJob {
|
||||
}
|
||||
|
||||
export interface IBaseCellGcJob {
|
||||
baseId: string;
|
||||
pageId: string;
|
||||
propertyId: string;
|
||||
workspaceId: string;
|
||||
}
|
||||
|
||||
export interface IBaseFormulaRecomputeJob {
|
||||
baseId: string;
|
||||
pageId: string;
|
||||
workspaceId: string;
|
||||
propertyIds: string[]; // formula properties to recompute
|
||||
reason:
|
||||
|
||||
Reference in New Issue
Block a user