Skip to content

Commit

Permalink
ssh write
Browse files Browse the repository at this point in the history
  • Loading branch information
jacokoo committed Feb 21, 2019
1 parent c9c303b commit a3ff949
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 7 deletions.
1 change: 1 addition & 0 deletions model/file_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func (dd *defaultDirOp) write(root string, item FileItem) ([]Task, error) {
eh <- err
return
}
defer w.Close()

buf := make([]byte, 4096)
pg := 0
Expand Down
271 changes: 264 additions & 7 deletions model/ssh_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -65,9 +67,9 @@ var (
// wheel // root // a1ed // 1512168297 // 11 // 'var' -> 'private/var'
"Darwin": func(sc *sshc, path string, dir bool) (io.Reader, error) {
if !dir {
return sc.exec(`stat -f "%Sg // %Su // %Xp // %m // %z // %N’ -> %Y’" ` + path)
return sc.exec(`stat -f "%Sg // %Su // %Xp // %m // %z // %N’ -> %Y’" ` + path)
}
return sc.exec("cd " + path + `; stat -f "%Sg // %Su // %Xp // %m // %z // %N’ -> %Y’" .* *`)
return sc.exec("cd " + path + `; stat -f "%Sg // %Su // %Xp // %m // %z // %N’ -> %Y’" .* *`)
},
}
)
Expand All @@ -94,7 +96,10 @@ func (sc *sshc) readDir(pp string) ([]FileItem, error) {
if !ok {
return nil, sc.error("target os is not supported")
}
buf, _ := fn(sc, pp, true)
buf, err := fn(sc, pp, true)
if buf == nil {
return nil, err
}
scan := bufio.NewScanner(buf)
its := make([]FileItem, 0)
for scan.Scan() {
Expand All @@ -120,7 +125,10 @@ func (sc *sshc) readFile(pp string) (FileItem, error) {
if !ok {
return nil, sc.error("target os is not supported")
}
buf, _ := fn(sc, pp, false)
buf, err := fn(sc, pp, false)
if buf == nil {
return nil, err
}
scan := bufio.NewScanner(buf)
its := make([]FileItem, 0)
for scan.Scan() {
Expand Down Expand Up @@ -326,15 +334,264 @@ func (sd *sshdir) Read() ([]FileItem, error) {
return sd.sshc.readDir(sd.ipath)
}

func (sd *sshdir) Write([]FileItem) (Task, error) {
return nil, nil
func (sd *sshdir) write(item FileItem, root string) ([]Task, error) {
if item.IsDir() {
return sd.writeDir(item, root)
}

if sdd, ok := item.(*sshfile); ok && sdd.sshc == sd.sshc {
return sd.writeSameHost(sdd, root)
}

return []Task{NewTask(item.Name(), func(progress chan<- int, quit <-chan bool, eh chan<- error) {
defer close(progress)
defer close(eh)

r, err := item.(FileOp).Reader()
if err != nil {
eh <- err
return
}
defer r.Close()

// TODO buggy when in windows, use different loader
rel, err := filepath.Rel(root, item.Path())
if err != nil {
eh <- err
return
}

err = sd.NewDir(filepath.Dir(filepath.Dir(rel)))
if err != nil {
eh <- err
return
}
nsd, err := sd.To(filepath.Dir(rel))
if err != nil {
eh <- err
return
}

err = nsd.(DirOp).NewFile(filepath.Base(rel))
if err != nil {
eh <- err
return
}

nf, err := nsd.(DirOp).To(filepath.Base(rel))
if err != nil {
eh <- err
return
}

w, err := nf.(FileOp).Writer(0)
if err != nil {
eh <- err
return
}
defer w.Close()

buf := make([]byte, 4096)
pg := 0
si := float64(item.Size())

var count int64
var quited = false

go func() {
<-quit
quited = true
}()

for !quited {
n, err := r.Read(buf)
if n > 0 {
_, err2 := w.Write(buf[:n])
if err2 != nil {
eh <- err2
return
}

count += int64(n)
pp := int(float64(count) / si * 100)
if pp > pg {
pg = pp
progress <- pg
}
}
if err == io.EOF {
break
}

if err != nil {
eh <- err
return
}
}
})}, nil
}

func (sd *sshdir) findPid(cmd, cmdstring string) (int, error) {
buf, err := sd.sshc.execf(`ps -eo pid,comm,args | grep "%s"`, cmdstring)
if err != nil {
return 0, nil
}

re := regexp.MustCompile(`^(\d+)\s+([^\s]+)`)
for _, v := range strings.Split(buf.String(), "\n") {
ss := re.FindStringSubmatch(v)
if len(ss) != 3 {
continue
}
if ss[2] == cmd {
return strconv.Atoi(ss[1])
}
}
return 0, errors.New("not found")
}

func (sd *sshdir) writeSameHost(item *sshfile, root string) ([]Task, error) {
return []Task{NewTask(item.Name(), func(progress chan<- int, quit <-chan bool, eh chan<- error) {
defer close(progress)
defer close(eh)

rel, err := filepath.Rel(root, item.Path())
if err != nil {
eh <- err
return
}
_, err = sd.sshc.execf("ls %s", filepath.Join(sd.ipath, rel))
if err == nil {
eh <- errors.New("file already exists")
return
}

err = sd.NewDir(filepath.Dir(rel))
if err != nil {
eh <- err
return
}

se, err := sd.sshc.conn.NewSession()
if err != nil {
eh <- err
return
}
defer se.Close()

out, err := se.StderrPipe()
if err != nil {
eh <- err
return
}

ended := false
go func() {
sc := bufio.NewScanner(out)
re := regexp.MustCompile(`^\s*(\d+)\s+bytes.*(?:copied|transferred)`)
size := float64(item.Size())
pg := 0
for sc.Scan() {
// linux: kill -USR1 ##### 2109121536 bytes (2.1 GB) copied, 23.7728 s, 88.7 MB/s
// drawin: kill -INFO ##### 707673088 bytes transferred in 10.532818 secs (67187442 bytes/sec)
subs := re.FindStringSubmatch(sc.Text())
if len(subs) != 2 {
continue
}

copied, err := strconv.ParseInt(subs[1], 10, 64)
if err != nil {
continue
}

pp := int(float64(copied) / size * 100)
if !ended && pp > pg {
pg = pp
progress <- pp
}
}
}()

cmds := fmt.Sprintf("dd if=%s of=%s", item.ipath, filepath.Join(sd.ipath, rel))
err = se.Start(cmds)
if err != nil {
eh <- err
return
}

pid, err := sd.findPid("dd", cmds)
if err != nil {
eh <- err
return
}

endch := make(chan bool)
go func() {
for {
select {
case <-quit:
sd.sshc.execf("kill -TERM %d", pid)
case <-time.After(2 * time.Second):
if sd.sshc.os == "Linux" {
sd.sshc.execf("kill -USR1 %d", pid)
} else {
sd.sshc.execf("kill -INFO %d", pid)
}
case <-endch:
return
}
}
}()

err = se.Wait()
if err != nil {
eh <- err
}
endch <- true
ended = true
})}, nil
}

func (sd *sshdir) writeDir(item FileItem, root string) ([]Task, error) {
its, err := item.(DirOp).Read()
if err != nil {
return nil, err
}

re := make([]Task, 0)
for _, v := range its {
ts, err := sd.write(v, root)
if err != nil {
return nil, err
}

re = append(re, ts...)
}
return re, nil
}

func (sd *sshdir) Write(items []FileItem) (Task, error) {
re := make([]Task, 0)
for _, v := range items {
ts, err := sd.write(v, filepath.Dir(v.Path()))
if err != nil {
return nil, err
}
re = append(re, ts...)
}
return NewBatchTask("Copy", re), nil
}

func (sd *sshdir) Move([]FileItem) error {
return sd.sshc.error("move is not supported")
}
func (sd *sshdir) NewFile(name string) error {
_, err := sd.sshc.execf("touch %s", path.Join(sd.ipath, name))
p := path.Join(sd.ipath, name)
_, err := sd.sshc.execf("ls %s", p)
if err == nil {
return errors.New("file already exists")
}
_, err = sd.sshc.execf("touch %s", path.Join(sd.ipath, name))
return err
}
func (sd *sshdir) NewDir(name string) error {
Expand Down

0 comments on commit a3ff949

Please sign in to comment.