Skip to content

Commit

Permalink
Add support for truncate operations
Browse files Browse the repository at this point in the history
  • Loading branch information
junjieqian authored and colinmarc committed Feb 9, 2022
1 parent 50243c4 commit 2f11406
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 0 deletions.
3 changes: 3 additions & 0 deletions cmd/hdfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Valid commands:
getmerge SOURCE DEST
put SOURCE DEST
df [-h]
truncate SIZE FILE
`, os.Args[0])

lsOpts = getopt.New()
Expand Down Expand Up @@ -147,6 +148,8 @@ func main() {
case "df":
dfOpts.Parse(argv)
df(*dfh)
case "truncate":
truncate(argv[1:])
// it's a seeeeecret command
case "complete":
complete(argv)
Expand Down
28 changes: 28 additions & 0 deletions cmd/hdfs/test/truncate.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env bats

load helper

setup() {
$HDFS mkdir -p /_test_cmd/truncate
$HDFS touch /_test_cmd/truncate/a
}

@test "truncate larger" {
run $HDFS truncate 10 /_test_cmd/a
assert_failure
}

@test "truncate nonexistent" {
run $HDSF truncate 10 /_test_cmd/nonexistent
assert_failure
}

@test "truncate" {
run $HDFS put $ROOT_TEST_DIR/testdata/foo.txt /_test_cmd/truncate/1
run $HDFS truncate 2 /_test_cmd/truncate/1
assert_success
}

teardown() {
$HDFS rm -r /_test_cmd/truncate
}
26 changes: 26 additions & 0 deletions cmd/hdfs/truncate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package main

import (
"strconv"
)

func truncate(args []string) {
if len(args) != 2 {
fatalWithUsage()
}

size, err := strconv.ParseInt(args[0], 10, 64)
if err != nil {
fatal(err)
}

client, err := getClient("")
if err != nil {
fatal(err)
}

_, err = client.Truncate(args[1], size)
if err != nil {
fatal(err)
}
}
3 changes: 3 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
pathIsNotEmptyDirException = "org.apache.hadoop.fs.PathIsNotEmptyDirectoryException"
fileAlreadyExistsException = "org.apache.hadoop.fs.FileAlreadyExistsException"
alreadyBeingCreatedException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException"
illegalArgumentException = "org.apache.hadoop.HadoopIllegalArgumentException"
)

// Error represents a remote java exception from an HDFS namenode or datanode.
Expand Down Expand Up @@ -50,6 +51,8 @@ func interpretException(err error) error {
return syscall.ENOTEMPTY
case fileAlreadyExistsException:
return os.ErrExist
case illegalArgumentException:
return os.ErrInvalid
default:
return err
}
Expand Down
31 changes: 31 additions & 0 deletions truncate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package hdfs

import (
"errors"
"os"

hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
"google.golang.org/protobuf/proto"
)

// Truncate truncates the file specified by name to the given size, and returns
// the status any error encountered. The returned status will false in the case
// of any error or, if the error is nil, if HDFS indicated that the operation
// will be performed asynchronously and is not yet complete.
func (c *Client) Truncate(name string, size int64) (bool, error) {
req := &hdfs.TruncateRequestProto{
Src: proto.String(name),
NewLength: proto.Uint64(uint64(size)),
ClientName: proto.String(c.namenode.ClientName),
}
resp := &hdfs.TruncateResponseProto{}

err := c.namenode.Execute("truncate", req, resp)
if err != nil {
return false, &os.PathError{"truncate", name, interpretException(err)}
} else if resp.Result == nil {
return false, &os.PathError{"truncate", name, errors.New("unexpected empty response")}
}

return resp.GetResult(), nil
}
152 changes: 152 additions & 0 deletions truncate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package hdfs

import (
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func waitTruncate(t *testing.T, client *Client, name string, size int64) {
done, err := client.Truncate(name, size)
require.NoError(t, err)

var stat os.FileInfo
if !done {
for i := 0; i < 5; i++ {
stat, err = client.Stat(name)
require.NoError(t, err)

if stat.Size() == size {
break
}

time.Sleep(500 * time.Millisecond)
t.Log("Waiting for truncate to finish")
}
} else {
stat, err = client.Stat(name)
require.NoError(t, err)
}

assert.EqualValues(t, size, stat.Size())
}

func TestTruncate(t *testing.T) {
client := getClient(t)

baleet(t, "/_test/truncate/1.txt")
mkdirp(t, "/_test/truncate")
f, err := client.Create("/_test/truncate/1.txt")
require.NoError(t, err)

n, err := f.Write([]byte("foobar\nfoobar\n"))
assert.Equal(t, 14, n)
require.NoError(t, err)

assertClose(t, f)

waitTruncate(t, client, "/_test/truncate/1.txt", 4)
}

func TestTruncateToZero(t *testing.T) {
client := getClient(t)

baleet(t, "/_test/truncate/2.txt")
mkdirp(t, "/_test/truncate")
f, err := client.Create("/_test/truncate/2.txt")
require.NoError(t, err)

n, err := f.Write([]byte("foobarbaz"))
assert.Equal(t, 9, n)
require.NoError(t, err)

assertClose(t, f)

waitTruncate(t, client, "/_test/truncate/2.txt", 0)
}

func TestTruncateSizeTooBig(t *testing.T) {
client := getClient(t)

baleet(t, "/_test/truncate/3.txt")
mkdirp(t, "/_test/truncate")

f, err := client.Create("/_test/truncate/3.txt")
require.NoError(t, err)

n, err := f.Write([]byte("foo"))
assert.Equal(t, 3, n)
require.NoError(t, err)

assertClose(t, f)

done, err := client.Truncate("/_test/truncate/3.txt", 100)
assert.False(t, done)
assertPathError(t, err, "truncate", "/_test/truncate/3.txt", os.ErrInvalid)
}

func TestTruncateSizeNegative(t *testing.T) {
client := getClient(t)

baleet(t, "/_test/truncate/4.txt")
mkdirp(t, "/_test/truncate")

f, err := client.Create("/_test/truncate/4.txt")
require.NoError(t, err)

n, err := f.Write([]byte("foo"))
assert.Equal(t, 3, n)
require.NoError(t, err)

assertClose(t, f)

done, err := client.Truncate("/_test/truncate/4.txt", -10)
assert.False(t, done)
assertPathError(t, err, "truncate", "/_test/truncate/4.txt", os.ErrInvalid)
}

func TestTruncateNoExist(t *testing.T) {
client := getClient(t)

done, err := client.Truncate("/_test/nonexistent", 100)
assert.False(t, done)
assertPathError(t, err, "truncate", "/_test/nonexistent", os.ErrNotExist)
}

func TestTruncateDir(t *testing.T) {
client := getClient(t)

mkdirp(t, "/_test/truncate")

done, err := client.Truncate("/_test/truncate", 100)
assert.False(t, done)
assertPathError(t, err, "truncate", "/_test/truncate", os.ErrNotExist)
}

func TestTruncateWithoutPermission(t *testing.T) {
client := getClient(t)
client2 := getClientForUser(t, "gohdfs2")

baleet(t, "/_test/truncate/5.txt")
mkdirp(t, "/_test/truncate")

f, err := client.Create("/_test/truncate/5.txt")
require.NoError(t, err)

n, err := f.Write([]byte("barbar"))
assert.Equal(t, 6, n)
require.NoError(t, err)

assertClose(t, f)

done, err := client2.Truncate("/_test/truncate/5.txt", 1)
assert.False(t, done)
assertPathError(t, err, "truncate", "/_test/truncate/5.txt", os.ErrPermission)

stat, err := client.Stat("/_test/truncate/5.txt")
require.NoError(t, err)
assert.EqualValues(t, 6, stat.Size())
}

0 comments on commit 2f11406

Please sign in to comment.