diff --git a/.gitignore b/.gitignore index f97f05e..834e4cf 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,4 @@ design-docs/ scrapeless-actor-sdk-node/ examples/*.png scrapeless-demo-*.json +/storage \ No newline at end of file diff --git a/src/services/storage/memory/dataset.ts b/src/services/storage/memory/dataset.ts index 8e39467..95ef9dd 100644 --- a/src/services/storage/memory/dataset.ts +++ b/src/services/storage/memory/dataset.ts @@ -91,28 +91,32 @@ export class LocalDatasetStorage extends MemoryService implements IDatasetStorag * @param name Dataset name */ async createDataset(name: string): Promise { - if (!name) { - throw new Error('name must not be empty'); + try { + if (!name) { + throw new Error('name must not be empty'); + } + const exist = await this.isNameExists(this.getStoragePath(this.datasetDir), name); + if (exist) { + throw new Error('The name of the dataset already exists'); + } + const id = uuidv4(); + const dirPath = this.getStoragePath(path.join(this.datasetDir, id)); + await this.mkdir(dirPath); + const now = new Date(); + const meta: IDataset = { + id, + name, + createdAt: now, + updatedAt: now, + fields: [], + stats: { count: 0, size: 0 } + }; + const metaPath = path.join(dirPath, 'metadata.json'); + await this.writeJsonFile(metaPath, meta); + return meta; + } catch { + throw new Error('Create dataset failed'); } - const exist = await this.isNameExists(this.getStoragePath(this.datasetDir), name); - if (exist) { - throw new Error('The name of the dataset already exists'); - } - const id = uuidv4(); - const dirPath = this.getStoragePath(path.join(this.datasetDir, id)); - await this.mkdir(dirPath); - const now = new Date(); - const meta: IDataset = { - id, - name, - createdAt: now, - updatedAt: now, - fields: [], - stats: { count: 0, size: 0 } - }; - const metaPath = path.join(dirPath, 'metadata.json'); - await this.writeJsonFile(metaPath, meta); - return meta; } /** @@ -133,13 +137,13 @@ export class LocalDatasetStorage extends MemoryService implements IDatasetStorag try { const file = await this.readFile(metaPath); meta = JSON.parse(file); + meta.name = name; + meta.updatedAt = new Date(); + await this.writeJsonFile(metaPath, meta); + return meta; } catch { throw new Error('Dataset not found'); } - meta.name = name; - meta.updatedAt = new Date(); - await this.writeJsonFile(metaPath, meta); - return meta; } /** @@ -177,31 +181,36 @@ export class LocalDatasetStorage extends MemoryService implements IDatasetStorag } catch { return { success: false, message: 'Dataset not found' }; } - // Get current max index - const files = await fs.promises.readdir(dirPath); - let maxIndex = 0; - files.forEach(f => { - if (/^\d{8}\.json$/.test(f)) { - const idx = parseInt(f.slice(0, 8), 10); - if (idx > maxIndex) maxIndex = idx; + + try { + // Get current max index + const files = await fs.promises.readdir(dirPath); + let maxIndex = 0; + files.forEach(f => { + if (/^\d{8}\.json$/.test(f)) { + const idx = parseInt(f.slice(0, 8), 10); + if (idx > maxIndex) maxIndex = idx; + } + }); + const fieldsSet = new Set(meta.fields || []); + for (let i = 0; i < items.length; i++) { + const item = items[i]; + Object.keys(item).forEach(key => fieldsSet.add(key)); + const index = maxIndex + i + 1; + const fileName = `${index.toString().padStart(8, '0')}.json`; + const filePath = path.join(dirPath, fileName); + await this.writeJsonFile(filePath, item); } - }); - const fieldsSet = new Set(meta.fields || []); - for (let i = 0; i < items.length; i++) { - const item = items[i]; - Object.keys(item).forEach(key => fieldsSet.add(key)); - const index = maxIndex + i + 1; - const fileName = `${index.toString().padStart(8, '0')}.json`; - const filePath = path.join(dirPath, fileName); - await this.writeJsonFile(filePath, item); - } - meta.fields = Array.from(fieldsSet); - // Update metadata - meta.updatedAt = new Date(); - meta.stats = meta.stats || { count: 0, size: 0 }; - meta.stats.count += items.length; - await this.writeJsonFile(metaPath, meta); - return { success: true, message: 'Items added' }; + meta.fields = Array.from(fieldsSet); + // Update metadata + meta.updatedAt = new Date(); + meta.stats = meta.stats || { count: 0, size: 0 }; + meta.stats.count += items.length; + await this.writeJsonFile(metaPath, meta); + return { success: true, message: 'Items added' }; + } catch { + throw new Error('Add to dataset failed'); + } } /** @@ -217,6 +226,15 @@ export class LocalDatasetStorage extends MemoryService implements IDatasetStorag let files: string[]; try { files = await fs.promises.readdir(dirPath); + files = files.filter(f => /^\d{8}\.json$/.test(f)); + files.sort(); + const items: T[] = []; + for (const file of files) { + const filePath = path.join(dirPath, file); + const data = await this.readFile(filePath); + items.push(JSON.parse(data)); + } + return this.paginateArray(items, params.page, params.pageSize); } catch { return { items: [], @@ -226,14 +244,5 @@ export class LocalDatasetStorage extends MemoryService implements IDatasetStorag totalPage: 0 }; } - files = files.filter(f => /^\d{8}\.json$/.test(f)); - files.sort(); - const items: T[] = []; - for (const file of files) { - const filePath = path.join(dirPath, file); - const data = await this.readFile(filePath); - items.push(JSON.parse(data)); - } - return this.paginateArray(items, params.page, params.pageSize); } } diff --git a/src/services/storage/memory/kv.ts b/src/services/storage/memory/kv.ts index 9cf0954..65cfa47 100644 --- a/src/services/storage/memory/kv.ts +++ b/src/services/storage/memory/kv.ts @@ -121,11 +121,15 @@ export class LocalKVStorage extends MemoryService implements IKVStorage { runId: '' }; const metaPath = path.join(dirPath, 'metadata.json'); - await this.writeJsonFile(metaPath, meta); - return { - ...meta, - stats: { size: 0, count: 0 } - }; + try { + await this.writeJsonFile(metaPath, meta); + return { + ...meta, + stats: { size: 0, count: 0 } + }; + } catch { + throw new Error('Create namespace failed'); + } } /** @@ -197,13 +201,13 @@ export class LocalKVStorage extends MemoryService implements IKVStorage { try { const file = await this.readFile(metaPath); meta = JSON.parse(file); + meta.name = name; + meta.updatedAt = new Date(); + await this.writeJsonFile(metaPath, meta); + return { success: true }; } catch { return { success: false }; } - meta.name = name; - meta.updatedAt = new Date(); - await this.writeJsonFile(metaPath, meta); - return { success: true }; } /** @@ -232,11 +236,15 @@ export class LocalKVStorage extends MemoryService implements IKVStorage { files.sort(); const items: IKVItem[] = []; for (const file of files) { - const filePath = path.join(dirPath, file); - const data = await this.readFile(filePath); - const kv = JSON.parse(data) as IKVKey; - if (kv.expireAt && new Date(kv.expireAt) < new Date()) continue; - items.push({ key: kv.key, size: kv.size }); + try { + const filePath = path.join(dirPath, file); + const data = await this.readFile(filePath); + const kv = JSON.parse(data) as IKVKey; + if (kv.expireAt && new Date(kv.expireAt) < new Date()) continue; + items.push({ key: kv.key, size: kv.size }); + } catch { + continue; + } } return this.paginateArray(items, params.page, params.pageSize); } @@ -278,11 +286,15 @@ export class LocalKVStorage extends MemoryService implements IKVStorage { let successfulKeyCount = 0; const unsuccessfulKeys = []; for (const item of data) { - const res = await this.setValue(namespaceId, item); - if (res.success) { - successfulKeyCount++; - } else { - unsuccessfulKeys.push(item.key); + try { + const res = await this.setValue(namespaceId, item); + if (res.success) { + successfulKeyCount++; + } else { + unsuccessfulKeys.push(item.key); + } + } catch { + continue; } } return { successfulKeyCount, unsuccessfulKeys }; @@ -301,7 +313,11 @@ export class LocalKVStorage extends MemoryService implements IKVStorage { return { success: false }; } for (const key of keys) { - await this.delValue(namespaceId, key); + try { + await this.delValue(namespaceId, key); + } catch { + continue; + } } return { success: true }; } @@ -337,8 +353,12 @@ export class LocalKVStorage extends MemoryService implements IKVStorage { expireAt, size: Buffer.byteLength(data.value) }; - await this.writeJsonFile(filePath, kv); - return { success: true }; + try { + await this.writeJsonFile(filePath, kv); + return { success: true }; + } catch { + throw new Error('Set kv failed'); + } } /** diff --git a/src/services/storage/memory/queue.ts b/src/services/storage/memory/queue.ts index 1e412f9..8d7baac 100644 --- a/src/services/storage/memory/queue.ts +++ b/src/services/storage/memory/queue.ts @@ -99,8 +99,12 @@ export class LocalQueueStorage extends MemoryService implements IQueueStorage { } }; const metaPath = path.join(dirPath, 'metadata.json'); - await this.writeJsonFile(metaPath, meta); - return { id, name: data.name }; + try { + await this.writeJsonFile(metaPath, meta); + return { id, name: data.name }; + } catch { + throw new Error('Create queue failed'); + } } /** @@ -143,14 +147,14 @@ export class LocalQueueStorage extends MemoryService implements IQueueStorage { try { const file = await this.readFile(metaPath); meta = JSON.parse(file); + if (data.name) meta.name = data.name; + if (data.description) meta.description = data.description; + meta.updatedAt = new Date(); + await this.writeJsonFile(metaPath, meta); + return null; } catch { throw new Error('Queue not found'); } - if (data.name) meta.name = data.name; - if (data.description) meta.description = data.description; - meta.updatedAt = new Date(); - await this.writeJsonFile(metaPath, meta); - return null; } /** @@ -199,8 +203,12 @@ export class LocalQueueStorage extends MemoryService implements IQueueStorage { reenterTime: '' }; const msgPath = path.join(dirPath, `${id}.json`); - await this.writeJsonFile(msgPath, msg); - return { msgId: id }; + try { + await this.writeJsonFile(msgPath, msg); + return { msgId: id }; + } catch { + throw new Error('Queue push failed'); + } } /** @@ -226,24 +234,24 @@ export class LocalQueueStorage extends MemoryService implements IQueueStorage { try { const data = await this.readFile(filePath); msg = JSON.parse(data); + // Filter out completed, failed, expired, or over-retried messages + if ( + msg.successAt > 0 || + msg.failedAt > 0 || + msg.deadline < Math.floor(now.getTime() / 1000) || + (msg.reenterTime && new Date(msg.reenterTime) > now) || + msg.retried >= msg.retry + ) { + await this.rm(filePath); + continue; + } + msg.reenterTime = new Date(now.getTime() + (msg.timeout || 0) * 1000); + msg.retried = (msg.retried || 0) + 1; + await this.writeJsonFile(filePath, msg); + msgs.push(msg); } catch { continue; } - // Filter out completed, failed, expired, or over-retried messages - if ( - msg.successAt > 0 || - msg.failedAt > 0 || - msg.deadline < Math.floor(now.getTime() / 1000) || - (msg.reenterTime && new Date(msg.reenterTime) > now) || - msg.retried >= msg.retry - ) { - await this.rm(filePath); - continue; - } - msg.reenterTime = new Date(now.getTime() + (msg.timeout || 0) * 1000); - msg.retried = (msg.retried || 0) + 1; - await this.writeJsonFile(filePath, msg); - msgs.push(msg); } // Sort by updateTime if present if (msgs.length > 0 && 'updateTime' in msgs[0]) { diff --git a/src/utils/memory.ts b/src/utils/memory.ts index 6e4d323..517ce76 100644 --- a/src/utils/memory.ts +++ b/src/utils/memory.ts @@ -35,28 +35,32 @@ const metadataStore = { }; export async function createRoot() { - const rootDir = 'storage'; - const rootPath = path.resolve(rootDir); - if (!fs.existsSync(rootPath)) { - await fs.promises.mkdir(rootPath); - } + try { + const rootDir = 'storage'; + const rootPath = path.resolve(rootDir); + if (!fs.existsSync(rootPath)) { + await fs.promises.mkdir(rootPath); + } + } catch {} } export async function createDir(dir: keyof typeof metadataStore) { - const storagePath = path.resolve(`storage/${dir}`); - if (!fs.existsSync(storagePath)) { - await fs.promises.mkdir(storagePath, { recursive: true }); - await fs.promises.mkdir(path.join(storagePath, 'default')); - const metadata = metadataStore[dir]; + try { + const storagePath = path.resolve(`storage/${dir}`); + if (!fs.existsSync(storagePath)) { + await fs.promises.mkdir(storagePath, { recursive: true }); + await fs.promises.mkdir(path.join(storagePath, 'default')); + const metadata = metadataStore[dir]; - await fs.promises.writeFile( - path.join(storagePath, 'default', 'metadata.json'), - JSON.stringify({ - ...metadata, - createdAt: new Date().toISOString(), - updatedAt: new Date().toISOString() - }) - ); - if (dir === 'kv_stores') await fs.promises.writeFile(path.join(storagePath, 'default', 'INPUT.json'), ''); - } + await fs.promises.writeFile( + path.join(storagePath, 'default', 'metadata.json'), + JSON.stringify({ + ...metadata, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString() + }) + ); + if (dir === 'kv_stores') await fs.promises.writeFile(path.join(storagePath, 'default', 'INPUT.json'), ''); + } + } catch {} }