Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/dal-runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"@types/node": "^20.2.4",
"@types/nunjucks": "^3.2.6",
"cross-env": "^7.0.3",
"mm": "^3.2.1",
"mocha": "^10.2.0",
"ts-node": "^10.9.1",
"typescript": "^5.0.4"
Expand Down
24 changes: 23 additions & 1 deletion core/dal-runtime/src/MySqlDataSource.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { RDSClient } from '@eggjs/rds';
import type { RDSClientOptions } from '@eggjs/rds';
import Base from 'sdk-base';
import { Logger } from '@eggjs/tegg-types';

export interface DataSourceOptions extends RDSClientOptions {
name: string;
// default is select 1 + 1;
initSql?: string;
forkDb?: boolean;
initRetryTimes?: number;
logger?: Logger;
}

const DEFAULT_OPTIONS: RDSClientOptions = {
Expand All @@ -22,12 +25,16 @@ export class MysqlDataSource extends Base {
readonly timezone?: string;
readonly rdsOptions: RDSClientOptions;
readonly forkDb?: boolean;
readonly #initRetryTimes?: number;
readonly #logger?: Logger;

constructor(options: DataSourceOptions) {
super({ initMethod: '_init' });
const { name, initSql, forkDb, ...mysqlOptions } = options;
const { name, initSql, forkDb, initRetryTimes, logger, ...mysqlOptions } = options;
this.#logger = logger;
this.forkDb = forkDb;
this.initSql = initSql ?? 'SELECT 1 + 1';
this.#initRetryTimes = initRetryTimes;
this.name = name;
this.timezone = options.timezone;
this.rdsOptions = Object.assign({}, DEFAULT_OPTIONS, mysqlOptions);
Expand All @@ -36,7 +43,22 @@ export class MysqlDataSource extends Base {

protected async _init() {
if (this.initSql) {
await this.#doInit(1);
}
}

async #doInit(tryTimes: number): Promise<void> {
try {
this.#logger?.log(`${tryTimes} try to initialize dataSource ${this.name}`);
const st = Date.now();
await this.client.query(this.initSql);
this.#logger?.info(`dataSource initialization cost: ${Date.now() - st}, tryTimes: ${tryTimes}`);
} catch (e) {
this.#logger?.warn(`failed to initialize dataSource ${this.name}, tryTimes ${tryTimes}`, e);
if (!this.#initRetryTimes || tryTimes >= this.#initRetryTimes) {
throw e;
}
await this.#doInit(tryTimes + 1);
}
}

Expand Down
300 changes: 174 additions & 126 deletions core/dal-runtime/test/DataSource.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import assert from 'node:assert';
import path from 'node:path';
import mm from 'mm';
import { RDSClient } from '@eggjs/rds';
import { DeleteResult, InsertResult, UpdateResult } from '@eggjs/rds/lib/types';
import { TableModel } from '@eggjs/dal-decorator';
import { MysqlDataSource } from '../src/MySqlDataSource';
Expand All @@ -11,143 +13,189 @@ import { DatabaseForker } from '../src/DatabaseForker';
import { BaseFooDAO } from './fixtures/modules/dal/dal/dao/base/BaseFooDAO';

describe('test/Datasource.test.ts', () => {
let dataSource: DataSource<Foo>;
let tableModel: TableModel<Foo>;
let forker: DatabaseForker;

before(async () => {
const mysqlOptions = {
name: 'foo',
host: '127.0.0.1',
user: 'root',
database: 'test_runtime',
timezone: '+08:00',
initSql: 'SET GLOBAL time_zone = \'+08:00\';',
forkDb: true,
};
forker = new DatabaseForker('unittest', mysqlOptions);
await forker.forkDb(path.join(__dirname, './fixtures/modules/dal'));
const mysql = new MysqlDataSource(mysqlOptions);
await mysql.ready();

tableModel = TableModel.build(Foo);
const sqlMapLoader = new SqlMapLoader(tableModel, BaseFooDAO, console as any);
const sqlMap = sqlMapLoader.load();
dataSource = new DataSource(tableModel, mysql, sqlMap);
});
const mysqlOptions = {
name: 'foo',
host: '127.0.0.1',
user: 'root',
database: 'test_runtime',
timezone: '+08:00',
initSql: 'SET GLOBAL time_zone = \'+08:00\';',
forkDb: true,
};

describe('init', () => {
afterEach(() => {
mm.restore();
});

it('init failed should throw error', async () => {
mm.errorOnce(RDSClient.prototype, 'query', new Error('fake error'));
const query: any = RDSClient.prototype.query;

const mysql = new MysqlDataSource(mysqlOptions);
await assert.rejects(mysql.ready(), /fake error/);
assert.strictEqual(query.called, 1);
assert.deepStrictEqual(query.lastCalledArguments, [ mysqlOptions.initSql ]);
});

it('init should retry', async () => {
let i = 0;
mm(RDSClient.prototype, 'query', () => {
throw new Error(`fake error ${++i}`);
});
const query: any = RDSClient.prototype.query;

const mysql = new MysqlDataSource({ ...mysqlOptions, initRetryTimes: 3 });
await assert.rejects(mysql.ready(), /fake error 3/);
assert.strictEqual(query.called, 3);
});

after(async () => {
await forker.destroy();
it('should success after retry', async () => {
let i = 0;
mm(RDSClient.prototype, 'query', async () => {
if (i === 0) {
i++;
throw new Error('fake error');
}
});
const query: any = RDSClient.prototype.query;

const mysql = new MysqlDataSource({ ...mysqlOptions, initRetryTimes: 2 });
await assert.doesNotReject(mysql.ready());
assert.strictEqual(query.called, 2);
});
});

it('execute should work', async () => {
const foo = new Foo();
foo.name = 'name';
foo.col1 = 'col1';
foo.bitColumn = Buffer.from([ 0, 0 ]);
foo.boolColumn = 0;
foo.tinyIntColumn = 0;
foo.smallIntColumn = 1;
foo.mediumIntColumn = 3;
foo.intColumn = 3;
foo.bigIntColumn = '00099';
foo.decimalColumn = '00002.33333';
foo.floatColumn = 2.3;
foo.doubleColumn = 2.3;
foo.dateColumn = new Date('2024-03-16T16:00:00.000Z');
foo.dateTimeColumn = new Date('2024-03-16T01:26:58.677Z');
foo.timestampColumn = new Date('2024-03-16T01:26:58.677Z');
foo.timeColumn = '838:59:50.123';
foo.yearColumn = 2024;
foo.varCharColumn = 'var_char';
foo.binaryColumn = Buffer.from('b');
foo.varBinaryColumn = Buffer.from('var_binary');
foo.tinyBlobColumn = Buffer.from('tiny_blob');
foo.tinyTextColumn = 'text';
foo.blobColumn = Buffer.from('blob');
foo.textColumn = 'text';
foo.mediumBlobColumn = Buffer.from('medium_blob');
foo.longBlobColumn = Buffer.from('long_blob');
foo.mediumTextColumn = 'medium_text';
foo.longTextColumn = 'long_text';
foo.enumColumn = 'A';
foo.setColumn = 'B';
foo.geometryColumn = { x: 10, y: 10 };
foo.pointColumn = { x: 10, y: 10 };
foo.lineStringColumn = [
{ x: 15, y: 15 },
{ x: 20, y: 20 },
];
foo.polygonColumn = [
[
{ x: 0, y: 0 }, { x: 10, y: 0 }, { x: 10, y: 10 }, { x: 0, y: 10 }, { x: 0, y: 0 },
], [
{ x: 5, y: 5 }, { x: 7, y: 5 }, { x: 7, y: 7 }, { x: 5, y: 7 }, { x: 5, y: 5 },
],
];
foo.multipointColumn = [
{ x: 0, y: 0 }, { x: 20, y: 20 }, { x: 60, y: 60 },
];
foo.multiLineStringColumn = [
[
{ x: 10, y: 10 }, { x: 20, y: 20 },
], [
{ x: 15, y: 15 }, { x: 30, y: 15 },
],
];
foo.multiPolygonColumn = [
[
describe('execute', () => {
let dataSource: DataSource<Foo>;
let tableModel: TableModel<Foo>;
let forker: DatabaseForker;

before(async () => {
forker = new DatabaseForker('unittest', mysqlOptions);
await forker.forkDb(path.join(__dirname, './fixtures/modules/dal'));
const mysql = new MysqlDataSource(mysqlOptions);
await mysql.ready();

tableModel = TableModel.build(Foo);
const sqlMapLoader = new SqlMapLoader(tableModel, BaseFooDAO, console as any);
const sqlMap = sqlMapLoader.load();
dataSource = new DataSource(tableModel, mysql, sqlMap);
});

after(async () => {
await forker.destroy();
});

it('execute should work', async () => {
const foo = new Foo();
foo.name = 'name';
foo.col1 = 'col1';
foo.bitColumn = Buffer.from([ 0, 0 ]);
foo.boolColumn = 0;
foo.tinyIntColumn = 0;
foo.smallIntColumn = 1;
foo.mediumIntColumn = 3;
foo.intColumn = 3;
foo.bigIntColumn = '00099';
foo.decimalColumn = '00002.33333';
foo.floatColumn = 2.3;
foo.doubleColumn = 2.3;
foo.dateColumn = new Date('2024-03-16T16:00:00.000Z');
foo.dateTimeColumn = new Date('2024-03-16T01:26:58.677Z');
foo.timestampColumn = new Date('2024-03-16T01:26:58.677Z');
foo.timeColumn = '838:59:50.123';
foo.yearColumn = 2024;
foo.varCharColumn = 'var_char';
foo.binaryColumn = Buffer.from('b');
foo.varBinaryColumn = Buffer.from('var_binary');
foo.tinyBlobColumn = Buffer.from('tiny_blob');
foo.tinyTextColumn = 'text';
foo.blobColumn = Buffer.from('blob');
foo.textColumn = 'text';
foo.mediumBlobColumn = Buffer.from('medium_blob');
foo.longBlobColumn = Buffer.from('long_blob');
foo.mediumTextColumn = 'medium_text';
foo.longTextColumn = 'long_text';
foo.enumColumn = 'A';
foo.setColumn = 'B';
foo.geometryColumn = { x: 10, y: 10 };
foo.pointColumn = { x: 10, y: 10 };
foo.lineStringColumn = [
{ x: 15, y: 15 },
{ x: 20, y: 20 },
];
foo.polygonColumn = [
[
{ x: 0, y: 0 }, { x: 10, y: 0 }, { x: 10, y: 10 }, { x: 0, y: 10 }, { x: 0, y: 0 },
], [
{ x: 5, y: 5 }, { x: 7, y: 5 }, { x: 7, y: 7 }, { x: 5, y: 7 }, { x: 5, y: 5 },
],
],
[
];
foo.multipointColumn = [
{ x: 0, y: 0 }, { x: 20, y: 20 }, { x: 60, y: 60 },
];
foo.multiLineStringColumn = [
[
{ x: 5, y: 5 }, { x: 7, y: 5 }, { x: 7, y: 7 }, { x: 5, y: 7 }, { x: 5, y: 5 },
{ x: 10, y: 10 }, { x: 20, y: 20 },
], [
{ x: 15, y: 15 }, { x: 30, y: 15 },
],
],
];
foo.geometryCollectionColumn = [
{ x: 10, y: 10 },
{ x: 30, y: 30 },
[
{ x: 15, y: 15 }, { x: 20, y: 20 },
],
];
foo.jsonColumn = {
hello: 'json',
};
const rowValue = TableModelInstanceBuilder.buildRow(foo, tableModel);
const insertResult: InsertResult = await dataSource.executeRawScalar('insert', rowValue);
assert(insertResult.insertId);
foo.id = insertResult.insertId;

const updateResult: UpdateResult = await dataSource.executeRawScalar('update', {
primary: {
id: insertResult.insertId,
},
$name: 'update_name',
});
assert.equal(updateResult.affectedRows, 1);
foo.name = 'update_name';
];
foo.multiPolygonColumn = [
[
[
{ x: 0, y: 0 }, { x: 10, y: 0 }, { x: 10, y: 10 }, { x: 0, y: 10 }, { x: 0, y: 0 },
],
],
[
[
{ x: 5, y: 5 }, { x: 7, y: 5 }, { x: 7, y: 7 }, { x: 5, y: 7 }, { x: 5, y: 5 },
],
],
];
foo.geometryCollectionColumn = [
{ x: 10, y: 10 },
{ x: 30, y: 30 },
[
{ x: 15, y: 15 }, { x: 20, y: 20 },
],
];
foo.jsonColumn = {
hello: 'json',
};
const rowValue = TableModelInstanceBuilder.buildRow(foo, tableModel);
const insertResult: InsertResult = await dataSource.executeRawScalar('insert', rowValue);
assert(insertResult.insertId);
foo.id = insertResult.insertId;

const findRow = await dataSource.executeScalar('findByPrimary', {
$id: insertResult.insertId,
});
assert(findRow);
assert.deepStrictEqual(findRow, foo);
const updateResult: UpdateResult = await dataSource.executeRawScalar('update', {
primary: {
id: insertResult.insertId,
},
$name: 'update_name',
});
assert.equal(updateResult.affectedRows, 1);
foo.name = 'update_name';

const deleteRow: DeleteResult = await dataSource.executeRawScalar('delete', {
id: insertResult.insertId,
});
assert.equal(deleteRow.affectedRows, 1);
const findRow = await dataSource.executeScalar('findByPrimary', {
$id: insertResult.insertId,
});
assert(findRow);
assert.deepStrictEqual(findRow, foo);

const findRow2 = await dataSource.executeScalar('findByPrimary', {
$id: insertResult.insertId,
});
assert.equal(findRow2, null);
const deleteRow: DeleteResult = await dataSource.executeRawScalar('delete', {
id: insertResult.insertId,
});
assert.equal(deleteRow.affectedRows, 1);

const res = await dataSource.paginate('findByPrimary', {}, 1, 10);
assert(res.total === 0);
const findRow2 = await dataSource.executeScalar('findByPrimary', {
$id: insertResult.insertId,
});
assert.equal(findRow2, null);

const res = await dataSource.paginate('findByPrimary', {}, 1, 10);
assert(res.total === 0);
});
});
});
Loading