Skip to content

Commit eb70a49

Browse files
Merge pull request #4 from opea-project/letong/microservice
Add HTTPService and AsyncLoop for microservice
2 parents 69de47b + cd32055 commit eb70a49

5 files changed

Lines changed: 601 additions & 1 deletion

File tree

GenAIComps/mega/async_loop.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import asyncio
2+
import signal
3+
import time
4+
from typing import Optional, Dict
5+
from logger import CustomLogger
6+
from utils import check_ports_availability
7+
8+
9+
# Define the signals that will be handled by the AsyncLoop class
10+
HANDLED_SIGNALS = (
11+
signal.SIGINT, # Unix signal 2. Sent by Ctrl+C.
12+
signal.SIGTERM, # Unix signal 15. Sent by `kill <pid>`.
13+
signal.SIGSEGV, # Unix signal 11. Caused by an invalid memory reference.
14+
)
15+
16+
17+
class AsyncLoop:
18+
"""
19+
Async loop to run a microservice asynchronously.
20+
This class is designed to handle the running of a microservice in an asynchronous manner.
21+
It sets up an event loop and handles certain signals to gracefully stop the service.
22+
"""
23+
24+
def __init__(self, args: Optional[Dict] = None) -> None:
25+
"""
26+
Initialize the AsyncLoop class.
27+
This method sets up the initial state of the AsyncLoop, including setting up the event loop and signal handlers.
28+
"""
29+
self.args = args
30+
if args.get('name', None):
31+
self.name = f'{args.get("name")}/{self.__class__.__name__}'
32+
else:
33+
self.name = self.__class__.__name__
34+
self.protocol = args.get('protocol', 'http')
35+
self.host = args.get('host', 'localhost')
36+
self.port = args.get('port', 8080)
37+
self.quiet_error = args.get('quiet_error', False)
38+
self.logger = CustomLogger(self.name)
39+
self._loop = asyncio.new_event_loop()
40+
asyncio.set_event_loop(self._loop)
41+
self.is_cancel = asyncio.Event()
42+
self.logger.info(f'Setting signal handlers')
43+
44+
def _cancel(signum, frame):
45+
"""
46+
Signal handler for the AsyncLoop class.
47+
This method is called when a signal is received. It sets the is_cancel event to stop the event loop.
48+
"""
49+
self.logger.info(f'Received signal {signum}')
50+
self.is_cancel.set(),
51+
52+
for sig in HANDLED_SIGNALS:
53+
signal.signal(sig, _cancel)
54+
55+
self._start_time = time.time()
56+
self._loop.run_until_complete(self.async_setup())
57+
58+
def run_forever(self):
59+
"""
60+
Running method to block the main thread.
61+
This method runs the event loop until a Future is done. It is designed to be called in the main thread to keep it busy.
62+
"""
63+
self._loop.run_until_complete(self._loop_body())
64+
65+
def teardown(self):
66+
"""
67+
Call async_teardown() and stop and close the event loop.
68+
This method is responsible for tearing down the event loop. It first calls the async_teardown method to perform
69+
any necessary cleanup, then it stops and closes the event loop. It also logs the duration of the event loop.
70+
"""
71+
self._loop.run_until_complete(self.async_teardown())
72+
self._loop.stop()
73+
self._loop.close()
74+
self._stop_time = time.time()
75+
self.logger.info(f"Async loop is tore down. Duration: {self._stop_time - self._start_time}")
76+
77+
def _get_server(self):
78+
"""
79+
Get the server instance based on the protocol.
80+
This method currently only supports HTTP services. It creates an instance of the HTTPService class with the
81+
necessary arguments.
82+
In the future, it will also support gRPC services.
83+
"""
84+
if self.protocol.lower() == 'http':
85+
from http_service import HTTPService
86+
87+
runtime_args = self.args.get('runtime_args', None)
88+
runtime_args['protocol'] = self.protocol
89+
runtime_args['host'] = self.host
90+
runtime_args['port'] = self.port
91+
return HTTPService(
92+
uvicorn_kwargs=self.args.get('uvicorn_kwargs', None),
93+
runtime_args=runtime_args,
94+
cors=self.args.get('cors', None),
95+
)
96+
97+
async def async_setup(self):
98+
"""
99+
The async method setup the runtime.
100+
This method is responsible for setting up the server. It first checks if the port is available, then it gets
101+
the server instance and initializes it.
102+
"""
103+
if not (check_ports_availability(self.host, self.port)):
104+
raise RuntimeError(f'port:{self.port}')
105+
106+
self.server = self._get_server()
107+
await self.server.initialize_server()
108+
109+
async def async_run_forever(self):
110+
"""
111+
Running method of the server.
112+
"""
113+
await self.server.execute_server()
114+
115+
async def async_teardown(self):
116+
"""
117+
Terminate the server.
118+
"""
119+
await self.server.terminate_server()
120+
121+
async def _wait_for_cancel(self):
122+
"""
123+
Wait for the cancellation event.
124+
This method waits for the is_cancel event to be set. If the server has a _should_exit attribute, it will also
125+
wait for that to be True. Once either of these conditions is met, it will call the async_teardown method.
126+
"""
127+
if isinstance(self.is_cancel, asyncio.Event) and not hasattr(
128+
self.server, '_should_exit'
129+
):
130+
await self.is_cancel.wait()
131+
else:
132+
while not self.is_cancel.is_set() and not getattr(
133+
self.server, '_should_exit', False
134+
):
135+
await asyncio.sleep(0.1)
136+
137+
await self.async_teardown()
138+
139+
async def _loop_body(self):
140+
"""
141+
The main body of the event loop.
142+
This method runs the async_run_forever and _wait_for_cancel methods concurrently. If a CancelledError is raised,
143+
it logs a warning message.
144+
"""
145+
try:
146+
await asyncio.gather(self.async_run_forever(), self._wait_for_cancel())
147+
except asyncio.CancelledError:
148+
self.logger.warning('received terminate ctrl message from main process')
149+
150+
def __enter__(self):
151+
"""
152+
Enter method for the context manager.
153+
This method simply returns the instance itself.
154+
"""
155+
return self
156+
157+
def __exit__(self, exc_type, exc_val, exc_tb):
158+
"""
159+
Exit method for the context manager.
160+
This method handles any exceptions that occurred within the context. If a KeyboardInterrupt was raised, it logs
161+
an info message. If any other exception was raised, it logs an error message. Finally, it attempts to call the
162+
teardown method. If an OSError is raised during this, it is ignored. Any other exceptions are logged as errors.
163+
"""
164+
if exc_type == KeyboardInterrupt:
165+
self.logger.info(f'{self!r} is interrupted by user')
166+
elif exc_type and issubclass(exc_type, Exception):
167+
self.logger.error(
168+
(
169+
f'{exc_val!r} during {self.run_forever!r}'
170+
+ f'\n add "--quiet-error" to suppress the exception details'
171+
if not self.quiet_error
172+
else ''
173+
),
174+
exc_info=not self.quiet_error,
175+
)
176+
else:
177+
self.logger.info(f'{self!r} is ended')
178+
179+
return True

