chore: realtime updates fix
This commit is contained in:
parent
be722f708d
commit
b53016b449
32 changed files with 3929 additions and 1969 deletions
|
|
@ -27,6 +27,17 @@ const fetchDocument = async ({ context, documentName: pageId, instance }: FetchP
|
|||
const pageDetails = await service.fetchDetails(pageId);
|
||||
const convertedBinaryData = getBinaryDataFromDocumentEditorHTMLString(pageDetails.description_html ?? "<p></p>");
|
||||
if (convertedBinaryData) {
|
||||
// save the converted binary data back to the database
|
||||
const { contentBinaryEncoded, contentHTML, contentJSON } = getAllDocumentFormatsFromDocumentEditorBinaryData(
|
||||
convertedBinaryData,
|
||||
true
|
||||
);
|
||||
const payload = {
|
||||
description_binary: contentBinaryEncoded,
|
||||
description_html: contentHTML,
|
||||
description: contentJSON,
|
||||
};
|
||||
await service.updateDescriptionBinary(pageId, payload);
|
||||
return convertedBinaryData;
|
||||
}
|
||||
}
|
||||
|
|
@ -52,8 +63,10 @@ const storeDocument = async ({
|
|||
try {
|
||||
const service = getPageService(context.documentType, context);
|
||||
// convert binary data to all formats
|
||||
const { contentBinaryEncoded, contentHTML, contentJSON } =
|
||||
getAllDocumentFormatsFromDocumentEditorBinaryData(pageBinaryData);
|
||||
const { contentBinaryEncoded, contentHTML, contentJSON } = getAllDocumentFormatsFromDocumentEditorBinaryData(
|
||||
pageBinaryData,
|
||||
true
|
||||
);
|
||||
// create payload
|
||||
const payload = {
|
||||
description_binary: contentBinaryEncoded,
|
||||
|
|
|
|||
175
apps/live/src/extensions/title-sync.ts
Normal file
175
apps/live/src/extensions/title-sync.ts
Normal file
|
|
@ -0,0 +1,175 @@
|
|||
// hocuspocus
|
||||
import type { Extension, Hocuspocus, Document } from "@hocuspocus/server";
|
||||
import { TiptapTransformer } from "@hocuspocus/transformer";
|
||||
import type * as Y from "yjs";
|
||||
// editor extensions
|
||||
import { TITLE_EDITOR_EXTENSIONS, createRealtimeEvent } from "@plane/editor";
|
||||
import { logger } from "@plane/logger";
|
||||
import { AppError } from "@/lib/errors";
|
||||
// helpers
|
||||
import { getPageService } from "@/services/page/handler";
|
||||
import type { HocusPocusServerContext, OnLoadDocumentPayloadWithContext } from "@/types";
|
||||
import { generateTitleProsemirrorJson } from "@/utils";
|
||||
import { broadcastMessageToPage } from "@/utils/broadcast-message";
|
||||
import { TitleUpdateManager } from "./title-update/title-update-manager";
|
||||
import { extractTextFromHTML } from "./title-update/title-utils";
|
||||
|
||||
/**
|
||||
* Hocuspocus extension for synchronizing document titles
|
||||
*/
|
||||
export class TitleSyncExtension implements Extension {
|
||||
// Maps document names to their observers and update managers
|
||||
private titleObservers: Map<string, (events: Y.YEvent<any>[]) => void> = new Map();
|
||||
private titleUpdateManagers: Map<string, TitleUpdateManager> = new Map();
|
||||
// Store minimal data needed for each document's title observer (prevents closure memory leaks)
|
||||
private titleObserverData: Map<
|
||||
string,
|
||||
{
|
||||
parentId?: string | null;
|
||||
userId: string;
|
||||
workspaceSlug: string | null;
|
||||
instance: Hocuspocus;
|
||||
}
|
||||
> = new Map();
|
||||
|
||||
/**
|
||||
* Handle document loading - migrate old titles if needed
|
||||
*/
|
||||
async onLoadDocument({ context, document, documentName }: OnLoadDocumentPayloadWithContext) {
|
||||
try {
|
||||
// initially for on demand migration of old titles to a new title field
|
||||
// in the yjs binary
|
||||
if (document.isEmpty("title")) {
|
||||
const service = getPageService(context.documentType, context);
|
||||
// const title = await service.fe
|
||||
const title = (await service.fetchDetails?.(documentName)).name;
|
||||
if (title == null) return;
|
||||
const titleField = TiptapTransformer.toYdoc(
|
||||
generateTitleProsemirrorJson(title),
|
||||
"title",
|
||||
// editor
|
||||
TITLE_EDITOR_EXTENSIONS as any
|
||||
);
|
||||
document.merge(titleField);
|
||||
}
|
||||
} catch (error) {
|
||||
const appError = new AppError(error, {
|
||||
context: { operation: "onLoadDocument", documentName },
|
||||
});
|
||||
logger.error("Error loading document title", appError);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Set up title synchronization for a document after it's loaded
|
||||
*/
|
||||
async afterLoadDocument({
|
||||
document,
|
||||
documentName,
|
||||
context,
|
||||
instance,
|
||||
}: {
|
||||
document: Document;
|
||||
documentName: string;
|
||||
context: HocusPocusServerContext;
|
||||
instance: Hocuspocus;
|
||||
}) {
|
||||
// Create a title update manager for this document
|
||||
const updateManager = new TitleUpdateManager(documentName, context);
|
||||
|
||||
// Store the manager
|
||||
this.titleUpdateManagers.set(documentName, updateManager);
|
||||
|
||||
// Store minimal data needed for the observer (prevents closure memory leak)
|
||||
this.titleObserverData.set(documentName, {
|
||||
userId: context.userId,
|
||||
workspaceSlug: context.workspaceSlug,
|
||||
instance: instance,
|
||||
});
|
||||
|
||||
// Create observer using bound method to avoid closure capturing heavy objects
|
||||
const titleObserver = this.handleTitleChange.bind(this, documentName);
|
||||
|
||||
// Observe the title field
|
||||
document.getXmlFragment("title").observeDeep(titleObserver);
|
||||
this.titleObservers.set(documentName, titleObserver);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle title changes for a document
|
||||
* This is a separate method to avoid closure memory leaks
|
||||
*/
|
||||
private handleTitleChange(documentName: string, events: Y.YEvent<any>[]) {
|
||||
let title = "";
|
||||
events.forEach((event) => {
|
||||
title = extractTextFromHTML(event.currentTarget.toJSON() as string);
|
||||
});
|
||||
|
||||
// Get the manager for this document
|
||||
const manager = this.titleUpdateManagers.get(documentName);
|
||||
|
||||
// Get the stored data for this document
|
||||
const data = this.titleObserverData.get(documentName);
|
||||
|
||||
// Broadcast to parent page if it exists
|
||||
if (data?.parentId && data.workspaceSlug && data.instance) {
|
||||
const event = createRealtimeEvent({
|
||||
user_id: data.userId,
|
||||
workspace_slug: data.workspaceSlug,
|
||||
action: "property_updated",
|
||||
page_id: documentName,
|
||||
data: { name: title },
|
||||
descendants_ids: [],
|
||||
});
|
||||
|
||||
// Use the instance from stored data (guaranteed to be set)
|
||||
broadcastMessageToPage(data.instance, data.parentId, event);
|
||||
}
|
||||
|
||||
// Schedule the title update
|
||||
if (manager) {
|
||||
manager.scheduleUpdate(title);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Force save title before unloading the document
|
||||
*/
|
||||
async beforeUnloadDocument({ documentName }: { documentName: string }) {
|
||||
const updateManager = this.titleUpdateManagers.get(documentName);
|
||||
if (updateManager) {
|
||||
// Force immediate save and wait for it to complete
|
||||
await updateManager.forceSave();
|
||||
// Clean up the manager
|
||||
this.titleUpdateManagers.delete(documentName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove observers after document unload
|
||||
*/
|
||||
async afterUnloadDocument({ documentName, document }: { documentName: string; document?: Document }) {
|
||||
// Clean up observer when document is unloaded
|
||||
const observer = this.titleObservers.get(documentName);
|
||||
if (observer) {
|
||||
// unregister observer from Y.js document to prevent memory leak
|
||||
if (document) {
|
||||
try {
|
||||
document.getXmlFragment("title").unobserveDeep(observer);
|
||||
} catch (error) {
|
||||
logger.error("Failed to unobserve title field", new AppError(error, { context: { documentName } }));
|
||||
}
|
||||
}
|
||||
this.titleObservers.delete(documentName);
|
||||
}
|
||||
|
||||
// Clean up the observer data map to prevent memory leak
|
||||
this.titleObserverData.delete(documentName);
|
||||
|
||||
// Ensure manager is cleaned up if beforeUnloadDocument somehow didn't run
|
||||
if (this.titleUpdateManagers.has(documentName)) {
|
||||
const manager = this.titleUpdateManagers.get(documentName)!;
|
||||
manager.cancel();
|
||||
this.titleUpdateManagers.delete(documentName);
|
||||
}
|
||||
}
|
||||
}
|
||||
277
apps/live/src/extensions/title-update/debounce.ts
Normal file
277
apps/live/src/extensions/title-update/debounce.ts
Normal file
|
|
@ -0,0 +1,277 @@
|
|||
import { logger } from "@plane/logger";
|
||||
|
||||
/**
|
||||
* DebounceState - Tracks the state of a debounced function
|
||||
*/
|
||||
export interface DebounceState {
|
||||
lastArgs: any[] | null;
|
||||
timerId: ReturnType<typeof setTimeout> | null;
|
||||
lastCallTime: number | undefined;
|
||||
lastExecutionTime: number;
|
||||
inProgress: boolean;
|
||||
abortController: AbortController | null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new DebounceState object
|
||||
*/
|
||||
export const createDebounceState = (): DebounceState => ({
|
||||
lastArgs: null,
|
||||
timerId: null,
|
||||
lastCallTime: undefined,
|
||||
lastExecutionTime: 0,
|
||||
inProgress: false,
|
||||
abortController: null,
|
||||
});
|
||||
|
||||
/**
|
||||
* DebounceOptions - Configuration options for debounce
|
||||
*/
|
||||
export interface DebounceOptions {
|
||||
/** The wait time in milliseconds */
|
||||
wait: number;
|
||||
|
||||
/** Optional logging prefix for debug messages */
|
||||
logPrefix?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enhanced debounce manager with abort support
|
||||
* Manages the state and timing of debounced function calls
|
||||
*/
|
||||
export class DebounceManager {
|
||||
private state: DebounceState;
|
||||
private wait: number;
|
||||
private logPrefix: string;
|
||||
|
||||
/**
|
||||
* Creates a new DebounceManager
|
||||
* @param options Debounce configuration options
|
||||
*/
|
||||
constructor(options: DebounceOptions) {
|
||||
this.state = createDebounceState();
|
||||
this.wait = options.wait;
|
||||
this.logPrefix = options.logPrefix || "";
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule a debounced function call
|
||||
* @param func The function to call
|
||||
* @param args The arguments to pass to the function
|
||||
*/
|
||||
schedule(func: (...args: any[]) => Promise<void>, ...args: any[]): void {
|
||||
// Always update the last arguments
|
||||
this.state.lastArgs = args;
|
||||
|
||||
const time = Date.now();
|
||||
this.state.lastCallTime = time;
|
||||
|
||||
// If an operation is in progress, just store the new args and start the timer
|
||||
if (this.state.inProgress) {
|
||||
// Always restart the timer for the new call, even if an operation is in progress
|
||||
if (this.state.timerId) {
|
||||
clearTimeout(this.state.timerId);
|
||||
}
|
||||
|
||||
this.state.timerId = setTimeout(() => {
|
||||
this.timerExpired(func);
|
||||
}, this.wait);
|
||||
return;
|
||||
}
|
||||
|
||||
// If already scheduled, update the args and restart the timer
|
||||
if (this.state.timerId) {
|
||||
clearTimeout(this.state.timerId);
|
||||
this.state.timerId = setTimeout(() => {
|
||||
this.timerExpired(func);
|
||||
}, this.wait);
|
||||
return;
|
||||
}
|
||||
|
||||
// Start the timer for the trailing edge execution
|
||||
this.state.timerId = setTimeout(() => {
|
||||
this.timerExpired(func);
|
||||
}, this.wait);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the timer expires
|
||||
*/
|
||||
private timerExpired(func: (...args: any[]) => Promise<void>): void {
|
||||
const time = Date.now();
|
||||
|
||||
// Check if this timer expiration represents the end of the debounce period
|
||||
if (this.shouldInvoke(time)) {
|
||||
// Execute the function
|
||||
this.executeFunction(func, time);
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise restart the timer
|
||||
this.state.timerId = setTimeout(() => {
|
||||
this.timerExpired(func);
|
||||
}, this.remainingWait(time));
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the debounced function
|
||||
*/
|
||||
private executeFunction(func: (...args: any[]) => Promise<void>, time: number): void {
|
||||
this.state.timerId = null;
|
||||
this.state.lastExecutionTime = time;
|
||||
|
||||
// Execute the function asynchronously
|
||||
this.performFunction(func).catch((error) => {
|
||||
logger.error(`${this.logPrefix}: Error in execution:`, error);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the actual function call, handling any in-progress operations
|
||||
*/
|
||||
private async performFunction(func: (...args: any[]) => Promise<void>): Promise<void> {
|
||||
const args = this.state.lastArgs;
|
||||
if (!args) return;
|
||||
|
||||
// Store the args we're about to use
|
||||
const currentArgs = [...args];
|
||||
|
||||
// If another operation is in progress, abort it
|
||||
await this.abortOngoingOperation();
|
||||
|
||||
// Mark that we're starting a new operation
|
||||
this.state.inProgress = true;
|
||||
this.state.abortController = new AbortController();
|
||||
|
||||
try {
|
||||
// Add the abort signal to the arguments if the function can use it
|
||||
const execArgs = [...currentArgs];
|
||||
execArgs.push(this.state.abortController.signal);
|
||||
|
||||
await func(...execArgs);
|
||||
|
||||
// Only clear lastArgs if they haven't been changed during this operation
|
||||
if (this.state.lastArgs && this.arraysEqual(this.state.lastArgs, currentArgs)) {
|
||||
this.state.lastArgs = null;
|
||||
|
||||
// Clear any timer as we've successfully processed the latest args
|
||||
if (this.state.timerId) {
|
||||
clearTimeout(this.state.timerId);
|
||||
this.state.timerId = null;
|
||||
}
|
||||
} else if (this.state.lastArgs) {
|
||||
// If lastArgs have changed during this operation, the timer should already be running
|
||||
// but let's make sure it is
|
||||
if (!this.state.timerId) {
|
||||
this.state.timerId = setTimeout(() => {
|
||||
this.timerExpired(func);
|
||||
}, this.wait);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.name === "AbortError") {
|
||||
// Nothing to do here, the new operation will be triggered by the timer expiration
|
||||
} else {
|
||||
logger.error(`${this.logPrefix}: Error during operation:`, error);
|
||||
|
||||
// On error (not abort), make sure we have a timer running to retry
|
||||
if (!this.state.timerId && this.state.lastArgs) {
|
||||
this.state.timerId = setTimeout(() => {
|
||||
this.timerExpired(func);
|
||||
}, this.wait);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.state.inProgress = false;
|
||||
this.state.abortController = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort any ongoing operation
|
||||
*/
|
||||
private async abortOngoingOperation(): Promise<void> {
|
||||
if (this.state.inProgress && this.state.abortController) {
|
||||
this.state.abortController.abort();
|
||||
|
||||
// Small delay to ensure the abort has had time to propagate
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
|
||||
// Double-check that state has been reset, force it if not
|
||||
if (this.state.inProgress || this.state.abortController) {
|
||||
this.state.inProgress = false;
|
||||
this.state.abortController = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if we should invoke the function now
|
||||
*/
|
||||
private shouldInvoke(time: number): boolean {
|
||||
// Either this is the first call, or we've waited long enough since the last call
|
||||
return this.state.lastCallTime === undefined || time - this.state.lastCallTime >= this.wait;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate how much longer we should wait
|
||||
*/
|
||||
private remainingWait(time: number): number {
|
||||
const timeSinceLastCall = time - (this.state.lastCallTime || 0);
|
||||
return Math.max(0, this.wait - timeSinceLastCall);
|
||||
}
|
||||
|
||||
/**
|
||||
* Force immediate execution
|
||||
*/
|
||||
async flush(func: (...args: any[]) => Promise<void>): Promise<void> {
|
||||
// Clear any pending timeout
|
||||
if (this.state.timerId) {
|
||||
clearTimeout(this.state.timerId);
|
||||
this.state.timerId = null;
|
||||
}
|
||||
|
||||
// Reset timing state
|
||||
this.state.lastCallTime = undefined;
|
||||
|
||||
// Perform the function immediately
|
||||
if (this.state.lastArgs) {
|
||||
await this.performFunction(func);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel any pending operations without executing
|
||||
*/
|
||||
cancel(): void {
|
||||
// Clear any pending timeout
|
||||
if (this.state.timerId) {
|
||||
clearTimeout(this.state.timerId);
|
||||
this.state.timerId = null;
|
||||
}
|
||||
|
||||
// Reset timing state
|
||||
this.state.lastCallTime = undefined;
|
||||
|
||||
// Abort any in-progress operation
|
||||
if (this.state.inProgress && this.state.abortController) {
|
||||
this.state.abortController.abort();
|
||||
this.state.inProgress = false;
|
||||
this.state.abortController = null;
|
||||
}
|
||||
|
||||
// Clear args
|
||||
this.state.lastArgs = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare two arrays for equality
|
||||
*/
|
||||
private arraysEqual(a: any[], b: any[]): boolean {
|
||||
if (a.length !== b.length) return false;
|
||||
for (let i = 0; i < a.length; i++) {
|
||||
if (a[i] !== b[i]) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,90 @@
|
|||
import { logger } from "@plane/logger";
|
||||
import { AppError } from "@/lib/errors";
|
||||
import { getPageService } from "@/services/page/handler";
|
||||
import type { HocusPocusServerContext } from "@/types";
|
||||
import { DebounceManager } from "./debounce";
|
||||
|
||||
/**
|
||||
* Manages title update operations for a single document
|
||||
* Handles debouncing, aborting, and force saving title updates
|
||||
*/
|
||||
export class TitleUpdateManager {
|
||||
private documentName: string;
|
||||
private context: HocusPocusServerContext;
|
||||
private debounceManager: DebounceManager;
|
||||
private lastTitle: string | null = null;
|
||||
|
||||
/**
|
||||
* Create a new TitleUpdateManager instance
|
||||
*/
|
||||
constructor(documentName: string, context: HocusPocusServerContext, wait: number = 5000) {
|
||||
this.documentName = documentName;
|
||||
this.context = context;
|
||||
|
||||
// Set up debounce manager with logging
|
||||
this.debounceManager = new DebounceManager({
|
||||
wait,
|
||||
logPrefix: `TitleManager[${documentName.substring(0, 8)}]`,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule a debounced title update
|
||||
*/
|
||||
scheduleUpdate(title: string): void {
|
||||
// Store the latest title
|
||||
this.lastTitle = title;
|
||||
|
||||
// Schedule the update with the debounce manager
|
||||
this.debounceManager.schedule(this.updateTitle.bind(this), title);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the title - will be called by the debounce manager
|
||||
*/
|
||||
private async updateTitle(title: string, signal?: AbortSignal): Promise<void> {
|
||||
const service = getPageService(this.context.documentType, this.context);
|
||||
if (!service.updatePageProperties) {
|
||||
logger.warn(`No updateTitle method found for document ${this.documentName}`);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await service.updatePageProperties(this.documentName, {
|
||||
data: { name: title },
|
||||
abortSignal: signal,
|
||||
});
|
||||
|
||||
// Clear last title only if it matches what we just updated
|
||||
if (this.lastTitle === title) {
|
||||
this.lastTitle = null;
|
||||
}
|
||||
} catch (error) {
|
||||
const appError = new AppError(error, {
|
||||
context: { operation: "updateTitle", documentName: this.documentName },
|
||||
});
|
||||
logger.error("Error updating title", appError);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Force save the current title immediately
|
||||
*/
|
||||
async forceSave(): Promise<void> {
|
||||
// Ensure we have the current title
|
||||
if (!this.lastTitle) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Use the debounce manager to flush the operation
|
||||
await this.debounceManager.flush(this.updateTitle.bind(this));
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel any pending updates
|
||||
*/
|
||||
cancel(): void {
|
||||
this.debounceManager.cancel();
|
||||
this.lastTitle = null;
|
||||
}
|
||||
}
|
||||
11
apps/live/src/extensions/title-update/title-utils.ts
Normal file
11
apps/live/src/extensions/title-update/title-utils.ts
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
import { sanitizeHTML } from "@plane/utils";
|
||||
|
||||
/**
|
||||
* Utility function to extract text from HTML content
|
||||
*/
|
||||
export const extractTextFromHTML = (html: string): string => {
|
||||
// Use sanitizeHTML to safely extract text and remove all HTML tags
|
||||
// This is more secure than regex as it handles edge cases and prevents injection
|
||||
// Note: sanitizeHTML trims whitespace, which is acceptable for title extraction
|
||||
return sanitizeHTML(html) || "";
|
||||
};
|
||||
Loading…
Add table
Add a link
Reference in a new issue