forked from minio/minio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlock-commands.go
145 lines (119 loc) · 4.01 KB
/
lock-commands.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package madmin
import (
"encoding/json"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
)
type statusType string
type lockType string
// OpsLockState - represents lock specific details.
type OpsLockState struct {
OperationID string `json:"id"` // String containing operation ID.
LockSource string `json:"source"` // Operation type (GetObject, PutObject...)
LockType lockType `json:"type"` // Lock type (RLock, WLock)
Status statusType `json:"status"` // Status can be Running/Ready/Blocked.
Since time.Time `json:"since"` // Time when the lock was initially held.
}
// VolumeLockInfo - represents summary and individual lock details of all
// locks held on a given bucket, object.
type VolumeLockInfo struct {
Bucket string `json:"bucket"`
Object string `json:"object"`
// All locks blocked + running for given <volume,path> pair.
LocksOnObject int64 `json:"-"`
// Count of operations which has successfully acquired the lock
// but hasn't unlocked yet( operation in progress).
LocksAcquiredOnObject int64 `json:"-"`
// Count of operations which are blocked waiting for the lock
// to be released.
TotalBlockedLocks int64 `json:"-"`
// Count of all read locks
TotalReadLocks int64 `json:"readLocks"`
// Count of all write locks
TotalWriteLocks int64 `json:"writeLocks"`
// State information containing state of the locks for all operations
// on given <volume,path> pair.
LockDetailsOnObject []OpsLockState `json:"lockOwners"`
}
// getLockInfos - unmarshal []VolumeLockInfo from a reader.
func getLockInfos(body io.Reader) ([]VolumeLockInfo, error) {
respBytes, err := ioutil.ReadAll(body)
if err != nil {
return nil, err
}
var lockInfos []VolumeLockInfo
err = json.Unmarshal(respBytes, &lockInfos)
if err != nil {
return nil, err
}
return lockInfos, nil
}
// ListLocks - Calls List Locks Management API to fetch locks matching
// bucket, prefix and held before the duration supplied.
func (adm *AdminClient) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
queryVal := make(url.Values)
queryVal.Set("lock", "")
queryVal.Set("bucket", bucket)
queryVal.Set("prefix", prefix)
queryVal.Set("duration", duration.String())
hdrs := make(http.Header)
hdrs.Set(minioAdminOpHeader, "list")
reqData := requestData{
queryValues: queryVal,
customHeaders: hdrs,
}
// Execute GET on /?lock to list locks.
resp, err := adm.executeMethod("GET", reqData)
defer closeResponse(resp)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, httpRespToErrorResponse(resp)
}
return getLockInfos(resp.Body)
}
// ClearLocks - Calls Clear Locks Management API to clear locks held
// on bucket, matching prefix older than duration supplied.
func (adm *AdminClient) ClearLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
queryVal := make(url.Values)
queryVal.Set("lock", "")
queryVal.Set("bucket", bucket)
queryVal.Set("prefix", prefix)
queryVal.Set("duration", duration.String())
hdrs := make(http.Header)
hdrs.Set(minioAdminOpHeader, "clear")
reqData := requestData{
queryValues: queryVal,
customHeaders: hdrs,
}
// Execute POST on /?lock to clear locks.
resp, err := adm.executeMethod("POST", reqData)
defer closeResponse(resp)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, httpRespToErrorResponse(resp)
}
return getLockInfos(resp.Body)
}