forked from thinkaurelius/titan
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreindex.txt
400 lines (289 loc) · 19.4 KB
/
reindex.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
[[index-admin]]
Index Management
----------------
[[reindex]]
Reindexing
~~~~~~~~~~
<<graph-indexes>> and <<vertex-indexes>> describe how to build graph-global and vertex-centric indexes to improve query performance. These indexes are immediately available if the indexed keys or labels have been newly defined in the same management transaction. In this case, there is no need to reindex the graph and this section can be skipped. If the indexed keys or labels already existed prior to index construction it is necessary to reindex the entire graph in order to ensure that the index contains previously added elements. This section describes the reindexing process.
[WARNING]
Reindexing is a manual process comprised of multiple steps. These steps must be carefully followed in the right order to avoid index inconsistencies.
Overview
^^^^^^^^
Titan can begin writing incremental index updates right after an index is defined. However, before the index is complete and usable, Titan must also take a one-time read pass over all existing graph elements associated with the newly indexed schema type(s). Once this reindexing job has completed, the index is fully populated and ready to be used. The index must then be enabled to be used during query processing.
Prior to Reindex
^^^^^^^^^^^^^^^^
The starting point of the reindexing process is the construction of an index. Refer to <<indexes>> for a complete discussion of global graph and vertex-centric indexes. Note, that a global graph index is uniquely identified by its name. A vertex-centric index is uniquely identified by the combination of its name and the edge label or property key on which the index is defined - the name of the latter is referred to as the *index type* in this section and only applies to vertex-centric indexes.
After building a new index against existing schema elements it is recommended to wait a few minutes for the index to be announced to the cluster. Note the index name (and the index type in case of a vertex-centric index) since this information is needed when reindexing.
Preparing to Reindex
^^^^^^^^^^^^^^^^^^^^
There is a choice between two execution frameworks for reindex jobs:
* MapReduce
* TitanManagement
Reindex on MapReduce supports large, horizontally-distributed databases. Reindex on TitanManagement spawns a single-machine OLAP job. This is intended for convenience and speed on those databases small enough to be handled by one machine.
Reindexing requires:
* The index name (a string -- the user provides this to Titan when building a new index)
* The index type (a string -- the name of the edge label or property key on which the vertex-centric index is built). This applies only to vertex-centric indexes - leave blank for global graph indexes.
Executing a Reindex Job on MapReduce
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The recommended way to generate and run a reindex job on MapReduce is through the `MapReduceIndexManagement` class. Here is a rough outline of the steps to run a reindex job using this class:
* Open a `TitanGraph` instance
* Pass the graph instance into `MapReduceIndexManagement`'s constructor
* Call `updateIndex(<index>, SchemaAction.REINDEX)` on the `MapReduceIndexManagement` instance
* If the index has not yet been enabled, enable it through `TitanManagement`
This class implements an `updateIndex` method that supports only the `REINDEX` and `REMOVE_INDEX` actions for its `SchemaAction` parameter. The class starts a Hadoop MapReduce job using the Hadoop configuration and jars on the classpath. Both Hadoop 1 and 2 are supported. This class gets metadata about the index and storage backend (e.g. the Cassandra partitioner) from the `TitanGraph` instance given to its constructor.
[source,gremlin]
graph = TitanFactory.open(...)
mgmt = graph.openManagement()
mr = new MapReduceIndexManagement(graph)
mr.updateIndex(mgmt.getRelationIndex(mgmt.getRelationType("battled"), "battlesByTime"), SchemaAction.REINDEX).get()
mgmt.commit()
Reindex Example on MapReduce
++++++++++++++++++++++++++++
The following Gremlin snippet outlines all steps of the MapReduce reindex process in one self-contained example using minimal dummy data against the Cassandra storage backend.
[source,gremlin]
----
// Open a graph
graph = TitanFactory.open("conf/titan-cassandra-es.properties")
g = graph.traversal()
// Define a property
mgmt = graph.openManagement()
desc = mgmt.makePropertyKey("desc").dataType(String.class).make()
mgmt.commit()
// Insert some data
graph.addVertex("desc", "foo bar")
graph.addVertex("desc", "foo baz")
graph.tx().commit()
// Run a query -- note the planner warning recommending the use of an index
g.V().has("desc", containsText("baz"))
// Create an index
mgmt = graph.openManagement()
desc = mgmt.getPropertyKey("desc")
mixedIndex = mgmt.buildIndex("mixedExample", Vertex.class).addKey(desc).buildMixedIndex("search")
mgmt.commit()
// Rollback or commit transactions on the graph which predate the index definition
graph.tx().rollback()
// Block until the SchemaStatus transitions from INSTALLED to REGISTERED
report = mgmt.awaitGraphIndexStatus(graph, "mixedExample").call()
// Run a Titan-Hadoop job to reindex
mgmt = graph.openManagement()
mr = new MapReduceIndexManagement(graph)
mr.updateIndex(mgmt.getGraphIndex("mixedExample"), SchemaAction.REINDEX).get()
// Enable the index
mgmt = graph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("mixedExample"), SchemaAction.ENABLE_INDEX).get()
mgmt.commit()
// Block until the SchemaStatus is ENABLED
mgmt = graph.openManagement()
report = mgmt.awaitGraphIndexStatus(graph, "mixedExample").status(SchemaStatus.ENABLED).call()
mgmt.rollback()
// Run a query -- Titan will use the new index, no planner warning
g.V().has("desc", containsText("baz"))
// Concerned that Titan could have read cache in that last query, instead of relying on the index?
// Start a new instance to rule out cache hits. Now we're definitely using the index.
graph.close()
graph = TitanFactory.open("conf/titan-cassandra-es.properties")
g.V().has("desc", containsText("baz"))
----
Executing a Reindex job on TitanManagement
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To run a reindex job on TitanManagement, invoke `TitanManagement.updateIndex` with the `SchemaAction.REINDEX` argument. For example:
[source,gremlin]
m = graph.openManagement()
i = m.getGraphIndex('indexName')
m.updateIndex(i, SchemaAction.REINDEX).get()
m.commit()
Example for TitanManagement
+++++++++++++++++++++++++++
The following loads some sample data into a BerkeleyDB-backed Titan database, defines an index after the fact, reindexes using TitanManagement, and finally enables and uses the index:
[source,java]
----
import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem
// Load some data from a file without any predefined schema
graph = TitanFactory.open('conf/titan-berkeleyje.properties')
g = graph.traversal()
m = graph.openManagement()
m.makePropertyKey('name').dataType(String.class).cardinality(Cardinality.LIST).make()
m.makePropertyKey('lang').dataType(String.class).cardinality(Cardinality.LIST).make()
m.makePropertyKey('age').dataType(Integer.class).cardinality(Cardinality.LIST).make()
m.commit()
graph.io(IoCore.gryo()).readGraph('data/tinkerpop-modern.gio')
graph.tx().commit()
// Run a query -- note the planner warning recommending the use of an index
g.V().has('name', 'lop')
graph.tx().rollback()
// Create an index
m = graph.openManagement()
m.buildIndex('names', Vertex.class).addKey(m.getPropertyKey('name')).buildCompositeIndex()
m.commit()
graph.tx().commit()
// Block until the SchemaStatus transitions from INSTALLED to REGISTERED
ManagementSystem.awaitGraphIndexStatus(graph, 'names').status(SchemaStatus.REGISTERED).call()
// Reindex using TitanManagement
m = graph.openManagement()
i = m.getGraphIndex('names')
m.updateIndex(i, SchemaAction.REINDEX)
m.commit()
// Enable the index
ManagementSystem.awaitGraphIndexStatus(graph, 'names').status(SchemaStatus.ENABLED).call()
// Run a query -- Titan will use the new index, no planner warning
g.V().has('name', 'lop')
graph.tx().rollback()
// Concerned that Titan could have read cache in that last query, instead of relying on the index?
// Start a new instance to rule out cache hits. Now we're definitely using the index.
graph.close()
graph = TitanFactory.open("conf/titan-berkeleyje.properties")
g = graph.traversal()
g.V().has('name', 'lop')
----
[[mr-index-removal]]
Index Removal
~~~~~~~~~~~~~
[WARNING]
Index removal is a manual process comprised of multiple steps. These steps must be carefully followed in the right order to avoid index inconsistencies.
Overview
^^^^^^^^
Index removal is a two-stage process. In the first stage, one Titan signals to all others via the storage backend that the index is slated for deletion. This changes the index's state to `DISABLED`. At that point, Titan stops using the index to answer queries and stops incrementally updating the index. Index-related data in the storage backend remains present but ignored.
The second stage depends on whether the index is mixed or composite. A composite index can be deleted via Titan. As with reindexing, removal can be done through either MapReduce or TitanManagement. However, a mixed index must be manually dropped in the index backend; Titan does not provide an automated mechanism to delete an index from its index backend.
Index removal deletes everything associated with the index except its schema definition and its `DISABLED` state. This schema stub for the index remains even after deletion, though its storage footprint is negligible and fixed.
Preparing for Index Removal
^^^^^^^^^^^^^^^^^^^^^^^^^^^
If the index is currently enabled, it should first be disabled. This is done through the `ManagementSystem`.
[source,gremlin]
mgmt = graph.openManagement()
rindex = mgmt.getRelationIndex(mgmt.getRelationType("battled"), "battlesByTime")
mgmt.updateIndex(rindex, SchemaAction.DISABLE_INDEX).get()
gindex = mgmt.getGraphIndex("byName")
mgmt.updateIndex(gindex, SchemaAction.DISABLE_INDEX).get()
mgmt.commit()
Once the status of all keys on the index changes to `DISABLED`, the index is ready to be removed. A utility in ManagementSystem can automate the wait-for-`DISABLED` step:
[source,gremlin]
ManagementSystem.awaitGraphIndexStatus(graph, 'byName').status(SchemaStatus.DISABLED).call()
After a composite index is `DISABLED`, there is a choice between two execution frameworks for its removal:
* MapReduce
* TitanManagement
Index removal on MapReduce supports large, horizontally-distributed databases. Inedx removal on TitanManagement spawns a single-machine OLAP job. This is intended for convenience and speed on those databases small enough to be handled by one machine.
Index removal requires:
* The index name (a string -- the user provides this to Titan when building a new index)
* The index type (a string -- the name of the edge label or property key on which the vertex-centric index is built). This applies only to vertex-centric indexes - leave blank for global graph indexes.
As noted in the overview, a mixed index must be manually dropped from the indexing backend. Neither the MapReduce framework nor the TitanManagement framework will delete a mixed backend from the indexing backend.
Executing an Index Removal Job on MapReduce
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
As with reindexing, the recommended way to generate and run an index removal job on MapReduce is through the `MapReduceIndexManagement` class. Here is a rough outline of the steps to run an index removal job using this class:
* Open a `TitanGraph` instance
* If the index has not yet been disabled, disable it through `TitanManagement`
* Pass the graph instance into `MapReduceIndexManagement`'s constructor
* Call `updateIndex(<index>, SchemaAction.REMOVE_INDEX)`
A commented code example follows in the next subsection.
Example for MapReduce
+++++++++++++++++++++
[source,java]
----
import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem
// Load the "Graph of the Gods" sample data
graph = TitanFactory.open('conf/titan-cassandra-es.properties')
g = graph.traversal()
GraphOfTheGodsFactory.load(graph)
g.V().has('name', 'jupiter')
// Disable the "name" composite index
m = graph.openManagement()
nameIndex = m.getGraphIndex('name')
m.updateIndex(nameIndex, SchemaAction.DISABLE_INDEX).get()
m.commit()
graph.tx().commit()
// Block until the SchemaStatus transitions from INSTALLED to REGISTERED
ManagementSystem.awaitGraphIndexStatus(graph, 'name').status(SchemaStatus.DISABLED).call()
// Delete the index using MapReduceIndexJobs
m = graph.openManagement()
mr = new MapReduceIndexManagement(graph)
future = mr.updateIndex(m.getGraphIndex('name'), SchemaAction.REMOVE_INDEX)
m.commit()
graph.tx().commit()
future.get()
// Index still shows up in management interface as DISABLED -- this is normal
m = graph.openManagement()
idx = m.getGraphIndex('name')
idx.getIndexStatus(m.getPropertyKey('name'))
m.rollback()
// Titan should issue a warning about this query requiring a full scan
g.V().has('name', 'jupiter')
----
Executing an Index Removal job on TitanManagement
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To run an index removal job on TitanManagement, invoke `TitanManagement.updateIndex` with the `SchemaAction.REMOVE_INDEX` argument. For example:
[source,gremlin]
m = graph.openManagement()
i = m.getGraphIndex('indexName')
m.updateIndex(i, SchemaAction.REMOVE_INDEX).get()
m.commit()
Example for TitanManagement
+++++++++++++++++++++++++++
The following loads some indexed sample data into a BerkeleyDB-backed Titan database, then disables and removes the index through TitanManagement:
[source,java]
----
import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem
// Load the "Graph of the Gods" sample data
graph = TitanFactory.open('conf/titan-cassandra-es.properties')
g = graph.traversal()
GraphOfTheGodsFactory.load(graph)
g.V().has('name', 'jupiter')
// Disable the "name" composite index
m = graph.openManagement()
nameIndex = m.getGraphIndex('name')
m.updateIndex(nameIndex, SchemaAction.DISABLE_INDEX).get()
m.commit()
graph.tx().commit()
// Block until the SchemaStatus transitions from INSTALLED to REGISTERED
ManagementSystem.awaitGraphIndexStatus(graph, 'name').status(SchemaStatus.DISABLED).call()
// Delete the index using TitanManagement
m = graph.openManagement()
nameIndex = m.getGraphIndex('name')
future = m.updateIndex(nameIndex, SchemaAction.REMOVE_INDEX)
m.commit()
graph.tx().commit()
future.get()
m = graph.openManagement()
nameIndex = m.getGraphIndex('name')
g.V().has('name', 'jupiter')
----
Common Problems with Index Management
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
IllegalArgumentException when starting job
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
When a reindexing job is started shortly after a the index has been built, the job might fail with an exception like one of the following:
[source,txt]
The index mixedExample is in an invalid state and cannot be indexed.
The following index keys have invalid status: desc has status INSTALLED
(status must be one of [REGISTERED, ENABLED])
[source,txt]
The index mixedExample is in an invalid state and cannot be indexed.
The index has status INSTALLED, but one of [REGISTERED, ENABLED] is required
When an index is built, its existence is broadcast to all other Titan instances in the cluster. Those must acknowledge the existence of the index before the reindexing process can be started. The acknowledgements can take a while to come in depending on the size of the cluster and the connection speed. Hence, one should wait a few minutes after building the index and before starting the reindex process.
Note, that the acknowledgement might fail due to Titan instance failure. In other words, the cluster might wait indefinitely on the acknowledgement of a failed instance. In this case, the user must manually remove the failed instance from the cluster registry as described in <<failure-recovery>>. After the cluster state has been restored, the acknowledgement process must be reinitiated by manually registering the index again in the management system.
[source,gremlin]
mgmt = graph.openManagement()
rindex = mgmt.getRelationIndex(mgmt.getRelationType("battled"),"battlesByTime")
mgmt.updateIndex(rindex, SchemaAction.REGISTER_INDEX).get()
gindex = mgmt.getGraphIndex("byName")
mgmt.updateIndex(gindex, SchemaAction.REGISTER_INDEX).get()
mgmt.commit()
After waiting a few minutes for the acknowledgement to arrive the reindex job should start successfully.
Could not find index
^^^^^^^^^^^^^^^^^^^^
This exception in the reindexing job indicates that an index with the given name does not exist or that the name has not been specified correctly. When reindexing a global graph index, only the name of the index as defined when building the index should be specified. When reindexing a global graph index, the name of the index must be given in addition to the name of the edge label or property key on which the vertex-centric index is defined.
Cassandra Mappers Fail with "Too many open files"
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The end of the exception stacktrace may look like this:
----
java.net.SocketException: Too many open files
at java.net.Socket.createImpl(Socket.java:447)
at java.net.Socket.getImpl(Socket.java:510)
at java.net.Socket.setSoLinger(Socket.java:988)
at org.apache.thrift.transport.TSocket.initSocket(TSocket.java:118)
at org.apache.thrift.transport.TSocket.<init>(TSocket.java:109)
----
When running Cassandra with virtual nodes enabled, the number of virtual nodes seems to set a floor under the number of mappers. Cassandra may generate more mappers than virtual nodes for clusters with lots of data, but it seems to generate at least as many mappers as there are virtual nodes even though the cluster might be empty or close to empty. The default is 256 as of this writing.
Each mapper opens and quickly closes several sockets to Cassandra. The kernel on the client side of those closed sockets goes into asynchronous TIME_WAIT, since Thrift uses SO_LINGER. Only a small number of sockets are open at any one time -- usually low single digits -- but potentially many lingering sockets can accumulate in TIME_WAIT. This accumulation is most pronounced when running a reindex job locally (not on a distributed MapReduce cluster), since all of those client-side TIME_WAIT sockets are lingering on a single client machine instead of being spread out across many machines in a cluster. Combined with the floor of 256 mappers, a reindex job can open thousands of sockets of the course of its execution. When these sockets all linger in TIME_WAIT on the same client, they have the potential to reach the open-files ulimit, which also controls the number of open sockets. The open-files ulimit is often set to 1024.
Here are a few suggestions for dealing with the "Too many open files" problem during reindexing on a single machine:
* Reduce the maximum size of the Cassandra connection pool. For example, consider setting the cassandrathrift storage backend's `max-active` and `max-idle` options to 1 each, and setting `max-total` to -1. See <<titan-config-ref>> for full listings of connection pool settings on the Cassandra storage backends.
* Increase the `nofile` ulimit. The ideal value depends on the size of the Cassandra dataset and the throughput of the reindex mappers; if starting at 1024, try an order of magnitude larger: 10000. This is just necessary to sustain lingering TIME_WAIT sockets. The reindex job won't try to open nearly that many sockets at once.
* Run the reindex task on a multi-node MapReduce cluster to spread out the socket load.