Chapter 26. Reindexing Process

Section 8.1, “Graph Index” and Section 8.2, “Vertex-centric Index” describe how to build graph-global and vertex-centric indexes to improve query performance. These indexes are immediately available if the indexed keys or labels have been newly defined in the same management transaction. In this case, there is no need to reindex the graph and this section can be skipped. If the indexed keys or labels already existed prior to index construction it is necessary to reindex the entire graph in order to ensure that the index contains previously added elements. This section describes the reindexing process.

[Warning]Warning

Reindexing is a manual process comprised of multiple steps. These steps must be carefully followed in the right order to avoid index inconsistencies.

26.1. Overview

Titan can begin writing incremental index updates right after an index is defined. However, before the index is complete and usable, Titan must also take a one-time read pass over all existing graph elements associated with the newly indexed schema type(s). Titan uses Hadoop MapReduce for this task in order to scale reindexing to very large graphs. Once this reindexing job has completed, the index is fully populated and ready to be used. The index must then be enabled to be used during query processing.

26.2. Prior to Reindex

The starting point of the reindexing process is the construction of an index. Refer to Chapter 8, Indexing for better Performance for a complete discussion of global graph and vertex-centric indexes. Note, that a global graph index is uniquely identified by its name. A vertex-centric index is uniquely identified by the combination of its name and the edge label or property key on which the index is defined - the name of the latter is referred to as the index type in this section and only applies to vertex-centric indexes.

After building a new index against existing schema elements it is recommended to wait a few minutes for the index to be announced to the cluster. Note the index name (and the index type in case of a vertex-centric index) since this information is needed when reindexing.

26.3. Preparing to Reindex

Reindexing large, horizontally-distributed databases generally proceeds most quickly on a Hadoop MapReduce cluster. However, it’s also possible to reindex from a single machine using Hadoop’s "local" job runner. Either method will work from a correctness standpoint. The only differentiator is reindex throughput and setup complexity.

Reindexing requires:

  • The index name (a string — the user provides this to Titan when building a new index)
  • The index type (a string — the name of the edge label or property key on which the vertex-centric index is built). This applies only to vertex-centric indexes - leave blank for global graph indexes.
  • An TitanGraph configuration file (a file whose path can be passed to TitanFactory.open)

26.4. Executing a Reindex Job

The recommended way to generate and run a reindex job is through the TitanIndexRepair helper class. It has a pair of helper methods to generate a MapReduce configuration, configure a MR Job with a TitanIndexRepairMapper, and then run the job using the default job runner.

The two recommended entry points on the TitanIndexRepair helper class are these methods:

  • cassandraRepair
  • hbaseRepair

26.4.1. Cassandra Helper

The static helper method TitanIndexRepair.cassandraRepair takes a path to a TitanGraph properties file (the same file that one would pass to TitanFactory.open), an index name, an (optional) index type, and the name of the Cassandra keyspace partitioner. It generates a reindexing config file in memory and starts a job using the generated config.

The following starts the reindexing process for the global graph index byName defined in Section 8.1, “Graph Index”:

TitanIndexRepair.cassandraRepair("conf/titan-cassandra-es.properties", "byName", "",
   /* Replace with your actual partitioner when not using Murmur3 */
   "org.apache.cassandra.dht.Murmur3Partitioner")

The following starts the reindexing process for the vertex-centric index battlesByTime build over battled edges as defined in Section 8.2, “Vertex-centric Index”:

TitanIndexRepair.cassandraRepair("conf/titan-cassandra-es.properties", "battlesByTime", "battled",
   /* Replace with your actual partitioner when not using Murmur3 */
   "org.apache.cassandra.dht.Murmur3Partitioner")

26.4.2. HBase Helper

The static helper method TitanIndexRepair.hbaseRepair takes a path to a TitanGraph properties file (the same file that one would pass to TitanFactory.open), an index name, and an (optional) index type. Unlike its Cassandra counterpart, there is no need to pass a partitioner name to the HBase helper method. It generates a reindexing config file in memory and starts a job using the generated config.

