Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/cmd/paddlecloud/paddlecloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func main() {
subcommands.Register(&paddlecloud.LogsCommand{}, "")
subcommands.Register(&paddlecloud.GetCommand{}, "")
subcommands.Register(&paddlecloud.KillCommand{}, "")
subcommands.Register(&paddlecloud.SimpleFileCmd{}, "")

flag.Parse()
ctx := context.Background()
Expand Down
22 changes: 9 additions & 13 deletions go/paddlecloud/restclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func makeRequest(uri string, method string, body io.Reader,
for k, v := range authHeader {
req.Header.Set(k, v)
}

if query != nil {
req.URL.RawQuery = query.Encode()
}
Expand Down Expand Up @@ -97,41 +96,38 @@ func DeleteCall(targetURL string, jsonString []byte) ([]byte, error) {
}

// PostFile make a POST call to HTTP server to upload a file.
func PostFile(targetURL string, filename string) ([]byte, error) {
func PostFile(targetURL string, filename string, query url.Values) ([]byte, error) {
bodyBuf := &bytes.Buffer{}
bodyWriter := multipart.NewWriter(bodyBuf)

// this step is very important
fileWriter, err := bodyWriter.CreateFormFile("uploadfile", filename)
fileWriter, err := bodyWriter.CreateFormFile("file", filename)
if err != nil {
fmt.Fprintf(os.Stderr, "error writing to buffer: %v\n", err)
return []byte{}, err
}

// open file handle
fh, err := os.Open(filename)
defer fh.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "error opening file: %v\n", err)
return []byte{}, err
}

//iocopy
_, err = io.Copy(fileWriter, fh)
if err != nil {
return []byte{}, err
}

contentType := bodyWriter.FormDataContentType()
bodyWriter.Close()
if err = bodyWriter.Close(); err != nil {
return []byte{}, err
}

req, err := makeRequestToken(targetURL, "POST", bodyBuf, contentType, nil)
req, err := makeRequestToken(targetURL, "POST", bodyBuf, contentType, query)
if err != nil {
return []byte{}, err
}
return getResponse(req)
}

// PostChunkData makes a POST call to HTTP server to upload chunkdata.
// PostChunk makes a POST call to HTTP server to upload chunkdata.
func PostChunk(targetURL string,
chunkName string, reader io.Reader, len int64, boundary string) ([]byte, error) {
body := &bytes.Buffer{}
Expand Down Expand Up @@ -161,7 +157,7 @@ func PostChunk(targetURL string,
return getResponse(req)
}

// GetChunkData makes a GET call to HTTP server to download chunk data.
// GetChunk makes a GET call to HTTP server to download chunk data.
func GetChunk(targetURL string,
query url.Values) (*http.Response, error) {
req, err := makeRequestToken(targetURL, "GET", nil, "", query)
Expand Down
127 changes: 127 additions & 0 deletions go/paddlecloud/simplefile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package paddlecloud

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net/url"
"os"
"path"

"github.com/google/subcommands"
)

// SimpleFileCmd define the subcommand of simple file operations.
type SimpleFileCmd struct {
}

// Name is subcommands name.
func (*SimpleFileCmd) Name() string { return "file" }

// Synopsis is subcommands synopsis.
func (*SimpleFileCmd) Synopsis() string { return "Simple file operations." }

// Usage is subcommands Usage.
func (*SimpleFileCmd) Usage() string {
return `file [put|get] <src> <dst>:
Options:
`
}

// SetFlags registers subcommands flags.
func (p *SimpleFileCmd) SetFlags(f *flag.FlagSet) {
}

// Execute file ops.
func (p *SimpleFileCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
if f.NArg() < 1 || f.NArg() > 3 {
f.Usage()
return subcommands.ExitFailure
}
switch f.Arg(0) {
case "put":
err := putFile(f.Arg(1), f.Arg(2))
if err != nil {
fmt.Fprintf(os.Stderr, "put file error: %s\n", err)
return subcommands.ExitFailure
}
case "get":
err := getFile(f.Arg(1), f.Arg(2))
if err != nil {
fmt.Fprintf(os.Stderr, "get file error: %s\n", err)
return subcommands.ExitFailure
}
default:
f.Usage()
return subcommands.ExitFailure
}
return subcommands.ExitSuccess
}

func putFile(src string, dest string) error {
query := url.Values{}
_, srcFile := path.Split(src)
destDir, destFile := path.Split(dest)
var destFullPath string
if len(destFile) == 0 {
destFullPath = path.Join(destDir, srcFile)
} else {
destFullPath = dest
}
query.Set("path", destFullPath)
respStr, err := PostFile(config.ActiveConfig.Endpoint+"/api/v1/file/", src, query)
if err != nil {
return err
}
var respObj interface{}
if err = json.Unmarshal(respStr, &respObj); err != nil {
return err
}
// FIXME: Print an error if error message is not empty. Use response code instead
Copy link
Collaborator

@gongweibao gongweibao Jun 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is response code? Is it a TODO?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a enhancement to uniform the response json format, see this issue: #129

errMsg := respObj.(map[string]interface{})["msg"].(string)
if len(errMsg) > 0 {
fmt.Fprintf(os.Stderr, "upload file error: %s\n", errMsg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return an error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return error will break the filepath.Walk call. Will fix this after the above issue is done.

}
return nil
}

func getFile(src string, dest string) error {
query := url.Values{}
query.Set("path", src)
req, err := makeRequestToken(config.ActiveConfig.Endpoint+"/api/v1/file/", "GET", nil, "", query)
if err != nil {
return err
}
resp, err := httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.Status != HTTPOK {
return errors.New("server error: " + resp.Status)
}
_, srcFile := path.Split(src)
destDir, destFile := path.Split(dest)
var destFullPath string
if len(destFile) == 0 {
destFullPath = path.Join(destDir, srcFile)
} else {
destFullPath = dest
}
if _, err = os.Stat(destFullPath); err == nil {
return errors.New("file already exist: " + destFullPath)
}
out, err := os.Create(destFullPath)
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
return nil
}
54 changes: 37 additions & 17 deletions go/paddlecloud/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package paddlecloud
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"os"
"path"
"path/filepath"

"github.com/golang/glog"
"github.com/google/subcommands"
)

// SubmitCmd define the subcommand of submitting paddle training jobs
// SubmitCmd define the subcommand of submitting paddle training jobs.
type SubmitCmd struct {
Jobname string `json:"name"`
Jobpackage string `json:"jobPackage"`
Expand All @@ -29,21 +31,21 @@ type SubmitCmd struct {
Passes int `json:"passes"`
}

// Name is subcommands name
// Name is subcommands name.
func (*SubmitCmd) Name() string { return "submit" }

// Synopsis is subcommands synopsis
// Synopsis is subcommands synopsis.
func (*SubmitCmd) Synopsis() string { return "Submit job to PaddlePaddle Cloud." }

// Usage is subcommands Usage
// Usage is subcommands Usage.
func (*SubmitCmd) Usage() string {
return `submit [options] <package path>:
Submit job to PaddlePaddle Cloud.
Options:
`
}

// SetFlags registers subcommands flags
// SetFlags registers subcommands flags.
func (p *SubmitCmd) SetFlags(f *flag.FlagSet) {
f.StringVar(&p.Jobname, "jobname", "paddle-cluster-job", "Cluster job name.")
f.IntVar(&p.Parallelism, "parallelism", 1, "Number of parrallel trainers. Defaults to 1.")
Expand All @@ -53,18 +55,18 @@ func (p *SubmitCmd) SetFlags(f *flag.FlagSet) {
f.IntVar(&p.Pservers, "pservers", 0, "Number of parameter servers. Defaults equal to -p")
f.IntVar(&p.PSCPU, "pscpu", 1, "Parameter server CPU resource. Defaults to 1.")
f.StringVar(&p.PSMemory, "psmemory", "1Gi", "Parameter server momory resource. Defaults to 1Gi.")
f.StringVar(&p.Entry, "entry", "paddle train", "Command of starting trainer process. Defaults to paddle train")
f.StringVar(&p.Entry, "entry", "", "Command of starting trainer process. Defaults to paddle train")
f.StringVar(&p.Topology, "topology", "", "Will Be Deprecated .py file contains paddle v1 job configs")
f.IntVar(&p.Passes, "passes", 1, "Pass count for training job")
}

// Execute submit command
// Execute submit command.
func (p *SubmitCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
if f.NArg() != 1 {
f.Usage()
return subcommands.ExitFailure
}
// default pservers count equals to trainers count
// default pservers count equals to trainers count.
if p.Pservers == 0 {
p.Pservers = p.Parallelism
}
Expand All @@ -81,32 +83,50 @@ func (p *SubmitCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}
return subcommands.ExitSuccess
}

// Submitter submit job to cloud
// Submitter submit job to cloud.
type Submitter struct {
args *SubmitCmd
}

// NewSubmitter returns a submitter object
// NewSubmitter returns a submitter object.
func NewSubmitter(cmd *SubmitCmd) *Submitter {
s := Submitter{cmd}
return &s
}

// Submit current job
// Submit current job.
func (s *Submitter) Submit(jobPackage string) error {
// 1. upload user job package to pfs
filepath.Walk(jobPackage, func(path string, info os.FileInfo, err error) error {
glog.V(10).Infof("Uploading %s...\n", path)
return nil
//return postFile(path, config.activeConfig.endpoint+"/api/v1/files")
err := filepath.Walk(jobPackage, func(filePath string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
glog.V(10).Infof("Uploading %s...\n", filePath)
dest := "/" + path.Join("pfs", config.ActiveConfig.Name, "home", config.ActiveConfig.Username, filePath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dest := "/" + path.Join("pfs", config.ActiveConfig.Name, ...
==>
dest := path.Join("/pfs", config.ActiveConfig.Name, ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Done.

fmt.Printf("uploading: %s...\n", filePath)
return putFile(filePath, dest)
})
if err != nil {
return err
}
// 2. call paddlecloud server to create kubernetes job
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talk offline. OK!

jsonString, err := json.Marshal(s.args)
if err != nil {
return err
}
glog.V(10).Infof("Submitting job: %s to %s\n", jsonString, config.ActiveConfig.Endpoint+"/api/v1/jobs")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to specify /api/v1/jobs to be constant?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

REST APIs are currently all v1, change this when we have v2

respBody, err := PostCall(config.ActiveConfig.Endpoint+"/api/v1/jobs/", jsonString)
glog.V(10).Infof("got return body size: %d", len(respBody))
return err
if err != nil {
return err
}
var respObj interface{}
if err = json.Unmarshal(respBody, &respObj); err != nil {
return err
}
// FIXME: Return an error if error message is not empty. Use response code instead
errMsg := respObj.(map[string]interface{})["msg"].(string)
if len(errMsg) > 0 {
return errors.New(errMsg)
}
return nil
}
1 change: 1 addition & 0 deletions paddlecloud/paddlecloud/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
url(r"^api/v1/logs/", paddlejob.views.LogsView.as_view()),
url(r"^api/v1/workers/", paddlejob.views.WorkersView.as_view()),
url(r"^api/v1/quota/", paddlejob.views.QuotaView.as_view()),
url(r"^api/v1/file/", paddlejob.views.SimpleFileView.as_view()),
]

urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
Expand Down
5 changes: 5 additions & 0 deletions paddlecloud/paddlejob/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from rest_framework import viewsets, generics, permissions
from rest_framework.response import Response
from rest_framework.views import APIView
import logging
first_cap_re = re.compile('(.)([A-Z][a-z]+)')
all_cap_re = re.compile('([a-z0-9])([A-Z])')
def simple_response(code, msg):
Expand All @@ -12,6 +13,10 @@ def simple_response(code, msg):
"msg": msg
})

def error_message_response(msg):
logging.error("error: %s", msg)
return Response({"msg": msg})

def convert_camel2snake(data):
s1 = first_cap_re.sub(r'\1_\2', name)
return all_cap_re.sub(r'\1_\2', s1).lower()
Loading