Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions comps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
from comps.proto.docarray import TextDoc, EmbedDoc768, EmbedDoc1024, GenerateDoc

# Microservice
from comps.mega.service_builder import BaseService, ServiceBuilder
from comps.mega.micro_service import MicroService
from comps.mega.orchestrator import ServiceOrchestrator
from comps.mega.orchestrator_with_yaml import ServiceOrchestratorWithYaml
from comps.mega.micro_service import MicroService, register_microservice, opea_microservices
47 changes: 14 additions & 33 deletions comps/embeddings/langchain/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio

from docarray.base_doc import DocArrayResponse
from http_service import HTTPService
from langchain_community.embeddings import HuggingFaceBgeEmbeddings

from comps import EmbedDoc1024, TextDoc


async def setup():
runtime_args = {
"title": "test_service",
"description": "this is a test.",
"protocol": "http",
"port": 8099,
"host": "localhost",
}
# breakpoint()
service = HTTPService(runtime_args=runtime_args, cors=False)
app = service.app
from comps import EmbedDoc1024, TextDoc, opea_microservices, register_microservice

@app.post(
path="/v1/embed",
response_model=EmbedDoc1024,
response_class=DocArrayResponse,
summary="Get the embedded vector of the input text",
tags=["Debug"],
)
def embedding(input: TextDoc) -> EmbedDoc1024:
embeddings = HuggingFaceBgeEmbeddings(model_name="BAAI/bge-large-en-v1.5")
embed_vector = embeddings.embed_query(input.text)
res = EmbedDoc1024(text=input.text, embedding=embed_vector)
return res

await service.initialize_server()
await service.execute_server()
@register_microservice(
name="opea_embedding_service",
expose_endpoint="/v1/embeddings",
port=9000,
input_datatype=TextDoc,
output_datatype=EmbedDoc1024,
)
def embedding(input: TextDoc) -> EmbedDoc1024:
embeddings = HuggingFaceBgeEmbeddings(model_name="BAAI/bge-large-en-v1.5")
embed_vector = embeddings.embed_query(input.text)
res = EmbedDoc1024(text=input.text, embedding=embed_vector)
return res


asyncio.run(setup())
opea_microservices["opea_embedding_service"].start()
10 changes: 7 additions & 3 deletions comps/mega/base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from types import SimpleNamespace
from typing import TYPE_CHECKING, Dict, Optional

from logger import CustomLogger
from .logger import CustomLogger

__all__ = ["BaseService"]

