From d28fa503b4cdb59705100e39d77f1fb04a7d7426 Mon Sep 17 00:00:00 2001 From: Srinarayan Srikanthan Date: Tue, 18 Mar 2025 11:01:33 +0800 Subject: [PATCH 1/5] remote endpoint support Signed-off-by: Srinarayan Srikanthan --- comps/cores/mega/micro_service.py | 12 ++++++++++-- comps/cores/mega/orchestrator.py | 9 +++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/comps/cores/mega/micro_service.py b/comps/cores/mega/micro_service.py index 2d79d6414f..dce33d5bb6 100644 --- a/comps/cores/mega/micro_service.py +++ b/comps/cores/mega/micro_service.py @@ -30,6 +30,7 @@ def __init__( protocol: str = "http", host: str = "localhost", port: int = 8080, + api_key : str = None, ssl_keyfile: Optional[str] = None, ssl_certfile: Optional[str] = None, endpoint: Optional[str] = "/", @@ -49,6 +50,7 @@ def __init__( self.protocol = protocol self.host = host self.port = port + self.api_key= api_key self.endpoint = endpoint self.input_datatype = input_datatype self.output_datatype = output_datatype @@ -137,8 +139,14 @@ def _validate_env(self): @property def endpoint_path(self): - return f"{self.protocol}://{self.host}:{self.port}{self.endpoint}" - + if self.api_key: + return f"{self.host}{self.endpoint}" + else: + return f"{self.protocol}://{self.host}:{self.port}{self.endpoint}" + + @property + def api_key_value(self): + return self.api_key def register_microservice( name: str, diff --git a/comps/cores/mega/orchestrator.py b/comps/cores/mega/orchestrator.py index 4532097e30..9f3a984bb6 100644 --- a/comps/cores/mega/orchestrator.py +++ b/comps/cores/mega/orchestrator.py @@ -239,6 +239,7 @@ async def execute( ): # send the cur_node request/reply endpoint = self.services[cur_node].endpoint_path + access_token = self.services[cur_node].api_key_value llm_parameters_dict = llm_parameters.dict() is_llm_vlm = self.services[cur_node].service_type in (ServiceType.LLM, ServiceType.LVM) @@ -262,7 +263,11 @@ async def execute( response = requests.post( url=endpoint, data=json.dumps(inputs), + headers={ headers={"Content-type": "application/json"}, + "Content-type": "application/json", + "Authorization": f"Bearer {access_token}" + }, proxies={"http": None}, stream=True, timeout=1000, @@ -290,6 +295,10 @@ def generate(): res = requests.post( url=downstream_endpoint, data=json.dumps({"text": buffered_chunk_str}), + headers={ + "Content-type": "application/json", + "Authorization": f"Bearer {access_token}" # Replace access_token with your actual token + }, proxies={"http": None}, ) res_json = res.json() From cbc97fd5655a4b28c4f195334674890c5fd77718 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 18 Mar 2025 03:00:52 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- comps/cores/mega/micro_service.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/comps/cores/mega/micro_service.py b/comps/cores/mega/micro_service.py index dce33d5bb6..07cf0149a2 100644 --- a/comps/cores/mega/micro_service.py +++ b/comps/cores/mega/micro_service.py @@ -30,7 +30,7 @@ def __init__( protocol: str = "http", host: str = "localhost", port: int = 8080, - api_key : str = None, + api_key: str = None, ssl_keyfile: Optional[str] = None, ssl_certfile: Optional[str] = None, endpoint: Optional[str] = "/", @@ -50,7 +50,7 @@ def __init__( self.protocol = protocol self.host = host self.port = port - self.api_key= api_key + self.api_key = api_key self.endpoint = endpoint self.input_datatype = input_datatype self.output_datatype = output_datatype @@ -140,14 +140,15 @@ def _validate_env(self): @property def endpoint_path(self): if self.api_key: - return f"{self.host}{self.endpoint}" + return f"{self.host}{self.endpoint}" else: - return f"{self.protocol}://{self.host}:{self.port}{self.endpoint}" - + return f"{self.protocol}://{self.host}:{self.port}{self.endpoint}" + @property def api_key_value(self): return self.api_key + def register_microservice( name: str, service_role: ServiceRoleType = ServiceRoleType.MICROSERVICE, From 6469dc02168035f6504b4f034a5db58978c2b867 Mon Sep 17 00:00:00 2001 From: Srinarayan Srikanthan Date: Thu, 20 Mar 2025 23:56:39 +0800 Subject: [PATCH 3/5] typo fix Signed-off-by: Srinarayan Srikanthan --- comps/cores/mega/orchestrator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/comps/cores/mega/orchestrator.py b/comps/cores/mega/orchestrator.py index 9f3a984bb6..c63ec7952c 100644 --- a/comps/cores/mega/orchestrator.py +++ b/comps/cores/mega/orchestrator.py @@ -264,7 +264,6 @@ async def execute( url=endpoint, data=json.dumps(inputs), headers={ - headers={"Content-type": "application/json"}, "Content-type": "application/json", "Authorization": f"Bearer {access_token}" }, From cab05cbc4255df0bc3547ab48dff152b4e4ce37d Mon Sep 17 00:00:00 2001 From: Srinarayan Srikanthan Date: Tue, 25 Mar 2025 10:49:24 +0800 Subject: [PATCH 4/5] added new test case Signed-off-by: Srinarayan Srikanthan --- comps/cores/mega/orchestrator.py | 49 +++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/comps/cores/mega/orchestrator.py b/comps/cores/mega/orchestrator.py index c63ec7952c..1fc7d79d58 100644 --- a/comps/cores/mega/orchestrator.py +++ b/comps/cores/mega/orchestrator.py @@ -260,17 +260,30 @@ async def execute( if ENABLE_OPEA_TELEMETRY else contextlib.nullcontext() ): - response = requests.post( - url=endpoint, - data=json.dumps(inputs), - headers={ - "Content-type": "application/json", - "Authorization": f"Bearer {access_token}" - }, - proxies={"http": None}, - stream=True, - timeout=1000, - ) + if access_token: + response = requests.post( + url=endpoint, + data=json.dumps(inputs), + headers={ + "Content-type": "application/json", + "Authorization": f"Bearer {access_token}" + }, + proxies={"http": None}, + stream=True, + timeout=1000, + ) + else: + response = requests.post( + url=endpoint, + data=json.dumps(inputs), + headers={ + "Content-type": "application/json", + }, + proxies={"http": None}, + stream=True, + timeout=1000, + ) + downstream = runtime_graph.downstream(cur_node) if downstream: assert len(downstream) == 1, "Not supported multiple stream downstreams yet!" @@ -291,12 +304,22 @@ def generate(): buffered_chunk_str += self.extract_chunk_str(chunk) is_last = chunk.endswith("[DONE]\n\n") if (buffered_chunk_str and buffered_chunk_str[-1] in hitted_ends) or is_last: - res = requests.post( + if access_token: + res = requests.post( + url=downstream_endpoint, + data=json.dumps({"text": buffered_chunk_str}), + headers={ + "Content-type": "application/json", + "Authorization": f"Bearer {access_token}" + }, + proxies={"http": None}, + ) + else: + res = requests.post( url=downstream_endpoint, data=json.dumps({"text": buffered_chunk_str}), headers={ "Content-type": "application/json", - "Authorization": f"Bearer {access_token}" # Replace access_token with your actual token }, proxies={"http": None}, ) From d9b49cf56e4abc1bedac031e21de138c629bd999 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 25 Mar 2025 03:03:44 +0000 Subject: [PATCH 5/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- comps/cores/mega/orchestrator.py | 37 +++++++++++++++----------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/comps/cores/mega/orchestrator.py b/comps/cores/mega/orchestrator.py index 1fc7d79d58..b74cb18901 100644 --- a/comps/cores/mega/orchestrator.py +++ b/comps/cores/mega/orchestrator.py @@ -264,10 +264,7 @@ async def execute( response = requests.post( url=endpoint, data=json.dumps(inputs), - headers={ - "Content-type": "application/json", - "Authorization": f"Bearer {access_token}" - }, + headers={"Content-type": "application/json", "Authorization": f"Bearer {access_token}"}, proxies={"http": None}, stream=True, timeout=1000, @@ -283,7 +280,7 @@ async def execute( stream=True, timeout=1000, ) - + downstream = runtime_graph.downstream(cur_node) if downstream: assert len(downstream) == 1, "Not supported multiple stream downstreams yet!" @@ -306,23 +303,23 @@ def generate(): if (buffered_chunk_str and buffered_chunk_str[-1] in hitted_ends) or is_last: if access_token: res = requests.post( - url=downstream_endpoint, - data=json.dumps({"text": buffered_chunk_str}), - headers={ - "Content-type": "application/json", - "Authorization": f"Bearer {access_token}" - }, - proxies={"http": None}, - ) + url=downstream_endpoint, + data=json.dumps({"text": buffered_chunk_str}), + headers={ + "Content-type": "application/json", + "Authorization": f"Bearer {access_token}", + }, + proxies={"http": None}, + ) else: res = requests.post( - url=downstream_endpoint, - data=json.dumps({"text": buffered_chunk_str}), - headers={ - "Content-type": "application/json", - }, - proxies={"http": None}, - ) + url=downstream_endpoint, + data=json.dumps({"text": buffered_chunk_str}), + headers={ + "Content-type": "application/json", + }, + proxies={"http": None}, + ) res_json = res.json() if "text" in res_json: res_txt = res_json["text"]