GenAIComps/mega/base_service.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import abc
2+
from logger import CustomLogger
3+
from types import SimpleNamespace
4+
from typing import Dict, Optional, TYPE_CHECKING
5+
6+
7+
__all__ = ['BaseServer']
8+
9+
10+
class BaseService():
11+
"""
12+
BaseService creates a HTTP/gRPC server as a microservice.
13+
"""
14+
15+
def __init__(
16+
self,
17+
name: Optional[str] = 'Base service',
18+
runtime_args: Optional[Dict] = None,
19+
**kwargs,
20+
):
21+
"""
22+
Initialize the BaseService with a name, runtime arguments, and any additional arguments.
23+
"""
24+
self.name = name
25+
self.runtime_args = runtime_args
26+
self._process_runtime_args()
27+
self.title = self.runtime_args.title
28+
self.description = self.runtime_args.description
29+
self.logger = CustomLogger(self.name)
30+
self.server = None
31+
32+
def _process_runtime_args(self):
33+
"""
34+
Process the runtime arguments to ensure they are in the correct format.
35+
"""
36+
_runtime_args = (
37+
self.runtime_args
38+
if isinstance(self.runtime_args, dict)
39+
else vars(self.runtime_args or {})
40+
)
41+
self.runtime_args = SimpleNamespace(**_runtime_args)
42+
43+
@property
44+
def primary_port(self):
45+
"""
46+
Gets the first port of the port list argument.
47+
:return: The first port to be exposed
48+
"""
49+
return (
50+
self.runtime_args.port[0]
51+
if isinstance(self.runtime_args.port, list)
52+
else self.runtime_args.port
53+
)
54+
55+
@property
56+
def all_ports(self):
57+
"""
58+
Gets all the list of ports from the runtime_args as a list.
59+
:return: The lists of ports to be exposed
60+
"""
61+
return (
62+
self.runtime_args.port
63+
if isinstance(self.runtime_args.port, list)
64+
else [self.runtime_args.port]
65+
)
66+
67+
@property
68+
def protocols(self):
69+
"""
70+
Gets all the list of protocols from the runtime_args as a list.
71+
:return: The lists of protocols to be exposed
72+
"""
73+
return (
74+
self.runtime_args.protocol
75+
if isinstance(self.runtime_args.protocol, list)
76+
else [self.runtime_args.protocol]
77+
)
78+
79+
@property
80+
def host_address(self):
81+
"""
82+
Gets the host from the runtime_args
83+
:return: The host where to bind the gateway
84+
"""
85+
return self.runtime_args.host or '127.0.0.1'
86+
87+
@abc.abstractmethod
88+
async def initialize_server(self):
89+
"""
90+
Abstract method to setup the server. This should be implemented in the child class.
91+
"""
92+
...
93+
94+
@abc.abstractmethod
95+
async def execute_server(self):
96+
"""
97+
Abstract method to run the server indefinitely. This should be implemented in the child class.
98+
"""
99+
...
100+
101+
@abc.abstractmethod
102+
async def terminate_server(self):
103+
"""
104+
Abstract method to shutdown the server and free other allocated resources, e.g, health check service, etc.
105+
This should be implemented in the child class.
106+
"""
107+
...
108+
109+
@staticmethod
110+
def check_server_readiness(
111+
ctrl_address: str,
112+
protocol: Optional[str] = 'http',
113+
**kwargs,
114+
) -> bool:
115+
"""
116+
Check if server status is ready.
117+
:param ctrl_address: the address where the control request needs to be sent.
118+
:param protocol: protocol of the service.
119+
:param kwargs: extra keyword arguments.
120+
:return: True if status is ready else False.
121+
"""
122+
from http_service import HTTPService
123+
res = False
124+
if protocol is None or protocol == 'http':
125+
res = HTTPService.check_readiness(ctrl_address)
126+
return res
127+
128+
129+
@staticmethod
130+
async def async_check_server_readiness(
131+
ctrl_address: str,
132+
protocol: Optional[str] = 'grpc',
133+
**kwargs,
134+
) -> bool:
135+
"""
136+
Asynchronously check if server status is ready.
137+
:param ctrl_address: the address where the control request needs to be sent.
138+
:param protocol: protocol of the service.
139+
:param kwargs: extra keyword arguments.
140+
:return: True if status is ready else False.
141+
"""
142+
if TYPE_CHECKING:
143+
from http_service import HTTPService
144+
res = False
145+
if protocol is None or protocol == 'http':
146+
res = await HTTPService.async_check_readiness(ctrl_address)
147+
return res
148+
149+

