Skip to content

Commit 34687dc

Browse files
authored
Merge pull request #1 from evolv3ai/feat/mattermost-channel
feat(channels): add Mattermost channel support
2 parents 8de36d3 + 4268d37 commit 34687dc

File tree

3 files changed

+362
-0
lines changed

3 files changed

+362
-0
lines changed

nanobot/channels/manager.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,17 @@ def _init_channels(self) -> None:
125125
except ImportError as e:
126126
logger.warning(f"Slack channel not available: {e}")
127127

128+
# Mattermost channel
129+
if self.config.channels.mattermost.enabled:
130+
try:
131+
from nanobot.channels.mattermost import MattermostChannel
132+
self.channels["mattermost"] = MattermostChannel(
133+
self.config.channels.mattermost, self.bus
134+
)
135+
logger.info("Mattermost channel enabled")
136+
except ImportError as e:
137+
logger.warning(f"Mattermost channel not available: {e}")
138+
128139
# QQ channel
129140
if self.config.channels.qq.enabled:
130141
try:

nanobot/channels/mattermost.py

Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
"""Mattermost channel implementation using async WebSocket + httpx.
2+
3+
Connects to a Mattermost server using:
4+
- REST API v4 (httpx) for sending posts and resolving user info
5+
- WebSocket API (websockets) for real-time event streaming
6+
7+
No external Mattermost SDK is required; only websockets and httpx (both
8+
already project dependencies) are used.
9+
10+
Ported from picoclaw's Go implementation.
11+
"""
12+
13+
from __future__ import annotations
14+
15+
import asyncio
16+
import json
17+
from urllib.parse import urlparse, urlunparse
18+
19+
import httpx
20+
import websockets
21+
from loguru import logger
22+
23+
from nanobot.bus.events import OutboundMessage
24+
from nanobot.bus.queue import MessageBus
25+
from nanobot.channels.base import BaseChannel
26+
from nanobot.config.schema import MattermostConfig
27+
28+
29+
class MattermostChannel(BaseChannel):
30+
"""Mattermost channel using async WebSocket + REST API v4."""
31+
32+
name = "mattermost"
33+
34+
def __init__(self, config: MattermostConfig, bus: MessageBus):
35+
super().__init__(config, bus)
36+
self.config: MattermostConfig = config
37+
self._http: httpx.AsyncClient | None = None
38+
self._ws: websockets.WebSocketClientProtocol | None = None
39+
self._bot_user_id: str | None = None
40+
self._bot_username: str | None = None
41+
self._reconnect_task: asyncio.Task | None = None
42+
43+
# ── Lifecycle ────────────────────────────────────────────────────────
44+
45+
async def start(self) -> None:
46+
"""Start the Mattermost channel."""
47+
if not self.config.url or not self.config.token:
48+
logger.error("Mattermost url and token must be configured")
49+
return
50+
51+
self._http = httpx.AsyncClient(
52+
base_url=self._api_base(),
53+
headers={
54+
"Authorization": f"Bearer {self.config.token}",
55+
"Content-Type": "application/json",
56+
},
57+
timeout=30.0,
58+
)
59+
60+
# Verify credentials and get bot user info
61+
try:
62+
resp = await self._http.get("/users/me")
63+
resp.raise_for_status()
64+
me = resp.json()
65+
self._bot_user_id = me["id"]
66+
self._bot_username = me.get("username", "")
67+
logger.info(f"Mattermost bot @{self._bot_username} authenticated (id={self._bot_user_id})")
68+
except Exception as e:
69+
logger.error(f"Mattermost auth failed: {e}")
70+
return
71+
72+
# Connect WebSocket
73+
try:
74+
await self._connect_ws()
75+
except Exception as e:
76+
logger.error(f"Mattermost WebSocket connect failed: {e}")
77+
return
78+
79+
self._running = True
80+
81+
# Spawn reconnect watcher
82+
self._reconnect_task = asyncio.create_task(self._reconnect_loop())
83+
84+
# Listen for events (blocks until stopped)
85+
await self._listen_ws()
86+
87+
async def stop(self) -> None:
88+
"""Stop the Mattermost channel."""
89+
self._running = False
90+
91+
if self._reconnect_task:
92+
self._reconnect_task.cancel()
93+
try:
94+
await self._reconnect_task
95+
except asyncio.CancelledError:
96+
pass
97+
self._reconnect_task = None
98+
99+
if self._ws:
100+
try:
101+
await self._ws.close()
102+
except Exception:
103+
pass
104+
self._ws = None
105+
106+
if self._http:
107+
await self._http.aclose()
108+
self._http = None
109+
110+
logger.info("Mattermost channel stopped")
111+
112+
async def send(self, msg: OutboundMessage) -> None:
113+
"""Send a message through Mattermost."""
114+
if not self._http:
115+
logger.warning("Mattermost client not running")
116+
return
117+
118+
channel_id, root_id = _parse_chat_id(msg.chat_id)
119+
if not channel_id:
120+
logger.error(f"Invalid mattermost chat_id: {msg.chat_id!r}")
121+
return
122+
123+
for chunk in _split_message(msg.content, 4000):
124+
try:
125+
body: dict[str, str] = {
126+
"channel_id": channel_id,
127+
"message": chunk,
128+
}
129+
if root_id:
130+
body["root_id"] = root_id
131+
132+
resp = await self._http.post("/posts", json=body)
133+
resp.raise_for_status()
134+
except Exception as e:
135+
logger.error(f"Error sending Mattermost message: {e}")
136+
return
137+
138+
# ── WebSocket connection ─────────────────────────────────────────────
139+
140+
async def _connect_ws(self) -> None:
141+
"""Open a WebSocket connection and authenticate."""
142+
ws_url = self._ws_url()
143+
logger.info(f"Connecting Mattermost WebSocket to {ws_url}")
144+
145+
self._ws = await websockets.connect(ws_url, ping_interval=30, ping_timeout=10)
146+
147+
# Send authentication challenge
148+
auth_msg = {
149+
"seq": 1,
150+
"action": "authentication_challenge",
151+
"data": {"token": self.config.token},
152+
}
153+
await self._ws.send(json.dumps(auth_msg))
154+
logger.info("Mattermost WebSocket connected")
155+
156+
async def _listen_ws(self) -> None:
157+
"""Read WebSocket events in a loop."""
158+
while self._running:
159+
if not self._ws:
160+
await asyncio.sleep(1)
161+
continue
162+
163+
try:
164+
raw = await self._ws.recv()
165+
await self._handle_ws_message(raw)
166+
except websockets.ConnectionClosed:
167+
if not self._running:
168+
return
169+
logger.warning("Mattermost WebSocket connection closed")
170+
self._ws = None
171+
except Exception as e:
172+
if not self._running:
173+
return
174+
logger.error(f"Mattermost WebSocket read error: {e}")
175+
self._ws = None
176+
177+
async def _reconnect_loop(self) -> None:
178+
"""Reconnect WebSocket with exponential backoff."""
179+
delay = 5.0
180+
max_delay = 60.0
181+
182+
while self._running:
183+
await asyncio.sleep(delay)
184+
185+
if self._ws is not None:
186+
delay = 5.0
187+
continue
188+
189+
logger.info("Attempting Mattermost WebSocket reconnect...")
190+
try:
191+
await self._connect_ws()
192+
delay = 5.0
193+
logger.info("Mattermost WebSocket reconnected")
194+
except Exception as e:
195+
logger.error(f"Mattermost WebSocket reconnect failed: {e}")
196+
delay = min(delay * 2, max_delay)
197+
198+
# ── Event handling ──────────────────────────────────────────────────
199+
200+
async def _handle_ws_message(self, raw: str | bytes) -> None:
201+
"""Parse and dispatch a WebSocket event."""
202+
try:
203+
evt = json.loads(raw)
204+
except json.JSONDecodeError:
205+
logger.warning("Mattermost: failed to parse WS frame")
206+
return
207+
208+
event_type = evt.get("event", "")
209+
210+
if event_type == "posted":
211+
await self._handle_posted(evt)
212+
elif event_type == "hello":
213+
logger.info("Mattermost WebSocket hello (server ready)")
214+
elif event_type == "":
215+
pass # acknowledgement frame
216+
else:
217+
logger.debug(f"Mattermost: unhandled WS event: {event_type}")
218+
219+
async def _handle_posted(self, evt: dict) -> None:
220+
"""Handle a 'posted' event (new message)."""
221+
data = evt.get("data", {})
222+
223+
# The post field is double-encoded JSON
224+
post_str = data.get("post", "{}")
225+
try:
226+
post = json.loads(post_str)
227+
except json.JSONDecodeError:
228+
logger.warning("Mattermost: failed to parse post JSON")
229+
return
230+
231+
user_id = post.get("user_id", "")
232+
channel_id = post.get("channel_id", "")
233+
message = post.get("message", "").strip()
234+
post_id = post.get("id", "")
235+
root_id = post.get("root_id", "")
236+
channel_type = data.get("channel_type", "")
237+
sender_name = data.get("sender_name", "")
238+
239+
# Ignore our own messages
240+
if user_id == self._bot_user_id:
241+
return
242+
243+
# Check allowlist
244+
if not self.is_allowed(user_id):
245+
logger.debug(f"Mattermost: message rejected by allowlist (user={user_id})")
246+
return
247+
248+
# Strip bot @mention
249+
message = self._strip_bot_mention(message)
250+
if not message:
251+
return
252+
253+
# Build chat_id with threading context:
254+
# DMs (channel_type "D"): just channelID (no threading)
255+
# Channel, existing thread: channelID/rootID (always stay in thread)
256+
# Channel, new message + reply_in_thread: channelID/postID (start new thread)
257+
# Channel, new message + no threading: just channelID (flat reply)
258+
chat_id = channel_id
259+
if root_id:
260+
chat_id = f"{channel_id}/{root_id}"
261+
elif channel_type != "D" and self.config.reply_in_thread:
262+
chat_id = f"{channel_id}/{post_id}"
263+
264+
logger.debug(
265+
f"Mattermost message from {sender_name} ({user_id}): {message[:50]}"
266+
)
267+
268+
await self._handle_message(
269+
sender_id=user_id,
270+
chat_id=chat_id,
271+
content=message,
272+
metadata={
273+
"mattermost": {
274+
"post_id": post_id,
275+
"channel_id": channel_id,
276+
"root_id": root_id,
277+
"channel_type": channel_type,
278+
"sender_name": sender_name,
279+
"team_id": data.get("team_id", ""),
280+
}
281+
},
282+
)
283+
284+
# ── Helpers ──────────────────────────────────────────────────────────
285+
286+
def _api_base(self) -> str:
287+
"""Build the REST API v4 base URL."""
288+
return self.config.url.rstrip("/") + "/api/v4"
289+
290+
def _ws_url(self) -> str:
291+
"""Build the WebSocket URL from the server URL."""
292+
parsed = urlparse(self.config.url)
293+
scheme = "wss" if parsed.scheme == "https" else "ws"
294+
ws_parsed = parsed._replace(scheme=scheme, path="/api/v4/websocket")
295+
return urlunparse(ws_parsed)
296+
297+
def _strip_bot_mention(self, text: str) -> str:
298+
"""Strip @botusername mention from message text."""
299+
username = self.config.username or self._bot_username
300+
if username:
301+
text = text.replace(f"@{username}", "")
302+
return text.strip()
303+
304+
305+
# ── Module-level utilities ───────────────────────────────────────────────────
306+
307+
308+
def _parse_chat_id(chat_id: str) -> tuple[str, str]:
309+
"""Split 'channelID' or 'channelID/rootID' into components."""
310+
parts = chat_id.split("/", 1)
311+
channel_id = parts[0]
312+
root_id = parts[1] if len(parts) > 1 else ""
313+
return channel_id, root_id
314+
315+
316+
def _split_message(content: str, limit: int = 4000) -> list[str]:
317+
"""Split long content into chunks, preferring newline then word boundaries."""
318+
if len(content) <= limit:
319+
return [content]
320+
321+
chunks: list[str] = []
322+
while content:
323+
if len(content) <= limit:
324+
chunks.append(content)
325+
break
326+
327+
# Try to split at a newline within the last 200 chars of the limit
328+
segment = content[:limit]
329+
split = segment.rfind("\n", limit - 200)
330+
if split <= 0:
331+
# Fall back to last space within the last 100 chars
332+
split = segment.rfind(" ", limit - 100)
333+
if split <= 0:
334+
split = limit
335+
336+
chunks.append(content[:split])
337+
content = content[split:].strip()
338+
339+
return chunks

nanobot/config/schema.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,17 @@ class QQConfig(Base):
164164
allow_from: list[str] = Field(default_factory=list) # Allowed user openids (empty = public access)
165165

166166

167+
class MattermostConfig(Base):
168+
"""Mattermost channel configuration."""
169+
170+
enabled: bool = False
171+
url: str = "" # Server URL, e.g. "https://mattermost.example.com"
172+
token: str = "" # Personal Access Token or Bot Token
173+
username: str = "" # Bot username for mention stripping (optional)
174+
reply_in_thread: bool = True # Reply as thread in channels (DMs always flat)
175+
allow_from: list[str] = Field(default_factory=list) # Allowed user IDs (empty = allow all)
176+
177+
167178
class ChannelsConfig(Base):
168179
"""Configuration for chat channels."""
169180

@@ -176,6 +187,7 @@ class ChannelsConfig(Base):
176187
email: EmailConfig = Field(default_factory=EmailConfig)
177188
slack: SlackConfig = Field(default_factory=SlackConfig)
178189
qq: QQConfig = Field(default_factory=QQConfig)
190+
mattermost: MattermostConfig = Field(default_factory=MattermostConfig)
179191

180192

181193
class AgentDefaults(Base):

0 commit comments

Comments
 (0)