Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def register_all_fit_services() -> None:
fitable_aliases_infos = reduce(list.__add__, list(_plugin_fitable_dict.values()))
local_fitable_aliases_infos = []
for fitable_aliases_info in fitable_aliases_infos:
if not _local_only_invoke(fitable_aliases_info.fitable.genericable_id):
if not _local_only_invoke(fitable_aliases_info.fitable.genericableId):
local_fitable_aliases_infos.append(fitable_aliases_info)
online_fit_services(local_fitable_aliases_infos)
except FitException:
Expand Down
33 changes: 32 additions & 1 deletion framework/fit/python/conf/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,35 @@ debug-console: true
terminate-main:
enabled: false
local_ip: "localhost"
context-path: ""
context-path: ""
http:
server:
enabled: true
address:
use-random-port: false
port: 9666
port-to-register:
protocol: 2
formats:
- 1
- 2
https:
client:
verify_enabled: false # 是否需要验证服务端身份
ca_path: "plugin/fit_py_http_client/resources/ca.crt"
assert_host_name: false # 是否在校验时校验主机名,仅当 verify_enabled 为 true 时有意义
cert_enabled: false # 是否服务端对自身身份校验
crt_path: "plugin/fit_py_http_client/resources/global.crt"
key_path: "plugin/fit_py_http_client/resources/global.key"
key_file_encrypted: false # 私钥是否被加密,仅当 cert_enabled 为 true 时有意义
key_file_password: "" # 私钥的密码,仅当 cert_enabled 和 key_file_encrypted 为 true 时有意义
key_file_password_encrypted: false # 私钥的密码是否被加密,仅当 cert_enabled 和 key_file_encrypted 为 true 时有意义
registry-center:
server:
mode: 'DIRECT' # DIRECT 表示直连,直接连接内存注册中心;PROXY 表示代理模式,通过本地代理服务连接 Nacos 注册中心
addresses:
- "localhost:8848"
protocol: 2
formats:
- 1
context-path: ""
10 changes: 8 additions & 2 deletions framework/fit/python/conf/fit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,17 @@ fit.public.genericables.2ac926e6e40245b78b7bdda23bcb727b:
route:
default: "ONLINE_FIT_SERVICE_FITABLE_ID"
fit.public.genericables.modelengine.fit.registry.registry-service.query-running-fitables:
name: "QUERY_FITABLE_METAS_GEN_ID"
name: "query_fitable_metas_gen_id"
tags:
- "nonTraceable"
route:
default: "query-running-fitables"
fit.public.genericables.modelengine.fit.registry.registry-service.register-fitables:
name: 'register_fitables_gen_id'
tags:
- 'nonTraceable'
route:
default: 'register-fitables'
fit.public.genericables.GET_FITABLES_OF_GENERICABLE_GEN_ID:
name: "get_fitables_of_genericable"
tags:
Expand Down Expand Up @@ -499,4 +505,4 @@ fit.public.genericables.modelengine.fit.get.earliest.start.time:
default: "local-worker"
tags:
- "localOnly"
- "nonTraceable"
- "nonTraceable"
16 changes: 8 additions & 8 deletions framework/fit/python/fit_common_struct/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

class Genericable(object):

def __init__(self, genericable_id: str, genericable_version: str):
self.genericable_id = genericable_id
self.genericable_version = genericable_version
def __init__(self, genericableId: str, genericableVersion: str):
self.genericableId = genericableId
self.genericableVersion = genericableVersion

def __eq__(self, other):
if not isinstance(other, self.__class__):
Expand All @@ -28,11 +28,11 @@ def __repr__(self):

class Fitable(object):

def __init__(self, genericable_id: str, genericable_version: str, fitable_id: str, fitable_version: str):
self.genericable_id = genericable_id
self.genericable_version = genericable_version
self.fitable_id = fitable_id
self.fitable_version = fitable_version
def __init__(self, genericableId: str, genericableVersion: str, fitableId: str, fitableVersion: str):
self.genericableId = genericableId
self.genericableVersion = genericableVersion
self.fitableId = fitableId
self.fitableVersion = fitableVersion

def __eq__(self, other):
if not isinstance(other, self.__class__):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,25 @@
from numpy import int32