GenAIComps/mega/logger.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import logging
2+
import functools
3+
from typing import Callable
4+
5+
6+
class CustomLogger:
7+
"""
8+
A custom logger class that adds additional logging levels.
9+
"""
10+
11+
def __init__(self, name: str = None):
12+
"""
13+
Initialize the logger with a name and custom levels.
14+
"""
15+
name = 'GenAIComps' if not name else name
16+
self.logger = logging.getLogger(name)
17+
18+
# Define custom log levels
19+
log_config = {
20+
'DEBUG': 10,
21+
'INFO': 20,
22+
'TRAIN': 21,
23+
'EVAL': 22,
24+
'WARNING': 30,
25+
'ERROR': 40,
26+
'CRITICAL': 50,
27+
'EXCEPTION': 100,
28+
}
29+
30+
# Add custom levels to logger
31+
for key, level in log_config.items():
32+
logging.addLevelName(level, key)
33+
if key == 'EXCEPTION':
34+
self.__dict__[key.lower()] = self.logger.exception
35+
else:
36+
self.__dict__[key.lower()] = functools.partial(self.log_message, level)
37+
38+
# Set up log format and handler
39+
self.format = logging.Formatter(fmt='[%(asctime)-15s] [%(levelname)8s] - %(message)s')
40+
self.handler = logging.StreamHandler()
41+
self.handler.setFormatter(self.format)
42+
43+
# Add handler to logger and set log level
44+
self.logger.addHandler(self.handler)
45+
self.logger.setLevel(logging.INFO)
46+
self.logger.propagate = False
47+
48+
def log_message(self, log_level: str, msg: str):
49+
"""
50+
Log a message at a given level.
51+
52+
:param log_level: The level at which to log the message.
53+
:param msg: The message to log.
54+
"""
55+
self.logger.log(log_level, msg)
56+
57+
# Define type hints for pylint check
58+
debug: Callable[[str], None]
59+
info: Callable[[str], None]
60+
train: Callable[[str], None]
61+
eval: Callable[[str], None]
62+
warning: Callable[[str], None]
63+
error: Callable[[str], None]
64+
critical: Callable[[str], None]
65+
exception: Callable[[str], None]
66+
67+
68+
logger = CustomLogger()

0 commit comments

Comments
 (0)