diff --git a/bdd/data/sequences/event-sequence/index.js b/bdd/data/sequences/event-sequence/index.js new file mode 100644 index 000000000..265cfc93e --- /dev/null +++ b/bdd/data/sequences/event-sequence/index.js @@ -0,0 +1,28 @@ +/* eslint-disable no-console */ + +// eslint-disable-next-line valid-jsdoc +/** + * Simple test event sequence. + * + * @param {never} _input - unused + * @param {string} inputEvent - input + * @param {string} outputEvent - output + * @returns {void} + * @this {import("@scramjet/types").AppContext<{}, {}>} - context + */ +module.exports = async function(_input, inputEvent = "in", outputEvent = "out") { + this.logger.info("started"); + return new Promise((res) => { + this.on(inputEvent, async (msg) => { + const ev = JSON.parse(msg); + + console.log("event", JSON.stringify(ev)); + this.emit(outputEvent, JSON.stringify({ test: ev.test + 1 })); + + await new Promise(res2 => setTimeout(res2, 100)); + + res(); + }); + }); +}; + diff --git a/bdd/data/sequences/event-sequence/package.json b/bdd/data/sequences/event-sequence/package.json new file mode 100644 index 000000000..dc74485a9 --- /dev/null +++ b/bdd/data/sequences/event-sequence/package.json @@ -0,0 +1,25 @@ +{ + "name": "@scramjet/event-sequence", + "version": "1.0.0", + "description": "", + "main": "index.js", + "scripts": { + "predeploy": "mkdir -p dist/ && cp index.js package.json dist/ && (cd dist && npm i --omit=dev)" + }, + "engines": { + "node": ">=16" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/scramjetorg/create-sequence.git" + }, + "bugs": { + "url": "https://github.com/scramjetorg/create-sequence/issues" + }, + "homepage": "https://github.com/scramjetorg/create-sequence#readme", + "devDependencies": { + "@scramjet/types": "^0.34.4" + }, + "author": "", + "license": "ISC" +} diff --git a/bdd/features/hub/HUB-002-host-iac.feature b/bdd/features/hub/HUB-002-host-iac.feature index fc2d5529f..c01a60bd3 100644 --- a/bdd/features/hub/HUB-002-host-iac.feature +++ b/bdd/features/hub/HUB-002-host-iac.feature @@ -28,3 +28,21 @@ Feature: HUB-002 Host started in Infrastructure as Code mode And wait for "500" ms And host is running * exit hub process + + @ci-hub @starts-host + Scenario: HUB-002 TC-004 Event forwarding works between sequences + When hub process is started with random ports and parameters "--sequences-root=data/sequences/ --instance-lifetime-extension-delay=10 --identify-existing --runtime-adapter=process" + And host is running + And I get list of instances + And start Instance by name "event-sequence" with JSON arguments '["event-one", "event-two"]' + * remember last instance as "first" + And start Instance by name "event-sequence" with JSON arguments '["event-two", "event-three"]' + * remember last instance as "second" + * switch to instance "first" + And send event "event-one" to instance with message '{"test": 1}' + # * wait for "100" ms + Then "stdout" starts with 'event {"test":1}' + * switch to instance "second" + Then "stdout" starts with 'event {"test":2}' + And host is running + * exit hub process diff --git a/bdd/lib/utils.ts b/bdd/lib/utils.ts index a9053b5c7..1a1b60dcd 100644 --- a/bdd/lib/utils.ts +++ b/bdd/lib/utils.ts @@ -218,23 +218,25 @@ export async function waitUntilStreamContains(stream: Readable, expected: string ]); } -export async function waitUntilStreamEquals(stream: Readable, expected: string): Promise { +export async function waitUntilStreamEquals(stream: Readable, expected: string, timeout = 10000): Promise { let response = ""; await Promise.race([ (async () => { - for await (const chunk of stream.pipe(new PassThrough({ encoding: undefined }))) { - response += chunk.toString(); + for await (const chunk of stream.pipe(new PassThrough({ encoding: "utf-8" }))) { + response += chunk; + + // eslint-disable-next-line no-console + console.log(response, chunk); if (response === expected) return expected; if (response.length >= expected.length) { - assert.equal(response, expected); + return assert.equal(response, expected); } } - assert.equal(response, expected, "End of stream reached"); - - return "passed"; - })() + throw new Error("End of stream reached"); + })(), + defer(timeout).then(() => { assert.equal(response, expected, "timeout"); }) ]); return response; @@ -361,6 +363,27 @@ export function spawnSiInit( }); } +export async function waitUntilStreamStartsWith(stream: Readable, expected: string, timeout = 10000): Promise { + let response = ""; + + await Promise.race([ + (async () => { + for await (const chunk of stream.pipe(new PassThrough({ encoding: undefined }))) { + response += chunk.toString(); + + if (response === expected) return expected; + if (response.length >= expected.length) { + return assert.equal(response.substring(0, expected.length), expected); + } + } + throw new Error("End of stream reached"); + })(), + defer(timeout).then(() => { assert.equal(response, expected, "timeout"); }) + ]); + + return response; +} + export function isTemplateCreated(templateType: string, workingDirectory: string) { return new Promise((resolve, reject) => { // eslint-disable-next-line complexity diff --git a/bdd/step-definitions/e2e/host-steps.ts b/bdd/step-definitions/e2e/host-steps.ts index 9033b6483..594fd5b21 100644 --- a/bdd/step-definitions/e2e/host-steps.ts +++ b/bdd/step-definitions/e2e/host-steps.ts @@ -7,6 +7,7 @@ import { removeBoundaryQuotes, defer, waitUntilStreamEquals, + waitUntilStreamStartsWith, waitUntilStreamContains, removeProfile, createProfile, @@ -307,6 +308,37 @@ When( When("instance started with arguments {string}", { timeout: 25000 }, startWith); +When("start Instance by name {string}", async function(this: CustomWorld, name: string) { + this.resources.sequence = hostClient.getSequenceClient(name); + this.resources.instance = await this.resources.sequence!.start({ + appConfig: {} + }); +}); + +When("start Instance by name {string} with JSON arguments {string}", async function(this: CustomWorld, name: string, args: string) { + const instanceArgs: any = JSON.parse(args); + + if (!Array.isArray(instanceArgs)) throw new Error("Args must be an array"); + + this.resources.sequence = hostClient.getSequenceClient(name); + this.resources.instance = await this.resources.sequence!.start({ + appConfig: {}, + args: instanceArgs + }); +}); + +When("remember last instance as {string}", function(this: CustomWorld, seq: string) { + if (!this.resources.instance) throw new Error("No instance client set"); + + this.resources.instanceList[seq] = this.resources.instance; +}); + +When("switch to instance {string}", function(this: CustomWorld, seq: string) { + if (!this.resources.instanceList[seq]) throw new Error(`No instance "${seq}"`); + + this.resources.instance = this.resources.instanceList[seq]; +}); + When("start Instance with output topic name {string}", async function(this: CustomWorld, topicOut: string) { this.resources.instance = await this.resources.sequence!.start({ appConfig: {}, @@ -754,6 +786,13 @@ When("send {string} to stdin", async function(this: CustomWorld, str) { await this.resources.instance?.sendStream("stdin", Readable.from(str)); }); +Then("{string} starts with {string}", async function(this: CustomWorld, stream, text) { + const result = await this.resources.instance?.getStream(stream); + + await waitUntilStreamStartsWith(result!, text); + if (!result) assert.fail(`No data in ${stream}!`); +}); + Then("{string} is {string}", async function(this: CustomWorld, stream, text) { const result = await this.resources.instance?.getStream(stream); const response = await waitUntilStreamEquals(result!, text); diff --git a/bdd/step-definitions/world.ts b/bdd/step-definitions/world.ts index 1012de25d..344160009 100644 --- a/bdd/step-definitions/world.ts +++ b/bdd/step-definitions/world.ts @@ -16,6 +16,7 @@ export class CustomWorld implements World { resources: { [key: string]: any; hub?: ChildProcess; + instanceList: {[key: string]: InstanceClient}; instance?: InstanceClient; instance1?: InstanceClient; instance2?: InstanceClient; @@ -23,7 +24,9 @@ export class CustomWorld implements World { sequence1?: SequenceClient; sequence2?: SequenceClient; outStream?: Readable; - } = {}; + } = { + instanceList: {} + }; cliResources: { stdio?: [stdout: string, stderr: string, statusCode: any]; @@ -48,7 +51,6 @@ export class CustomWorld implements World { if (setDefaultResultOrder) { setDefaultResultOrder("ipv4first"); } - this.attach = attach; this.log = log; this.parameters = parameters; diff --git a/packages/host/src/lib/csi-controller.ts b/packages/host/src/lib/csi-controller.ts index 88aebb695..abb572933 100644 --- a/packages/host/src/lib/csi-controller.ts +++ b/packages/host/src/lib/csi-controller.ts @@ -56,6 +56,7 @@ const runnerExitDelay = 15000; type Events = { pang: (payload: MessageDataType) => void; + event: (payload: EventMessageData) => void; hourChime: () => void; error: (error: any) => void; stop: (code: number) => void; @@ -164,6 +165,8 @@ export class CSIController extends TypedEmitter { private downStreams?: DownstreamStreamsConfig; private upStreams: PassThroughStreamsConfig; + public localEmitter: EventEmitter & { lastEvents: { [evname: string]: any } }; + communicationHandler: ICommunicationHandler; constructor( @@ -195,6 +198,10 @@ export class CSIController extends TypedEmitter { this.communicationHandler = communicationHandler; this.logger = new ObjLogger(this, { id }); + this.localEmitter = Object.assign( + new EventEmitter(), + { lastEvents: {} } + ); this.logger.debug("Constructor executed"); this.info.created = new Date(); @@ -652,18 +659,15 @@ export class CSIController extends TypedEmitter { // We are not able to obtain all necessary information for this endpoint yet, disabling it for now // router.get("/status", RunnerMessageCode.STATUS, this.communicationHandler); - const localEmitter = Object.assign( - new EventEmitter(), - { lastEvents: {} } as { lastEvents: { [evname: string]: any } } - ); - this.communicationHandler.addMonitoringHandler(RunnerMessageCode.EVENT, (data) => { const event = data[1]; if (!event.eventName) return; - localEmitter.lastEvents[event.eventName] = event.message; - localEmitter.emit(event.eventName, event); + this.emit("event", event); + + this.localEmitter.lastEvents[event.eventName] = event.message; + this.localEmitter.emit(event.eventName, event); }); this.router.upstream("/events/:name", async (req: ParsedMessage, res: ServerResponse) => { @@ -688,12 +692,12 @@ export class CSIController extends TypedEmitter { const clean = () => { this.logger.debug(`Event stream "${name}" disconnected`); - localEmitter.off(name, handler); + this.localEmitter.off(name, handler); }; this.logger.debug("Event stream connected", name); - localEmitter.on(name, handler); + this.localEmitter.on(name, handler); res.on("error", clean); res.on("end", clean); @@ -704,16 +708,15 @@ export class CSIController extends TypedEmitter { const awaitEvent = async (req: ParsedMessage): Promise => new Promise((res) => { const name = req.params?.name; - if (!name) { + if (!name) throw new HostError("EVENT_NAME_MISSING"); - } - localEmitter.once(name, (data) => res(data.message)); + this.localEmitter.once(name, (data) => res(data.message)); }); this.router.get("/event/:name", async (req) => { - if (req.params?.name && localEmitter.lastEvents[req.params.name]) { - return localEmitter.lastEvents[req.params.name]; + if (req.params?.name && this.localEmitter.lastEvents[req.params.name]) { + return this.localEmitter.lastEvents[req.params.name]; } return awaitEvent(req); @@ -722,12 +725,30 @@ export class CSIController extends TypedEmitter { // operations this.router.op("post", "/_monitoring_rate", RunnerMessageCode.MONITORING_RATE, this.communicationHandler); - this.router.op("post", "/_event", RunnerMessageCode.EVENT, this.communicationHandler); + this.router.op("post", "/_event", (req) => this.handleEvent(req), this.communicationHandler); this.router.op("post", "/_stop", (req) => this.handleStop(req), this.communicationHandler); this.router.op("post", "/_kill", (req) => this.handleKill(req), this.communicationHandler); } + private async handleEvent(event: ParsedMessage): Promise> { + const [, { eventName, message }] = event.body; + + if (typeof eventName !== "string") + return { opStatus: ReasonPhrases.BAD_REQUEST, error: "Invalid format, eventName missing." }; + + await this.emitEvent({ eventName, source: "api", message }); + return { opStatus: ReasonPhrases.OK, accepted: ReasonPhrases.OK }; + } + + public async emitEvent({ source, eventName, message }: EventMessageData) { + await this.communicationHandler.sendControlMessage(RunnerMessageCode.EVENT, { + eventName, + source, + message + }); + } + private async handleStop(req: ParsedMessage): Promise> { const { body: { timeout = 7000, canCallKeepalive = false } = { timeout: 7000, canCallKeepalive: false } } = req; diff --git a/packages/host/src/lib/host.ts b/packages/host/src/lib/host.ts index cbb9d52ae..bbeceda65 100644 --- a/packages/host/src/lib/host.ts +++ b/packages/host/src/lib/host.ts @@ -8,6 +8,7 @@ import { AddressInfo } from "net"; import { APIExpose, CPMConnectorOptions, + EventMessageData, HostProxy, IComponent, IMonitoringServerConstructor, @@ -1019,6 +1020,9 @@ export class Host implements IComponent { this.instancesStore[id] = csic; + csic.on("event", async (event: EventMessageData) => { + await this.eventBus({ source: id, ...event }); + }); csic.on("error", (err) => { this.pushTelemetry("Instance error", { ...err }, "error"); this.logger.error("CSIController errored", err.message, err.exitcode); @@ -1131,6 +1135,16 @@ export class Host implements IComponent { return csic; } + async eventBus(event: EventMessageData) { + this.logger.debug("Got event", event); + + // Send the event to all instances except the source of the event. + await Promise.all( + Object.values(this.instancesStore) + .map(inst => event.source !== inst.id ? inst.emitEvent(event) : true) + ); + } + /** * Returns list of all Sequences. * diff --git a/packages/types/src/messages/event.ts b/packages/types/src/messages/event.ts index a54420ad4..d6a5a8ef7 100644 --- a/packages/types/src/messages/event.ts +++ b/packages/types/src/messages/event.ts @@ -5,8 +5,10 @@ export type EventMessageData = { /** Name of the event. */ eventName: string; + source?: string; + /** TODO update Informs if keepAlive can be called to prolong the running of the Sequence. */ - message: any + message: any; } /**