Skip to content

Commit e865fd0

Browse files
authored
Merge pull request #728 from ex-takashima/feat/media-cleanup-ttl
feat(media): integrate TTL cleanup into FileMediaStore
2 parents d917140 + 8979683 commit e865fd0

File tree

5 files changed

+491
-17
lines changed

5 files changed

+491
-17
lines changed

cmd/picoclaw/cmd_gateway.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,17 @@ func gatewayCmd() {
122122
return tools.SilentResult(response)
123123
})
124124

125-
// Create media store for file lifecycle management
126-
mediaStore := media.NewFileMediaStore()
125+
// Create media store for file lifecycle management with TTL cleanup
126+
mediaStore := media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{
127+
Enabled: cfg.Tools.MediaCleanup.Enabled,
128+
MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute,
129+
Interval: time.Duration(cfg.Tools.MediaCleanup.Interval) * time.Minute,
130+
})
131+
mediaStore.Start()
127132

128133
channelManager, err := channels.NewManager(cfg, msgBus, mediaStore)
129134
if err != nil {
135+
mediaStore.Stop()
130136
fmt.Printf("Error creating channel manager: %v\n", err)
131137
os.Exit(1)
132138
}
@@ -200,6 +206,7 @@ func gatewayCmd() {
200206
deviceService.Stop()
201207
heartbeatService.Stop()
202208
cronService.Stop()
209+
mediaStore.Stop()
203210
agentLoop.Stop()
204211
fmt.Println("✓ Gateway stopped")
205212
}

pkg/config/config.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -504,11 +504,18 @@ type ExecConfig struct {
504504
CustomDenyPatterns []string `json:"custom_deny_patterns" env:"PICOCLAW_TOOLS_EXEC_CUSTOM_DENY_PATTERNS"`
505505
}
506506

507+
type MediaCleanupConfig struct {
508+
Enabled bool `json:"enabled" env:"PICOCLAW_MEDIA_CLEANUP_ENABLED"`
509+
MaxAge int `json:"max_age_minutes" env:"PICOCLAW_MEDIA_CLEANUP_MAX_AGE"`
510+
Interval int `json:"interval_minutes" env:"PICOCLAW_MEDIA_CLEANUP_INTERVAL"`
511+
}
512+
507513
type ToolsConfig struct {
508-
Web WebToolsConfig `json:"web"`
509-
Cron CronToolsConfig `json:"cron"`
510-
Exec ExecConfig `json:"exec"`
511-
Skills SkillsToolsConfig `json:"skills"`
514+
Web WebToolsConfig `json:"web"`
515+
Cron CronToolsConfig `json:"cron"`
516+
Exec ExecConfig `json:"exec"`
517+
Skills SkillsToolsConfig `json:"skills"`
518+
MediaCleanup MediaCleanupConfig `json:"media_cleanup"`
512519
}
513520

