regression: downgrade to tiptap v2 (#7982)
* chore: downgrade to tiptap v2 * fix: revert back to hocuspocus * fix: collaboration events added * fix: lock unlock issues * fix: build errors * fix: type errors * fix: graceful shutdown --------- Co-authored-by: Palanikannan M <akashmalinimurugu@gmail.com>
This commit is contained in:
parent
59022b6beb
commit
64781be7d2
48 changed files with 2123 additions and 824 deletions
|
|
@ -6,22 +6,17 @@ import {
|
|||
} from "@plane/editor";
|
||||
// logger
|
||||
import { logger } from "@plane/logger";
|
||||
// lib
|
||||
import { AppError } from "@/lib/errors";
|
||||
// services
|
||||
import { getPageService } from "@/services/page/handler";
|
||||
// type
|
||||
import type { FetchPayloadWithContext, StorePayloadWithContext } from "@/types";
|
||||
import { ForceCloseReason, CloseCode } from "@/types/admin-commands";
|
||||
import { broadcastError } from "@/utils/broadcast-error";
|
||||
// force close utility
|
||||
import { forceCloseDocumentAcrossServers } from "./force-close-handler";
|
||||
|
||||
const normalizeToError = (error: unknown, fallbackMessage: string) => {
|
||||
if (error instanceof Error) {
|
||||
return error;
|
||||
}
|
||||
|
||||
const message = typeof error === "string" && error.trim().length > 0 ? error : fallbackMessage;
|
||||
|
||||
return new Error(message);
|
||||
};
|
||||
|
||||
const fetchDocument = async ({ context, documentName: pageId }: FetchPayloadWithContext) => {
|
||||
const fetchDocument = async ({ context, documentName: pageId, instance }: FetchPayloadWithContext) => {
|
||||
try {
|
||||
const service = getPageService(context.documentType, context);
|
||||
// fetch details
|
||||
|
|
@ -38,12 +33,22 @@ const fetchDocument = async ({ context, documentName: pageId }: FetchPayloadWith
|
|||
// return binary data
|
||||
return binaryData;
|
||||
} catch (error) {
|
||||
logger.error("DATABASE_EXTENSION: Error in fetching document", error);
|
||||
throw normalizeToError(error, `Failed to fetch document: ${pageId}`);
|
||||
const appError = new AppError(error, { context: { pageId } });
|
||||
logger.error("Error in fetching document", appError);
|
||||
|
||||
// Broadcast error to frontend for user document types
|
||||
await broadcastError(instance, pageId, "Unable to load the page. Please try refreshing.", "fetch", context);
|
||||
|
||||
throw appError;
|
||||
}
|
||||
};
|
||||
|
||||
const storeDocument = async ({ context, state: pageBinaryData, documentName: pageId }: StorePayloadWithContext) => {
|
||||
const storeDocument = async ({
|
||||
context,
|
||||
state: pageBinaryData,
|
||||
documentName: pageId,
|
||||
instance,
|
||||
}: StorePayloadWithContext) => {
|
||||
try {
|
||||
const service = getPageService(context.documentType, context);
|
||||
// convert binary data to all formats
|
||||
|
|
@ -57,8 +62,46 @@ const storeDocument = async ({ context, state: pageBinaryData, documentName: pag
|
|||
};
|
||||
await service.updateDescriptionBinary(pageId, payload);
|
||||
} catch (error) {
|
||||
logger.error("DATABASE_EXTENSION: Error in updating document:", error);
|
||||
throw normalizeToError(error, `Failed to update document: ${pageId}`);
|
||||
const appError = new AppError(error, { context: { pageId } });
|
||||
logger.error("Error in updating document:", appError);
|
||||
|
||||
// Check error types
|
||||
const isContentTooLarge = appError.statusCode === 413;
|
||||
|
||||
// Determine if we should disconnect and unload
|
||||
const shouldDisconnect = isContentTooLarge;
|
||||
|
||||
// Determine error message and code
|
||||
let errorMessage: string;
|
||||
let errorCode: "content_too_large" | "page_locked" | "page_archived" | undefined;
|
||||
|
||||
if (isContentTooLarge) {
|
||||
errorMessage = "Document is too large to save. Please reduce the content size.";
|
||||
errorCode = "content_too_large";
|
||||
} else {
|
||||
errorMessage = "Unable to save the page. Please try again.";
|
||||
}
|
||||
|
||||
// Broadcast error to frontend for user document types
|
||||
await broadcastError(instance, pageId, errorMessage, "store", context, errorCode, shouldDisconnect);
|
||||
|
||||
// If we should disconnect, close connections and unload document
|
||||
if (shouldDisconnect) {
|
||||
// Map error code to ForceCloseReason with proper types
|
||||
const reason =
|
||||
errorCode === "content_too_large" ? ForceCloseReason.DOCUMENT_TOO_LARGE : ForceCloseReason.CRITICAL_ERROR;
|
||||
|
||||
const closeCode = errorCode === "content_too_large" ? CloseCode.DOCUMENT_TOO_LARGE : CloseCode.FORCE_CLOSE;
|
||||
|
||||
// force close connections and unload document
|
||||
await forceCloseDocumentAcrossServers(instance, pageId, reason, closeCode);
|
||||
|
||||
// Don't throw after force close - document is already unloaded
|
||||
// Throwing would cause hocuspocus's finally block to access the null document
|
||||
return;
|
||||
}
|
||||
|
||||
throw appError;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
|||
203
apps/live/src/extensions/force-close-handler.ts
Normal file
203
apps/live/src/extensions/force-close-handler.ts
Normal file
|
|
@ -0,0 +1,203 @@
|
|||
import type { Connection, Extension, Hocuspocus, onConfigurePayload } from "@hocuspocus/server";
|
||||
import { logger } from "@plane/logger";
|
||||
import { Redis } from "@/extensions/redis";
|
||||
import {
|
||||
AdminCommand,
|
||||
CloseCode,
|
||||
ForceCloseReason,
|
||||
getForceCloseMessage,
|
||||
isForceCloseCommand,
|
||||
type ClientForceCloseMessage,
|
||||
type ForceCloseCommandData,
|
||||
} from "@/types/admin-commands";
|
||||
|
||||
/**
|
||||
* Extension to handle force close commands from other servers via Redis admin channel
|
||||
*/
|
||||
export class ForceCloseHandler implements Extension {
|
||||
name = "ForceCloseHandler";
|
||||
priority = 999;
|
||||
|
||||
async onConfigure({ instance }: onConfigurePayload) {
|
||||
const redisExt = instance.configuration.extensions.find((ext) => ext instanceof Redis) as Redis | undefined;
|
||||
|
||||
if (!redisExt) {
|
||||
logger.warn("[FORCE_CLOSE_HANDLER] Redis extension not found");
|
||||
return;
|
||||
}
|
||||
|
||||
// Register handler for force_close admin command
|
||||
redisExt.onAdminCommand<ForceCloseCommandData>(AdminCommand.FORCE_CLOSE, async (data) => {
|
||||
// Type guard for safety
|
||||
if (!isForceCloseCommand(data)) {
|
||||
logger.error("[FORCE_CLOSE_HANDLER] Received invalid force close command");
|
||||
return;
|
||||
}
|
||||
|
||||
const { docId, reason, code } = data;
|
||||
|
||||
const document = instance.documents.get(docId);
|
||||
if (!document) {
|
||||
// Not our document, ignore
|
||||
return;
|
||||
}
|
||||
|
||||
const connectionCount = document.getConnectionsCount();
|
||||
logger.info(`[FORCE_CLOSE_HANDLER] Sending force close message to ${connectionCount} clients...`);
|
||||
|
||||
// Step 1: Send force close message to ALL clients first
|
||||
const forceCloseMessage: ClientForceCloseMessage = {
|
||||
type: "force_close",
|
||||
reason,
|
||||
code,
|
||||
message: getForceCloseMessage(reason),
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
let messageSent = 0;
|
||||
document.connections.forEach(({ connection }: { connection: Connection }) => {
|
||||
try {
|
||||
connection.sendStateless(JSON.stringify(forceCloseMessage));
|
||||
messageSent++;
|
||||
} catch (error) {
|
||||
logger.error("[FORCE_CLOSE_HANDLER] Failed to send message:", error);
|
||||
}
|
||||
});
|
||||
|
||||
logger.info(`[FORCE_CLOSE_HANDLER] Sent force close message to ${messageSent}/${connectionCount} clients`);
|
||||
|
||||
// Wait a moment for messages to be delivered
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// Step 2: Close connections
|
||||
logger.info(`[FORCE_CLOSE_HANDLER] Closing ${connectionCount} connections...`);
|
||||
|
||||
let closed = 0;
|
||||
document.connections.forEach(({ connection }: { connection: Connection }) => {
|
||||
try {
|
||||
connection.close({ code, reason });
|
||||
closed++;
|
||||
} catch (error) {
|
||||
logger.error("[FORCE_CLOSE_HANDLER] Failed to close connection:", error);
|
||||
}
|
||||
});
|
||||
|
||||
logger.info(`[FORCE_CLOSE_HANDLER] Closed ${closed}/${connectionCount} connections for ${docId}`);
|
||||
});
|
||||
|
||||
logger.info("[FORCE_CLOSE_HANDLER] Registered with Redis extension");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Force close all connections to a document across all servers and unload it from memory.
|
||||
* Used for critical errors or admin operations.
|
||||
*
|
||||
* @param instance - The Hocuspocus server instance
|
||||
* @param pageId - The document ID to force close
|
||||
* @param reason - The reason for force closing
|
||||
* @param code - Optional WebSocket close code (defaults to FORCE_CLOSE)
|
||||
* @returns Promise that resolves when document is closed and unloaded
|
||||
* @throws Error if document not found in memory
|
||||
*/
|
||||
export const forceCloseDocumentAcrossServers = async (
|
||||
instance: Hocuspocus,
|
||||
pageId: string,
|
||||
reason: ForceCloseReason,
|
||||
code: CloseCode = CloseCode.FORCE_CLOSE
|
||||
): Promise<void> => {
|
||||
// STEP 1: VERIFY DOCUMENT EXISTS
|
||||
const document = instance.documents.get(pageId);
|
||||
|
||||
if (!document) {
|
||||
logger.info(`[FORCE_CLOSE] Document ${pageId} already unloaded - no action needed`);
|
||||
return; // Document already cleaned up, nothing to do
|
||||
}
|
||||
|
||||
const connectionsBefore = document.getConnectionsCount();
|
||||
logger.info(`[FORCE_CLOSE] Sending force close message to ${connectionsBefore} local clients...`);
|
||||
|
||||
const forceCloseMessage: ClientForceCloseMessage = {
|
||||
type: "force_close",
|
||||
reason,
|
||||
code,
|
||||
message: getForceCloseMessage(reason),
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
let messageSentCount = 0;
|
||||
document.connections.forEach(({ connection }: { connection: Connection }) => {
|
||||
try {
|
||||
connection.sendStateless(JSON.stringify(forceCloseMessage));
|
||||
messageSentCount++;
|
||||
} catch (error) {
|
||||
logger.error("[FORCE_CLOSE] Failed to send message to client:", error);
|
||||
}
|
||||
});
|
||||
|
||||
logger.info(`[FORCE_CLOSE] Sent force close message to ${messageSentCount}/${connectionsBefore} clients`);
|
||||
|
||||
// Wait a moment for messages to be delivered
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// STEP 3: CLOSE LOCAL CONNECTIONS
|
||||
logger.info(`[FORCE_CLOSE] Closing ${connectionsBefore} local connections...`);
|
||||
|
||||
let closedCount = 0;
|
||||
document.connections.forEach(({ connection }: { connection: Connection }) => {
|
||||
try {
|
||||
connection.close({ code, reason });
|
||||
closedCount++;
|
||||
} catch (error) {
|
||||
logger.error("[FORCE_CLOSE] Failed to close local connection:", error);
|
||||
}
|
||||
});
|
||||
|
||||
logger.info(`[FORCE_CLOSE] Closed ${closedCount}/${connectionsBefore} local connections`);
|
||||
|
||||
// STEP 4: BROADCAST TO OTHER SERVERS
|
||||
const redisExt = instance.configuration.extensions.find((ext) => ext instanceof Redis) as Redis | undefined;
|
||||
|
||||
if (redisExt) {
|
||||
const commandData: ForceCloseCommandData = {
|
||||
command: AdminCommand.FORCE_CLOSE,
|
||||
docId: pageId,
|
||||
reason,
|
||||
code,
|
||||
originServer: instance.configuration.name || "unknown",
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
const receivers = await redisExt.publishAdminCommand(commandData);
|
||||
logger.info(`[FORCE_CLOSE] Notified ${receivers} other server(s)`);
|
||||
} else {
|
||||
logger.warn("[FORCE_CLOSE] Redis extension not found, cannot notify other servers");
|
||||
}
|
||||
|
||||
// STEP 5: WAIT FOR OTHER SERVERS
|
||||
const waitTime = 800;
|
||||
logger.info(`[FORCE_CLOSE] Waiting ${waitTime}ms for other servers to close connections...`);
|
||||
await new Promise((resolve) => setTimeout(resolve, waitTime));
|
||||
|
||||
// STEP 6: UNLOAD DOCUMENT after closing all the connections
|
||||
logger.info(`[FORCE_CLOSE] Unloading document from memory...`);
|
||||
|
||||
try {
|
||||
await instance.unloadDocument(document);
|
||||
logger.info(`[FORCE_CLOSE] Document unloaded successfully ✅`);
|
||||
} catch (unloadError: unknown) {
|
||||
logger.error("[FORCE_CLOSE] UNLOAD FAILED:", unloadError);
|
||||
logger.error(` Error: ${unloadError instanceof Error ? unloadError.message : "unknown"}`);
|
||||
}
|
||||
|
||||
// STEP 7: VERIFY UNLOAD
|
||||
const documentAfterUnload = instance.documents.get(pageId);
|
||||
|
||||
if (documentAfterUnload) {
|
||||
logger.error(
|
||||
`❌ [FORCE_CLOSE] Document still in memory!, Document ID: ${pageId}, Connections: ${documentAfterUnload.getConnectionsCount()}`
|
||||
);
|
||||
} else {
|
||||
logger.info(`✅ [FORCE_CLOSE] COMPLETE, Document: ${pageId}, Status: Successfully closed and unloaded`);
|
||||
}
|
||||
};
|
||||
|
|
@ -1,30 +1,134 @@
|
|||
import { Redis as HocuspocusRedis } from "@hocuspocus/extension-redis";
|
||||
import { OutgoingMessage } from "@hocuspocus/server";
|
||||
// redis
|
||||
import { OutgoingMessage, type onConfigurePayload } from "@hocuspocus/server";
|
||||
import { logger } from "@plane/logger";
|
||||
import { AppError } from "@/lib/errors";
|
||||
import { redisManager } from "@/redis";
|
||||
import { AdminCommand } from "@/types/admin-commands";
|
||||
import type { AdminCommandData, AdminCommandHandler } from "@/types/admin-commands";
|
||||
|
||||
const getRedisClient = () => {
|
||||
const redisClient = redisManager.getClient();
|
||||
if (!redisClient) {
|
||||
throw new Error("Redis client not initialized");
|
||||
throw new AppError("Redis client not initialized");
|
||||
}
|
||||
return redisClient;
|
||||
};
|
||||
|
||||
export class Redis extends HocuspocusRedis {
|
||||
private adminHandlers = new Map<AdminCommand, AdminCommandHandler>();
|
||||
private readonly ADMIN_CHANNEL = "hocuspocus:admin";
|
||||
|
||||
constructor() {
|
||||
super({ redis: getRedisClient() });
|
||||
}
|
||||
|
||||
public broadcastToDocument(documentName: string, payload: any): Promise<number> {
|
||||
async onConfigure(payload: onConfigurePayload) {
|
||||
await super.onConfigure(payload);
|
||||
|
||||
// Subscribe to admin channel
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
this.sub.subscribe(this.ADMIN_CHANNEL, (error: Error) => {
|
||||
if (error) {
|
||||
logger.error(`[Redis] Failed to subscribe to admin channel:`, error);
|
||||
reject(error);
|
||||
} else {
|
||||
logger.info(`[Redis] Subscribed to admin channel: ${this.ADMIN_CHANNEL}`);
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Listen for admin messages
|
||||
this.sub.on("message", this.handleAdminMessage);
|
||||
logger.info(`[Redis] Attached admin message listener`);
|
||||
}
|
||||
|
||||
private handleAdminMessage = async (channel: string, message: string) => {
|
||||
if (channel !== this.ADMIN_CHANNEL) return;
|
||||
|
||||
try {
|
||||
const data = JSON.parse(message) as AdminCommandData;
|
||||
|
||||
// Validate command
|
||||
if (!data.command || !Object.values(AdminCommand).includes(data.command as AdminCommand)) {
|
||||
logger.warn(`[Redis] Invalid admin command received: ${data.command}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const handler = this.adminHandlers.get(data.command);
|
||||
|
||||
if (handler) {
|
||||
await handler(data);
|
||||
} else {
|
||||
logger.warn(`[Redis] No handler registered for admin command: ${data.command}`);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error("[Redis] Error handling admin message:", error);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Register handler for an admin command
|
||||
*/
|
||||
public onAdminCommand<T extends AdminCommandData = AdminCommandData>(
|
||||
command: AdminCommand,
|
||||
handler: AdminCommandHandler<T>
|
||||
) {
|
||||
this.adminHandlers.set(command, handler as AdminCommandHandler);
|
||||
logger.info(`[Redis] Registered admin command: ${command}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish admin command to global channel
|
||||
*/
|
||||
public async publishAdminCommand<T extends AdminCommandData>(data: T): Promise<number> {
|
||||
// Validate command data
|
||||
if (!data.command || !Object.values(AdminCommand).includes(data.command)) {
|
||||
throw new AppError(`Invalid admin command: ${data.command}`);
|
||||
}
|
||||
|
||||
const message = JSON.stringify(data);
|
||||
const receivers = await this.pub.publish(this.ADMIN_CHANNEL, message);
|
||||
|
||||
logger.info(`[Redis] Published "${data.command}" command, received by ${receivers} server(s)`);
|
||||
return receivers;
|
||||
}
|
||||
|
||||
async onDestroy() {
|
||||
// Unsubscribe from admin channel
|
||||
await new Promise<void>((resolve) => {
|
||||
this.sub.unsubscribe(this.ADMIN_CHANNEL, (error: Error) => {
|
||||
if (error) {
|
||||
logger.error(`[Redis] Error unsubscribing from admin channel:`, error);
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
|
||||
// Remove the message listener to prevent memory leaks
|
||||
this.sub.removeListener("message", this.handleAdminMessage);
|
||||
logger.info(`[Redis] Removed admin message listener`);
|
||||
|
||||
await super.onDestroy();
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast a message to a document across all servers via Redis.
|
||||
* Uses empty identifier so ALL servers process the message.
|
||||
*/
|
||||
public async broadcastToDocument(documentName: string, payload: unknown): Promise<number> {
|
||||
const stringPayload = typeof payload === "string" ? payload : JSON.stringify(payload);
|
||||
|
||||
const message = new OutgoingMessage(documentName).writeBroadcastStateless(stringPayload);
|
||||
|
||||
return this.pub.publish(
|
||||
// we're accessing the private method of the hocuspocus redis extension
|
||||
this["pubKey"](documentName),
|
||||
// we're accessing the private method of the hocuspocus redis extension to encode the message
|
||||
this["encodeMessage"](message.toUint8Array())
|
||||
);
|
||||
const emptyPrefix = Buffer.concat([Buffer.from([0])]);
|
||||
const channel = this["pubKey"](documentName);
|
||||
const encodedMessage = Buffer.concat([emptyPrefix, Buffer.from(message.toUint8Array())]);
|
||||
|
||||
const result = await this.pub.publishBuffer(channel, encodedMessage);
|
||||
|
||||
logger.info(`REDIS_EXTENSION: Published to ${documentName}, ${result} subscribers`);
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue