Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 206 additions & 69 deletions framework/fit/python/fit_cli/utils/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,23 @@

type_errors = []

def parse_type(annotation):
class ClassInfo:
"""存储自定义类的信息"""
def __init__(self, name:str):
self.name = name
self.fields: dict[str, ast.AST] = {} # 字段名 -> 类型注解
self.bases: list[str] = [] # 基类名

def add_field(self, field_name: str, annotation: ast.AST):
"""添加字段"""
self.fields[field_name] = annotation

def add_base(self, base_name: str):
"""添加基类"""
self.bases.append(base_name)


def parse_type(annotation, class_map):
"""解析参数类型"""
global type_errors

Expand All @@ -34,79 +50,133 @@ def parse_type(annotation):
return "invalid", None, True

elif isinstance(annotation, ast.Name):
if annotation.id in TYPE_MAP:
if annotation.id in TYPE_MAP: # 基础类型
return TYPE_MAP[annotation.id], None, True
else:
elif annotation.id in class_map: # 自定义类型
class_info = class_map[annotation.id]
properties = {}
for field_name, field_annotation in class_info.fields.items():
field_type, field_items, _ = parse_type(field_annotation, class_map)

if field_type == "invalid":
type_errors.append(f"类 {annotation.id} 的字段 {field_name} 类型无效")
return "invalid", None, True

field_schema = {"type": field_type}
if field_type == "array":
field_schema["items"] = field_items if field_items else {}
elif field_type == "object" and field_items:
if "properties" in field_items:
field_schema["properties"] = field_items["properties"]
if "anyOf" in field_items:
field_schema["anyOf"] = field_items["anyOf"]
if "additionalProperties" in field_items:
field_schema["additionalProperties"] = field_items["additionalProperties"]

properties[field_name] = field_schema
return "object", {"type":"object", "properties":properties}, True
else: # 未知类型
type_errors.append(f"不支持的类型: {annotation.id}")
return "invalid", None, True

elif isinstance(annotation, ast.Constant) and annotation.value is None:
elif isinstance(annotation, ast.Constant) and annotation.value is None: # None
return "null", None, False

elif isinstance(annotation, ast.Subscript):
if isinstance(annotation.value, ast.Name):
container = annotation.value.id
elif isinstance(annotation, ast.Subscript) and isinstance(annotation.value, ast.Name): # 容器类型
container = annotation.value.id

# List[int]
if container in ("list", "List"):
item_type, _, _ = parse_type(annotation.slice)
if item_type == "invalid":
type_errors.append(f"不支持的列表元素类型: {annotation.slice}")
# List[int]
if container in ("list", "List"):
item_type, item_schema, _ = parse_type(annotation.slice, class_map)
if item_type == "invalid":
type_errors.append(f"不支持的列表元素类型: {annotation.slice}")
return "invalid", None, True
items = item_schema if item_schema else {"type": item_type}
return "array", items, True

# Dict[str, int] → object
elif container in ("dict", "Dict"):
if isinstance(annotation.slice, ast.Tuple) and len(annotation.slice.elts) == 2:
key_annot, value_annot = annotation.slice.elts
key_type, _, _ = parse_type(key_annot, class_map)
if key_type != "string":
type_errors.append(f"Dict 的键类型必须是 string,实际是 {key_type}")
return "invalid", None, True
return "array", {"type": item_type}, True
value_type, value_schema, _ = parse_type(value_annot, class_map)
items = value_schema if value_schema else {"type": value_type}
return "object", {"additionalProperties": items}, True

# Dict[str, int] → object
elif container in ("dict", "Dict"):
return "object", None, True
# Optional[int]
elif container == "Optional":
inner_type, inner_items, _ = parse_type(annotation.slice, class_map)

# Optional[int]
elif container == "Optional":
inner_type, inner_items, _ = parse_type(annotation.slice)
if inner_type == "invalid":
type_errors.append(f"不支持的Optional类型: {annotation.slice}")
return "invalid", None, False
return inner_type, inner_items, False