The following starts the reindexing process for the global graph index byName defined in Section 8.1, “Graph Index”:

TitanIndexRepair.hbaseRepair("conf/titan-hbase-es.properties", "byName", "")

The following starts the reindexing process for the vertex-centric index battlesByTime build over battled edges as defined in Section 8.2, “Vertex-centric Index”:

TitanIndexRepair.hbaseRepair("conf/titan-hbase-es.properties", "battlesByTime", "battled")

26.5. Enabling Index

When the reindex job completes successfully, the index is fully populated and ready to be used. To alert all Titan instances that the index can be used for query answering, the index must be enabled in the management system.

mgmt = g.getManagementSystem()
rindex = mgmt.getRelationIndex(mgmt.getRelationType("battled"),"battlesByTime")
mgmt.updateIndex(rindex, SchemaAction.ENABLE_INDEX);
gindex = mgmt.getGraphIndex("byName")
mgmt.updateIndex(gindex, SchemaAction.ENABLE_INDEX);
mgmt.commit()

26.6. Self-contained Example

The following Gremlin snippet outlines all steps of the reindex process in one self-contained example using minimal dummy data against the Cassandra storage backend.

// Open a graph
g = TitanFactory.open("conf/titan-cassandra-es.properties")

// Define a property
mgmt = g.getManagementSystem()
desc = mgmt.makePropertyKey("desc").dataType(String.class).make()
mgmt.commit()

// Insert some data
v = g.addVertex(null)
v.desc = "foo bar"
v = g.addVertex(null)
v.desc = "foo baz"
g.commit()

// Run a query -- note the planner warning recommending the use of an index
g.V.has("desc", com.thinkaurelius.titan.core.attribute.Text.CONTAINS, "baz").map

// Create an index
mgmt = g.getManagementSystem()
desc = mgmt.getPropertyKey("desc")
mixedIndex = mgmt.buildIndex("mixedExample",Vertex.class).addKey(desc).buildMixedIndex("search")
mgmt.commit()

// Rollback or commit transactions on the graph which predate the index definition
g.rollback()

// Block until the SchemaStatus transitions from INSTALLED to REGISTERED
registered = false
before = System.currentTimeMillis()
while (!registered) {
    Thread.sleep(500L)
    mgmt = g.getManagementSystem()
    idx  = mgmt.getGraphIndex("mixedExample")
    registered = true
    for (k in idx.getFieldKeys()) {
        s = idx.getIndexStatus(k)
        registered &= s.equals(SchemaStatus.REGISTERED)
    }
    mgmt.rollback()
}
println("Index REGISTERED in " + (System.currentTimeMillis() - before) + " ms")

// Run a Titan-Hadoop job to reindex (replace Murmur3 with your actual partitioner)
pt = "org.apache.cassandra.dht.Murmur3Partitioner" // The default
TitanIndexRepair.cassandraRepair("conf/titan-cassandra-es.properties", "mixedExample", "", pt)

// Enable the index
mgmt = g.getManagementSystem()
mgmt.updateIndex(mgmt.getGraphIndex("mixedExample"), SchemaAction.ENABLE_INDEX);
mgmt.commit()

// Check the status -- should be ENABLED
mgmt = g.getManagementSystem()
desc = mgmt.getPropertyKey("desc")
mgmt.getGraphIndex("mixedExample").getIndexStatus(desc)
mgmt.rollback()

// Run a query -- Titan will use the new index, no planner warning
g.V.has("desc", com.thinkaurelius.titan.core.attribute.Text.CONTAINS, "baz").map

// Concerned that Titan could have read cache in that last query, instead of relying on the index?
// Start a new instance to rule out cache hits.  Now we're definitely using the index.
g.shutdown()
g = TitanFactory.open("conf/titan-cassandra-es.properties")
g.V.has("desc", com.thinkaurelius.titan.core.attribute.Text.CONTAINS, "baz").map

26.7. Common Problems

26.7.1. IllegalArgumentException when starting job

When a reindexing job is started shortly after a the index has been built, the job might fail with an exception like one of the following:

