forked from googleapis/google-cloud-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.go
280 lines (256 loc) · 7.72 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
// Copyright 2017 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package firestore
import (
"errors"
"fmt"
"io"
"strings"
"time"
vkit "cloud.google.com/go/firestore/apiv1beta1"
"cloud.google.com/go/internal/version"
"github.com/golang/protobuf/ptypes"
gax "github.com/googleapis/gax-go"
"golang.org/x/net/context"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
pb "google.golang.org/genproto/googleapis/firestore/v1beta1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// resourcePrefixHeader is the name of the metadata header used to indicate
// the resource being operated on.
const resourcePrefixHeader = "google-cloud-resource-prefix"
// A Client provides access to the Firestore service.
type Client struct {
c *vkit.Client
projectID string
databaseID string // A client is tied to a single database.
}
// NewClient creates a new Firestore client that uses the given project.
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
vc, err := vkit.NewClient(ctx, opts...)
if err != nil {
return nil, err
}
vc.SetGoogleClientInfo("gccl", version.Repo)
c := &Client{
c: vc,
projectID: projectID,
databaseID: "(default)", // always "(default)", for now
}
return c, nil
}
// Close closes any resources held by the client.
//
// Close need not be called at program exit.
func (c *Client) Close() error {
return c.c.Close()
}
func (c *Client) path() string {
return fmt.Sprintf("projects/%s/databases/%s", c.projectID, c.databaseID)
}
func withResourceHeader(ctx context.Context, resource string) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
md[resourcePrefixHeader] = []string{resource}
return metadata.NewOutgoingContext(ctx, md)
}
// Collection creates a reference to a collection with the given path.
// A path is a sequence of IDs separated by slashes.
//
// Collection returns nil if path contains an even number of IDs or any ID is empty.
func (c *Client) Collection(path string) *CollectionRef {
coll, _ := c.idsToRef(strings.Split(path, "/"), c.path())
return coll
}
// Doc creates a reference to a document with the given path.
// A path is a sequence of IDs separated by slashes.
//
// Doc returns nil if path contains an odd number of IDs or any ID is empty.
func (c *Client) Doc(path string) *DocumentRef {
_, doc := c.idsToRef(strings.Split(path, "/"), c.path())
return doc
}
func (c *Client) idsToRef(IDs []string, dbPath string) (*CollectionRef, *DocumentRef) {
if len(IDs) == 0 {
return nil, nil
}
for _, id := range IDs {
if id == "" {
return nil, nil
}
}
coll := newTopLevelCollRef(c, dbPath, IDs[0])
i := 1
for i < len(IDs) {
doc := newDocRef(coll, IDs[i])
i++
if i == len(IDs) {
return nil, doc
}
coll = newCollRefWithParent(c, doc, IDs[i])
i++
}
return coll, nil
}
// GetAll retrieves multiple documents with a single call. The DocumentSnapshots are
// returned in the order of the given DocumentRefs.
//
// If a document is not present, the corresponding DocumentSnapshot's Exists method will return false.
func (c *Client) GetAll(ctx context.Context, docRefs []*DocumentRef) ([]*DocumentSnapshot, error) {
if err := checkTransaction(ctx); err != nil {
return nil, err
}
return c.getAll(ctx, docRefs, nil)
}
func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte) ([]*DocumentSnapshot, error) {
var docNames []string
docIndex := map[string]int{} // doc name to position in docRefs
for i, dr := range docRefs {
if dr == nil {
return nil, errNilDocRef
}
docNames = append(docNames, dr.Path)
docIndex[dr.Path] = i
}
req := &pb.BatchGetDocumentsRequest{
Database: c.path(),
Documents: docNames,
}
if tid != nil {
req.ConsistencySelector = &pb.BatchGetDocumentsRequest_Transaction{tid}
}
streamClient, err := c.c.BatchGetDocuments(withResourceHeader(ctx, req.Database), req)
if err != nil {
return nil, err
}
// Read and remember all results from the stream.
var resps []*pb.BatchGetDocumentsResponse
for {
resp, err := streamClient.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
resps = append(resps, resp)
}
// Results may arrive out of order. Put each at the right index.
docs := make([]*DocumentSnapshot, len(docNames))
for _, resp := range resps {
var (
i int
doc *pb.Document
err error
)
switch r := resp.Result.(type) {
case *pb.BatchGetDocumentsResponse_Found:
i = docIndex[r.Found.Name]
doc = r.Found
case *pb.BatchGetDocumentsResponse_Missing:
i = docIndex[r.Missing]
doc = nil
default:
return nil, errors.New("firestore: unknown BatchGetDocumentsResponse result type")
}
if docs[i] != nil {
return nil, fmt.Errorf("firestore: %q seen twice", docRefs[i].Path)
}
docs[i], err = newDocumentSnapshot(docRefs[i], doc, c, resp.ReadTime)
if err != nil {
return nil, err
}
}
return docs, nil
}
// Collections returns an interator over the top-level collections.
func (c *Client) Collections(ctx context.Context) *CollectionIterator {
it := &CollectionIterator{
err: checkTransaction(ctx),
client: c,
it: c.c.ListCollectionIds(
withResourceHeader(ctx, c.path()),
&pb.ListCollectionIdsRequest{Parent: c.path()}),
}
it.pageInfo, it.nextFunc = iterator.NewPageInfo(
it.fetch,
func() int { return len(it.items) },
func() interface{} { b := it.items; it.items = nil; return b })
return it
}
// Batch returns a WriteBatch.
func (c *Client) Batch() *WriteBatch {
return &WriteBatch{c: c}
}
// commit calls the Commit RPC outside of a transaction.
func (c *Client) commit(ctx context.Context, ws []*pb.Write) ([]*WriteResult, error) {
if err := checkTransaction(ctx); err != nil {
return nil, err
}
req := &pb.CommitRequest{
Database: c.path(),
Writes: ws,
}
res, err := c.c.Commit(withResourceHeader(ctx, req.Database), req)
if err != nil {
return nil, err
}
if len(res.WriteResults) == 0 {
return nil, errors.New("firestore: missing WriteResult")
}
var wrs []*WriteResult
for _, pwr := range res.WriteResults {
wr, err := writeResultFromProto(pwr)
if err != nil {
return nil, err
}
wrs = append(wrs, wr)
}
return wrs, nil
}
func (c *Client) commit1(ctx context.Context, ws []*pb.Write) (*WriteResult, error) {
wrs, err := c.commit(ctx, ws)
if err != nil {
return nil, err
}
return wrs[0], nil
}
// A WriteResult is returned by methods that write documents.
type WriteResult struct {
// The time at which the document was updated, or created if it did not
// previously exist. Writes that do not actually change the document do
// not change the update time.
UpdateTime time.Time
}
func writeResultFromProto(wr *pb.WriteResult) (*WriteResult, error) {
t, err := ptypes.Timestamp(wr.UpdateTime)
if err != nil {
t = time.Time{}
// TODO(jba): Follow up if Delete is supposed to return a nil timestamp.
}
return &WriteResult{UpdateTime: t}, nil
}
func sleep(ctx context.Context, dur time.Duration) error {
switch err := gax.Sleep(ctx, dur); err {
case context.Canceled:
return status.Error(codes.Canceled, "context canceled")
case context.DeadlineExceeded:
return status.Error(codes.DeadlineExceeded, "context deadline exceeded")
default:
return err
}
}