# Union[str, int]
elif container == "Union":
if isinstance(annotation.slice, ast.Tuple):
schemas = []
for elt in annotation.slice.elts:
elt_type, elt_items, _ = parse_type(elt, class_map)
if elt_type == "invalid":
type_errors.append(f"不支持的 Union 元素类型: {ast.dump(elt)}")
return "invalid", None, True
schema = {"type": elt_type}
if elt_items:
schema.update(elt_items)
schemas.append(schema)
return "object", {"anyOf": schemas}, True
else:
inner_type, inner_items, _ = parse_type(annotation.slice, class_map)
if inner_type == "invalid":
type_errors.append(f"不支持的Optional类型: {annotation.slice}")
return "invalid", None, False
return inner_type, inner_items, False

# Union[str, int]
elif container == "Union":
return "object", None, True

# Tuple[str]
elif container in ("tuple", "Tuple"):
items = []
if isinstance(annotation.slice, ast.Tuple):
for elt in annotation.slice.elts:
item_type, _, _ = parse_type(elt)
if item_type == "invalid":
type_errors.append(f"不支持的元组元素类型: {ast.dump(elt)}")
return "invalid", None, True
items.append({"type":item_type})
return "array", f"{items}", True
else:
item_type, _, _ = parse_type(annotation.slice)
type_errors.append(f"不支持的 Union 类型: {ast.dump(annotation.slice)}")
return "invalid", None, True
schema = {"type": inner_type}
if inner_items:
schema.update(inner_items)
return "object", {"anyOf": [schema]}, True

# Tuple[str]
elif container in ("tuple", "Tuple"):
if isinstance(annotation.slice, ast.Tuple):
tuple_items = []
for elt in annotation.slice.elts:
item_type, item_schema, _ = parse_type(elt, class_map)
if item_type == "invalid":
type_errors.append(f"不支持的元组元素类型: {ast.dump(annotation.slice)}")
type_errors.append(f"不支持的元组元素类型: {ast.dump(elt)}")
return "invalid", None, True
return "array", {"type":item_type}, True

# Set[int]
elif container in ("set", "Set"):
item_type, _, _ = parse_type(annotation.slice)
tuple_items.append(item_schema if item_schema else {"type": item_type})
# 返回固定长度 tuple 的 items 列表
return "array", {"items": tuple_items}, True
else:
# 单元素 Tuple
item_type, item_schema, _ = parse_type(annotation.slice, class_map)
if item_type == "invalid":
type_errors.append(f"不支持的集合元素类型: {annotation.slice}")
type_errors.append(f"不支持的元组元素类型: {ast.dump(annotation.slice)}")
return "invalid", None, True
return "array", {"type": item_type}, True


else:
type_errors.append(f"不支持的容器类型: {container}")
return "array", {"items": item_schema if item_schema else {"type": item_type}}, True

# Set[int]
elif container in ("set", "Set"):
item_type, item_schema, _ = parse_type(annotation.slice, class_map)
if item_type == "invalid":
type_errors.append(f"不支持的集合元素类型: {annotation.slice}")
return "invalid", None, True
items = item_schema if item_schema else {"type": item_type}
return "array", items, True

else:
type_errors.append(f"不支持的容器类型: {container}")
return "invalid", None, True

type_errors.append(f"无法识别的类型: {ast.dump(annotation)}")
return "invalid", None, True


def parse_parameters(args):
def parse_parameters(args, class_map):
"""解析函数参数"""
properties = {}
order = []
Expand All @@ -115,35 +185,81 @@ def parse_parameters(args):
for arg in args.args:
arg_name = arg.arg
order.append(arg_name)
arg_type, items, is_required = parse_type(arg.annotation)
arg_type, items, is_required = parse_type(arg.annotation, class_map)
# 定义参数
prop_def = {
"defaultValue": "",
"description": f"参数 {arg_name}",
"name": arg_name,
"type": arg_type,
**({"items": items} if items else {}),
"examples": "",
"required": is_required,
}
if arg_type == "array" and items:
if "items" in items:
prop_def["items"] = items["items"]
else:
arr_items = {"type": items.get("type", "object")}
if "properties" in items:
arr_items["properties"] = items["properties"]
if "anyOf" in items:
arr_items["anyOf"] = items["anyOf"]
if "additionalProperties" in items:
arr_items["additionalProperties"] = items["additionalProperties"]
prop_def["items"] = arr_items

