Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 89 additions & 26 deletions packages/cli/src/lib/commands/sequence.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable max-len */
import { CommandDefinition } from "../../types";
import { createWriteStream, lstatSync } from "fs";
import { createWriteStream, lstatSync, readFileSync } from "fs";
import { displayEntity, displayError, displayMessage, displayObject } from "../output";
import { getHostClient } from "../common";
import { getSequenceId, profileManager, sessionConfig } from "../config";
Expand All @@ -10,9 +10,12 @@ import { PassThrough, Writable } from "stream";
import { isDevelopment } from "../../utils/envs";

import { resolve } from "path";
import { sequenceDelete, sequencePack, sequenceParseArgs, sequenceSendPackage, sequenceStart } from "../helpers/sequence";
import { sequenceDelete, sequencePack, sequenceParseArgs, sequenceParseConfig, sequenceSendPackage, sequenceStart } from "../helpers/sequence";
import { ClientError } from "@scramjet/client-utils";
import { initPlatform } from "../platform";
import { AppConfig, DeepPartial } from "@scramjet/types";
import { isStartSequenceEndpointPayloadDTO, merge } from "@scramjet/utility";
import { SequenceDeployArgs, SequenceStartCLIArgs } from "../../types/params";

/**
* Initializes `sequence` command.
Expand Down Expand Up @@ -108,6 +111,28 @@ export const sequence: CommandDefinition = (program) => {
}
);

function validateStartupConfig(config: DeepPartial<SequenceDeployArgs>) {
return isStartSequenceEndpointPayloadDTO(config);
}

function loadStartupConfig(filename: string): DeepPartial<SequenceDeployArgs> {
if (!filename) return {};

let json = {};

try {
const data = readFileSync(filename, "utf8");

json = JSON.parse(data);
} catch (e) {
// eslint-disable-next-line no-console
console.error(e);
process.exit(1);
}

return json;
}

sequenceCmd
.command("start")
.argument("<id>", "Sequence id to start or '-' for the last uploaded")
Expand All @@ -119,28 +144,40 @@ export const sequence: CommandDefinition = (program) => {
.option("--output-topic <string>", "Topic to which the output stream should be routed")
.option("--input-topic <string>", "Topic to which the input stream should be routed")
.option("--args <json-string>", "Arguments to be passed to the first function in the Sequence")
.option("--startup-config <path-to-config>", "Path to startup config", loadStartupConfig)
.option("--limits <json-string>", "Instance limits")
.description("Start the Sequence with or without given arguments")
.action(async (id, { configFile, configString, outputTopic, inputTopic, args: argsStr, limits: limitsStr, instId }) => {
let args;

if (argsStr) args = sequenceParseArgs(argsStr);
.action(async (id, { startupConfig, configFile, configString, outputTopic, inputTopic, args: argsStr, limits: limitsStr, instId: instanceId }: SequenceStartCLIArgs) => {
const args = argsStr ? sequenceParseArgs(argsStr) : undefined;
const appConfig = await sequenceParseConfig(configFile, configString);
const limits = limitsStr ? JSON.parse(limitsStr) : {};

const instanceClient = await sequenceStart(
id, { configFile, configString, args, outputTopic, inputTopic, limits, instId });
startupConfig ||= {};
merge(startupConfig, {
appConfig,
args,
instanceId,
inputTopic,
outputTopic,
limits
});

if (!validateStartupConfig(startupConfig)) {
throw new Error("Invalid startup config",);
}

const instanceClient = await sequenceStart(id, {
appConfig: startupConfig.appConfig as AppConfig,
args: startupConfig.args,
limits: startupConfig.limits,
instanceId: startupConfig.instanceId,
outputTopic: startupConfig.outputTopic,
inputTopic: startupConfig.inputTopic
});

displayObject(instanceClient, profileManager.getProfileConfig().format);
});

type DeployArgs = {
output: string;
configFile: any;
configString: string;
instId?: string;
args?: string;
};

sequenceCmd
.command("deploy")
.alias("run")
Expand All @@ -150,38 +187,64 @@ export const sequence: CommandDefinition = (program) => {
.option("-s, --config-string <json-string>", "Configuration in JSON format to be passed to the Instance context")
.option("--inst-id <string>", "Start Sequence with a custom Instance Id. Should consist of 36 characters")
// TODO: check if output-topic and input-topic should be added after development
.option("--output-topic <string>", "Topic to which the output stream should be routed")
.option("--input-topic <string>", "Topic to which the input stream should be routed")
.option("--args <json-string>", "Arguments to be passed to the first function in the Sequence")
.option("--startup-config <path-to-config>", "Path to startup config", loadStartupConfig)
.option("--limits <json-string>", "Instance limits")
.description("Pack (if needed), send and start the Sequence")
.action(async (path: string, { output: fileoutput, configFile, configString, args: argsStr, instId }: DeployArgs) => {
let args;
.action(async (path: string, { startupConfig, output, configFile, configString, outputTopic, inputTopic, args: argsStr, limits: limitsStr, instId }: SequenceStartCLIArgs) => {
const args = argsStr ? sequenceParseArgs(argsStr) : undefined;
const appConfig = await sequenceParseConfig(configFile, configString);
const limits = limitsStr ? JSON.parse(limitsStr) : {};

if (argsStr) args = sequenceParseArgs(argsStr);
startupConfig ||= {};
merge(startupConfig, {
output,
appConfig,
args,
instanceId: instId,
inputTopic,
outputTopic,
limits
});

const output = new PassThrough();
if (!validateStartupConfig(startupConfig)) {
throw new Error("Invalid startup config",);
}

if (fileoutput) {
const outputPath = fileoutput ? resolve(fileoutput) : `${resolve(path)}.tar.gz`;
const compressedPackageStream = new PassThrough();

output.pipe(createWriteStream(outputPath));
if (startupConfig.output) {
const outputPath = startupConfig.output ? resolve(startupConfig.output) : `${resolve(path)}.tar.gz`;

compressedPackageStream.pipe(createWriteStream(outputPath));
sessionConfig.setLastPackagePath(outputPath);
}
const format = profileManager.getProfileConfig().format;

if (lstatSync(path).isDirectory()) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
const sendSeqPromise = getHostClient().sendSequence(output).then(seq => {
const sendSeqPromise = getHostClient().sendSequence(compressedPackageStream).then(seq => {
sessionConfig.setLastSequenceId(seq.id);
});

await sequencePack(path, { output });
await sequencePack(path, { output: compressedPackageStream });
await sendSeqPromise;
} else {
const sequenceClient = await sequenceSendPackage(path, {}, false, { progress: sequenceCmd.parent?.getOptionValue("progress") });

displayObject(sequenceClient, profileManager.getProfileConfig().format);
}

const instanceClient = await sequenceStart("-", { configFile, configString, args, instId });
const instanceClient = await sequenceStart("-", {
appConfig: startupConfig.appConfig as AppConfig,
args: startupConfig.args,
limits: startupConfig.limits,
instanceId: startupConfig.instanceId,
outputTopic: startupConfig.outputTopic,
inputTopic: startupConfig.inputTopic
});

displayObject(instanceClient, format);
});
Expand Down
34 changes: 19 additions & 15 deletions packages/cli/src/lib/helpers/sequence.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

import { GetSequenceResponse } from "@scramjet/types/src/rest-api-sth";
import { InstanceLimits } from "@scramjet/types";
import { AppConfig, InstanceLimits } from "@scramjet/types";
import { constants, createReadStream, createWriteStream, PathLike } from "fs";
import { readFile, readdir, access, lstat } from "fs/promises";
import { InstanceClient, SequenceClient } from "@scramjet/api-client";
Expand Down Expand Up @@ -148,35 +148,39 @@ export const sequenceSendPackage = async (
}
};

export const sequenceStart = async (
id: string, { configFile, configString, args, outputTopic, inputTopic, limits, instId }:
{
configFile: any,
configString: string,
args?: any[],
outputTopic?: string,
inputTopic?: string,
limits?: InstanceLimits,
instId?: string
}
): Promise<InstanceClient> => {
export const sequenceParseConfig = async (configFile: string = "", configString: string = ""): Promise<AppConfig> => {
if (configFile && configString) {
return Promise.reject(new Error("Provide one source of configuration"));
}

let appConfig = {};
let appConfig;

try {
if (configString) appConfig = JSON.parse(configString);
if (configFile) appConfig = JSON.parse(await readFile(configFile, "utf-8"));
} catch (_) {
return Promise.reject(new Error("Unable to read configuration"));
}

return appConfig;
};

export const sequenceStart = async (
id: string, { appConfig, args, outputTopic, inputTopic, limits, instanceId }:
{
appConfig: AppConfig,
args?: any[],
outputTopic?: string,
inputTopic?: string,
limits?: InstanceLimits,
instanceId?: string
}
): Promise<InstanceClient> => {
const sequenceClient = SequenceClient.from(getSequenceId(id), getHostClient());

try {
const instance = await sequenceClient.start({
appConfig, args, outputTopic, inputTopic, limits, instanceId: instId
appConfig, args: args?.length ? args : undefined, outputTopic, inputTopic, limits, instanceId
});

sessionConfig.setLastInstanceId(instance.id);
Expand Down
5 changes: 5 additions & 0 deletions packages/cli/src/types/params/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**
* Arguments types
*/

export * from "./sequence";
17 changes: 17 additions & 0 deletions packages/cli/src/types/params/sequence.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { DeepPartial, STHRestAPI } from "@scramjet/types";

export type SequenceDeployArgs = STHRestAPI.StartSequencePayload & {
output?: string;
};

export type SequenceStartCLIArgs = {
args?: string;
configFile?: string;
configString?: string;
instId?: string;
inputTopic?: string;
limits?: string;
output?: string;
outputTopic?: string;
startupConfig: DeepPartial<SequenceDeployArgs>;
};
16 changes: 8 additions & 8 deletions packages/types/src/rest-api-sth/start-sequence.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { AppConfig } from "../app-config";
import { InstanceLimits } from "../instance-limits";

export type StartSequenceResponse = { id: string }
export type StartSequenceResponse = { id: string };

export type StartSequencePayload = {
appConfig: AppConfig,
args?: any[],
outputTopic?: string,
inputTopic?: string,
limits?: InstanceLimits,
instanceId?: string
}
appConfig: AppConfig;
args?: any[];
outputTopic?: string;
inputTopic?: string;
limits?: InstanceLimits;
instanceId?: string;
};