from fit_common_struct.core import Address as AddressInner
from fit_common_struct.core import Fitable


class FitableInfo(object):

def __init__(self, genericableId: str, genericableVersion: str, fitableId: str, fitableVersion: str):
self.genericableId = genericableId
self.genericableVersion = genericableVersion
self.fitableId = fitableId
self.fitableVersion = fitableVersion

def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__

def __hash__(self):
return hash(tuple(self.__dict__.values()))

def __repr__(self):
return str(tuple(self.__dict__.values()))


class GenericableInfo:

def __init__(self, genericableId: str, genericableVersion: str):
self.genericableId = genericableId
self.genericableVersion = genericableVersion

def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__

def __hash__(self):
return hash(tuple(self.__dict__.values()))

def __repr__(self):
return str(tuple(self.__dict__.values()))
def safe_hash_dict(obj_dict):
"""安全地计算包含列表的字典的哈希值"""
hashable_values = []
for value in obj_dict.values():
if isinstance(value, list):
hashable_values.append(tuple(value))
elif isinstance(value, dict):
hashable_values.append(tuple(sorted(value.items())))
else:
hashable_values.append(value)
return hash(tuple(hashable_values))


class FitableMeta(object):

def __init__(self, fitable: FitableInfo, aliases: List[str], formats: List[int32]):
def __init__(self, fitable: Fitable, aliases: List[str], formats: List[int32]):
self.fitable = fitable
self.aliases = aliases

Expand All @@ -68,7 +44,8 @@ def __eq__(self, other):
return self.__dict__ == other.__dict__

def __hash__(self):
return hash(tuple(self.__dict__.values()))
# 使用安全的哈希函数处理包含列表的对象
return safe_hash_dict(self.__dict__)

def __repr__(self):
return str(tuple(self.__dict__.values()))
Expand Down Expand Up @@ -131,7 +108,7 @@ def __eq__(self, other):
return self.__dict__ == other.__dict__

def __hash__(self):
return hash(tuple(self.__dict__.values()))
return safe_hash_dict(self.__dict__)

def __repr__(self):
return str(tuple(self.__dict__.values()))
Expand All @@ -154,7 +131,7 @@ def __eq__(self, other):
return self.__dict__ == other.__dict__

def __hash__(self):
return hash(tuple(self.__dict__.values()))
return safe_hash_dict(self.__dict__)

def __repr__(self):
return str(tuple(self.__dict__.values()))
Expand All @@ -178,15 +155,15 @@ def __eq__(self, other):
return self.__dict__ == other.__dict__

def __hash__(self):
return hash(tuple(self.__dict__.values()))
return safe_hash_dict(self.__dict__)

def __repr__(self):
return str(tuple(self.__dict__.values()))


class FitableAddressInstance(object):

def __init__(self, applicationInstances: List[ApplicationInstance], fitable: FitableInfo):
def __init__(self, applicationInstances: List[ApplicationInstance], fitable: Fitable):
self.applicationInstances = applicationInstances
self.fitable = fitable

Expand All @@ -196,7 +173,7 @@ def __eq__(self, other):
return self.__dict__ == other.__dict__

def __hash__(self):
return hash(tuple(self.__dict__.values()))
return safe_hash_dict(self.__dict__)

def __repr__(self):
return str(tuple(self.__dict__.values()))
Expand All @@ -214,9 +191,20 @@ def __eq__(self, other):
return self.__dict__ == other.__dict__

def __hash__(self):
return hash(tuple(self.__dict__.values()))
return safe_hash_dict(self.__dict__)

def __repr__(self):
return str(tuple(self.__dict__.values()))

class HeartBeatInfo:
def __init__(self, sceneType: str, aliveTime: int, initDelay: int):
self.sceneType: str = sceneType
self.aliveTime: int = aliveTime
self.initDelay: int = initDelay


class HeartBeatAddress:
def __init__(self, id_: str):
self.id = id_


13 changes: 12 additions & 1 deletion framework/fit/python/fitframework/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,23 @@

