fix: async loading of the redis extension (#5537)

* fix: async loading of the redis extension

* fix: initialize redis connection and hocuspocusserver only during server start

* fix: removed console logs

* fix: remove async

* fix: error handling and shutting down gracefully in unhandled errors

* feat: added compression library

* fix: added helmet for security headers
This commit is contained in:
M. Palanikannan 2024-09-07 14:24:20 +05:30 committed by GitHub
parent 8154a190d2
commit 70ea1459cd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 199 additions and 85 deletions

View file

@ -24,10 +24,12 @@
"@tiptap/core": "^2.4.0",
"@tiptap/html": "^2.3.0",
"axios": "^1.7.2",
"compression": "^1.7.4",
"cors": "^2.8.5",
"dotenv": "^16.4.5",
"express": "^4.19.2",
"express-ws": "^5.0.2",
"helmet": "^7.1.0",
"ioredis": "^5.4.1",
"lodash": "^4.17.21",
"morgan": "^1.10.0",
@ -42,6 +44,7 @@
"@babel/core": "^7.25.2",
"@babel/preset-env": "^7.25.4",
"@babel/preset-typescript": "^7.24.7",
"@types/compression": "^1.7.5",
"@types/cors": "^2.8.17",
"@types/dotenv": "^8.2.0",
"@types/express": "^4.17.21",

View file

@ -24,7 +24,7 @@ import { TDocumentTypes } from "@/core/types/common.js";
import { fetchDocument } from "@/plane-live/lib/fetch-document.js";
import { updateDocument } from "@/plane-live/lib/update-document.js";
export const getExtensions: () => Extension[] = () => {
export const getExtensions: () => Promise<Extension[]> = async () => {
const extensions: Extension[] = [
new Logger({
onChange: false,
@ -65,7 +65,7 @@ export const getExtensions: () => Extension[] = () => {
}
resolve(fetchedData);
} catch (error) {
console.error("Error in fetching document", error);
manualLogger.error("Error in fetching document", error);
}
});
},
@ -97,7 +97,7 @@ export const getExtensions: () => Extension[] = () => {
});
}
} catch (error) {
console.error("Error in updating document", error);
manualLogger.error("Error in updating document:", error);
}
});
},
@ -106,34 +106,42 @@ export const getExtensions: () => Extension[] = () => {
const redisUrl = getRedisUrl();
// Add the Redis extension only if configured
if (redisUrl) {
try {
const redisClient = new Redis(redisUrl);
redisClient.on("error", (error: any) => {
// if auth fails or the server is down, disconnect redis
if (
error?.code === "ENOTFOUND" ||
error.message.includes("WRONGPASS") ||
error.message.includes("NOAUTH")
) {
redisClient.disconnect();
}
manualLogger.error(
`Redis Client wasn't able to connect, continuing without Redis (you won't be able to sync data betwen multiple plane live servers)`,
);
manualLogger.error(error);
await new Promise<void>((resolve, reject) => {
redisClient.on("error", (error: any) => {
if (
error?.code === "ENOTFOUND" ||
error.message.includes("WRONGPASS") ||
error.message.includes("NOAUTH")
) {
redisClient.disconnect();
}
manualLogger.warn(
`Redis Client wasn't able to connect, continuing without Redis (you won't be able to sync data between multiple plane live servers)`,
error,
);
reject(error);
});
redisClient.on("ready", () => {
extensions.push(new HocusPocusRedis({ redis: redisClient }));
manualLogger.info("Redis Client connected ✅");
resolve();
});
});
redisClient.on("ready", () => {
manualLogger.info("Redis Client connected");
});
if (!redisClient) {
throw new Error("Redis client is not defined");
}
extensions.push(new HocusPocusRedis({ redis: redisClient }));
} catch (error) {
manualLogger.error("Failed to connect to Redis:", error);
manualLogger.warn(
`Redis Client wasn't able to connect, continuing without Redis (you won't be able to sync data between multiple plane live servers)`,
error,
);
}
} else {
manualLogger.warn(
"Redis URL is not set, continuing without Redis (you won't be able to sync data between multiple plane live servers)",
);
}
return extensions;

View file

@ -7,9 +7,32 @@ const transport = {
},
};
const hooks = {
logMethod(inputArgs: any, method: any): any {
if (inputArgs.length >= 2) {
const arg1 = inputArgs.shift();
const arg2 = inputArgs.shift();
return method.apply(this, [arg2, arg1, ...inputArgs]);
}
return method.apply(this, inputArgs);
},
};
export const logger = pinoHttp({
level: "info",
transport: transport,
hooks: hooks,
serializers: {
req(req) {
return `${req.method} ${req.url}`;
},
res(res) {
return `${res.statusCode} ${res?.statusMessage || ""}`;
},
responseTime(time) {
return `${time}ms`;
},
},
});
export const manualLogger = logger.logger;

View file

@ -3,33 +3,36 @@ import { Server } from "@hocuspocus/server";
import { handleAuthentication } from "@/core/lib/authentication.js";
import { getExtensions } from "@/core/extensions/index.js";
export const HocusPocusServer = Server.configure({
onAuthenticate: async ({
requestHeaders,
requestParameters,
connection,
// user id used as token for authentication
token,
}) => {
// request headers
const cookie = requestHeaders.cookie?.toString();
// params
const params = requestParameters;
export const getHocusPocusServer = async () => {
const extensions = await getExtensions();
return Server.configure({
onAuthenticate: async ({
requestHeaders,
requestParameters,
connection,
// user id used as token for authentication
token,
}) => {
// request headers
const cookie = requestHeaders.cookie?.toString();
// params
const params = requestParameters;
if (!cookie) {
throw Error("Credentials not provided");
}
if (!cookie) {
throw Error("Credentials not provided");
}
try {
await handleAuthentication({
connection,
cookie,
params,
token,
});
} catch (error) {
throw Error("Authentication unsuccessful!");
}
},
extensions: getExtensions(),
});
try {
await handleAuthentication({
connection,
cookie,
params,
token,
});
} catch (error) {
throw Error("Authentication unsuccessful!");
}
},
extensions,
});
};

View file

@ -5,6 +5,8 @@ import { UserService } from "@/core/services/user.service.js";
import { TDocumentTypes } from "@/core/types/common.js";
// plane live lib
import { authenticateUser } from "@/plane-live/lib/authentication.js";
// core helpers
import { manualLogger } from "@/core/helpers/logger.js";
const userService = new UserService();
@ -26,7 +28,7 @@ export const handleAuthentication = async (props: Props) => {
try {
response = await userService.currentUser(cookie);
} catch (error) {
console.error("Failed to fetch current user:", error);
manualLogger.error("Failed to fetch current user:", error);
throw error;
}
if (response.id !== token) {
@ -43,22 +45,27 @@ export const handleAuthentication = async (props: Props) => {
);
}
// fetch current user's project membership info
const projectMembershipInfo = await userService.getUserProjectMembership(
workspaceSlug,
projectId,
cookie
);
const projectRole = projectMembershipInfo.role;
// make the connection read only for roles lower than a member
if (projectRole < 15) {
connection.readOnly = true;
try {
const projectMembershipInfo = await userService.getUserProjectMembership(
workspaceSlug,
projectId,
cookie
);
const projectRole = projectMembershipInfo.role;
// make the connection read only for roles lower than a member
if (projectRole < 15) {
connection.readOnly = true;
}
} catch (error) {
manualLogger.error("Failed to fetch project membership info:", error);
throw error;
}
} else {
await authenticateUser({
connection,
cookie,
documentType,
params
params,
});
}

View file

@ -1,18 +1,22 @@
// helpers
import { getAllDocumentFormatsFromBinaryData, getBinaryDataFromHTMLString } from "@/core/helpers/page.js";
import {
getAllDocumentFormatsFromBinaryData,
getBinaryDataFromHTMLString,
} from "@/core/helpers/page.js";
// services
import { PageService } from "@/core/services/page.service.js";
import { manualLogger } from "../helpers/logger.js";
const pageService = new PageService();
export const updatePageDescription = async (
params: URLSearchParams,
pageId: string,
updatedDescription: Uint8Array,
cookie: string | undefined
cookie: string | undefined,
) => {
if (!(updatedDescription instanceof Uint8Array)) {
throw new Error(
"Invalid updatedDescription: must be an instance of Uint8Array"
"Invalid updatedDescription: must be an instance of Uint8Array",
);
}
@ -20,11 +24,8 @@ export const updatePageDescription = async (
const projectId = params.get("projectId")?.toString();
if (!workspaceSlug || !projectId || !cookie) return;
const {
contentBinaryEncoded,
contentHTML,
contentJSON
} = getAllDocumentFormatsFromBinaryData(updatedDescription);
const { contentBinaryEncoded, contentHTML, contentJSON } =
getAllDocumentFormatsFromBinaryData(updatedDescription);
try {
const payload = {
description_binary: contentBinaryEncoded,
@ -37,10 +38,10 @@ export const updatePageDescription = async (
projectId,
pageId,
payload,
cookie
cookie,
);
} catch (error) {
console.error("Update error:", error);
manualLogger.error("Update error:", error);
throw error;
}
};
@ -49,7 +50,7 @@ const fetchDescriptionHTMLAndTransform = async (
workspaceSlug: string,
projectId: string,
pageId: string,
cookie: string
cookie: string,
) => {
if (!workspaceSlug || !projectId || !cookie) return;
@ -58,12 +59,17 @@ const fetchDescriptionHTMLAndTransform = async (
workspaceSlug,
projectId,
pageId,
cookie
cookie,
);
const { contentBinary } = getBinaryDataFromHTMLString(
pageDetails.description_html ?? "<p></p>",
);
const { contentBinary } = getBinaryDataFromHTMLString(pageDetails.description_html ?? "<p></p>")
return contentBinary;
} catch (error) {
console.error("Error while transforming from HTML to Uint8Array", error);
manualLogger.error(
"Error while transforming from HTML to Uint8Array",
error,
);
throw error;
}
};
@ -71,7 +77,7 @@ const fetchDescriptionHTMLAndTransform = async (
export const fetchPageDescriptionBinary = async (
params: URLSearchParams,
pageId: string,
cookie: string | undefined
cookie: string | undefined,
) => {
const workspaceSlug = params.get("workspaceSlug")?.toString();
const projectId = params.get("projectId")?.toString();
@ -82,7 +88,7 @@ export const fetchPageDescriptionBinary = async (
workspaceSlug,
projectId,
pageId,
cookie
cookie,
);
const binaryData = new Uint8Array(response);
@ -91,7 +97,7 @@ export const fetchPageDescriptionBinary = async (
workspaceSlug,
projectId,
pageId,
cookie
cookie,
);
if (binary) {
return binary;
@ -100,7 +106,7 @@ export const fetchPageDescriptionBinary = async (
return binaryData;
} catch (error) {
console.error("Fetch error:", error);
manualLogger.error("Fetch error:", error);
throw error;
}
};

View file

@ -3,12 +3,14 @@ import "@/core/config/sentry-config.js";
import express from "express";
import expressWs from "express-ws";
import * as Sentry from "@sentry/node";
import compression from "compression";
import helmet from "helmet";
// cors
import cors from "cors";
// core hocuspocus server
import { HocusPocusServer } from "@/core/hocuspocus-server.js";
import { getHocusPocusServer } from "@/core/hocuspocus-server.js";
// helpers
import { logger, manualLogger } from "@/core/helpers/logger.js";
@ -19,6 +21,17 @@ expressWs(app);
app.set("port", process.env.PORT || 3000);
// Security middleware
app.use(helmet());
// Middleware for response compression
app.use(
compression({
level: 6,
threshold: 5 * 1000,
}),
);
// Logging middleware
app.use(logger);
@ -31,12 +44,22 @@ app.use(cors());
const router = express.Router();
const HocusPocusServer = await getHocusPocusServer().catch((err) => {
manualLogger.error("Failed to initialize HocusPocusServer:", err);
process.exit(1);
});
router.get("/health", (_req, res) => {
res.status(200).json({ status: "OK" });
});
router.ws("/collaboration", (ws, req) => {
HocusPocusServer.handleConnection(ws, req);
try {
HocusPocusServer.handleConnection(ws, req);
} catch (err) {
manualLogger.error("WebSocket connection error:", err);
ws.close();
}
});
app.use(process.env.LIVE_BASE_PATH || "/live", router);
@ -49,6 +72,47 @@ Sentry.setupExpressErrorHandler(app);
app.use(errorHandler);
app.listen(app.get("port"), () => {
const liveServer = app.listen(app.get("port"), () => {
manualLogger.info(`Plane Live server has started at port ${app.get("port")}`);
});
const gracefulShutdown = async () => {
manualLogger.info("Starting graceful shutdown...");
try {
// Close the HocusPocus server WebSocket connections
await HocusPocusServer.destroy();
manualLogger.info(
"HocusPocus server WebSocket connections closed gracefully.",
);
// Close the Express server
liveServer.close(() => {
manualLogger.info("Express server closed gracefully.");
process.exit(1);
});
} catch (err) {
manualLogger.error("Error during shutdown:", err);
process.exit(1);
}
// Forcefully shut down after 10 seconds if not closed
setTimeout(() => {
manualLogger.error("Forcing shutdown...");
process.exit(1);
}, 10000);
};
// Graceful shutdown on unhandled rejection
process.on("unhandledRejection", (err: any) => {
manualLogger.info("Unhandled Rejection: ", err);
manualLogger.info(`UNHANDLED REJECTION! 💥 Shutting down...`);
gracefulShutdown();
});
// Graceful shutdown on uncaught exception
process.on("uncaughtException", (err: any) => {
manualLogger.info("Uncaught Exception: ", err);
manualLogger.info(`UNCAUGHT EXCEPTION! 💥 Shutting down...`);
gracefulShutdown();
});