Skip to content

Commit c831b5f

Browse files
pymilvus-botjac0626silas.jiang
authored
[Backport 2.6] feat:support async client list_persistent_segments (#3206) (#3207)
Backport of #3206 to `2.6`. Signed-off-by: silas.jiang <silas.jiang@zilliz.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: jac <jacllovey@qq.com> Co-authored-by: silas.jiang <silas.jiang@zilliz.com>
1 parent f76d5f9 commit c831b5f

3 files changed

Lines changed: 90 additions & 0 deletions

File tree

pymilvus/client/async_grpc_handler.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,18 @@ async def _prepare_row_insert_request(
628628
)
629629

630630
@retry_on_rpc_failure()
631+
async def get_persistent_segment_infos(
632+
self, collection_name: str, timeout: Optional[float] = None, **kwargs
633+
) -> List[milvus_types.PersistentSegmentInfo]:
634+
await self.ensure_channel_ready()
635+
check_pass_param(collection_name=collection_name, timeout=timeout)
636+
req = Prepare.get_persistent_segment_info_request(collection_name)
637+
response = await self._async_stub.GetPersistentSegmentInfo(
638+
req, timeout=timeout, metadata=_api_level_md(**kwargs)
639+
)
640+
check_status(response.status)
641+
return response.infos
642+
631643
async def delete(
632644
self,
633645
collection_name: str,

pymilvus/milvus_client/async_milvus_client.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
OmitZeroDict,
99
ResourceGroupConfig,
1010
RoleInfo,
11+
SegmentInfo,
1112
UserInfo,
1213
)
1314
from pymilvus.client.utils import convert_struct_fields_to_user_format, is_vector_type
@@ -1154,6 +1155,40 @@ async def get_flush_all_state(self, timeout: Optional[float] = None, **kwargs) -
11541155
conn = self._get_connection()
11551156
return await conn.get_flush_all_state(timeout=timeout, **kwargs)
11561157

1158+
async def list_persistent_segments(
1159+
self,
1160+
collection_name: str,
1161+
timeout: Optional[float] = None,
1162+
**kwargs,
1163+
) -> List[SegmentInfo]:
1164+
"""List persistent segments for a collection.
1165+
1166+
Args:
1167+
collection_name (str): The name of the collection.
1168+
timeout (Optional[float]): An optional duration of time in seconds to allow for the RPC.
1169+
**kwargs: Additional arguments.
1170+
1171+
Returns:
1172+
List[SegmentInfo]: A list of persistent segment information.
1173+
"""
1174+
validate_param("collection_name", collection_name, str)
1175+
infos = await self._get_connection().get_persistent_segment_infos(
1176+
collection_name, timeout=timeout, **kwargs
1177+
)
1178+
return [
1179+
SegmentInfo(
1180+
segment_id=info.segmentID,
1181+
collection_id=info.collectionID,
1182+
collection_name=collection_name,
1183+
num_rows=info.num_rows,
1184+
is_sorted=info.is_sorted,
1185+
state=info.state,
1186+
level=info.level,
1187+
storage_version=info.storage_version,
1188+
)
1189+
for info in infos
1190+
]
1191+
11571192
async def compact(
11581193
self,
11591194
collection_name: str,

tests/test_async_milvus_client.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,49 @@ async def test_describe_collection_with_struct_array_fields(self):
8686
assert len(result["fields"]) == 2 # original field + converted struct field
8787
mock_handler.describe_collection.assert_called_once()
8888

89+
@pytest.mark.asyncio
90+
async def test_list_persistent_segments(self):
91+
with patch('pymilvus.milvus_client.async_milvus_client.create_connection', return_value="test"), \
92+
patch('pymilvus.orm.connections.Connections._fetch_handler') as mock_fetch:
93+
# Mock connection and its get_persistent_segment_infos method
94+
mock_conn = AsyncMock()
95+
mock_fetch.return_value = mock_conn
96+
97+
# Create a mock segment info
98+
mock_segment_info = MagicMock()
99+
mock_segment_info.segmentID = 1001
100+
mock_segment_info.collectionID = 2001
101+
mock_segment_info.num_rows = 1000
102+
mock_segment_info.is_sorted = True
103+
mock_segment_info.state = 3 # FLUSHED
104+
mock_segment_info.level = 1
105+
mock_segment_info.storage_version = 1
106+
107+
mock_conn.get_persistent_segment_infos.return_value = [mock_segment_info]
108+
109+
# Initialize AsyncMilvusClient
110+
client = AsyncMilvusClient(uri="http://localhost:19530")
111+
112+
# Call list_persistent_segments
113+
result = await client.list_persistent_segments("test_collection")
114+
115+
# Verify the result
116+
assert len(result) == 1
117+
segment_info = result[0]
118+
assert segment_info.segment_id == 1001
119+
assert segment_info.collection_id == 2001
120+
assert segment_info.collection_name == "test_collection"
121+
assert segment_info.num_rows == 1000
122+
assert segment_info.is_sorted is True
123+
assert segment_info.state == 3
124+
assert segment_info.level == 1
125+
assert segment_info.storage_version == 1
126+
127+
# Verify call arguments
128+
mock_conn.get_persistent_segment_infos.assert_called_once_with(
129+
"test_collection", timeout=None
130+
)
131+
89132
@pytest.mark.asyncio
90133
async def test_describe_collection_without_struct_array_fields(self):
91134
"""Test describe_collection works normally when no struct_array_fields"""

0 commit comments

Comments
 (0)