Skip to content

Commit 1d0091f

Browse files
committed
add recover model from checkpoint first if it exists, store is local
1 parent fde8461 commit 1d0091f

File tree

13 files changed

+541
-31
lines changed

13 files changed

+541
-31
lines changed

cmd/craned/app/manager.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package app
33
import (
44
"context"
55
"flag"
6+
"fmt"
67
"os"
78
"strings"
89

@@ -25,6 +26,7 @@ import (
2526
predictionapi "github.com/gocrane/api/prediction/v1alpha1"
2627

2728
"github.com/gocrane/crane/cmd/craned/app/options"
29+
"github.com/gocrane/crane/pkg/checkpoint"
2830
"github.com/gocrane/crane/pkg/controller/analytics"
2931
"github.com/gocrane/crane/pkg/controller/cnp"
3032
"github.com/gocrane/crane/pkg/controller/ehpa"
@@ -108,7 +110,11 @@ func Run(ctx context.Context, opts *options.Options) error {
108110
}
109111
// initialize data sources and predictor
110112
realtimeDataSources, histroyDataSources, _ := initDataSources(mgr, opts)
111-
predictorMgr := initPredictorManager(opts, realtimeDataSources, histroyDataSources)
113+
predictorMgr, err := initPredictorManager(opts, realtimeDataSources, histroyDataSources)
114+
if err != nil {
115+
klog.Error(err, "failed to init predictor mgr")
116+
return err
117+
}
112118

113119
initScheme()
114120
initWebhooks(mgr, opts)
@@ -197,8 +203,27 @@ func initDataSources(mgr ctrl.Manager, opts *options.Options) (map[providers.Dat
197203
return realtimeDataSources, historyDataSources, hybridDataSources
198204
}
199205

200-
func initPredictorManager(opts *options.Options, realtimeDataSources map[providers.DataSourceType]providers.RealTime, historyDataSources map[providers.DataSourceType]providers.History) predictor.Manager {
201-
return predictor.NewManager(realtimeDataSources, historyDataSources, predictor.DefaultPredictorsConfig(opts.AlgorithmModelConfig))
206+
func initPredictorManager(opts *options.Options, realtimeDataSources map[providers.DataSourceType]providers.RealTime, historyDataSources map[providers.DataSourceType]providers.History) (predictor.Manager, error) {
207+
cpStoreType := checkpoint.StoreType(opts.CheckpointerStore)
208+
var checkpointer checkpoint.Checkpointer
209+
var err error
210+
if opts.EnableCheckpointer {
211+
switch cpStoreType {
212+
case checkpoint.StoreTypeLocal:
213+
checkpointer, err = checkpoint.InitCheckpointer(checkpoint.StoreType(opts.CheckpointerStore), opts.CheckpointerLocalConfig)
214+
if err != nil {
215+
return nil, err
216+
}
217+
default:
218+
return nil, fmt.Errorf("not supported checkpointer store type %v", cpStoreType)
219+
}
220+
}
221+
222+
ctx := predictor.CheckPointerContext{
223+
Enable: opts.EnableCheckpointer,
224+
Checkpointer: checkpointer,
225+
}
226+
return predictor.NewManager(realtimeDataSources, historyDataSources, predictor.DefaultPredictorsConfig(opts.AlgorithmModelConfig), ctx), nil
202227
}
203228

204229
// initControllers setup controllers with manager

cmd/craned/app/options/options.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/spf13/pflag"
77
componentbaseconfig "k8s.io/component-base/config"
88

9+
"github.com/gocrane/crane/pkg/checkpoint"
910
"github.com/gocrane/crane/pkg/controller/ehpa"
1011
"github.com/gocrane/crane/pkg/prediction/config"
1112
"github.com/gocrane/crane/pkg/providers"
@@ -37,7 +38,10 @@ type Options struct {
3738
DataSourceGrpcConfig providers.GrpcConfig
3839

3940
// AlgorithmModelConfig
40-
AlgorithmModelConfig config.AlgorithmModelConfig
41+
AlgorithmModelConfig config.AlgorithmModelConfig
42+
EnableCheckpointer bool
43+
CheckpointerStore string
44+
CheckpointerLocalConfig checkpoint.LocalStoreConfig
4145

4246
// WebhookConfig
4347
WebhookConfig webhooks.WebhookConfig
@@ -107,6 +111,12 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
107111
flags.StringVar(&o.DataSourceGrpcConfig.Address, "grpc-ds-address", "localhost:50051", "grpc data source server address")
108112
flags.DurationVar(&o.DataSourceGrpcConfig.Timeout, "grpc-ds-timeout", time.Minute, "grpc timeout")
109113
flags.DurationVar(&o.AlgorithmModelConfig.UpdateInterval, "model-update-interval", 12*time.Hour, "algorithm model update interval, now used for dsp model update interval")
114+
115+
flags.BoolVar(&o.EnableCheckpointer, "enable-checkpointer", false, "algorithm model checkpointer, if you want to do checkpoint, you can enable it")
116+
flags.StringVar(&o.CheckpointerStore, "checkpointer-store", "local", "type of the checkpointer, different checkpointer has different storage type. default is local")
117+
flags.StringVar(&o.CheckpointerLocalConfig.Root, "checkpointer-local-root", ".", "local checkpointer root path which checkpoint data stored in, make sure your app has permission to read/write")
118+
flags.IntVar(&o.CheckpointerLocalConfig.MaxWorkers, "checkpointer-local-max-workers", 4, "local checkpointer max workers to do read/write")
119+
110120
flags.BoolVar(&o.WebhookConfig.Enabled, "webhook-enabled", true, "whether enable webhook or not, default to true")
111121
flags.StringVar(&o.RecommendationConfigFile, "recommendation-config-file", "", "recommendation configuration file")
112122
flags.StringSliceVar(&o.EhpaControllerConfig.PropagationConfig.LabelPrefixes, "ehpa-propagation-label-prefixes", []string{}, "propagate labels whose key has the prefix to hpa")

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ go 1.17
44

55
require (
66
github.com/go-echarts/go-echarts/v2 v2.2.4
7-
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac
87
github.com/google/cadvisor v0.39.2
98
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
109
github.com/prometheus/client_golang v1.11.0
@@ -149,7 +148,6 @@ require (
149148
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
150149
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
151150
google.golang.org/appengine v1.6.7 // indirect
152-
google.golang.org/protobuf v1.27.1 // indirect
153151
gopkg.in/inf.v0 v0.9.1 // indirect
154152
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
155153
gopkg.in/warnings.v0 v0.1.1 // indirect
@@ -169,7 +167,8 @@ require (
169167
require (
170168
cloud.google.com/go v0.84.0 // indirect
171169
github.com/cespare/xxhash/v2 v2.1.2 // indirect
172-
github.com/fsnotify/fsnotify v1.5.1 // indirect
170+
github.com/fsnotify/fsnotify v1.5.1
171+
github.com/gocrane/api v0.5.0
173172
github.com/json-iterator/go v1.1.12 // indirect
174173
github.com/mattn/go-isatty v0.0.14 // indirect
175174
github.com/robfig/cron/v3 v3.0.1
@@ -181,6 +180,7 @@ require (
181180
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
182181
golang.org/x/tools v0.1.8 // indirect
183182
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
183+
google.golang.org/protobuf v1.27.1
184184
)
185185

186186
replace (

go.sum

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,10 +310,8 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
310310
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
311311
github.com/gobwas/ws v1.1.0-rc.5 h1:QOAag7FoBaBYYHRqzqkhhd8fq5RTubvI4v3Ft/gDVVQ=
312312
github.com/gobwas/ws v1.1.0-rc.5/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0=
313-
github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4 h1:vGDg3G6y661KAlhjf/8/r8JCjaIi6aV8szCP+MZRU3Y=
314-
github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
315-
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac h1:lBKVVOA4del0Plj80PCE+nglxaJxaXanCv5N6a3laVY=
316-
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
313+
github.com/gocrane/api v0.5.0 h1:hKPt1T8T/vBEtMyWhz976ZHG8w+Z4NuHpp5+eixcw1A=
314+
github.com/gocrane/api v0.5.0/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
317315
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
318316
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
319317
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=

pkg/checkpoint/checkpoint.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package checkpoint
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/gocrane/crane/pkg/internal"
10+
"github.com/gocrane/crane/pkg/metricnaming"
11+
)
12+
13+
// Checkpointer is used to do checkpoint for metric namer. this package is only responsible for executing store and load of checkpoint data.
14+
// You can implement other checkpoint writer and reader backed by different storages such as localfs、s3、etcd(Custom Resource Definition)
15+
// the caller decides when to do checkpoint, checkpoint frequency is depending on the caller.
16+
// there are multiple ways to decide when to do checkpoint.
17+
// 1. predictor checkpoints all metric namers together periodically by a independent routine. but this will not guarantee the checkpoint data is consistent with the latest updated model in memory
18+
// 2. predictor checkpoints the metric namer each time after model is updating, so the checkpoint is always latest. for example, the percentile to do checkpoint after add sample for each metric namer.
19+
// 3. application caller such as evpa triggers the metric namer to do checkpoint. delegate the trigger to application caller
20+
type Checkpointer interface {
21+
Start(stopCh <-chan struct{})
22+
Writer
23+
Reader
24+
}
25+
26+
type Writer interface {
27+
// store metricNamer checkpoints. each time call will override original checkpoint data of the same metric namer if it exists.
28+
// each metric namer model checkpoint only store one replica.
29+
// this is sync way, it block until the checkpoint stored operation finished
30+
StoreMetricModelCheckpoint(ctx context.Context, checkpoint *internal.CheckpointContext, now time.Time) error
31+
// this is async way, it send the checkpoint to a channel and return immediately
32+
AsyncStoreMetricModelCheckpoint(ctx context.Context, checkpoint *internal.CheckpointContext, now time.Time) error
33+
// close checkpointer, close the queue && wait until all requests pending in queue done
34+
Flush()
35+
}
36+
37+
type Reader interface {
38+
// load metricNamer checkpoints
39+
LoadMetricModelCheckpoint(ctx context.Context, namer metricnaming.MetricNamer) (*internal.MetricNamerModelCheckpoint, error)
40+
}
41+
42+
type StoreType string
43+
44+
const (
45+
StoreTypeLocal StoreType = "local"
46+
StoreTypeK8s StoreType = "k8s"
47+
)
48+
49+
type Factory func(cfg interface{}) (Checkpointer, error)
50+
51+
var (
52+
checkpointFactorys = make(map[StoreType]Factory)
53+
lock sync.Mutex
54+
)
55+
56+
func RegisterFactory(storeType StoreType, factory Factory) {
57+
lock.Lock()
58+
defer lock.Unlock()
59+
checkpointFactorys[storeType] = factory
60+
}
61+
62+
func InitCheckpointer(storeType StoreType, cfg interface{}) (Checkpointer, error) {
63+
lock.Lock()
64+
defer lock.Unlock()
65+
if factory, ok := checkpointFactorys[storeType]; ok {
66+
return factory(cfg)
67+
} else {
68+
return nil, fmt.Errorf("not registered checkpoint store type %v", storeType)
69+
}
70+
}

pkg/checkpoint/k8s.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package checkpoint
2+
3+
// todo: define the k8s crd for metricnamer model checkpoint

0 commit comments

Comments
 (0)