# registry server
QUERY_FIT_SERVICE_GEN_ID = 'modelengine.fit.registry.registry-service.query-fitables-addresses'
QUERY_FIT_SERVICE_FIT_ID = 'query-fitables-addresses'
SUBSCRIBE_FIT_SERVICE_GEN_ID = 'modelengine.fit.registry.registry-service.subscribe-fitables'
SUBSCRIBE_FIT_SERVICE_FIT_ID = 'subscribe-fitables'
UNSUBSCRIBE_FIT_SERVICE_GEN_ID = 'modelengine.fit.registry.registry-service.unsubscribe-fitables'
UNSUBSCRIBE_FIT_SERVICE_FIT_ID = 'unsubscribe-fitables'
REGISTER_FIT_SERVICE_GEN_ID = 'modelengine.fit.registry.registry-service.register-fitables'
REGISTER_FIT_SERVICE_FIT_ID = 'register-fitables'
UNREGISTER_FIT_SERVICE_GEN_ID = 'modelengine.fit.registry.registry-service.unregister-fitables'
UNREGISTER_FIT_SERVICE_FIT_ID = 'unregister-fitables'
QUERY_FITABLE_METAS_GEN_ID = 'modelengine.fit.registry.registry-service.query-running-fitables'
QUERY_FITABLE_METAS_FIT_ID = 'query-running-fitables'

# heartbeat server
HEART_BEAT_GEN_ID = 'modelengine.fit.heartbeat.send-heartbeat'
SEND_HEART_BEAT_GEN_ID = 'modelengine.fit.heartbeat.send-heartbeat'
SEND_HEART_BEAT_FIT_ID = 'send-heartbeat'
STOP_HEART_BEAT_GEN_ID = 'modelengine.fit.heartbeat.stop-heartbeat'
STOP_HEART_BEAT_FIT_ID = 'stop-heartbeat'

# debugger
DEBUGGER_START_FIT_ID = 'debugger_start_fitable_id'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def default_load_balancing(self, generic_id, fitable_id, fitable: Fitable):
f"addresses count: {len(addresses)}")
if len(addresses) == 0:
fit_logger.warning(f"cannot get any address can use in this worker. "
f"[genericable_id={fitable.genericable_id}, fitable_id={fitable.fitable_id}]")
f"[genericable_id={fitable.genericableId}, fitable_id={fitable.fitableId}]")
return None
# no choice!
if len(addresses) == 1:
Expand All @@ -339,13 +339,13 @@ def get_fit_service_addresses(self, fitable: Fitable) -> List[Address]:
addresses: List[Address] = _get_fit_service_address_with_priorities(fitable)
if not addresses:
fit_logger.warning(f"cannot get any endpoint after checking format and protocol. "
f"[genericable_id={fitable.genericable_id}, fitable_id={fitable.fitable_id}]")
f"[genericable_id={fitable.genericableId}, fitable_id={fitable.fitableId}]")
return []

addresses: List[Address] = _load_balance_env_filtering(addresses)
if not addresses:
fit_logger.warning(f"cannot get any endpoint after filtering by environment. "
f"[genericable_id={fitable.genericable_id}, fitable_id={fitable.fitable_id}]")
f"[genericable_id={fitable.genericableId}, fitable_id={fitable.fitableId}]")
return []

return addresses
Expand Down Expand Up @@ -398,7 +398,7 @@ def custom_load_balancing(self, address_filter: Callable[[Address], bool],
addresses: List[Address] = self.get_fit_service_addresses(fitable)
if len(addresses) == 0:
fit_logger.warning(f"cannot get any address can use in this worker. "
f"[genericable_id={fitable.genericable_id}, fitable_id={fitable.fitable_id}]")
f"[genericable_id={fitable.genericableId}, fitable_id={fitable.fitableId}]")
return None
try:
addresses = [address for address in addresses if address_filter(address)]
Expand All @@ -407,15 +407,15 @@ def custom_load_balancing(self, address_filter: Callable[[Address], bool],
return None
if not addresses:
fit_logger.warning(f"cannot get any address after custom load balancing. "
f"[genericable_id={fitable.genericable_id}, fitable_id={fitable.fitable_id}]")
f"[genericable_id={fitable.genericableId}, fitable_id={fitable.fitableId}]")
return None
if len(addresses) > 1:
fit_logger.warning(f"get more than one address after custom load balancing. "
f"[genericable_id={fitable.genericable_id}, fitable_id={fitable.fitable_id}]")
f"[genericable_id={fitable.genericableId}, fitable_id={fitable.fitableId}]")
return addresses[0]

