Skip to content

Commit 28e951f

Browse files
Enable SLIM group chat pattern (#40)
* feat: enable group chat mode for broadcasting message Signed-off-by: samuyang <samuyang@cisco.com> * feat: refactoring A2A broadcast to enable groupchat. Adding groupchat test and refactoring test A2A agent-executor for groupchat. * feat: adding client-initiated group session cleanup protocol using message header * feat: refactor end-message-signal message to be uuid with closing handshake trigger * feat: move session close propagation to session_manager.close_session * feat: bump package version and add CHANGELOG.md --------- Signed-off-by: samuyang <samuyang@cisco.com> Co-authored-by: samuyang <samuyang@cisco.com>
1 parent a0c7c91 commit 28e951f

17 files changed

Lines changed: 463 additions & 336 deletions

CHANGELOG.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
11
# Changelog
22

3-
## 0.0.1
3+
All notable changes to this project will be documented in this file.
4+
5+
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
6+
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7+
8+
## 0.2.3
9+
10+
### Added
11+
- SLIM multi-session lifecycle management
12+
- SLIM groupchat sessions, initiated with A2AClient.broadcast_message(group_chat=True)
13+
14+
### Changed
15+
- AgntcyFactory.create_transport requires a name field when the type is SLIM, in the form /org/namespace/service
16+
- A2AClient.broadcast_message, when created from factory, requires list of recipients to fulfill SLIM requirements
17+
18+
### Fixed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ from agntcy_app_sdk.factory import AgntcyFactory
8888
# Create factory and transport
8989
factory = AgntcyFactory()
9090
transport_instance = factory.create_transport(
91-
transport="SLIM", endpoint="http://localhost:46357"
91+
transport="SLIM", endpoint="http://localhost:46357", name="org/namespace/agent-foo"
9292
)
9393

9494
# Create MCP client

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "agntcy-app-sdk"
3-
version = "0.2.2"
3+
version = "0.2.3"
44
description = "Agntcy Application SDK for Python"
55
authors = [{ name = "Cody Hartsook", email = "codyhartsook@gmail.com" }]
66
requires-python = ">=3.13,<4.0"

src/agntcy_app_sdk/bridge.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -66,22 +66,17 @@ async def loop_forever(self):
6666
async def _process_message(self, message: Message):
6767
"""Process an incoming message through the handler and send response."""
6868

69-
try:
70-
# Handle the request - check if handler is async or sync
71-
if inspect.iscoroutinefunction(self.handler):
72-
response = await self.handler(message)
69+
if inspect.iscoroutinefunction(self.handler):
70+
response = await self.handler(message)
71+
else:
72+
result = self.handler(message)
73+
# If the result is a coroutine, await it
74+
if inspect.iscoroutine(result):
75+
response = await result
7376
else:
74-
result = self.handler(message)
75-
# If the result is a coroutine, await it
76-
if inspect.iscoroutine(result):
77-
response = await result
78-
else:
79-
response = result
80-
81-
if not response:
82-
logger.warning("Handler returned no response for message.")
77+
response = result
8378

84-
return response
79+
if not response:
80+
logger.warning("Handler returned no response for message.")
8581

86-
except Exception as e:
87-
logger.error(f"Error processing message: {e}")
82+
return response

src/agntcy_app_sdk/protocols/a2a/protocol.py

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,13 @@
1111
from a2a.client import A2AClient, A2ACardResolver
1212
from a2a.utils import AGENT_CARD_WELL_KNOWN_PATH, PREV_AGENT_CARD_WELL_KNOWN_PATH
1313
from a2a.server.apps import A2AStarletteApplication
14-
from a2a.types import AgentCard, SendMessageRequest, SendMessageResponse
15-
14+
from a2a.types import (
15+
AgentCard,
16+
SendMessageRequest,
17+
SendMessageResponse,
18+
JSONRPCSuccessResponse,
19+
MessageSendParams,
20+
)
1621
from agntcy_app_sdk.protocols.protocol import BaseAgentProtocol
1722
from agntcy_app_sdk.transports.transport import BaseTransport, ResponseMode
1823
from agntcy_app_sdk.protocols.message import Message
@@ -24,7 +29,9 @@
2429
logger = get_logger(__name__)
2530

2631

27-
async def get_client_from_agent_card_url(httpx_client: httpx.AsyncClient, base_url: str,
32+
async def get_client_from_agent_card_url(
33+
httpx_client: httpx.AsyncClient,
34+
base_url: str,
2835
http_kwargs: dict[str, Any] | None = None,
2936
) -> A2AClient:
3037
"""
@@ -33,20 +40,25 @@ async def get_client_from_agent_card_url(httpx_client: httpx.AsyncClient, base_u
3340
"""
3441
try:
3542
agent_card: AgentCard = await A2ACardResolver(
36-
httpx_client, base_url=base_url,
43+
httpx_client,
44+
base_url=base_url,
3745
agent_card_path=AGENT_CARD_WELL_KNOWN_PATH,
3846
).get_agent_card(http_kwargs=http_kwargs)
3947
except Exception as e:
40-
logger.info(f"Failed to get client from agent card url with v3 path, "
41-
f"falling back to v2 path: {e}")
48+
logger.info(
49+
f"Failed to get client from agent card url with v3 path, "
50+
f"falling back to v2 path: {e}"
51+
)
4252
try:
43-
agent_card: AgentCard = await (A2ACardResolver(
44-
httpx_client, base_url=base_url,
53+
agent_card: AgentCard = await A2ACardResolver(
54+
httpx_client,
55+
base_url=base_url,
4556
agent_card_path=PREV_AGENT_CARD_WELL_KNOWN_PATH,
46-
).get_agent_card(http_kwargs=http_kwargs))
57+
).get_agent_card(http_kwargs=http_kwargs)
4758
except Exception as e:
48-
logger.error(f"Failed to get client from agent card url with v2 "
49-
f"path: {e}")
59+
logger.error(
60+
f"Failed to get client from agent card url with v2 " f"path: {e}"
61+
)
5062
raise e
5163

5264
return A2AClient(httpx_client=httpx_client, agent_card=agent_card)
@@ -77,22 +89,28 @@ async def get_client_from_agent_card_topic(
7789
try:
7890
request = Message(
7991
type="A2ARequest",
80-
payload=json.dumps({"path": AGENT_CARD_WELL_KNOWN_PATH, "method": method}),
92+
payload=json.dumps(
93+
{"path": AGENT_CARD_WELL_KNOWN_PATH, "method": method}
94+
),
8195
route_path=AGENT_CARD_WELL_KNOWN_PATH,
82-
method=method
96+
method=method,
8397
)
8498
response = await transport.request(topic, request, ResponseMode.FIRST)
8599

86100
response.payload = json.loads(response.payload.decode("utf-8"))
87101
card = AgentCard.model_validate(response.payload)
88102
except Exception as e:
89-
logger.info(f"A2A v3 path failed or invalid payload, falling back to v2: {e}")
103+
logger.info(
104+
f"A2A v3 path failed or invalid payload, falling back to v2: {e}"
105+
)
90106

91107
request = Message(
92108
type="A2ARequest",
93-
payload=json.dumps({"path": PREV_AGENT_CARD_WELL_KNOWN_PATH, "method": method}),
109+
payload=json.dumps(
110+
{"path": PREV_AGENT_CARD_WELL_KNOWN_PATH, "method": method}
111+
),
94112
route_path=PREV_AGENT_CARD_WELL_KNOWN_PATH,
95-
method=method
113+
method=method,
96114
)
97115
response = await transport.request(topic, request, ResponseMode.FIRST)
98116

@@ -203,6 +221,8 @@ async def broadcast_message(
203221
recipients: List[str] | None = None,
204222
broadcast_topic: str = None,
205223
timeout: float = 30.0,
224+
group_chat: bool = False,
225+
end_message: str = "work-done",
206226
) -> List[SendMessageResponse]:
207227
"""
208228
Broadcast a request using the provided transport.
@@ -217,12 +237,16 @@ async def broadcast_message(
217237
if not broadcast_topic:
218238
broadcast_topic = topic
219239

240+
# determine response mode, either collect len(recipients) or group chat
241+
resp_mode = ResponseMode.GROUP if group_chat else ResponseMode.COLLECT_ALL
242+
220243
try:
221244
responses = await transport.request(
222245
broadcast_topic,
223246
msg,
224-
response_mode=ResponseMode.COLLECT_ALL,
247+
response_mode=resp_mode,
225248
recipients=recipients,
249+
end_message=end_message,
226250
timeout=timeout,
227251
)
228252
except Exception as e:
@@ -305,6 +329,19 @@ async def handle_message(self, message: Message) -> Message:
305329
)
306330
method = message.method
307331

332+
# check if the body is a JSONRPCSuccessResponse, and if so, convert it to a SendMessageRequest
333+
try:
334+
inner = JSONRPCSuccessResponse.model_validate_json(body)
335+
msg_params = {"message": inner.result}
336+
request = SendMessageRequest(
337+
id=str(uuid4()), params=MessageSendParams(**msg_params)
338+
)
339+
body = json.dumps(
340+
request.model_dump(mode="json", exclude_none=True)
341+
).encode("utf-8")
342+
except Exception:
343+
pass
344+
308345
headers = []
309346
for key, value in message.headers.items():
310347
if isinstance(value, str):

src/agntcy_app_sdk/protocols/message.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ def __init__(
1616
type: str,
1717
payload: bytes,
1818
reply_to: Optional[str] = None,
19-
route_path: Optional[str] = None,
20-
method: Optional[str] = None,
19+
route_path: Optional[str] = "/",
20+
method: Optional[str] = "POST",
2121
headers: Optional[dict] = None,
2222
status_code: Optional[int] = None,
2323
):

src/agntcy_app_sdk/transports/nats/transport.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,12 @@ async def _connect(self):
103103
async def close(self) -> None:
104104
"""Close the NATS connection."""
105105
if self._nc:
106-
await self._nc.drain()
107-
await self._nc.close()
108-
logger.info("NATS connection closed")
106+
try:
107+
await self._nc.drain()
108+
await self._nc.close()
109+
logger.info("NATS connection closed")
110+
except Exception as e:
111+
logger.error(f"Error closing NATS connection: {e}")
109112
else:
110113
logger.warning("No NATS connection to close")
111114

@@ -191,6 +194,7 @@ async def _request_all(
191194
message: Message,
192195
recipients: List[str] = None,
193196
timeout: float = 30.0,
197+
**kwargs,
194198
) -> List[Message]:
195199
"""
196200
Send a message to topic and wait for a response from all recipients

src/agntcy_app_sdk/transports/slim/common.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ def split_id(id):
1717
except ValueError as e:
1818
raise e
1919

20-
print(f"split_id: {id} -> {organization}, {namespace}, {app}")
21-
2220
return slim_bindings.PyName(organization, namespace, app)
2321

2422

src/agntcy_app_sdk/transports/slim/session_manager.py

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Copyright AGNTCY Contributors (https://github.com/agntcy)
22
# SPDX-License-Identifier: Apache-2.0
33

4+
import asyncio
45
from typing import Dict
56
import datetime
67
from agntcy_app_sdk.common.logging_config import configure_logging, get_logger
@@ -11,6 +12,7 @@
1112
PySessionConfiguration,
1213
PySessionDirection,
1314
)
15+
from agntcy_app_sdk.transports.transport import Message
1416
from threading import Lock
1517

1618
configure_logging()
@@ -42,12 +44,17 @@ async def request_reply_session(
4244
raise ValueError("SLIM client is not set")
4345

4446
# check if we already have a request-reply session
45-
session_key = "RequestReply"
46-
47-
with self._lock:
48-
if session_key in self._sessions:
49-
logger.info(f"Reusing existing session: {session_key}")
50-
return session_key, self._sessions[session_key]
47+
for session_id, (session, q) in self._slim.sessions.items():
48+
try:
49+
conf = await self._slim.get_session_config(session_id)
50+
# compare the type of conf to PySessionConfiguration.FireAndForget
51+
if isinstance(conf, PySessionConfiguration.FireAndForget):
52+
return session_id, session
53+
except Exception as e:
54+
logger.warning(
55+
f"could not retrieve SLIM session config for {session_id}: {e}"
56+
)
57+
continue
5158

5259
with self._lock:
5360
session = await self._slim.create_session(
@@ -58,9 +65,7 @@ async def request_reply_session(
5865
mls_enabled=mls_enabled,
5966
)
6067
)
61-
session_key = "RequestReply"
62-
self._sessions[session_key] = session
63-
return session_key, session
68+
return session.id, session
6469

6570
async def group_broadcast_session(
6671
self,
@@ -77,15 +82,15 @@ async def group_broadcast_session(
7782
raise ValueError("SLIM client is not set")
7883

7984
# check if we already have a group broadcast session for this channel and invitees
80-
session_key = f"GroupChannel:{channel}:" + ",".join(
85+
session_key = f"PySessionConfiguration.Streaming:{channel}:" + ",".join(
8186
[str(invitee) for invitee in invitees]
8287
)
8388
with self._lock:
8489
if session_key in self._sessions:
8590
logger.info(f"Reusing existing group broadcast session: {session_key}")
8691
return session_key, self._sessions[session_key]
8792

88-
logger.info(f"Creating new group broadcast session: {session_key}")
93+
logger.debug(f"Creating new group broadcast session: {session_key}")
8994
with self._lock:
9095
session_info = await self._slim.create_session(
9196
PySessionConfiguration.Streaming(
@@ -99,20 +104,61 @@ async def group_broadcast_session(
99104
)
100105

101106
for invitee in invitees:
107+
logger.debug(f"Inviting {invitee} to session {session_info.id}")
102108
await self._slim.set_route(invitee)
103109
await self._slim.invite(session_info, invitee)
104110

105111
# store the session info
106112
self._sessions[session_key] = session_info
107113
return session_key, session_info
108114

109-
def close_session(self, session_key: str):
115+
async def close_session(
116+
self, session: PySessionInfo, remote: PyName = None, end_signal: str = None
117+
):
110118
"""
111119
Close and remove a session by its key.
112120
"""
113-
session = self._sessions.pop(session_key, None)
114-
if session:
115-
logger.info(f"Closing session: {session_key}")
121+
if not self._slim:
122+
raise ValueError("SLIM client is not set")
123+
124+
try:
125+
# send the end signal to the remote if provided
126+
if remote is not None and end_signal is not None:
127+
logger.info(f"Sending end signal '{end_signal}' to remote {remote}")
128+
129+
end_msg = Message(
130+
type="text/plain",
131+
headers={"x-session-end-message": end_signal},
132+
payload=end_signal,
133+
)
134+
await self._slim.publish(session, end_msg.serialize(), remote)
135+
await asyncio.sleep(
136+
0.25
137+
) # give some time for the message to be sent before closing our side
138+
139+
await self._slim.delete_session(session.id)
140+
logger.debug(f"Closed session: {session.id}")
141+
142+
# remove from local store
143+
self._local_cache_cleanup(session.id)
144+
except Exception as e:
145+
logger.warning(f"Error closing SLIM session {session.id}: {e}")
146+
return
147+
148+
def _local_cache_cleanup(self, session_id: int):
149+
"""
150+
Perform local cleanup of a session without attempting to close it on the SLIM client.
151+
"""
152+
with self._lock:
153+
session_key = None
154+
for key, sess in self._sessions.items():
155+
if sess.id == session_id:
156+
session_key = key
157+
break
158+
159+
if session_key:
160+
del self._sessions[session_key]
161+
logger.debug(f"Locally cleaned up session: {session_id}")
116162

117163
def session_details(self, session_key: str):
118164
"""

0 commit comments

Comments
 (0)