if arg_type == "object" and items:
if "properties" in items:
prop_def["properties"] = items["properties"]
if "anyOf" in items:
prop_def["anyOf"] = items["anyOf"]
if "additionalProperties" in items:
prop_def["additionalProperties"] = items["additionalProperties"]

prop_def["examples"] = ""
prop_def["required"] = is_required

properties[arg_name] = prop_def
if is_required:
required.append(arg_name)
return properties, order, required


def parse_return(annotation):
def parse_return(annotation, custom_classes):
"""解析返回值类型"""
if not annotation:
return {"type": "string", "convertor": ""}

return_type, items, _ = parse_type(annotation)
ret = {
"type": return_type,
**({"items": items} if items else {}),
"convertor": ""
}
return ret
return_type, items, _ = parse_type(annotation, custom_classes)
if return_type == "array":
ret = {"type": "array"}
if items:
if "items" in items:
ret["items"] = items["items"]
else:
arr_items = {"type": items.get("type", "object")}
if isinstance(items, dict) and "properties" in items:
arr_items["properties"] = items["properties"]
if isinstance(items, dict) and "anyOf" in items:
arr_items["anyOf"] = items["anyOf"]
if isinstance(items, dict) and "additionalProperties" in items:
arr_items["additionalProperties"] = items["additionalProperties"]
ret["items"] = arr_items
ret["convertor"] = ""
return ret

elif return_type == "object":
ret = {"type": "object"}
if items and isinstance(items, dict):
if "properties" in items:
ret["properties"] = items["properties"]
if "anyOf" in items:
ret["anyOf"] = items["anyOf"]
if "additionalProperties" in items:
ret["additionalProperties"] = items["additionalProperties"]
ret["convertor"] = ""
return ret

else:
return {"type": return_type, "convertor": ""}


def parse_python_file(file_path: Path):
Expand All @@ -155,11 +271,26 @@ def parse_python_file(file_path: Path):
py_name = file_path.stem
definitions = []
tool_groups = []

class_map: dict[str, ClassInfo] = {} # 类名, 类信息
# 收集自定义类
for node in tree.body:
if isinstance(node, ast.ClassDef):
class_info = ClassInfo(node.name)
for base in node.bases:
if isinstance(base, ast.Name):
class_info.add_base(base.id)
for subnode in node.body:
if isinstance(subnode, ast.FunctionDef) and subnode.name == "__init__":
for arg in subnode.args.args[1:]: # 跳过 self
arg_name = arg.arg
class_info.add_field(arg_name, arg.annotation)
class_map[node.name] = class_info
# 解析函数定义
for node in tree.body:
if isinstance(node, ast.FunctionDef):
func_name = node.name
# 默认描述

# 获取描述
description = f"执行 {func_name} 方法"
if node.body and isinstance(node.body[0], ast.Expr):
expr_value = node.body[0].value
Expand All @@ -185,8 +316,8 @@ def parse_python_file(file_path: Path):
continue

# 解析参数和返回值
properties, order, required = parse_parameters(node.args)
return_schema = parse_return(node.returns)
properties, order, required = parse_parameters(node.args, class_map)
return_schema = parse_return(node.returns, class_map)

# definition schema
definition_schema = {
Expand All @@ -213,6 +344,9 @@ def parse_python_file(file_path: Path):
"name": v["name"],
"type": v["type"],
**({"items": v["items"]} if "items" in v else {}),
**({"properties": v["properties"]} if "properties" in v else {}),
**({"anyOf": v["anyOf"]} if "anyOf" in v else {}),
**({"additionalProperties": v["additionalProperties"]} if "additionalProperties" in v else {}),
"required": False, # 工具里参数默认非必填
}
for k, v in properties.items()
Expand All @@ -225,6 +359,9 @@ def parse_python_file(file_path: Path):
"description": f"{func_name} 函数的返回值",
"type": return_schema["type"],
**({"items": return_schema["items"]} if "items" in return_schema else {}),
**({"properties": return_schema["properties"]} if "properties" in return_schema else {}),
**({"anyOf": return_schema["anyOf"]} if "anyOf" in return_schema else {}),
**({"additionalProperties": return_schema["additionalProperties"]} if "additionalProperties" in return_schema else {}),
"convertor": "",
"examples": "",
},
Expand Down