forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pulsar-managed-ledger-admin
executable file
·325 lines (293 loc) · 11.3 KB
/
pulsar-managed-ledger-admin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import argparse
import traceback
import sys
try:
from google.protobuf.text_format import Merge
from google.protobuf.text_format import MessageToString
except Exception as missingLib:
sys.exit("You need python protobuf library. Get it from: pip install protobuf")
try:
from proto import MLDataFormats_pb2
except Exception as missingLib:
sys.exit("Incompatible proto/MLDataFormats_pb2.py. Regenerate using: "+
"protoc -I=${PULSAR_PATH}/managed-ledger/src/main/proto --python_out=${PULSAR_PATH}/bin/proto/ "+
"${PULSAR_PATH}/managed-ledger/src/main/proto/MLDataFormats.proto")
try:
from kazoo.client import KazooClient
except Exception as missingLib:
sys.exit("You need Kazoo ZK client library. Get it from: pip install kazoo")
'''
This util provides API to access managed-ledger data and also
provides command line tool-access to execute these commands.
'''
managedLedgerPath = "/managed-ledgers/"
printMlCommand = "print-managed-ledger"
deleteMlLedgerIds = "delete-managed-ledger-ids"
printCursorsCommands = "print-cursor"
updateMakDeleteCursor = "update-mark-delete-cursor"
'''
Returns managed-ledger info for given managed-leger path
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
'''
def getManagedLedgerInfo(zk, mlPath):
try:
# get managed-ledger info
mlData = zk.get(mlPath)[0]
mlInfo = MLDataFormats_pb2.ManagedLedgerInfo()
try:
mlInfo.ParseFromString(mlData)
except Exception as formatException:
Merge(mlData, mlInfo)
return mlInfo
except Exception as e:
traceback.print_exc()
print('Failed to get data for {} due to {}'.format(mlPath, repr(e)))
'''
Delete specific ledgerIds from the managed-ledger info and updates into zk
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
deleteLedgerIds : str
comma separated deleting ledger-ids (eg: 123,124)
'''
def deleteLedgerIdsFromManagedLedgerInfo(zk, mlPath, deletLedgerIds):
try:
# get managed-ledger info
(mlData, mlStat) = zk.get(mlPath)
mlInfo = MLDataFormats_pb2.ManagedLedgerInfo()
isTextFormat = False
try:
mlInfo.ParseFromString(mlData)
except Exception as formatException:
Merge(mlData, mlInfo)
isTextFormat = True
ledgerInfoList = mlInfo.ledgerInfo
i = 0
while i < len(ledgerInfoList):
ledgerInfo = ledgerInfoList[i]
if ledgerInfo.ledgerId in deletLedgerIds:
ledgerInfoList.remove(ledgerInfo)
else:
i += 1
updatedMlInfo = None
if isTextFormat:
updatedMlInfo = MessageToString(mlInfo, True)
else:
updatedMlInfo = mlInfo.SerializeToString();
zk.set(mlPath, updatedMlInfo, version=mlStat.version)
print('Updated {} with value\n{}'.format(mlPath, str(mlInfo)))
except Exception as e:
traceback.print_exc()
print('Failed to delete ledgerIds for {} due to {}'.format(mlPath, repr(e)))
'''
Returns managed-ledger cursor info for given managed-cursor path
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
cursorName : str
managed-cursor path
'''
def getManagedCursorInfo(zk, mlPath):
try:
cursors = zk.get_children(mlPath)
cursorList = {}
for cursor in cursors:
cursorData = zk.get(mlPath + "/" + cursor)[0]
cursorInfo = MLDataFormats_pb2.ManagedCursorInfo()
try:
cursorInfo.ParseFromString(cursorData)
except Exception as formatException:
Merge(cursorData, cursorInfo)
cursorList[cursor] = cursorInfo
return cursorList
except Exception as e:
traceback.print_exc()
print('Failed to get ml-cursor {} due to {}'.format(mlPath, repr(e)))
'''
Update mark-delete position of the given managed-cursor into zk
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
cursorName : str
managed-cursor path
markDeletePosition: str
markDeletePosition combination of <ledgerId>:<entryId> (eg. 123:1)
'''
def updateCursorMarkDelete(zk, cursorPath, markDeleteLedgerId, markDeleteEntryId):
try:
(cursorData, cursorStat) = zk.get(cursorPath)
cursorInfo = MLDataFormats_pb2.ManagedCursorInfo()
isTextFormat = False
try:
cursorInfo.ParseFromString(cursorData)
except Exception as formatException:
Merge(cursorData, cursorInfo)
isTextFormat = True
cursorInfo.markDeleteLedgerId = markDeleteLedgerId
cursorInfo.markDeleteEntryId = markDeleteEntryId
sData = None
if isTextFormat:
sData = MessageToString(cursorInfo, True)
else:
sData = cursorInfo.SerializeToString()
zk.set(cursorPath, sData, version=cursorStat.version)
print('Updated {} with value \n{}'.format(cursorPath, cursorInfo))
except Exception as e:
traceback.print_exc()
print('Failed to update ml-cursor {} due to {}'.format(cursorPath, repr(e)))
'''
print managed-ledger info for given managed-leger path
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
eg:
print-managed-ledger --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test
'''
def printManagedLedgerCommand(zk, mlPath):
print(getManagedLedgerInfo(zk, mlPath))
'''
print managed-ledger cursor info for given managed-cursor path
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
cursorName : str
managed-cursor path
eg:
print-cursor --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test --cursorName s1
'''
def printManagedCursorCommand(zk, mlPath, cursorName):
try:
if cursorName:
print(getManagedCursorInfo(zk, mlPath)[cursorName])
else:
print('Usage: --command {} [--cursorName]'.format(printCursorsCommands))
except Exception as e:
traceback.print_exc()
print('No cursor found for {}/{}'.format(mlPath, cursorName))
'''
delete specific ledgerIds from the managed-ledger info
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
deleteLedgerIds : str
comma separated deleting ledger-ids (eg: 123,124)
eg:
delete-managed-ledger-ids --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test --ledgerIds 3
'''
def deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds):
try:
if deleteLedgerIds:
deletLedgerIds = set(deleteLedgerIds.split(","))
deletLedgerIdSet = set()
for id in deletLedgerIds:
deletLedgerIdSet.add(int(id))
deleteLedgerIdsFromManagedLedgerInfo(zk, mlPath, deletLedgerIdSet)
else:
print('Usage: --command {} [--ledgerIds]'.format(deleteMlLedgerIds))
except Exception as e:
traceback.print_exc()
print('Failed to delete ml-ledger_ids {} due to {}'.format(mlPath, repr(e)))
'''
Update mark-delete position of the given managed-cursor
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
cursorName : str
managed-cursor path
markDeletePosition: str
markDeletePosition combination of <ledgerId>:<entryId> (eg. 123:1)
eg:
update-mark-delete-cursor --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test --cursorName s1 --cursorMarkDelete 0:1
'''
def updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, markDeletePosition):
try:
if cursorName:
if markDeletePosition:
positionPair = markDeletePosition.split(":")
if len(positionPair) == 2:
updateCursorMarkDelete(zk, mlPath + "/" + cursorName, (int(positionPair[0])), int(positionPair[1]))
else:
print("markDeletePosition must be in format <ledger_id>:<entry_id>")
else:
print('Usage: --command {} [----cursorName] [--cursorMarkDelete]'.format(updateMakDeleteCursor))
else:
print('Usage: --command {} [--cursorName] [--cursorMarkDelete]'.format(updateMakDeleteCursor))
except Exception as e:
traceback.print_exc()
print('Failed to update ml-cursor {}/{} due to {}'.format(mlPath, cursorName, repr(e)))
if __name__ in '__main__':
commandHelpText = 'Managed-ledger command: \n{}, {}, {}, {}'.format(printMlCommand, deleteMlLedgerIds, printCursorsCommands, updateMakDeleteCursor)
try:
command = sys.argv[1]
except Exception as indexError:
print('ERROR: Pass command as a first argument, supported {}\n\n'.format(commandHelpText))
arguments = sys.argv[2:]
parser = argparse.ArgumentParser()
parser.add_argument("--zkServer", "-zk", required=True, help="ZooKeeperServer:port")
parser.add_argument("--managedLedgerPath", "-mlp", required=True, help="Managed-ledger path")
parser.add_argument("--ledgerIds", "-lid", required=False, help="Delete ledger ids: comma separated")
parser.add_argument("--cursorName", "-cn", required=False, help="Managed-ledger cursor name")
parser.add_argument("--cursorMarkDelete", "-cm", required=False, help="Cursor mark delete position: <ledger_id>:<entry_id>")
args = parser.parse_args(arguments)
zkSrvr = args.zkServer
mlPath = managedLedgerPath + args.managedLedgerPath
deleteLedgerIds = args.ledgerIds
cursorName = args.cursorName
cursorMarkDelete = args.cursorMarkDelete
zk = KazooClient(hosts=zkSrvr)
zk.start()
if command == printMlCommand:
printManagedLedgerCommand(zk, mlPath)
elif command == deleteMlLedgerIds:
deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds)
elif command == printCursorsCommands:
printManagedCursorCommand(zk, mlPath, cursorName)
elif command == updateMakDeleteCursor:
updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, cursorMarkDelete)
else:
print('{} command not found. supported {}, pass command as a first argument'.format(command, commandHelpText))