Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ design-docs/
scrapeless-actor-sdk-node/
examples/*.png
scrapeless-demo-*.json
/storage
125 changes: 67 additions & 58 deletions src/services/storage/memory/dataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,28 +91,32 @@ export class LocalDatasetStorage extends MemoryService implements IDatasetStorag
* @param name Dataset name
*/
async createDataset(name: string): Promise<IDataset> {
if (!name) {
throw new Error('name must not be empty');
try {
if (!name) {
throw new Error('name must not be empty');
}
const exist = await this.isNameExists(this.getStoragePath(this.datasetDir), name);
if (exist) {
throw new Error('The name of the dataset already exists');
}
const id = uuidv4();
const dirPath = this.getStoragePath(path.join(this.datasetDir, id));
await this.mkdir(dirPath);
const now = new Date();
const meta: IDataset = {
id,
name,
createdAt: now,
updatedAt: now,
fields: [],
stats: { count: 0, size: 0 }
};
const metaPath = path.join(dirPath, 'metadata.json');
await this.writeJsonFile(metaPath, meta);
return meta;
} catch {
throw new Error('Create dataset failed');
}
const exist = await this.isNameExists(this.getStoragePath(this.datasetDir), name);
if (exist) {
throw new Error('The name of the dataset already exists');
}
const id = uuidv4();
const dirPath = this.getStoragePath(path.join(this.datasetDir, id));
await this.mkdir(dirPath);
const now = new Date();
const meta: IDataset = {
id,
name,
createdAt: now,
updatedAt: now,
fields: [],
stats: { count: 0, size: 0 }
};
const metaPath = path.join(dirPath, 'metadata.json');
await this.writeJsonFile(metaPath, meta);
return meta;
}

/**
Expand All @@ -133,13 +137,13 @@ export class LocalDatasetStorage extends MemoryService implements IDatasetStorag
try {
const file = await this.readFile(metaPath);
meta = JSON.parse(file);
meta.name = name;
meta.updatedAt = new Date();
await this.writeJsonFile(metaPath, meta);
return meta;
} catch {
throw new Error('Dataset not found');
}
meta.name = name;
meta.updatedAt = new Date();
await this.writeJsonFile(metaPath, meta);
return meta;
}

/**
Expand Down Expand Up @@ -177,31 +181,36 @@ export class LocalDatasetStorage extends MemoryService implements IDatasetStorag
} catch {
return { success: false, message: 'Dataset not found' };
}
// Get current max index
const files = await fs.promises.readdir(dirPath);
let maxIndex = 0;
files.forEach(f => {
if (/^\d{8}\.json$/.test(f)) {
const idx = parseInt(f.slice(0, 8), 10);
if (idx > maxIndex) maxIndex = idx;

try {
// Get current max index
const files = await fs.promises.readdir(dirPath);
let maxIndex = 0;
files.forEach(f => {
if (/^\d{8}\.json$/.test(f)) {
const idx = parseInt(f.slice(0, 8), 10);
if (idx > maxIndex) maxIndex = idx;
}
});
const fieldsSet = new Set<string>(meta.fields || []);
for (let i = 0; i < items.length; i++) {
const item = items[i];
Object.keys(item).forEach(key => fieldsSet.add(key));
const index = maxIndex + i + 1;
const fileName = `${index.toString().padStart(8, '0')}.json`;
const filePath = path.join(dirPath, fileName);
await this.writeJsonFile(filePath, item);
}
});
const fieldsSet = new Set<string>(meta.fields || []);
for (let i = 0; i < items.length; i++) {
const item = items[i];
Object.keys(item).forEach(key => fieldsSet.add(key));
const index = maxIndex + i + 1;
const fileName = `${index.toString().padStart(8, '0')}.json`;
const filePath = path.join(dirPath, fileName);
await this.writeJsonFile(filePath, item);
}
meta.fields = Array.from(fieldsSet);
// Update metadata
meta.updatedAt = new Date();
meta.stats = meta.stats || { count: 0, size: 0 };
meta.stats.count += items.length;
await this.writeJsonFile(metaPath, meta);
return { success: true, message: 'Items added' };
meta.fields = Array.from(fieldsSet);
// Update metadata
meta.updatedAt = new Date();
meta.stats = meta.stats || { count: 0, size: 0 };
meta.stats.count += items.length;
await this.writeJsonFile(metaPath, meta);
return { success: true, message: 'Items added' };
} catch {
throw new Error('Add to dataset failed');
}
}

/**
Expand All @@ -217,6 +226,15 @@ export class LocalDatasetStorage extends MemoryService implements IDatasetStorag
let files: string[];
try {
files = await fs.promises.readdir(dirPath);
files = files.filter(f => /^\d{8}\.json$/.test(f));
files.sort();
const items: T[] = [];
for (const file of files) {
const filePath = path.join(dirPath, file);
const data = await this.readFile(filePath);
items.push(JSON.parse(data));
}
return this.paginateArray(items, params.page, params.pageSize);
} catch {
return {
items: [],
Expand All @@ -226,14 +244,5 @@ export class LocalDatasetStorage extends MemoryService implements IDatasetStorag
totalPage: 0
};
}
files = files.filter(f => /^\d{8}\.json$/.test(f));
files.sort();
const items: T[] = [];
for (const file of files) {
const filePath = path.join(dirPath, file);
const data = await this.readFile(filePath);
items.push(JSON.parse(data));
}
return this.paginateArray(items, params.page, params.pageSize);
}
}
64 changes: 42 additions & 22 deletions src/services/storage/memory/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,15 @@ export class LocalKVStorage extends MemoryService implements IKVStorage {
runId: ''
};
const metaPath = path.join(dirPath, 'metadata.json');
await this.writeJsonFile(metaPath, meta);
return {
...meta,
stats: { size: 0, count: 0 }
};
try {
await this.writeJsonFile(metaPath, meta);
return {
...meta,
stats: { size: 0, count: 0 }
};
} catch {
throw new Error('Create namespace failed');
}
}

/**
Expand Down Expand Up @@ -197,13 +201,13 @@ export class LocalKVStorage extends MemoryService implements IKVStorage {
try {
const file = await this.readFile(metaPath);
meta = JSON.parse(file);
meta.name = name;
meta.updatedAt = new Date();
await this.writeJsonFile(metaPath, meta);
return { success: true };
} catch {
return { success: false };
}
meta.name = name;
meta.updatedAt = new Date();
await this.writeJsonFile(metaPath, meta);
return { success: true };
}

/**
Expand Down Expand Up @@ -232,11 +236,15 @@ export class LocalKVStorage extends MemoryService implements IKVStorage {
files.sort();
const items: IKVItem[] = [];
for (const file of files) {
const filePath = path.join(dirPath, file);
const data = await this.readFile(filePath);
const kv = JSON.parse(data) as IKVKey;
if (kv.expireAt && new Date(kv.expireAt) < new Date()) continue;
items.push({ key: kv.key, size: kv.size });
try {
const filePath = path.join(dirPath, file);
const data = await this.readFile(filePath);
const kv = JSON.parse(data) as IKVKey;
if (kv.expireAt && new Date(kv.expireAt) < new Date()) continue;
items.push({ key: kv.key, size: kv.size });
} catch {
continue;
}
}
return this.paginateArray(items, params.page, params.pageSize);
}
Expand Down Expand Up @@ -278,11 +286,15 @@ export class LocalKVStorage extends MemoryService implements IKVStorage {
let successfulKeyCount = 0;
const unsuccessfulKeys = [];
for (const item of data) {
const res = await this.setValue(namespaceId, item);
if (res.success) {
successfulKeyCount++;
} else {
unsuccessfulKeys.push(item.key);
try {
const res = await this.setValue(namespaceId, item);
if (res.success) {
successfulKeyCount++;
} else {
unsuccessfulKeys.push(item.key);
}
} catch {
continue;
}
}
return { successfulKeyCount, unsuccessfulKeys };
Expand All @@ -301,7 +313,11 @@ export class LocalKVStorage extends MemoryService implements IKVStorage {
return { success: false };
}
for (const key of keys) {
await this.delValue(namespaceId, key);
try {
await this.delValue(namespaceId, key);
} catch {
continue;
}
}
return { success: true };
}
Expand Down Expand Up @@ -337,8 +353,12 @@ export class LocalKVStorage extends MemoryService implements IKVStorage {
expireAt,
size: Buffer.byteLength(data.value)
};
await this.writeJsonFile(filePath, kv);
return { success: true };
try {
await this.writeJsonFile(filePath, kv);
return { success: true };
} catch {
throw new Error('Set kv failed');
}
}

/**
Expand Down
56 changes: 32 additions & 24 deletions src/services/storage/memory/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,12 @@ export class LocalQueueStorage extends MemoryService implements IQueueStorage {
}
};
const metaPath = path.join(dirPath, 'metadata.json');
await this.writeJsonFile(metaPath, meta);
return { id, name: data.name };
try {
await this.writeJsonFile(metaPath, meta);
return { id, name: data.name };
} catch {
throw new Error('Create queue failed');
}
}

/**
Expand Down Expand Up @@ -143,14 +147,14 @@ export class LocalQueueStorage extends MemoryService implements IQueueStorage {
try {
const file = await this.readFile(metaPath);
meta = JSON.parse(file);
if (data.name) meta.name = data.name;
if (data.description) meta.description = data.description;
meta.updatedAt = new Date();
await this.writeJsonFile(metaPath, meta);
return null;
} catch {
throw new Error('Queue not found');
}
if (data.name) meta.name = data.name;
if (data.description) meta.description = data.description;
meta.updatedAt = new Date();
await this.writeJsonFile(metaPath, meta);
return null;
}

/**
Expand Down Expand Up @@ -199,8 +203,12 @@ export class LocalQueueStorage extends MemoryService implements IQueueStorage {
reenterTime: ''
};
const msgPath = path.join(dirPath, `${id}.json`);
await this.writeJsonFile(msgPath, msg);
return { msgId: id };
try {
await this.writeJsonFile(msgPath, msg);
return { msgId: id };
} catch {
throw new Error('Queue push failed');
}
}

/**
Expand All @@ -226,24 +234,24 @@ export class LocalQueueStorage extends MemoryService implements IQueueStorage {
try {
const data = await this.readFile(filePath);
msg = JSON.parse(data);
// Filter out completed, failed, expired, or over-retried messages
if (
msg.successAt > 0 ||
msg.failedAt > 0 ||
msg.deadline < Math.floor(now.getTime() / 1000) ||
(msg.reenterTime && new Date(msg.reenterTime) > now) ||
msg.retried >= msg.retry
) {
await this.rm(filePath);
continue;
}
msg.reenterTime = new Date(now.getTime() + (msg.timeout || 0) * 1000);
msg.retried = (msg.retried || 0) + 1;
await this.writeJsonFile(filePath, msg);
msgs.push(msg);
} catch {
continue;
}
// Filter out completed, failed, expired, or over-retried messages
if (
msg.successAt > 0 ||
msg.failedAt > 0 ||
msg.deadline < Math.floor(now.getTime() / 1000) ||
(msg.reenterTime && new Date(msg.reenterTime) > now) ||
msg.retried >= msg.retry
) {
await this.rm(filePath);
continue;
}
msg.reenterTime = new Date(now.getTime() + (msg.timeout || 0) * 1000);
msg.retried = (msg.retried || 0) + 1;
await this.writeJsonFile(filePath, msg);
msgs.push(msg);
}
// Sort by updateTime if present
if (msgs.length > 0 && 'updateTime' in msgs[0]) {
Expand Down
Loading