diff --git a/package.json b/package.json index 0e8725e9c..e007b774e 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "buildsite": "vite build --outDir dist", "updatePlugin": "tsc public/pluginApi.ts", "runserver": "node server/node/server.cjs", + "runserver:patch": "node server/node/server.cjs --patch-sync", "sync": "node electron/sync", "electron": "node electron/dist/electron" }, @@ -56,6 +57,7 @@ "eventsource-parser": "^1.1.2", "exifr": "^7.1.3", "express": "^4.18.2", + "fast-json-patch": "^3.1.1", "fflate": "^0.8.1", "gpt-3-encoder": "^1.1.4", "gpt3-tokenizer": "^1.1.5", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7d20db902..3f22c26ce 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -128,6 +128,9 @@ importers: express: specifier: ^4.18.2 version: 4.18.2 + fast-json-patch: + specifier: ^3.1.1 + version: 3.1.1 fflate: specifier: ^0.8.1 version: 0.8.1 @@ -2069,6 +2072,9 @@ packages: resolution: {integrity: sha512-oX2ruAFQwf/Orj8m737Y5adxDQO0LAB7/S5MnxCdTNDd4p6BsyIVsv9JQsATbTSq8KHRpLwIHbVlUNatxd+1Ow==} engines: {node: '>=8.6.0'} + fast-json-patch@3.1.1: + resolution: {integrity: sha512-vf6IHUX2SBcA+5/+4883dsIjpBTqmfBjmYiWK1savxQmFk4JfBMLa7ynTYOs1Rolp/T1betJxHiGD3g1Mn8lUQ==} + fastq@1.16.0: resolution: {integrity: sha512-ifCoaXsDrsdkWTtiNJX5uzHDsrck5TzfKKDcuFFTIrrc/BS076qgEIfoIy1VeZqViznfKiysPYTh/QeHtnIsYA==} @@ -5741,6 +5747,8 @@ snapshots: merge2: 1.4.1 micromatch: 4.0.5 + fast-json-patch@3.1.1: {} + fastq@1.16.0: dependencies: reusify: 1.0.4 diff --git a/server/node/server.cjs b/server/node/server.cjs index 0828d9f7f..1920efe1e 100644 --- a/server/node/server.cjs +++ b/server/node/server.cjs @@ -5,6 +5,9 @@ const htmlparser = require('node-html-parser'); const { existsSync, mkdirSync, readFileSync, writeFileSync } = require('fs'); const fs = require('fs/promises') const crypto = require('crypto') +const { applyPatch } = require('fast-json-patch') +const { Packr, Unpackr, decode } = require('msgpackr') +const fflate = require('fflate') app.use(express.static(path.join(process.cwd(), 'dist'), {index: false})); app.use(express.json({ limit: '50mb' })); app.use(express.raw({ type: 'application/octet-stream', limit: '50mb' })); @@ -15,6 +18,24 @@ const hubURL = 'https://sv.risuai.xyz'; let password = '' +// Configuration flags for patch-based sync +let enablePatchSync = process.env.RISU_PATCH_SYNC === '1' || process.argv.includes('--patch-sync') + +if (enablePatchSync) { + const [major, minor, patch] = process.version.slice(1).split('.').map(Number); + // v22.7.0, v23 and above have a bug with msgpackr that causes it to crash on encoding risu saves + if (major >= 23 || (major === 22 && minor === 7 && patch === 0)) { + console.log(`[Server] Detected problematic Node.js version ${process.version}. Disabling patch-based sync.`); + enablePatchSync = false; + } +} + +// In-memory database cache for patch-based sync with versioning +let dbCache = {} +let dbVersions = {} // Track version numbers for each file +let saveTimers = {} +const SAVE_INTERVAL = 5000 // Save to disk after 5 seconds of inactivity + const savePath = path.join(process.cwd(), "save") if(!existsSync(savePath)){ mkdirSync(savePath) @@ -29,6 +50,79 @@ function isHex(str) { return hexRegex.test(str.toUpperCase().trim()) || str === '__password'; } +// Encoding/decoding functions for RisuSave format +const packr = new Packr({ useRecords: false }); +const unpackr = new Unpackr({ int64AsType: 'number', useRecords: false }); + +const magicHeader = new Uint8Array([0, 82, 73, 83, 85, 83, 65, 86, 69, 0, 7]); +const magicCompressedHeader = new Uint8Array([0, 82, 73, 83, 85, 83, 65, 86, 69, 0, 8]); + +function checkHeader(data) { + let header = 'raw'; + + if (data.length < magicHeader.length) { + return 'none'; + } + + for (let i = 0; i < magicHeader.length; i++) { + if (data[i] !== magicHeader[i]) { + header = 'none'; + break; + } + } + + if (header === 'none') { + header = 'compressed'; + for (let i = 0; i < magicCompressedHeader.length; i++) { + if (data[i] !== magicCompressedHeader[i]) { + header = 'none'; + break; + } + } + } + + return header; +} + +async function decodeRisuSaveServer(data) { + try { + switch(checkHeader(data)){ + case "compressed": + data = data.slice(magicCompressedHeader.length) + return decode(fflate.decompressSync(data)) + case "raw": + data = data.slice(magicHeader.length) + return unpackr.decode(data) + } + return unpackr.decode(data) + } + catch (error) { + try { + console.log('risudecode') + const risuSaveHeader = new Uint8Array(Buffer.from("\u0000\u0000RISU",'utf-8')) + const realData = data.subarray(risuSaveHeader.length) + const dec = unpackr.decode(realData) + return dec + } catch (error) { + const buf = Buffer.from(fflate.decompressSync(Buffer.from(data))) + try { + return JSON.parse(buf.toString('utf-8')) + } catch (error) { + return unpackr.decode(buf) + } + } + } +} + +async function encodeRisuSaveServer(data) { + // Encode to legacy format (no compression for simplicity) + const encoded = packr.encode(data); + const result = new Uint8Array(encoded.length + magicHeader.length); + result.set(magicHeader, 0); + result.set(encoded, magicHeader.length); + return result; +} + app.get('/', async (req, res, next) => { const clientIP = req.headers['x-forwarded-for'] || req.ip || req.socket.remoteAddress || 'Unknown IP'; @@ -39,7 +133,7 @@ app.get('/', async (req, res, next) => { const mainIndex = await fs.readFile(path.join(process.cwd(), 'dist', 'index.html')) const root = htmlparser.parse(mainIndex) const head = root.querySelector('head') - head.innerHTML = `` + head.innerHTML + head.innerHTML = `` + head.innerHTML res.send(root.toString()) } catch (error) { @@ -262,6 +356,26 @@ app.get('/api/read', async (req, res, next) => { return; } try { + const fullPath = path.join(savePath, filePath); + + // Stop any pending save timer for this file + if (saveTimers[filePath]) { + clearTimeout(saveTimers[filePath]); + delete saveTimers[filePath]; + } + + // write to disk if available in cache + if (dbCache[filePath]) { + const decodedFilePath = Buffer.from(filePath, 'hex').toString('utf-8'); + let dataToSave = await encodeRisuSaveServer(dbCache[filePath]); + await fs.writeFile(fullPath, dataToSave); + } + + // Clear cache and reset version after read operation + if (dbCache[filePath]) delete dbCache[filePath]; + dbVersions[filePath] = 0; + + // read from disk if(!existsSync(path.join(savePath, filePath))){ res.send(); } @@ -315,9 +429,10 @@ app.get('/api/list', async (req, res, next) => { return } try { - const data = (await fs.readdir(path.join(savePath))).map((v) => { - return Buffer.from(v, 'hex').toString('utf-8') - }) + const data = (await fs.readdir(path.join(savePath))) + .map((v) => { return Buffer.from(v, 'hex').toString('utf-8') }) + .filter((v) => { return v.startsWith(req.headers['key-prefix'].trim()) }) + res.send({ success: true, content: data @@ -352,6 +467,15 @@ app.post('/api/write', async (req, res, next) => { try { await fs.writeFile(path.join(savePath, filePath), fileContent); + // Clear cache for this file since it was directly written + if (dbCache[filePath]) delete dbCache[filePath]; + // Clear any pending save timer for this file + if (saveTimers[filePath]) { + clearTimeout(saveTimers[filePath]); + delete saveTimers[filePath]; + } + // Reset version to 0 after direct write + dbVersions[filePath] = 0; res.send({ success: true }); @@ -360,6 +484,116 @@ app.post('/api/write', async (req, res, next) => { } }); +app.post('/api/patch', async (req, res, next) => { + // Check if patch sync is enabled + if (!enablePatchSync) { + res.status(404).send({ + error: 'Patch sync is not enabled' + }); + return; + } + + if(req.headers['risu-auth'].trim() !== password.trim()){ + console.log('incorrect') + res.status(400).send({ + error:'Password Incorrect' + }); + return + } + const filePath = req.headers['file-path']; + const patch = req.body.patch; + const clientVersion = parseInt(req.body.expectedVersion) || 0; + + if (!filePath || !patch) { + res.status(400).send({ + error:'File path and patch required' + }); + return; + } + if(!isHex(filePath)){ + res.status(400).send({ + error:'Invaild Path' + }); + return; + } + + try { + const decodedFilePath = Buffer.from(filePath, 'hex').toString('utf-8'); + + // Initialize version if not exists + if (!dbVersions[filePath]) dbVersions[filePath] = 0; + + // Check version mismatch + const serverVersion = dbVersions[filePath]; + if (clientVersion !== serverVersion) { + console.log(`[Patch] Version mismatch for ${decodedFilePath}: client=${clientVersion}, server=${serverVersion}`); + res.status(409).send({ + error: 'Version mismatch', + }); + return; + } + + // Load database into memory if not already cached + if (!dbCache[filePath]) { + const fullPath = path.join(savePath, filePath); + if (existsSync(fullPath)) { + const fileContent = await fs.readFile(fullPath); + dbCache[filePath] = await decodeRisuSaveServer(fileContent); + } + else { + dbCache[filePath] = {}; + } + } + + // Apply patch to in-memory database + const result = applyPatch(dbCache[filePath], patch, true); + + // Increment version after successful patch + dbVersions[filePath]++; + const newVersion = dbVersions[filePath]; + + // Schedule save to disk (debounced) + if (saveTimers[filePath]) { + clearTimeout(saveTimers[filePath]); + } + saveTimers[filePath] = setTimeout(async () => { + try { + const fullPath = path.join(savePath, filePath); + let dataToSave = await encodeRisuSaveServer(dbCache[filePath]); + await fs.writeFile(fullPath, dataToSave); + // Create backup for database files after successful save + if (decodedFilePath.includes('database/database.bin')) { + try { + const timestamp = Math.floor(Date.now() / 100).toString(); + const backupFileName = `database/dbbackup-${timestamp}.bin`; + const backupFilePath = Buffer.from(backupFileName, 'utf-8').toString('hex'); + const backupFullPath = path.join(savePath, backupFilePath); + // Create backup using the same data that was just saved + await fs.writeFile(backupFullPath, dataToSave); + } catch (backupError) { + console.error(`[Patch] Error creating backup:`, backupError); + } + } + } catch (error) { + console.error(`[Patch] Error saving ${filePath}:`, error); + } finally { + delete saveTimers[filePath]; + } + }, SAVE_INTERVAL); + + res.send({ + success: true, + appliedOperations: result.length, + newVersion: newVersion + }); + } catch (error) { + console.error(`[Patch] Error applying patch to ${filePath}:`, error); + res.status(500).send({ + error: 'Patch application failed: ' + (error && error.message ? error.message : error) + }); + } +}); + async function getHttpsOptions() { const keyPath = path.join(sslPath, 'server.key'); @@ -388,19 +622,19 @@ async function startServer() { try { const port = process.env.PORT || 6001; - const httpsOptions = await getHttpsOptions(); - - if (httpsOptions) { + const httpsOptions = await getHttpsOptions(); if (httpsOptions) { // HTTPS https.createServer(httpsOptions, app).listen(port, () => { console.log("[Server] HTTPS server is running."); console.log(`[Server] https://localhost:${port}/`); + console.log(`[Server] Patch sync: ${enablePatchSync ? 'ENABLED' : 'DISABLED'}`); }); } else { // HTTP app.listen(port, () => { console.log("[Server] HTTP server is running."); console.log(`[Server] http://localhost:${port}/`); + console.log(`[Server] Patch sync: ${enablePatchSync ? 'ENABLED' : 'DISABLED'}`); }); } } catch (error) { diff --git a/src/ts/globalApi.svelte.ts b/src/ts/globalApi.svelte.ts index 47281d36c..724529d34 100644 --- a/src/ts/globalApi.svelte.ts +++ b/src/ts/globalApi.svelte.ts @@ -44,11 +44,14 @@ import { fetch as TauriHTTPFetch } from '@tauri-apps/plugin-http'; import { moduleUpdate } from "./process/modules"; import type { AccountStorage } from "./storage/accountStorage"; import { makeColdData } from "./process/coldstorage.svelte"; +import { compare } from 'fast-json-patch' //@ts-ignore export const isTauri = !!window.__TAURI_INTERNALS__ //@ts-ignore export const isNodeServer = !!globalThis.__NODE__ +//@ts-ignore +export const supportsPatchSync = !!globalThis.__PATCH_SYNC__ export const forageStorage = new AutoStorage() export const googleBuild = false export const isMobile = navigator.userAgent.match(/(iPad)|(iPhone)|(iPod)|(android)|(webOS)/i) @@ -313,10 +316,48 @@ export async function loadAsset(id:string){ } let lastSave = '' +let lastSyncedDb: any = null +let dbVersion = 0 // Track local database version for patch sync export let saving = $state({ state: false }) +/** + * Attempts to save database changes using patch synchronization. + * @returns {Promise} Returns true if patch was successfully applied or no changes exist, false if full save is required. + */ +async function tryPatchSave(db: Database): Promise { + // Initial save cannot use patch, so return false to trigger full save. + if (lastSyncedDb === null) { + return false; + } + + try { + const serializedDb = $state.snapshot(db); + const patch = compare(lastSyncedDb, serializedDb); + + if (patch.length > 0) { + const success = await forageStorage.patchItem('database/database.bin', { + patch: patch, + expectedVersion: dbVersion + }); + + if (success) { + lastSyncedDb = serializedDb; + dbVersion++; + console.log(`[Patch] Successfully applied patch, new version: ${dbVersion}`); + return true; + } + console.warn('[Patch] Patch failed, falling back to full save'); + return false; + } + return true; // No changes detected, treat as success + } catch (error) { + console.error('[Patch] Error during patch attempt:', error); + return false; // Fall back to full save on error + } +} + /** * Saves the current state of the database. * @@ -398,11 +439,29 @@ export async function saveDb(){ await writeFile('database/database.bin', dbData, {baseDir: BaseDirectory.AppData}); await writeFile(`database/dbbackup-${(Date.now()/100).toFixed()}.bin`, dbData, {baseDir: BaseDirectory.AppData}); } - else{ - if(!forageStorage.isAccount){ - const dbData = encodeRisuSaveLegacy(db) - await forageStorage.setItem('database/database.bin', dbData) - await forageStorage.setItem(`database/dbbackup-${(Date.now()/100).toFixed()}.bin`, dbData) + else{ + if(!forageStorage.isAccount){ + // Patch-based sync for Node server + if (isNodeServer && supportsPatchSync) { + const patchSuccessful = await tryPatchSave(db); + + // If this is the first save or patch failed, fall back to full save. + if (!patchSuccessful) { + const dbData = encodeRisuSaveLegacy(db); + await forageStorage.setItem('database/database.bin', dbData); + await forageStorage.setItem(`database/dbbackup-${(Date.now()/100).toFixed()}.bin`, dbData); + + // (Re)initialize patch tracking state after full save. + lastSyncedDb = $state.snapshot(db); + dbVersion = 0; + console.log('[Patch] Full save completed, patch tracking (re)initialized.'); + } + } else { + // Standard save method for environments that don't support patches + const dbData = encodeRisuSaveLegacy(db); + await forageStorage.setItem('database/database.bin', dbData); + await forageStorage.setItem(`database/dbbackup-${(Date.now()/100).toFixed()}.bin`, dbData); + } } if(forageStorage.isAccount){ const dbData = await encodeRisuSave(db) @@ -480,7 +539,7 @@ async function getDbBackups() { return backups } else{ - const keys = await forageStorage.keys() + const keys = await forageStorage.keys("database/dbbackup-") const backups = keys .filter(key => key.startsWith('database/dbbackup-')) @@ -570,6 +629,8 @@ export async function loadData() { const decoded = await decodeRisuSave(gotStorage) console.log(decoded) setDatabase(decoded) + lastSyncedDb = $state.snapshot(decoded) + dbVersion = 0 // Initialize version tracking } catch (error) { console.error(error) const backups = await getDbBackups() @@ -2322,4 +2383,4 @@ export function getLanguageCodes(){ }).sort((a, b) => a.name.localeCompare(b.name)) return languageCodes -} \ No newline at end of file +} diff --git a/src/ts/storage/autoStorage.ts b/src/ts/storage/autoStorage.ts index deb432969..bbb080e22 100644 --- a/src/ts/storage/autoStorage.ts +++ b/src/ts/storage/autoStorage.ts @@ -1,5 +1,5 @@ import localforage from "localforage" -import { isNodeServer, replaceDbResources } from "../globalApi.svelte" +import { isNodeServer, replaceDbResources, supportsPatchSync } from "../globalApi.svelte" import { NodeStorage } from "./nodeStorage" import { OpfsStorage } from "./opfsStorage" import { alertInput, alertSelect, alertStore } from "../alert" @@ -28,16 +28,31 @@ export class AutoStorage{ return await this.realStorage.getItem(key) } - async keys():Promise{ + async keys(prefix:string = ""):Promise{ await this.Init() - return await this.realStorage.keys() - + let result: string[] + if(this.realStorage instanceof NodeStorage) { + result = await this.realStorage.keys(prefix) + } + else { + result = await this.realStorage.keys() + } + return result.filter((key) => key.startsWith(prefix.trim())) } async removeItem(key:string){ await this.Init() return await this.realStorage.removeItem(key) } + async patchItem(key: string, patchData: {patch: any[], expectedVersion: number}): Promise { + await this.Init() + // Only NodeStorage supports patching for now + if (this.realStorage instanceof NodeStorage && supportsPatchSync) { + return await (this.realStorage as NodeStorage).patchItem(key, patchData) + } + return false + } + async checkAccountSync(){ let db = getDatabase() if(this.isAccount){ @@ -178,4 +193,4 @@ export class AutoStorage{ } listItem = this.keys -} \ No newline at end of file +} diff --git a/src/ts/storage/nodeStorage.ts b/src/ts/storage/nodeStorage.ts index 71ac37ab1..dc93f0fa9 100644 --- a/src/ts/storage/nodeStorage.ts +++ b/src/ts/storage/nodeStorage.ts @@ -43,12 +43,13 @@ export class NodeStorage{ } return data } - async keys():Promise{ + async keys(prefix:string = ""):Promise{ await this.checkAuth() const da = await fetch('/api/list', { method: "GET", headers:{ - 'risu-auth': auth + 'risu-auth': auth, + 'key-prefix': prefix } }) const data = await da.json() @@ -59,7 +60,7 @@ export class NodeStorage{ throw data.error } return data.content - } + } async removeItem(key:string){ await this.checkAuth() const da = await fetch('/api/remove', { @@ -78,6 +79,34 @@ export class NodeStorage{ } } + async patchItem(key: string, patchData: {patch: any[], expectedVersion: number}): Promise { + await this.checkAuth() + + const da = await fetch('/api/patch', { + method: "POST", + body: JSON.stringify(patchData), + headers: { + 'content-type': 'application/json', + 'file-path': Buffer.from(key, 'utf-8').toString('hex'), + 'risu-auth': auth + } + }) + + if(da.status === 409) { + // Version mismatch - throw specific error for client to handle + throw new Error('Version mismatch') + } + + if(da.status < 200 || da.status >= 300){ + return false + } + const data = await da.json() + if(data.error){ + return false + } + return true + } + private async checkAuth(){ if(!auth){ auth = localStorage.getItem('risuauth') @@ -142,4 +171,4 @@ async function digestPassword(message:string) { })).text() return crypt; -} \ No newline at end of file +}