Expand All @@ -27,7 +27,7 @@ class BaseService:
def __init__(
self,
name: Optional[str] = "Base service",
runtime_args: Optional[Dict] = None,
runtime_args: Optional[Dict] = {},
**kwargs,
):
"""Initialize the BaseService with a name, runtime arguments, and any additional arguments."""
Expand All @@ -41,7 +41,7 @@ def __init__(

def _process_runtime_args(self):
"""Process the runtime arguments to ensure they are in the correct format."""
_runtime_args = self.runtime_args if isinstance(self.runtime_args, dict) else vars(self.runtime_args or {})
_runtime_args = self.runtime_args if isinstance(self.runtime_args, dict) else vars(self.runtime_args)
self.runtime_args = SimpleNamespace(**_runtime_args)

@property
Expand Down Expand Up @@ -118,6 +118,8 @@ def check_server_readiness(
res = False
if protocol is None or protocol == "http":
res = HTTPService.check_readiness(ctrl_address)
else:
raise ValueError(f"Unsupported protocol: {protocol}")
return res

@staticmethod
Expand All @@ -138,4 +140,6 @@ async def async_check_server_readiness(
res = False
if protocol is None or protocol == "http":
res = await HTTPService.async_check_readiness(ctrl_address)
else:
raise ValueError(f"Unsupported protocol: {protocol}")
return res
4 changes: 2 additions & 2 deletions comps/mega/http_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

from typing import Optional

from base_service import BaseService
from fastapi import FastAPI
from uvicorn import Config, Server

from .base_service import BaseService


class HTTPService(BaseService):
"""FastAPI HTTP service based on BaseService class.
Expand Down Expand Up @@ -127,7 +128,6 @@ async def execute_server(self):
async def terminate_server(self):
"""Terminate the HTTP server and free resources allocated when setting up the server."""
self.logger.info("Initiating server termination")
await super().shutdown()
self.server.should_exit = True
await self.server.shutdown()
self.logger.info("Server termination completed")
Expand Down
5 changes: 5 additions & 0 deletions comps/mega/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def log_message(self, log_level: str, msg: str):
"""
self.logger.log(log_level, msg)

def close(self):
"""Close all the handlers."""
for handler in self.logger.handlers:
handler.close()

# Define type hints for pylint check
debug: Callable[[str], None]
info: Callable[[str], None]
Expand Down
115 changes: 92 additions & 23 deletions comps/mega/micro_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,53 @@
# limitations under the License.

import asyncio
from typing import Dict, Optional
import multiprocessing
import os
import signal
from typing import Any, Optional, Type

from constants import ServiceRoleType
from utils import check_ports_availability
from ..proto.docarray import TextDoc
from .constants import ServiceRoleType
from .utils import check_ports_availability

opea_microservices = {}


class MicroService:
"""MicroService class to create a microservice."""

def __init__(self, args: Optional[Dict] = None):
def __init__(
self,
name: Optional[str] = None,
service_role: ServiceRoleType = ServiceRoleType.MICROSERVICE,
protocol: str = "http",
host: str = "localhost",
port: int = 8080,
expose_endpoint: Optional[str] = "/",
input_datatype: Type[Any] = TextDoc,
output_datatype: Type[Any] = TextDoc,
replicas: int = 1,
provider: Optional[str] = None,
provider_endpoint: Optional[str] = None,
):
"""Init the microservice."""
self.args = args
if args.get("name", None):
self.name = f'{args.get("name")}/{self.__class__.__name__}'
else:
self.name = self.__class__.__name__
self.service_role = args.get("service_role", ServiceRoleType.MICROSERVICE)
self.protocol = args.get("protocol", "http")

self.host = args.get("host", "localhost")
self.port = args.get("port", 8080)
self.replicas = args.get("replicas", 1)
self.provider = args.get("provider", None)
self.provider_endpoint = args.get("provider_endpoint", None)
self.name = f"{name}/{self.__class__.__name__}" if name else self.__class__.__name__
self.service_role = service_role
self.protocol = protocol
self.host = host
self.port = port
self.expose_endpoint = expose_endpoint
self.input_datatype = input_datatype
self.output_datatype = output_datatype
self.replicas = replicas
self.provider = provider
self.provider_endpoint = provider_endpoint
self.endpoints = []

self.server = self._get_server()
self.app = self.server.app
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
self.event_loop.run_until_complete(self.async_setup())

def _get_server(self):
Expand All @@ -50,7 +69,7 @@ def _get_server(self):
necessary arguments.
In the future, it will also support gRPC services.
"""
from http_service import HTTPService
from .http_service import HTTPService

runtime_args = {
"protocol": self.protocol,
Expand All @@ -60,11 +79,7 @@ def _get_server(self):
"description": "OPEA Microservice Infrastructure",
}

return HTTPService(
uvicorn_kwargs=self.args.get("uvicorn_kwargs", None),
runtime_args=runtime_args,
cors=self.args.get("cors", None),
)
return HTTPService(runtime_args=runtime_args)

async def async_setup(self):
"""The async method setup the runtime.
Expand All @@ -88,3 +103,57 @@ def run(self):
This method runs the event loop until a Future is done. It is designed to be called in the main thread to keep it busy.
"""
self.event_loop.run_until_complete(self.async_run_forever())

def start(self):
self.process = multiprocessing.Process(target=self.run, daemon=False, name=self.name)
self.process.start()

async def async_teardown(self):
"""Shutdown the server."""
await self.server.terminate_server()

def stop(self):
self.event_loop.run_until_complete(self.async_teardown())
self.event_loop.stop()
self.event_loop.close()
self.server.logger.close()
if self.process.is_alive():
self.process.terminate()

@property
def endpoint_path(self):
return f"{self.protocol}://{self.host}:{self.port}{self.expose_endpoint}"


def register_microservice(
name: Optional[str] = None,
service_role: ServiceRoleType = ServiceRoleType.MICROSERVICE,
protocol: str = "http",
host: str = "localhost",
port: int = 8080,
expose_endpoint: Optional[str] = "/",
input_datatype: Type[Any] = TextDoc,
output_datatype: Type[Any] = TextDoc,
replicas: int = 1,
provider: Optional[str] = None,
provider_endpoint: Optional[str] = None,
):
def decorator(func):
micro_service = MicroService(
name=name,
service_role=service_role,
protocol=protocol,
host=host,
port=port,
expose_endpoint=expose_endpoint,
input_datatype=input_datatype,
output_datatype=output_datatype,
replicas=replicas,
provider=provider,
provider_endpoint=provider_endpoint,
)
micro_service.app.router.add_api_route(expose_endpoint, func, methods=["POST"])
opea_microservices[name] = micro_service
return func

return decorator
32 changes: 10 additions & 22 deletions comps/mega/service_builder.py → comps/mega/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,14 @@
# limitations under the License.

import json
import os
import re
import subprocess
import time
from collections import OrderedDict, defaultdict
from typing import Dict, List, Optional, Tuple
from typing import Dict, List

import requests
import yaml
from dag import DAG

from .dag import DAG

class BaseService:
def __init__(self, id, endpoint) -> None:
"""The base service object contains an id and an endpoint url."""
self.id = id
self.endpoint = endpoint


class ServiceBuilder(DAG):
class ServiceOrchestrator(DAG):
"""Manage 1 or N micro services in a DAG through Python API."""

def __init__(self, host="localhost", port=8000, hostfile=None) -> None:
Expand All @@ -41,16 +29,16 @@ def __init__(self, host="localhost", port=8000, hostfile=None) -> None:
super().__init__()

def add(self, service):
if service.id not in self.services:
self.services[service.id] = service
self.add_node_if_not_exists(service.id)
if service.name not in self.services:
self.services[service.name] = service
self.add_node_if_not_exists(service.name)
else:
raise Exception(f"Service {service.id} already exists!")
raise Exception(f"Service {service.name} already exists!")
return self

def flow_to(self, from_service, to_service):
try:
self.add_edge(from_service.id, to_service.id)
self.add_edge(from_service.name, to_service.name)
return True
except Exception as e:
print(e)
Expand All @@ -75,8 +63,8 @@ def process_outputs(self, prev_nodes: List) -> Dict:

def execute(self, cur_node: str, inputs: Dict):
# send the cur_node request/reply
endpoint = self.services[cur_node].endpoint
response = requests.post(url=endpoint, data=json.dumps({"number": inputs["number"]}), proxies={"http": None})
endpoint = self.services[cur_node].endpoint_path
response = requests.post(url=endpoint, data=json.dumps(inputs), proxies={"http": None})
print(response)
return response.json()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,17 @@
# limitations under the License.

import json
import os
import re
import subprocess
import time
from collections import OrderedDict, defaultdict
from typing import Dict, List, Optional, Tuple
from collections import OrderedDict
from typing import Dict, List, Tuple

import requests
import yaml
from dag import DAG

from .dag import DAG

class YAMLServiceBuilder(DAG):

class ServiceOrchestratorWithYaml(DAG):
"""Manage 1 or N micro services in a DAG defined by YAML."""

def __init__(self, yaml_file_path: str):
Expand All @@ -39,7 +37,7 @@ def __init__(self, yaml_file_path: str):
def execute(self, cur_node: str, inputs: Dict):
# send the cur_node request/reply
endpoint = self.docs["opea_micro_services"][cur_node]["endpoint"]
response = requests.post(url=endpoint, data=json.dumps({"number": inputs["number"]}), proxies={"http": None})
response = requests.post(url=endpoint, data=json.dumps(inputs), proxies={"http": None})
print(response)
return response.json()

Expand Down
4 changes: 4 additions & 0 deletions comps/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
docarray
httpx
pyyaml
requests
Loading