Skip to content

Commit

Permalink
write out any attachment preview to the fetcher cache on upload CORE-…
Browse files Browse the repository at this point in the history
…8361 (keybase#13039)

* wip

* test

* run with fake S3 params
  • Loading branch information
mmaxim authored Jul 30, 2018
1 parent 21b4125 commit 59597df
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 49 deletions.
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ services:
- INSECURE_TLS_MODE=1
- GREGOR_TLFAUTH_PRIVATE_SIGNING_KEY=e20589b8cd66d447aaee44b587305bd521f34f3085709b32b4e3bd479b20253e59ea153c88a8ea524d39e0ae58fa195749214b38a28fdb4229ba3390b2d33e86
- GREGOR_TLFAUTH_PUBLIC_SIGNING_KEY=012059ea153c88a8ea524d39e0ae58fa195749214b38a28fdb4229ba3390b2d33e860a
- CHAT_S3_BUCKET=test
- CHAT_S3_ACCESS_KEY=test
- CHAT_S3_SECRET_KEY=test

logging:
driver: json-file
options:
Expand Down
45 changes: 15 additions & 30 deletions go/chat/attachment_httpsrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,6 @@ import (
"golang.org/x/net/context"
)

type DummyAttachmentFetcher struct{}

func (d DummyAttachmentFetcher) FetchAttachment(ctx context.Context, w io.Writer,
convID chat1.ConversationID, asset chat1.Asset, r func() chat1.RemoteInterface, signer s3.Signer,
progress types.ProgressReporter) error {
return nil
}

func (d DummyAttachmentFetcher) DeleteAssets(ctx context.Context,
convID chat1.ConversationID, assets []chat1.Asset, ri func() chat1.RemoteInterface, signer s3.Signer) (err error) {
return nil
}

type DummyAttachmentHTTPSrv struct{}

func (d DummyAttachmentHTTPSrv) GetURL(ctx context.Context, convID chat1.ConversationID, msgID chat1.MessageID,
preview bool) string {
return ""
}

func (d DummyAttachmentHTTPSrv) GetPendingPreviewURL(ctx context.Context, outboxID chat1.OutboxID) string {
return ""
}

func (d DummyAttachmentHTTPSrv) GetAttachmentFetcher() types.AttachmentFetcher {
return DummyAttachmentFetcher{}
}

var blankProgress = func(bytesComplete, bytesTotal int64) {}

type AttachmentHTTPSrv struct {
Expand Down Expand Up @@ -286,6 +258,10 @@ func (r *RemoteAttachmentFetcher) DeleteAssets(ctx context.Context,
return nil
}

func (r *RemoteAttachmentFetcher) PutUploadedAsset(ctx context.Context, filename string, asset chat1.Asset) error {
return nil
}

type attachmentRemoteStore interface {
DecryptAsset(ctx context.Context, w io.Writer, body io.Reader, asset chat1.Asset,
progress types.ProgressReporter) error
Expand Down Expand Up @@ -418,20 +394,29 @@ func (c *CachingAttachmentFetcher) FetchAttachment(ctx context.Context, w io.Wri
return err
}

// commit to the on disk LRU
return c.putFileInLRU(ctx, fileWriter.Name(), asset)
}

func (c *CachingAttachmentFetcher) putFileInLRU(ctx context.Context, filename string, asset chat1.Asset) error {
// Add an entry to the disk LRU mapping the asset path to the local path, and remove
// the remnants of any evicted attachments.
evicted, err := c.diskLRU.Put(ctx, c.G(), c.cacheKey(asset), fileWriter.Name())
evicted, err := c.diskLRU.Put(ctx, c.G(), c.cacheKey(asset), filename)
if err != nil {
return err
}
if evicted != nil {
path := evicted.Value.(string)
os.Remove(path)
}

return nil
}

func (c *CachingAttachmentFetcher) PutUploadedAsset(ctx context.Context, filename string, asset chat1.Asset) (err error) {
defer c.Trace(ctx, func() error { return err }, "PutUploadedAsset")()
return c.putFileInLRU(ctx, filename, asset)
}

func (c *CachingAttachmentFetcher) DeleteAssets(ctx context.Context,
convID chat1.ConversationID, assets []chat1.Asset, ri func() chat1.RemoteInterface, signer s3.Signer) (err error) {
defer c.Trace(ctx, func() error { return err }, "DeleteAssets")()
Expand Down
58 changes: 58 additions & 0 deletions go/chat/attachment_httpsrv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"testing"
"time"

"github.com/keybase/client/go/chat/attachments"
"github.com/keybase/client/go/chat/utils"
"github.com/keybase/client/go/libkb"
"github.com/keybase/client/go/logger"

"github.com/keybase/client/go/chat/s3"
"github.com/keybase/client/go/chat/types"
"github.com/keybase/client/go/protocol/chat1"
"github.com/keybase/client/go/protocol/gregor1"
"github.com/keybase/client/go/protocol/keybase1"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -170,3 +173,58 @@ func TestChatSrvAttachmentHTTPSrv(t *testing.T) {
_, err = os.Stat(localPath)
require.True(t, os.IsNotExist(err))
}

func TestChatSrvAttachmentUploadPreviewCached(t *testing.T) {
ctc := makeChatTestContext(t, "TestChatSrvAttachmentUploadPreviewCached", 1)
defer ctc.cleanup()
users := ctc.users()

defer func() {
useRemoteMock = true
}()
useRemoteMock = false
tc := ctc.world.Tcs[users[0].Username]
store := attachments.NewStoreTesting(logger.NewTestLogger(t), nil)
fetcher := NewCachingAttachmentFetcher(tc.Context(), store, 1)
ri := ctc.as(t, users[0]).ri
d, err := libkb.RandHexString("", 8)
require.NoError(t, err)
fetcher.tempDir = filepath.Join(os.TempDir(), d)

conv := mustCreateConversationForTest(t, ctc, users[0], chat1.TopicType_CHAT,
chat1.ConversationMembersType_IMPTEAMNATIVE)
tc.ChatG.AttachmentURLSrv = NewAttachmentHTTPSrv(tc.Context(),
fetcher, func() chat1.RemoteInterface { return mockSigningRemote{} })
uploader := attachments.NewUploader(tc.Context(), store, mockSigningRemote{},
func() chat1.RemoteInterface { return ri })
uploader.SetPreviewTempDir(fetcher.tempDir)
tc.ChatG.AttachmentUploader = uploader

res, err := ctc.as(t, users[0]).chatLocalHandler().PostFileAttachmentLocal(context.TODO(),
chat1.PostFileAttachmentLocalArg{
Arg: chat1.PostFileAttachmentArg{
ConversationID: conv.Id,
TlfName: conv.TlfName,
Visibility: keybase1.TLFVisibility_PRIVATE,
Filename: "testdata/ship.jpg",
Title: "SHIP",
},
})
require.NoError(t, err)

msgRes, err := ctc.as(t, users[0]).chatLocalHandler().GetMessagesLocal(context.TODO(),
chat1.GetMessagesLocalArg{
ConversationID: conv.Id,
MessageIDs: []chat1.MessageID{res.MessageID},
})
require.NoError(t, err)
require.Equal(t, 1, len(msgRes.Messages))
require.True(t, msgRes.Messages[0].IsValid())
body := msgRes.Messages[0].Valid().MessageBody
require.NotNil(t, body.Attachment().Preview)

found, path, err := fetcher.localAssetPath(context.TODO(), *body.Attachment().Preview)
require.NoError(t, err)
require.True(t, found)
t.Logf("found path: %s", path)
}
20 changes: 12 additions & 8 deletions go/chat/attachments/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (u *UploadTask) Nonce() signencrypt.Nonce {
}

type Store interface {
UploadAsset(ctx context.Context, task *UploadTask) (chat1.Asset, error)
UploadAsset(ctx context.Context, task *UploadTask, encryptedOut io.Writer) (chat1.Asset, error)
DownloadAsset(ctx context.Context, params chat1.S3Params, asset chat1.Asset, w io.Writer,
signer s3.Signer, progress types.ProgressReporter) error
DeleteAsset(ctx context.Context, params chat1.S3Params, signer s3.Signer, asset chat1.Asset) error
Expand All @@ -79,7 +79,7 @@ type S3Store struct {
blockLimit int // max number of blocks to upload
}

// NewStore creates a standard Store that uses a real
// NewS3Store creates a standard Store that uses a real
// S3 connection.
func NewS3Store(logger logger.Logger, runtimeDir string) *S3Store {
return &S3Store{
Expand All @@ -89,11 +89,11 @@ func NewS3Store(logger logger.Logger, runtimeDir string) *S3Store {
}
}

// newStoreTesting creates an Store suitable for testing
// NewStoreTesting creates an Store suitable for testing
// purposes. It is not exposed outside this package.
// It uses an in-memory s3 interface, reports enc/sig keys, and allows limiting
// the number of blocks uploaded.
func newStoreTesting(logger logger.Logger, kt func(enc, sig []byte)) *S3Store {
func NewStoreTesting(logger logger.Logger, kt func(enc, sig []byte)) *S3Store {
return &S3Store{
DebugLabeler: utils.NewDebugLabeler(logger, "Attachments.Store", false),
s3c: &s3.Mem{},
Expand All @@ -103,7 +103,7 @@ func newStoreTesting(logger logger.Logger, kt func(enc, sig []byte)) *S3Store {
}
}

func (a *S3Store) UploadAsset(ctx context.Context, task *UploadTask) (res chat1.Asset, err error) {
func (a *S3Store) UploadAsset(ctx context.Context, task *UploadTask, encryptedOut io.Writer) (res chat1.Asset, err error) {
defer a.Trace(ctx, func() error { return err }, "UploadAsset")()
// compute plaintext hash
if task.plaintextHash == nil {
Expand All @@ -128,7 +128,7 @@ func (a *S3Store) UploadAsset(ctx context.Context, task *UploadTask) (res chat1.
previous = a.previousUpload(ctx, task)
}

res, err = a.uploadAsset(ctx, task, enc, previous, resumable)
res, err = a.uploadAsset(ctx, task, enc, previous, resumable, encryptedOut)

// if the upload is aborted, reset the stream and start over to get new keys
if err == ErrAbortOnPartMismatch && previous != nil {
Expand All @@ -140,13 +140,14 @@ func (a *S3Store) UploadAsset(ctx context.Context, task *UploadTask) (res chat1.
if err := task.computePlaintextHash(); err != nil {
return res, err
}
return a.uploadAsset(ctx, task, enc, nil, resumable)
return a.uploadAsset(ctx, task, enc, nil, resumable, encryptedOut)
}

return res, err
}

func (a *S3Store) uploadAsset(ctx context.Context, task *UploadTask, enc *SignEncrypter, previous *AttachmentInfo, resumable bool) (asset chat1.Asset, err error) {
func (a *S3Store) uploadAsset(ctx context.Context, task *UploadTask, enc *SignEncrypter,
previous *AttachmentInfo, resumable bool, encryptedOut io.Writer) (asset chat1.Asset, err error) {
defer a.Trace(ctx, func() error { return err }, "uploadAsset")()
var encReader io.Reader
if previous != nil {
Expand Down Expand Up @@ -174,6 +175,9 @@ func (a *S3Store) uploadAsset(ctx context.Context, task *UploadTask, enc *SignEn
// compute ciphertext hash
hash := sha256.New()
tee := io.TeeReader(encReader, hash)
if encryptedOut != nil {
tee = io.TeeReader(tee, encryptedOut)
}

// post to s3
length := int64(enc.EncryptedLen(task.FileSize))
Expand Down
10 changes: 5 additions & 5 deletions go/chat/attachments/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestSignEncrypter(t *testing.T) {
}

func makeTestStore(t *testing.T, kt func(enc, sig []byte)) *S3Store {
return newStoreTesting(logger.NewTestLogger(t), kt)
return NewStoreTesting(logger.NewTestLogger(t), kt)
}

func testStoreMultis(t *testing.T, s *S3Store) []*s3.MemMulti {
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestUploadAssetSmall(t *testing.T) {
s := makeTestStore(t, nil)
ctx := context.Background()
plaintext, task := makeUploadTask(t, 1*MB)
a, err := s.UploadAsset(ctx, task)
a, err := s.UploadAsset(ctx, task, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -218,7 +218,7 @@ func TestUploadAssetLarge(t *testing.T) {
s := makeTestStore(t, nil)
ctx := context.Background()
plaintext, task := makeUploadTask(t, 12*MB)
a, err := s.UploadAsset(ctx, task)
a, err := s.UploadAsset(ctx, task, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -263,7 +263,7 @@ func (u *uploader) keyTracker(e, s []byte) {

func (u *uploader) UploadResume() chat1.Asset {
u.s.blockLimit = 0
a, err := u.s.UploadAsset(context.Background(), u.task)
a, err := u.s.UploadAsset(context.Background(), u.task, nil)
if err != nil {
u.t.Fatalf("expected second UploadAsset call to work, got: %s", err)
}
Expand All @@ -286,7 +286,7 @@ func (u *uploader) UploadResume() chat1.Asset {
func (u *uploader) UploadPartial(blocks int) {
u.s.blockLimit = blocks

_, err := u.s.UploadAsset(context.Background(), u.task)
_, err := u.s.UploadAsset(context.Background(), u.task, nil)
if err == nil {
u.t.Fatal("expected incomplete upload to have error")
}
Expand Down
40 changes: 38 additions & 2 deletions go/chat/attachments/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package attachments

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"

"github.com/keybase/client/go/chat/globals"
Expand Down Expand Up @@ -39,6 +41,9 @@ type Uploader struct {
ri func() chat1.RemoteInterface
s3signer s3.Signer
uploads map[string]chan types.AttachmentUploadResult

// testing
tempDir string
}

var _ types.AttachmentUploader = (*Uploader)(nil)
Expand All @@ -54,6 +59,10 @@ func NewUploader(g *globals.Context, store Store, s3signer s3.Signer, ri func()
}
}

func (u *Uploader) SetPreviewTempDir(dir string) {
u.tempDir = dir
}

func (u *Uploader) dbStatusKey(outboxID chat1.OutboxID) libkb.DbKey {
return libkb.DbKey{
Typ: libkb.DBAttachmentUploader,
Expand Down Expand Up @@ -198,6 +207,18 @@ func (u *Uploader) doneUploading(outboxID chat1.OutboxID) {
delete(u.uploads, outboxID.String())
}

func (u *Uploader) uploadPreviewFile(ctx context.Context) (f *os.File, err error) {
baseDir := u.G().GetCacheDir()
if u.tempDir != "" {
baseDir = u.tempDir
}
dir := filepath.Join(baseDir, "uploadedpreviews")
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return nil, err
}
return ioutil.TempFile(dir, "up")
}

func (u *Uploader) upload(ctx context.Context, uid gregor1.UID, convID chat1.ConversationID,
outboxID chat1.OutboxID, title, filename string, metadata []byte, callerPreview *chat1.MakePreviewRes) (res chan types.AttachmentUploadResult, err error) {
// Check to see if we are already uploading this message and set upload status if not
Expand Down Expand Up @@ -278,7 +299,7 @@ func (u *Uploader) upload(ctx context.Context, uid gregor1.UID, convID chat1.Con
UserID: uid,
Progress: progress,
}
ures.Object, err = u.store.UploadAsset(bgctx, &task)
ures.Object, err = u.store.UploadAsset(bgctx, &task, nil)
if err != nil {
u.Debug(bgctx, "upload: error uploading primary asset to s3: %s", err)
} else {
Expand All @@ -300,6 +321,15 @@ func (u *Uploader) upload(ctx context.Context, uid gregor1.UID, convID chat1.Con
// copy the params so as not to mess with the main params above
previewParams := s3params

// set up file to write out encrypted preview to
encryptedOut, err := u.uploadPreviewFile(ctx)
if err != nil {
u.Debug(bgctx, "upload: failed to create uploaded preview file: %s", err)
encryptedOut = nil
} else {
defer encryptedOut.Close()
}

// add preview suffix to object key (P in hex)
// the s3path in gregor is expecting hex here
previewParams.ObjectKey += "50"
Expand All @@ -312,12 +342,18 @@ func (u *Uploader) upload(ctx context.Context, uid gregor1.UID, convID chat1.Con
ConversationID: convID,
UserID: uid,
}
preview, err := u.store.UploadAsset(bgctx, &task)
preview, err := u.store.UploadAsset(bgctx, &task, encryptedOut)
if err == nil {
ures.Preview = &preview
ures.Preview.MimeType = pre.ContentType
ures.Preview.Metadata = pre.PreviewMetadata()
ures.Preview.Tag = chat1.AssetTag_PRIMARY
if encryptedOut != nil {
if err := u.G().AttachmentURLSrv.GetAttachmentFetcher().PutUploadedAsset(ctx,
encryptedOut.Name(), preview); err != nil {
u.Debug(bgctx, "upload: failed to put uploaded asset into fetcher: %s", err)
}
}
} else {
u.Debug(bgctx, "upload: error uploading preview asset to s3: %s", err)
}
Expand Down
Loading

0 comments on commit 59597df

Please sign in to comment.