Skip to content

Commit 2e4e9f8

Browse files
committed
Proper handle binary output
1 parent 24b5f50 commit 2e4e9f8

File tree

9 files changed

+33
-8
lines changed

9 files changed

+33
-8
lines changed

packages/api-server/src/handlers/stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ export function createStreamHandlers(router: SequentialCeroRouter) {
6767
? `${type}; charset=${encoding}`
6868
: type;
6969

70-
out.setEncoding(encoding);
70+
logger.debug("encoding, cType, readableEncoding", encoding, cType, data.readableEncoding);
7171

7272
res.setHeader("content-type", cType);
7373
res.setHeader("transfer-encoding", "chunked");

packages/host/src/lib/csi-controller.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,8 @@ export class CSIController extends TypedEmitter<Events> {
496496
this.apiInputEnabled = false;
497497
}
498498

499+
this.upStreams[CC.OUT].setDefaultEncoding(pangData.outputEncoding || "utf-8");
500+
499501
this.emit("pang", {
500502
provides: this.provides,
501503
requires: this.requires,
@@ -628,7 +630,8 @@ export class CSIController extends TypedEmitter<Events> {
628630
if (development()) {
629631
this.router.upstream("/monitoring", this.upStreams[CC.MONITORING]);
630632
}
631-
this.router.upstream("/output", this.upStreams[CC.OUT]);
633+
634+
this.router.upstream("/output", this.upStreams[CC.OUT], { encoding: "binary" });
632635

633636
this.router.downstream("/input", (req) => {
634637
if (this.apiInputEnabled) {

packages/host/src/lib/host.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -628,12 +628,15 @@ export class Host implements IComponent {
628628
spaceMiddleware(req: ParsedMessage, res: ServerResponse) {
629629
const url = req.url!.replace(`${this.apiBase}/cpm/api/v1/`, "");
630630

631-
this.logger.info("SPACE REQUEST", req.url, url, this.apiBase, req.body);
631+
this.logger.info("SPACE REQUEST", req.url, url, this.apiBase);
632632

633633
const clientRequest = this.cpmConnector?.makeHttpRequestToCpm(req.method!, url, req.headers);
634634

635635
if (clientRequest) {
636636
clientRequest.on("response", (response: IncomingMessage) => {
637+
response.on("end", () => {
638+
this.logger.info("Space response ended", url, response.statusCode);
639+
});
637640
response.pipe(res);
638641
}).on("error", (error) => {
639642
this.logger.error("Error requesting CPM", error);

packages/host/src/lib/socket-server.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import net from "net";
44

55
import { isDefined, TypedEmitter } from "@scramjet/utility";
66
import { ObjLogger } from "@scramjet/obj-logger";
7+
import { CommunicationChannel } from "@scramjet/symbols";
78

89
type MaybeSocket = net.Socket | null
910
type RunnerConnectionsInProgress = [
@@ -58,7 +59,9 @@ export class SocketServer extends TypedEmitter<Events> implements IComponent {
5859

5960
connection
6061
.on("error", (err) => this.logger.error("Error on Instance in stream", id, channel, err))
61-
.on("end", () => this.logger.debug(`Channel [${id}:${channel}] ended`));
62+
.on("end", () => this.logger.debug(
63+
`Channel [${id}:${channel}] ended. tx/rx: ${connection.bytesWritten}/${connection.bytesRead}`
64+
));
6265

6366
try {
6467
await this.handleConnection(id, channel, connection);
@@ -90,8 +93,10 @@ export class SocketServer extends TypedEmitter<Events> implements IComponent {
9093
} else {
9194
throw new Error(`Runner(${id}) wanted to connect on already initialized channel ${channel}`);
9295
}
96+
9397
if (runner.every(isDefined)) {
9498
this.runnerConnectionsInProgress.delete(id);
99+
runner[CommunicationChannel.OUT]?.setDefaultEncoding("binary");
95100
this.emit("connect", id, runner as RunnerOpenConnections);
96101
}
97102
}

packages/runner/src/host-client.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ class HostClient implements IHostClient {
6565
);
6666

6767
this._streams = openConnections as HostOpenConnections;
68+
this._streams[CC.OUT].on("end", () => {
69+
this.logger.info("Total data written to instance output", (this.streams[CC.OUT] as net.Socket).bytesWritten);
70+
});
6871

6972
try {
7073
this.bpmux = new BPMux(this._streams[CC.PACKAGE]);

packages/runner/src/runner.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,13 @@ export class Runner<X extends AppConfig> implements IComponent {
589589
stream instanceof StringStream || stream instanceof BufferStream
590590
);
591591

592+
if (!shouldSerialize) {
593+
this.hostClient.outputStream.setDefaultEncoding("binary");
594+
}
595+
596+
this.logger.info("Will Output be serialized?", shouldSerialize);
597+
this.logger.info("Stream encoding is", stream.readableEncoding);
598+
592599
stream
593600
.once("end", () => {
594601
this.logger.debug("Sequence stream ended");
@@ -602,7 +609,8 @@ export class Runner<X extends AppConfig> implements IComponent {
602609
MessageUtils.writeMessageOnStream(
603610
[RunnerMessageCode.PANG, {
604611
provides: intermediate.topic || "",
605-
contentType: intermediate.contentType || ""
612+
contentType: intermediate.contentType || "",
613+
outputEncoding: shouldSerialize ? "utf-8" : "binary"
606614
}],
607615
this.hostClient.monitorStream,
608616
);

packages/types/src/messages/handshake.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ export type HandshakeMessage = { msgCode: RunnerMessageCode.PING };
1010
export type PingMessageData = { ports?: Record<string, string> }
1111

1212
export type PangMessageData = {
13-
requires?: string,
14-
contentType?: string,
15-
provides?: string
13+
requires?: string;
14+
contentType?: string;
15+
provides?: string;
16+
outputEncoding?: BufferEncoding;
1617
}

packages/verser/src/lib/verser-client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ export class VerserClient extends TypedEmitter<Events> {
121121
resolve({ res, socket });
122122
});
123123

124+
connectRequest.socket?.setNoDelay(true);
124125
connectRequest.flushHeaders();
125126
});
126127
}

packages/verser/src/lib/verser.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export class Verser extends TypedEmitter<Events> {
2525
this.server = server;
2626

2727
this.server.on("connect", (req, socket: Socket) => {
28+
socket.setNoDelay(true);
2829
this.logger.info("New connection:", req.url);
2930

3031
const connection = new VerserConnection(req, socket);

0 commit comments

Comments
 (0)