514521
type SkillsToolsConfig struct {

pkg/config/defaults.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,11 @@ func DefaultConfig() *Config {
291291
Port: 18790,
292292
},
293293
Tools: ToolsConfig{
294+
MediaCleanup: MediaCleanupConfig{
295+
Enabled: true,
296+
MaxAge: 30,
297+
Interval: 5,
298+
},
294299
Web: WebToolsConfig{
295300
Brave: BraveConfig{
296301
Enabled: false,

pkg/media/store.go

Lines changed: 158 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ import (
44
"fmt"
55
"os"
66
"sync"
7+
"time"
78

89
"github.com/google/uuid"
10+
11+
"github.com/sipeed/picoclaw/pkg/logger"
912
)
1013

1114
// MediaMeta holds metadata about a stored media file.
@@ -35,8 +38,16 @@ type MediaStore interface {
3538

3639
// mediaEntry holds the path and metadata for a stored media file.
3740
type mediaEntry struct {
38-
path string
39-
meta MediaMeta
41+
path string
42+
meta MediaMeta
43+
storedAt time.Time
44+
}
45+
46+
// MediaCleanerConfig configures the background TTL cleanup.
47+
type MediaCleanerConfig struct {
48+
Enabled bool
49+
MaxAge time.Duration
50+
Interval time.Duration
4051
}
4152

4253
// FileMediaStore is a pure in-memory implementation of MediaStore.
@@ -45,13 +56,34 @@ type FileMediaStore struct {
4556
mu sync.RWMutex
4657
refs map[string]mediaEntry
4758
scopeToRefs map[string]map[string]struct{}
59+
refToScope map[string]string
60+
61+
cleanerCfg MediaCleanerConfig
62+
stop chan struct{}
63+
startOnce sync.Once
64+
stopOnce sync.Once
65+
nowFunc func() time.Time // for testing
4866
}
4967

50-
// NewFileMediaStore creates a new FileMediaStore.
68+
// NewFileMediaStore creates a new FileMediaStore without background cleanup.
5169
func NewFileMediaStore() *FileMediaStore {
5270
return &FileMediaStore{
5371
refs: make(map[string]mediaEntry),
5472
scopeToRefs: make(map[string]map[string]struct{}),
73+
refToScope: make(map[string]string),
74+
nowFunc: time.Now,
75+
}
76+
}
77+
78+
// NewFileMediaStoreWithCleanup creates a FileMediaStore with TTL-based background cleanup.
79+
func NewFileMediaStoreWithCleanup(cfg MediaCleanerConfig) *FileMediaStore {
80+
return &FileMediaStore{
81+
refs: make(map[string]mediaEntry),
82+
scopeToRefs: make(map[string]map[string]struct{}),
83+
refToScope: make(map[string]string),
84+
cleanerCfg: cfg,
85+
stop: make(chan struct{}),
86+
nowFunc: time.Now,
5587
}
5688
}
5789

@@ -66,11 +98,12 @@ func (s *FileMediaStore) Store(localPath string, meta MediaMeta, scope string) (
6698
s.mu.Lock()
6799
defer s.mu.Unlock()
68100

69-
s.refs[ref] = mediaEntry{path: localPath, meta: meta}
101+
s.refs[ref] = mediaEntry{path: localPath, meta: meta, storedAt: s.nowFunc()}
70102
if s.scopeToRefs[scope] == nil {
71103
s.scopeToRefs[scope] = make(map[string]struct{})
72104
}
73105
s.scopeToRefs[scope][ref] = struct{}{}
106+
s.refToScope[ref] = scope
74107

75108
return ref, nil
76109
}
@@ -100,24 +133,139 @@ func (s *FileMediaStore) ResolveWithMeta(ref string) (string, MediaMeta, error)
100133
}
101134

102135
// ReleaseAll removes all files under the given scope and cleans up mappings.
136+
// Phase 1 (under lock): remove entries from maps.
137+
// Phase 2 (no lock): delete files from disk.
103138
func (s *FileMediaStore) ReleaseAll(scope string) error {
104-
s.mu.Lock()
105-
defer s.mu.Unlock()
139+
// Phase 1: collect paths and remove from maps under lock
140+
var paths []string
106141

142+
s.mu.Lock()
107143
refs, ok := s.scopeToRefs[scope]
108144
if !ok {
145+
s.mu.Unlock()
109146
return nil
110147
}
111148

112149
for ref := range refs {
113150
if entry, exists := s.refs[ref]; exists {
114-
if err := os.Remove(entry.path); err != nil && !os.IsNotExist(err) {
115-
// Log but continue — best effort cleanup
151+
paths = append(paths, entry.path)
152+
}
153+
delete(s.refs, ref)
154+
delete(s.refToScope, ref)
155+
}
156+
delete(s.scopeToRefs, scope)
157+
s.mu.Unlock()
158+
159+
// Phase 2: delete files without holding the lock
160+
for _, p := range paths {
161+
if err := os.Remove(p); err != nil && !os.IsNotExist(err) {
162+
logger.WarnCF("media", "release: failed to remove file", map[string]any{
163+
"path": p,
164+
"error": err.Error(),
165+
})
166+
}
167+
}
168+
169+
return nil
170+
}
171+
172+
// CleanExpired removes all entries older than MaxAge.
173+
// Phase 1 (under lock): identify expired entries and remove from maps.
174+
// Phase 2 (no lock): delete files from disk to minimize lock contention.
175+
func (s *FileMediaStore) CleanExpired() int {
176+
if s.cleanerCfg.MaxAge <= 0 {
177+
return 0
178+
}
179+
180+
// Phase 1: collect expired entries under lock
181+
type expiredEntry struct {
182+
ref string
183+
path string
184+
}
185+
186+
s.mu.Lock()
187+
cutoff := s.nowFunc().Add(-s.cleanerCfg.MaxAge)
188+
var expired []expiredEntry
189+
190+
for ref, entry := range s.refs {
191+
if entry.storedAt.Before(cutoff) {
192+
expired = append(expired, expiredEntry{ref: ref, path: entry.path})
193+
194+
if scope, ok := s.refToScope[ref]; ok {
195+
if scopeRefs, ok := s.scopeToRefs[scope]; ok {
196+
delete(scopeRefs, ref)
197+
if len(scopeRefs) == 0 {
198+
delete(s.scopeToRefs, scope)
199+
}
200+
}
116201
}
202+
117203
delete(s.refs, ref)
204+
delete(s.refToScope, ref)
118205
}
119206
}
207+
s.mu.Unlock()
120208

121-
delete(s.scopeToRefs, scope)
122-
return nil
209+
// Phase 2: delete files without holding the lock
210+
for _, e := range expired {
211+
if err := os.Remove(e.path); err != nil && !os.IsNotExist(err) {
212+
logger.WarnCF("media", "cleanup: failed to remove file", map[string]any{
213+
"path": e.path,
214+
"error": err.Error(),
215+
})
216+
}
217+
}
218+
219+
return len(expired)
220+
}
221+
222+
// Start begins the background cleanup goroutine if cleanup is enabled.
223+
// Safe to call multiple times; only the first call starts the goroutine.
224+
func (s *FileMediaStore) Start() {
225+
if !s.cleanerCfg.Enabled || s.stop == nil {
226+
return
227+
}
228+
if s.cleanerCfg.Interval <= 0 || s.cleanerCfg.MaxAge <= 0 {
229+
logger.WarnCF("media", "cleanup: skipped due to invalid config", map[string]any{
230+
"interval": s.cleanerCfg.Interval.String(),
231+
"max_age": s.cleanerCfg.MaxAge.String(),
232+
})
233+
return
234+
}
235+
236+
s.startOnce.Do(func() {
237+
logger.InfoCF("media", "cleanup enabled", map[string]any{
238+
"interval": s.cleanerCfg.Interval.String(),
239+
"max_age": s.cleanerCfg.MaxAge.String(),
240+
})
241+
242+
go func() {
243+
ticker := time.NewTicker(s.cleanerCfg.Interval)
244+
defer ticker.Stop()
245+
246+
for {
247+
select {
248+
case <-ticker.C:
249+
if n := s.CleanExpired(); n > 0 {
250+
logger.InfoCF("media", "cleanup: removed expired entries", map[string]any{
251+
"count": n,
252+
})
253+
}
254+
case <-s.stop:
255+
return
256+
}
257+
}
258+
}()
259+
})
260+
}
261+
262+
// Stop terminates the background cleanup goroutine.
263+
// Safe to call multiple times; only the first call closes the channel.
264+
func (s *FileMediaStore) Stop() {
265+
if s.stop == nil {
266+
return
267+
}
268+
s.stopOnce.Do(func() {
269+
close(s.stop)
270+
})
123271
}

0 commit comments

Comments
 (0)