Skip to content

Commit 38114bd

Browse files
authored
feat: implement cork/uncork for eventbus (#60)
1 parent d5e18b9 commit 38114bd

File tree

10 files changed

+230
-21
lines changed

10 files changed

+230
-21
lines changed

core/eventbus-decorator/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,27 @@ class Foo {
2424
}
2525
```
2626

27+
### cork events
28+
29+
Cache events in memory until uncork.
30+
31+
```ts
32+
class Foo {
33+
@Inject()
34+
private readonly eventBus: ContextEventBus;
35+
36+
bar() {
37+
this.eventBus.cork();
38+
// ...do something
39+
this.eventBus.emit('hello', '01');
40+
// ...do other things
41+
42+
// emit all cached events
43+
this.eventBus.uncork();
44+
}
45+
}
46+
```
47+
2748
### handle event
2849

2950
```ts

core/eventbus-decorator/src/EventBus.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,15 @@ export type EventName = string | symbol;
99
* use `emit` to emit a event
1010
*/
1111
export interface EventBus extends Pick<TypedEventEmitter<Events>, 'emit'> {
12+
cork(corkId: string);
13+
uncork(corkId: string);
14+
}
15+
16+
export const CORK_ID = Symbol.for('eventBus#corkId');
17+
18+
export interface ContextEventBus extends EventBus {
19+
cork();
20+
uncork();
1221
}
1322

1423
export type EventKeys = keyof Events;

core/eventbus-runtime/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@
5454
"@eggjs/tegg-runtime": "^1.4.1",
5555
"coffee": "^5.4.0",
5656
"egg": "^2.29.3",
57-
"mm": "^3.2.0"
57+
"mm": "^3.2.0",
58+
"mz-modules": "^2.1.0"
5859
},
5960
"publishConfig": {
6061
"access": "public"

core/eventbus-runtime/src/SingletonEventBus.ts

Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { AccessLevel, Inject, SingletonProto } from '@eggjs/core-decorator';
2-
import { EventBus, Events, EventWaiter, EventName } from '@eggjs/eventbus-decorator';
2+
import { EventBus, Events, EventWaiter, EventName, CORK_ID } from '@eggjs/eventbus-decorator';
33
import { EggContext } from '@eggjs/tegg-runtime';
44
import type { EggLogger } from 'egg';
55
import { EventContextFactory } from './EventContextFactory';
@@ -9,10 +9,21 @@ import awaitEvent from 'await-event';
99
import awaitFirst from 'await-first';
1010

1111
// from typed-emitter
12-
type Arguments<T> = [ T ] extends [ (...args: infer U) => any ]
12+
type Array<T> = [ T ] extends [ (...args: infer U) => any ]
1313
? U
1414
: [ T ] extends [ void ] ? [] : [ T ];
1515

16+
export interface Event {
17+
name: EventName;
18+
args: Array<any>;
19+
context?: EggContext;
20+
}
21+
22+
export interface CorkEvents {
23+
times: number;
24+
events: Array<Event>;
25+
}
26+
1627
@SingletonProto({
1728
// TODO 需要考虑支持别名
1829
// SingletonEventBus 同时实现了两个接口
@@ -31,6 +42,10 @@ export class SingletonEventBus implements EventBus, EventWaiter {
3142
@Inject()
3243
private readonly logger: EggLogger;
3344

45+
private corkIdSequence = 0;
46+
47+
private readonly corkedEvents = new Map<string /* corkId */, CorkEvents>();
48+
3449
/**
3550
* only use for ensure event will happen
3651
*/
@@ -39,38 +54,88 @@ export class SingletonEventBus implements EventBus, EventWaiter {
3954
return this;
4055
}
4156

42-
async await<E extends keyof Events>(event: E): Promise<Arguments<Events[E]>> {
57+
async await<E extends keyof Events>(event: E): Promise<Array<Events[E]>> {
4358
return awaitEvent(this.emitter, event);
4459
}
4560

46-
awaitFirst<E extends keyof Events>(...e: Array<E>): Promise<{ event: EventName, args: Arguments<Events[E]> }> {
61+
awaitFirst<E extends keyof Events>(...e: Array<E>): Promise<{ event: EventName, args: Array<Events[E]> }> {
4762
return awaitFirst(this.emitter, e);
4863
}
4964

50-
emit<E extends keyof Events>(event: E, ...args: Arguments<Events[E]>): boolean {
65+
emit<E extends keyof Events>(event: E, ...args: Array<Events[E]>): boolean {
5166
const ctx = this.eventContextFactory.createContext();
5267
const hasListener = this.eventHandlerFactory.hasListeners(event);
5368
this.doEmit(ctx, event, args);
5469
return hasListener;
5570
}
5671

57-
emitWithContext<E extends keyof Events>(parentContext: EggContext, event: E, args: Arguments<Events[E]>): boolean {
58-
const ctx = this.eventContextFactory.createContext(parentContext);
72+
generateCorkId(): string {
73+
return String(++this.corkIdSequence);
74+
}
75+
76+
cork(corkId: string) {
77+
let corkEvents = this.corkedEvents.get(corkId);
78+
if (!corkEvents) {
79+
corkEvents = {
80+
times: 0,
81+
events: [],
82+
} as unknown as CorkEvents;
83+
this.corkedEvents.set(corkId, corkEvents);
84+
}
85+
corkEvents!.times++;
86+
}
87+
88+
uncork(corkId: string) {
89+
const corkEvents = this.corkedEvents.get(corkId);
90+
if (!corkEvents) {
91+
throw new Error(`eventbus corkId ${corkId} not found`);
92+
}
93+
if (--corkEvents.times !== 0) {
94+
return;
95+
}
96+
this.corkedEvents.delete(corkId);
97+
for (const event of corkEvents.events) {
98+
if (event.context) {
99+
this.doEmitWithContext(event.context, event.name, event.args);
100+
}
101+
}
102+
}
103+
104+
queueEvent(corkId: string, event: Event) {
105+
const corkdEvents = this.corkedEvents.get(corkId);
106+
if (!corkdEvents) {
107+
throw new Error(`eventbus corkId ${corkId} not found`);
108+
}
109+
corkdEvents.events.push(event);
110+
}
111+
112+
emitWithContext<E extends keyof Events>(parentContext: EggContext, event: E, args: Array<Events[E]>): boolean {
113+
const corkId = parentContext.get(CORK_ID);
114+
const hasListener = this.eventHandlerFactory.hasListeners(event);
115+
if (corkId) {
116+
this.queueEvent(corkId, { name: event, args, context: parentContext });
117+
return hasListener;
118+
}
119+
return this.doEmitWithContext(parentContext, event, args);
120+
}
121+
122+
private doEmitWithContext(parentContext: EggContext, event: EventName, args: Array<any>): boolean {
59123
const hasListener = this.eventHandlerFactory.hasListeners(event);
124+
const ctx = this.eventContextFactory.createContext(parentContext);
60125
this.doEmit(ctx, event, args);
61126
return hasListener;
62127
}
63128

64-
doOnceEmit<E extends keyof Events>(event: E, args: Arguments<Events[E]>) {
129+
private doOnceEmit(event: EventName, args: Array<any>) {
65130
try {
66131
this.emitter.emit(event, ...args);
67132
} catch (e) {
68-
e.message = `[EventBus] process once event ${event} failed: ${e.message}`;
133+
e.message = `[EventBus] process once event ${String(event)} failed: ${e.message}`;
69134
this.logger.error(e);
70135
}
71136
}
72137

73-
async doEmit<E extends keyof Events>(ctx: EggContext, event: E, args: Arguments<Events[E]>) {
138+
private async doEmit(ctx: EggContext, event: EventName, args: Array<any>) {
74139
const lifecycle = {};
75140
if (ctx.init) {
76141
await ctx.init(lifecycle);
@@ -82,12 +147,12 @@ export class SingletonEventBus implements EventBus, EventWaiter {
82147
await Reflect.apply(handler.handle, handler, args);
83148
} catch (e) {
84149
// should wait all handlers done then destroy ctx
85-
e.message = `[EventBus] process event ${event} failed: ${e.message}`;
150+
e.message = `[EventBus] process event ${String(event)} failed: ${e.message}`;
86151
this.logger.error(e);
87152
}
88153
}));
89154
} catch (e) {
90-
e.message = `[EventBus] process event ${event} failed: ${e.message}`;
155+
e.message = `[EventBus] process event ${String(event)} failed: ${e.message}`;
91156
this.logger.error(e);
92157
} finally {
93158
if (ctx.destroy) {

core/eventbus-runtime/test/EventBus.test.ts

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ import { EggProtoImplClass, PrototypeUtil } from '@eggjs/core-decorator';
77
import { LoaderFactory } from '../../loader';
88
import { EggTestContext } from '../../test-util';
99
import { EventContextFactory, EventHandlerFactory, SingletonEventBus } from '..';
10-
import { EventInfoUtil } from '@eggjs/eventbus-decorator';
10+
import { EventInfoUtil, CORK_ID } from '@eggjs/eventbus-decorator';
1111
import assert from 'assert';
1212
import { Timeout0Handler, Timeout100Handler, TimeoutProducer } from './fixtures/modules/event/MultiEvent';
13+
import sleep from 'mz-modules/sleep';
1314

1415
describe('test/EventBus.test.ts', () => {
1516
async function getLoadUnitInstance(moduleDir: string): Promise<LoadUnitInstance> {
@@ -121,4 +122,70 @@ describe('test/EventBus.test.ts', () => {
121122
await timeoutEvent;
122123
assert(Timeout100Handler.called === true);
123124
});
125+
126+
it('cork should work', async () => {
127+
const ctx = new EggTestContext();
128+
const eventContextFactory = await getObject(EventContextFactory);
129+
eventContextFactory.registerContextCreator(() => {
130+
return ctx;
131+
});
132+
const eventHandlerFactory = await getObject(EventHandlerFactory);
133+
eventHandlerFactory.registerHandler(
134+
EventInfoUtil.getEventName(HelloHandler)!,
135+
PrototypeUtil.getClazzProto(HelloHandler) as EggPrototype);
136+
137+
const eventBus = await getObject(SingletonEventBus);
138+
const corkId = eventBus.generateCorkId();
139+
ctx.set(CORK_ID, corkId);
140+
eventBus.cork(corkId);
141+
142+
const helloHandler = await getObject(HelloHandler, ctx);
143+
const helloEvent = eventBus.await('hello');
144+
let eventTime: number;
145+
mm(helloHandler, 'handle', async () => {
146+
eventTime = Date.now();
147+
});
148+
eventBus.emitWithContext(ctx, 'hello', [ '01' ]);
149+
const triggerTime = Date.now();
150+
151+
await sleep(100);
152+
eventBus.uncork(corkId);
153+
await helloEvent;
154+
assert(eventTime >= triggerTime + 100);
155+
});
156+
157+
it('multi cork should work', async () => {
158+
const ctx = new EggTestContext();
159+
const eventContextFactory = await getObject(EventContextFactory);
160+
eventContextFactory.registerContextCreator(() => {
161+
return ctx;
162+
});
163+
const eventHandlerFactory = await getObject(EventHandlerFactory);
164+
eventHandlerFactory.registerHandler(
165+
EventInfoUtil.getEventName(HelloHandler)!,
166+
PrototypeUtil.getClazzProto(HelloHandler) as EggPrototype);
167+
168+
const eventBus = await getObject(SingletonEventBus);
169+
const corkId = eventBus.generateCorkId();
170+
ctx.set(CORK_ID, corkId);
171+
eventBus.cork(corkId);
172+
eventBus.cork(corkId);
173+
174+
const helloHandler = await getObject(HelloHandler, ctx);
175+
const helloEvent = eventBus.await('hello');
176+
let eventTime: number;
177+
mm(helloHandler, 'handle', async () => {
178+
eventTime = Date.now();
179+
});
180+
eventBus.emitWithContext(ctx, 'hello', [ '01' ]);
181+
const triggerTime = Date.now();
182+
183+
await sleep(100);
184+
eventBus.uncork(corkId);
185+
await sleep(100);
186+
eventBus.uncork(corkId);
187+
188+
await helloEvent;
189+
assert(eventTime >= triggerTime + 200);
190+
});
124191
});

plugin/eventbus/lib/EggContextEventBus.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1+
import assert from 'assert';
12
import { Context } from 'egg';
2-
import { EventBus, Events, PrototypeUtil } from '@eggjs/tegg';
3+
import { Events, PrototypeUtil, CORK_ID, ContextEventBus } from '@eggjs/tegg';
34
import { SingletonEventBus } from '@eggjs/tegg-eventbus-runtime';
45
import { EggPrototype } from '@eggjs/tegg-metadata';
56
import { EggContext } from '@eggjs/tegg-runtime';
67

7-
export class EggContextEventBus implements EventBus {
8+
export class EggContextEventBus implements ContextEventBus {
89
private readonly eventBus: SingletonEventBus;
910
private readonly context: EggContext;
11+
private corkId?: string;
1012

1113
constructor(ctx: Context) {
1214
this.context = ctx.teggContext;
@@ -15,6 +17,19 @@ export class EggContextEventBus implements EventBus {
1517
this.eventBus = eggObject.obj as SingletonEventBus;
1618
}
1719

20+
cork() {
21+
if (!this.corkId) {
22+
this.corkId = this.eventBus.generateCorkId();
23+
this.context.set(CORK_ID, this.corkId);
24+
}
25+
this.eventBus.cork(this.corkId);
26+
}
27+
28+
uncork() {
29+
assert(this.corkId, 'eventbus uncork without cork');
30+
this.eventBus.uncork(this.corkId);
31+
}
32+
1833
emit<E extends keyof Events>(event: E, ...args: any): boolean {
1934
return this.eventBus.emitWithContext(this.context, event, args);
2035
}

plugin/eventbus/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@
5959
"@eggjs/tegg-plugin": "^1.4.1",
6060
"await-event": "^2.1.0",
6161
"egg": "^2.29.4",
62-
"egg-mock": "^4.0.1"
62+
"egg-mock": "^4.0.1",
63+
"mz-modules": "^2.1.0"
6364
},
6465
"publishConfig": {
6566
"access": "public"

plugin/eventbus/test/eventbus.test.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import assert from 'assert';
22
import path from 'path';
33
import mm from 'egg-mock';
4+
import sleep from 'mz-modules/sleep';
45
import { HelloService } from './fixtures/apps/event-app/app/event-module/HelloService';
56
import { HelloLogger } from './fixtures/apps/event-app/app/event-module/HelloLogger';
67

@@ -43,4 +44,26 @@ describe('test/eventbus.test.ts', () => {
4344
await helloEvent;
4445
assert(msg === '01');
4546
});
47+
48+
it('cork/uncork should work', async () => {
49+
ctx = await app.mockModuleContext();
50+
51+
const helloService = await ctx.getEggObject(HelloService);
52+
let helloTime: number;
53+
// helloLogger is in child context
54+
mm(HelloLogger.prototype, 'handle', () => {
55+
helloTime = Date.now();
56+
});
57+
helloService.cork();
58+
const triggerTime = Date.now();
59+
helloService.hello();
60+
61+
await sleep(100);
62+
helloService.uncork();
63+
64+
const eventWaiter = await app.getEventWaiter();
65+
const helloEvent = eventWaiter.await('helloEgg');
66+
await helloEvent;
67+
assert(helloTime >= triggerTime + 100);
68+
});
4669
});

plugin/eventbus/test/fixtures/apps/event-app/app/event-module/HelloService.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { AccessLevel, ContextProto, Inject, EventBus } from '@eggjs/tegg';
1+
import { AccessLevel, ContextProto, Inject, ContextEventBus } from '@eggjs/tegg';
22

33
declare module '@eggjs/tegg' {
44
interface Events {
@@ -12,7 +12,15 @@ declare module '@eggjs/tegg' {
1212
})
1313
export class HelloService {
1414
@Inject()
15-
private readonly eventBus: EventBus;
15+
private readonly eventBus: ContextEventBus;
16+
17+
cork() {
18+
this.eventBus.cork();
19+
}
20+
21+
uncork() {
22+
this.eventBus.uncork();
23+
}
1624

1725
hello() {
1826
this.eventBus.emit('helloEgg', '01');

0 commit comments

Comments
 (0)