diff --git a/cmd/drive/main.go b/cmd/drive/main.go index ef812a40..7d371df3 100644 --- a/cmd/drive/main.go +++ b/cmd/drive/main.go @@ -20,6 +20,7 @@ import ( "flag" "fmt" "io" + "log" "os" "os/signal" "path/filepath" @@ -121,6 +122,7 @@ func main() { bindCommandWithAliases(drive.ClashesKey, drive.DescFixClashes, &clashesCmd{}, []string{}) bindCommandWithAliases(drive.IdKey, drive.DescId, &idCmd{}, []string{}) bindCommandWithAliases(drive.ReportIssueKey, drive.DescReportIssue, &issueCmd{}, []string{}) + bindCommandWithAliases("watch", "watch files", &watchCmd{}, []string{}) command.DefineHelp(&helpCmd{}) command.ParseAndRun() @@ -798,6 +800,77 @@ func (pCmd *pullCmd) Run(args []string, definedFlags map[string]*flag.Flag) { } } +type watchCmd struct { + HostAddress *string `json:"host-address"` + ById *bool `json:"by-id"` +} + +func (cmd *watchCmd) Flags(fs *flag.FlagSet) *flag.FlagSet { + cmd.HostAddress = fs.String("host-address", "http://localhost:8333", "the host to run the watch server on") + cmd.ById = fs.Bool("by-id", false, "resolve paths by id") + + return fs +} + +func (wcmd *watchCmd) Run(args []string, definedFlags map[string]*flag.Flag) { + sources, context, path := preprocessArgsByToggle(args, false) + cmd := watchCmd{} + df := defaultsFiller{ + command: "watch", + from: *wcmd, to: &cmd, + rcSourcePath: context.AbsPathOf(path), + definedFlags: definedFlags, + } + + if err := fillWithDefaults(df); err != nil { + exitWithError(err) + } + + options := &drive.Options{ + Path: path, + Sources: sources, + } + + watchReq := &drive.WatchChannel{ + Address: *wcmd.HostAddress, + } + + watchesMap, err := drive.New(context, options).FileWatch(watchReq) + exitWithError(err) + + cleanUp := func() error { + for key, watch := range watchesMap { + watch.Cancel <- struct{}{} + log.Printf("%s cancelled..", key) + } + return nil + } + + // In the case of Ctrl-C, let's ensure that we cleanup + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + go func() { + _ = <-c + cleanUp() + }() + + working := true + for working { + for key, watch := range watchesMap { + select { + case wres, stillAlive := <-watch.ResponseChan: + if !stillAlive { + working = false + goto nextCycle + } + log.Printf("key: %s wres: %#v\n", key, wres) + } + } + + nextCycle: + } +} + type pushCmd struct { NoClobber *bool `json:"no-clobber"` Hidden *bool `json:"hidden"` diff --git a/drive-server/main.go b/drive-server/main.go index 276b5439..6aa69531 100644 --- a/drive-server/main.go +++ b/drive-server/main.go @@ -16,6 +16,7 @@ package main import ( "fmt" + "log" "net/http" "os" "time" @@ -73,6 +74,8 @@ func (ai *addressInfo) ConnectionString() string { return fmt.Sprintf("%s:%s", ai.host, ai.port) } +type fileWatchInfo map[string]interface{} + func main() { if envKeySet.PublicKey == "" { errorPrint("publicKey not set. Please set %s in your env.\n", envKeyAlias.PubKeyAlias) @@ -87,11 +90,15 @@ func main() { m := martini.Classic() m.Get("/qr", binding.Bind(meddler.Payload{}), presentQRCode) - m.Post("/qr", binding.Bind(meddler.Payload{}), presentQRCode) + m.Post("/watch-files", binding.Bind(fileWatchInfo{}), handleFileChanges) m.RunOnAddr(envAddrInfo.ConnectionString()) } +func handleFileChanges(fwi fileWatchInfo, res http.ResponseWriter, req *http.Request) { + log.Printf("fwi: %#v", fwi) +} + func presentQRCode(pl meddler.Payload, res http.ResponseWriter, req *http.Request) { if pl.PublicKey != envKeySet.PublicKey { http.Error(res, "invalid publickey", 405) diff --git a/src/realtime.go b/src/realtime.go new file mode 100644 index 00000000..e2ee6a2f --- /dev/null +++ b/src/realtime.go @@ -0,0 +1,86 @@ +// Copyright 2017 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package drive + +import ( + uuid "github.com/odeke-em/go-uuid" + + drive "google.golang.org/api/drive/v2" +) + +type Watch struct { + Cancel chan<- struct{} + ResponseChan chan *WatchResponse + Filepath string + File *File +} + +type WatchChannel drive.Channel + +func (c *Commands) FileWatch(wreq *WatchChannel) (map[string]*Watch, error) { + watchesMapping := make(map[string]*Watch) + + // cleanUp is invoked when we encounter errors. + cleanUp := func() error { + for _, watch := range watchesMapping { + watch.Cancel <- struct{}{} + } + + // Now explicitly set it to nil to discard any content. + watchesMapping = nil + return nil + } + + for _, relToRootPath := range c.opts.Sources { + f, err := c.rem.FindByPath(relToRootPath) + if err != nil { + if err == ErrPathNotExists { + // non-existent files are excusible + continue + } + cleanUp() + return nil, err + } + + castReq := drive.Channel(*wreq) + if castReq.Id == "" { + castReq.Id = uuid.NewRandom().String() + } + + // Otherwise now let's set up this fileId for watching + cancel := make(chan struct{}) + wreq := &WatchRequest{ + FileId: f.Id, + Cancel: cancel, + Request: &castReq, + } + + reschan, err := c.rem.watchForChanges(wreq) + if err != nil { + cleanUp() + return nil, err + } + + watchesMapping[f.Id] = &Watch{ + Cancel: cancel, + File: f, + Filepath: relToRootPath, + + ResponseChan: reschan, + } + } + + return watchesMapping, nil +} diff --git a/src/remote.go b/src/remote.go index 6653e62d..1c18feaa 100644 --- a/src/remote.go +++ b/src/remote.go @@ -17,6 +17,7 @@ package drive import ( "fmt" "io" + "log" "math/rand" "net/http" "net/url" @@ -1189,3 +1190,63 @@ func newOAuthClient(configContext *config.Context) *http.Client { return config.Client(context.Background(), &token) } + +type WatchRequest struct { + FileId string + Request *drive.Channel + + Cancel chan struct{} +} + +type WatchResponse struct { + Channel *drive.Channel + + Err error + Cancel chan struct{} +} + +func (rem *Remote) watchForChanges(wreq *WatchRequest) (chan *WatchResponse, error) { + watchChan := make(chan *WatchResponse) + + go func() { + defer close(watchChan) + + if wreq.Request.Kind == "" { + wreq.Request.Kind = "api#channel" + } + if wreq.Request.Type == "" { + wreq.Request.Type = "web_hook" + } + + watchCall := rem.service.Files.Watch(wreq.FileId, wreq.Request) + cancel := wreq.Cancel + + working := true + var pageToken string + throttle := time.NewTicker(1e8) + for working { + select { + case <-cancel: + working = false + break + default: + wchan, err := watchCall.Do() + fmt.Printf("wchan: %#v err: %v\n", wchan, err) + watchChan <- &WatchResponse{Err: err, Channel: wchan} + + log.Printf("token: %s\n", pageToken) + if wchan != nil { + pageToken = wchan.Token + } + + if pageToken == "" { + working = false + break + } + <-throttle.C + } + } + }() + + return watchChan, nil +}