if addresses[0].id == _worker_id():
return service_repo.get_fitable_ref(fitable.genericable_id, fitable.fitable_id)
return service_repo.get_fitable_ref(fitable.genericableId, fitable.fitableId)

return addresses[0]

Expand Down Expand Up @@ -466,9 +466,9 @@ def lb_call_template(fitable_info: Fitable, target_addresses: List[Address]) ->
pass

args = fitable, addresses
lb_fitable_id = get_fit_ffp_fitable_id(fitable.genericable_id, 'load_balance')
lb_fitable_id = get_fit_ffp_fitable_id(fitable.genericableId, 'load_balance')
if lb_fitable_id:
fit_invoke_info = (fitable.genericable_id, lb_fitable_id, lb_call_template)
fit_invoke_info = (fitable.genericableId, lb_fitable_id, lb_call_template)
return _ffp_invoke(fit_invoke_info, False, None, None, *args)
else:
fit_invoke_info = (const.LOAD_BALANCING_GEN_ID, const.LOAD_BALANCING_RANDOM_FIT_ID, lb_call_template)
Expand Down Expand Up @@ -591,7 +591,7 @@ def _get_fit_service_address_with_priorities(fitable: Fitable) -> List[Address]:

def _get_fit_service_address_and_convert(fitable: Fitable) -> List[Address]:
addresses: List[Address] = get_fit_service_address_list(fitable)
fit_logger.debug(f"got address, gid: {fitable.genericable_id}, count: {len(addresses)}")
fit_logger.debug(f"got address, gid: {fitable.genericableId}, count: {len(addresses)}")
return addresses


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from fitframework.api.decorators import register_event
from fitframework.api.enums import FrameworkEvent as Fit_Event
from fitframework.api.logging import sys_plugin_logger
from .heart_beat_utils import HeartBeatAddress, HeartBeatInfo
from fit_common_struct.entity import HeartBeatInfo, HeartBeatAddress

# 用于控制心跳任务退出的队列
_HEART_BEAT_FINISH_QUEUE = multiprocessing.Queue()
Expand Down Expand Up @@ -71,7 +71,7 @@ def get_runtime_worker_id() -> str:
pass


@fit(const.HEART_BEAT_GEN_ID)
@fit(const.SEND_HEART_BEAT_GEN_ID)
def heartbeat(beat_info: List[HeartBeatInfo], address: HeartBeatAddress) -> bool:
""" 可能返回 false,也可能抛出异常,也可能超时 """
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,6 @@
from fitframework.api.logging import sys_plugin_logger


class HeartBeatInfo:
def __init__(self, sceneType: str, aliveTime: int, initDelay: int):
self.sceneType: str = sceneType
self.aliveTime: int = aliveTime
self.initDelay: int = initDelay


class HeartBeatAddress:
def __init__(self, id_: str):
self.id = id_


def timeout_or_exception_retry(timeout: int = 3, a_exception=Exception, max_retry: int = 1):
"""

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +0,0 @@
https:
client:
verify_enabled: false # 是否需要验证服务端身份
ca_path: "plugin/fit_py_http_client/resources/ca.crt"
assert_host_name: false # 是否在校验时校验主机名,仅当 verify_enabled 为 true 时有意义
cert_enabled: false # 是否服务端对自身身份校验
crt_path: "plugin/fit_py_http_client/resources/global.crt"
key_path: "plugin/fit_py_http_client/resources/global.key"
key_file_encrypted: false # 私钥是否被加密,仅当 cert_enabled 为 true 时有意义
key_file_password: "" # 私钥的密码,仅当 cert_enabled 和 key_file_encrypted 为 true 时有意义
key_file_password_encrypted: false # 私钥的密码是否被加密,仅当 cert_enabled 和 key_file_encrypted 为 true 时有意义
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
category: "system"
level: 4
Loading