The index mixedExample is in an invalid state and cannot be indexed.
The following index keys have invalid status: desc has status INSTALLED
(status must be one of [REGISTERED, ENABLED])
The index mixedExample is in an invalid state and cannot be indexed.
The index has status INSTALLED, but one of [REGISTERED, ENABLED] is required

When an index is built, its existence is broadcast to all other Titan instances in the cluster. Those must acknowledge the existence of the index before the reindexing process can be started. The acknowledgements can take a while to come in depending on the size of the cluster and the connection speed. Hence, one should wait a few minutes after building the index and before starting the reindex process.

Note, that the acknowledgement might fail due to Titan instance failure. In other words, the cluster might wait indefinitely on the acknowledgement of a failed instance. In this case, the user must manually remove the failed instance from the cluster registry as described in Chapter 25, Failure & Recovery. After the cluster state has been restored, the acknowledgement process must be reinitiated by manually registering the index again in the management system.

mgmt = g.getManagementSystem()
rindex = mgmt.getRelationIndex(mgmt.getRelationType("battled"),"battlesByTime")
mgmt.updateIndex(rindex, SchemaAction.REGISTER_INDEX);
gindex = mgmt.getGraphIndex("byName")
mgmt.updateIndex(gindex, SchemaAction.REGISTER_INDEX);
mgmt.commit()

After waiting a few minutes for the acknowledgement to arrive the reindex job should start successfully.

26.7.2. Could not find index

This exception in the reindexing job indicates that an index with the given name does not exist or that the name has not been specified correctly. When reindexing a global graph index, only the name of the index as defined when building the index should be specified. When reindexing a global graph index, the name of the index must be given in addition to the name of the edge label or property key on which the vertex-centric index is defined.

26.7.3. Cassandra Reindex Mappers Fail with "Too many open files"

The end of the exception stacktrace may look like this:

java.net.SocketException: Too many open files
        at java.net.Socket.createImpl(Socket.java:447)
        at java.net.Socket.getImpl(Socket.java:510)
        at java.net.Socket.setSoLinger(Socket.java:988)
        at org.apache.thrift.transport.TSocket.initSocket(TSocket.java:118)
        at org.apache.thrift.transport.TSocket.<init>(TSocket.java:109)

When running Cassandra with virtual nodes enabled, the number of virtual nodes seems to set a floor under the number of mappers. Cassandra may generate more mappers than virtual nodes for clusters with lots of data, but it seems to generate at least as many mappers as there are virtual nodes even though the cluster might be empty or close to empty. The default is 256 as of this writing.

Each mapper opens and quickly closes several sockets to Cassandra. The kernel on the client side of those closed sockets goes into asynchronous TIME_WAIT, since Thrift uses SO_LINGER. Only a small number of sockets are open at any one time — usually low single digits — but potentially many lingering sockets can accumulate in TIME_WAIT. This accumulation is most pronounced when running a reindex job locally (not on a distributed MapReduce cluster), since all of those client-side TIME_WAIT sockets are lingering on a single client machine instead of being spread out across many machines in a cluster. Combined with the floor of 256 mappers, a reindex job can open thousands of sockets of the course of its execution. When these sockets all linger in TIME_WAIT on the same client, they have the potential to reach the open-files ulimit, which also controls the number of open sockets. The open-files ulimit is often set to 1024.

Here are a few suggestions for dealing with the "Too many open files" problem during reindexing on a single machine:

  • Reduce the maximum size of the Cassandra connection pool. For example, consider setting the cassandrathrift storage backend’s max-active and max-idle options to 1 each, and setting max-total to -1. See Chapter 12, Configuration Reference for full listings of connection pool settings on the Cassandra storage backends.
  • Increase the nofile ulimit. The ideal value depends on the size of the Cassandra dataset and the throughput of the reindex mappers; if starting at 1024, try an order of magnitude larger: 10000. This is just necessary to sustain lingering TIME_WAIT sockets. The reindex job won’t try to open nearly that many sockets at once.
  • Run the reindex task on a multi-node MapReduce cluster to spread out the socket load.