Chapter 28. Index Management

28.1. Reindexing

Section 8.1, “Graph Index” and Section 8.2, “Vertex-centric Index” describe how to build graph-global and vertex-centric indexes to improve query performance. These indexes are immediately available if the indexed keys or labels have been newly defined in the same management transaction. In this case, there is no need to reindex the graph and this section can be skipped. If the indexed keys or labels already existed prior to index construction it is necessary to reindex the entire graph in order to ensure that the index contains previously added elements. This section describes the reindexing process.

[Warning]Warning

Reindexing is a manual process comprised of multiple steps. These steps must be carefully followed in the right order to avoid index inconsistencies.

28.1.1. Overview

Titan can begin writing incremental index updates right after an index is defined. However, before the index is complete and usable, Titan must also take a one-time read pass over all existing graph elements associated with the newly indexed schema type(s). Once this reindexing job has completed, the index is fully populated and ready to be used. The index must then be enabled to be used during query processing.

28.1.2. Prior to Reindex

The starting point of the reindexing process is the construction of an index. Refer to Chapter 8, Indexing for better Performance for a complete discussion of global graph and vertex-centric indexes. Note, that a global graph index is uniquely identified by its name. A vertex-centric index is uniquely identified by the combination of its name and the edge label or property key on which the index is defined - the name of the latter is referred to as the index type in this section and only applies to vertex-centric indexes.

After building a new index against existing schema elements it is recommended to wait a few minutes for the index to be announced to the cluster. Note the index name (and the index type in case of a vertex-centric index) since this information is needed when reindexing.

28.1.3. Preparing to Reindex

There is a choice between two execution frameworks for reindex jobs:

  • MapReduce
  • TitanManagement

Reindex on MapReduce supports large, horizontally-distributed databases. Reindex on TitanManagement spawns a single-machine OLAP job. This is intended for convenience and speed on those databases small enough to be handled by one machine.

Reindexing requires:

  • The index name (a string — the user provides this to Titan when building a new index)
  • The index type (a string — the name of the edge label or property key on which the vertex-centric index is built). This applies only to vertex-centric indexes - leave blank for global graph indexes.

28.1.4. Executing a Reindex Job on MapReduce

The recommended way to generate and run a reindex job on MapReduce is through the MapReduceIndexManagement class. Here is a rough outline of the steps to run a reindex job using this class:

  • Open a TitanGraph instance
  • Pass the graph instance into MapReduceIndexManagement's constructor
  • Call updateIndex(<index>, SchemaAction.REINDEX) on the MapReduceIndexManagement instance
  • If the index has not yet been enabled, enable it through TitanManagement

This class implements an updateIndex method that supports only the REINDEX and REMOVE_INDEX actions for its SchemaAction parameter. The class starts a Hadoop MapReduce job using the Hadoop configuration and jars on the classpath. Both Hadoop 1 and 2 are supported. This class gets metadata about the index and storage backend (e.g. the Cassandra partitioner) from the TitanGraph instance given to its constructor.

graph = TitanFactory.open(...)
mgmt = graph.openManagement()
mr = new MapReduceIndexManagement(graph)
mr.updateIndex(mgmt.getRelationIndex(mgmt.getRelationType("battled"), "battlesByTime"), SchemaAction.REINDEX).get()
mgmt.commit()

28.1.4.1. Reindex Example on MapReduce

The following Gremlin snippet outlines all steps of the MapReduce reindex process in one self-contained example using minimal dummy data against the Cassandra storage backend.

// Open a graph
graph = TitanFactory.open("conf/titan-cassandra-es.properties")
g = graph.traversal()

// Define a property
mgmt = graph.openManagement()
desc = mgmt.makePropertyKey("desc").dataType(String.class).make()
mgmt.commit()

// Insert some data
graph.addVertex("desc", "foo bar")
graph.addVertex("desc", "foo baz")
graph.tx().commit()

// Run a query -- note the planner warning recommending the use of an index
g.V().has("desc", containsText("baz"))

// Create an index
mgmt = graph.openManagement()

desc = mgmt.getPropertyKey("desc")
mixedIndex = mgmt.buildIndex("mixedExample", Vertex.class).addKey(desc).buildMixedIndex("search")
mgmt.commit()

// Rollback or commit transactions on the graph which predate the index definition
graph.tx().rollback()

// Block until the SchemaStatus transitions from INSTALLED to REGISTERED
report = mgmt.awaitGraphIndexStatus(graph, "mixedExample").call()

// Run a Titan-Hadoop job to reindex
mgmt = graph.openManagement()
mr = new MapReduceIndexManagement(graph)
mr.updateIndex(mgmt.getGraphIndex("mixedExample"), SchemaAction.REINDEX).get()

// Enable the index
mgmt = graph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("mixedExample"), SchemaAction.ENABLE_INDEX).get()
mgmt.commit()

// Block until the SchemaStatus is ENABLED
mgmt = graph.openManagement()
report = mgmt.awaitGraphIndexStatus(graph, "mixedExample").status(SchemaStatus.ENABLED).call()
mgmt.rollback()

// Run a query -- Titan will use the new index, no planner warning
g.V().has("desc", containsText("baz"))

// Concerned that Titan could have read cache in that last query, instead of relying on the index?
// Start a new instance to rule out cache hits.  Now we're definitely using the index.
graph.close()
graph = TitanFactory.open("conf/titan-cassandra-es.properties")
g.V().has("desc", containsText("baz"))

28.1.5. Executing a Reindex job on TitanManagement

To run a reindex job on TitanManagement, invoke TitanManagement.updateIndex with the SchemaAction.REINDEX argument. For example:

m = graph.openManagement()
i = m.getGraphIndex('indexName')
m.updateIndex(i, SchemaAction.REINDEX).get()
m.commit()

28.1.5.1. Example for TitanManagement

The following loads some sample data into a BerkeleyDB-backed Titan database, defines an index after the fact, reindexes using TitanManagement, and finally enables and uses the index:

import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem

// Load some data from a file without any predefined schema
graph = TitanFactory.open('conf/titan-berkeleyje.properties')
g = graph.traversal()
m = graph.openManagement()
m.makePropertyKey('name').dataType(String.class).cardinality(Cardinality.LIST).make()
m.makePropertyKey('lang').dataType(String.class).cardinality(Cardinality.LIST).make()
m.makePropertyKey('age').dataType(Integer.class).cardinality(Cardinality.LIST).make()
m.commit()
graph.io(IoCore.gryo()).readGraph('data/tinkerpop-modern.gio')
graph.tx().commit()

// Run a query -- note the planner warning recommending the use of an index
g.V().has('name', 'lop')
graph.tx().rollback()

// Create an index
m = graph.openManagement()
m.buildIndex('names', Vertex.class).addKey(m.getPropertyKey('name')).buildCompositeIndex()
m.commit()
graph.tx().commit()

// Block until the SchemaStatus transitions from INSTALLED to REGISTERED
ManagementSystem.awaitGraphIndexStatus(graph, 'names').status(SchemaStatus.REGISTERED).call()

// Reindex using TitanManagement
m = graph.openManagement()
i = m.getGraphIndex('names')
m.updateIndex(i, SchemaAction.REINDEX)
m.commit()

// Enable the index
ManagementSystem.awaitGraphIndexStatus(graph, 'names').status(SchemaStatus.ENABLED).call()

// Run a query -- Titan will use the new index, no planner warning
g.V().has('name', 'lop')
graph.tx().rollback()

// Concerned that Titan could have read cache in that last query, instead of relying on the index?
// Start a new instance to rule out cache hits.  Now we're definitely using the index.
graph.close()
graph = TitanFactory.open("conf/titan-berkeleyje.properties")
g = graph.traversal()
g.V().has('name', 'lop')

28.2. Index Removal

[Warning]Warning

Index removal is a manual process comprised of multiple steps. These steps must be carefully followed in the right order to avoid index inconsistencies.

28.2.1. Overview

Index removal is a two-stage process. In the first stage, one Titan signals to all others via the storage backend that the index is slated for deletion. This changes the index’s state to DISABLED. At that point, Titan stops using the index to answer queries and stops incrementally updating the index. Index-related data in the storage backend remains present but ignored.

The second stage depends on whether the index is mixed or composite. A composite index can be deleted via Titan. As with reindexing, removal can be done through either MapReduce or TitanManagement. However, a mixed index must be manually dropped in the index backend; Titan does not provide an automated mechanism to delete an index from its index backend.

Index removal deletes everything associated with the index except its schema definition and its DISABLED state. This schema stub for the index remains even after deletion, though its storage footprint is negligible and fixed.

28.2.2. Preparing for Index Removal

If the index is currently enabled, it should first be disabled. This is done through the ManagementSystem.

mgmt = graph.openManagement()
rindex = mgmt.getRelationIndex(mgmt.getRelationType("battled"), "battlesByTime")
mgmt.updateIndex(rindex, SchemaAction.DISABLE_INDEX).get()
gindex = mgmt.getGraphIndex("byName")
mgmt.updateIndex(gindex, SchemaAction.DISABLE_INDEX).get()
mgmt.commit()

Once the status of all keys on the index changes to DISABLED, the index is ready to be removed. A utility in ManagementSystem can automate the wait-for-DISABLED step:

ManagementSystem.awaitGraphIndexStatus(graph, 'byName').status(SchemaStatus.DISABLED).call()

After a composite index is DISABLED, there is a choice between two execution frameworks for its removal:

  • MapReduce
  • TitanManagement

Index removal on MapReduce supports large, horizontally-distributed databases. Inedx removal on TitanManagement spawns a single-machine OLAP job. This is intended for convenience and speed on those databases small enough to be handled by one machine.

Index removal requires:

  • The index name (a string — the user provides this to Titan when building a new index)
  • The index type (a string — the name of the edge label or property key on which the vertex-centric index is built). This applies only to vertex-centric indexes - leave blank for global graph indexes.

As noted in the overview, a mixed index must be manually dropped from the indexing backend. Neither the MapReduce framework nor the TitanManagement framework will delete a mixed backend from the indexing backend.

28.2.3. Executing an Index Removal Job on MapReduce

As with reindexing, the recommended way to generate and run an index removal job on MapReduce is through the MapReduceIndexManagement class. Here is a rough outline of the steps to run an index removal job using this class:

  • Open a TitanGraph instance
  • If the index has not yet been disabled, disable it through TitanManagement
  • Pass the graph instance into MapReduceIndexManagement's constructor
  • Call updateIndex(<index>, SchemaAction.REMOVE_INDEX)

A commented code example follows in the next subsection.

28.2.3.1. Example for MapReduce

import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem

// Load the "Graph of the Gods" sample data
graph = TitanFactory.open('conf/titan-cassandra-es.properties')
g = graph.traversal()
GraphOfTheGodsFactory.load(graph)

g.V().has('name', 'jupiter')

// Disable the "name" composite index
m = graph.openManagement()
nameIndex = m.getGraphIndex('name')
m.updateIndex(nameIndex, SchemaAction.DISABLE_INDEX).get()
m.commit()
graph.tx().commit()

// Block until the SchemaStatus transitions from INSTALLED to REGISTERED
ManagementSystem.awaitGraphIndexStatus(graph, 'name').status(SchemaStatus.DISABLED).call()

// Delete the index using MapReduceIndexJobs
m = graph.openManagement()
mr = new MapReduceIndexManagement(graph)
future = mr.updateIndex(m.getGraphIndex('name'), SchemaAction.REMOVE_INDEX)
m.commit()
graph.tx().commit()
future.get()

// Index still shows up in management interface as DISABLED -- this is normal
m = graph.openManagement()
idx = m.getGraphIndex('name')
idx.getIndexStatus(m.getPropertyKey('name'))
m.rollback()

// Titan should issue a warning about this query requiring a full scan
g.V().has('name', 'jupiter')

28.2.4. Executing an Index Removal job on TitanManagement

To run an index removal job on TitanManagement, invoke TitanManagement.updateIndex with the SchemaAction.REMOVE_INDEX argument. For example:

m = graph.openManagement()
i = m.getGraphIndex('indexName')
m.updateIndex(i, SchemaAction.REMOVE_INDEX).get()
m.commit()

28.2.4.1. Example for TitanManagement

The following loads some indexed sample data into a BerkeleyDB-backed Titan database, then disables and removes the index through TitanManagement:

import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem

// Load the "Graph of the Gods" sample data
graph = TitanFactory.open('conf/titan-cassandra-es.properties')
g = graph.traversal()
GraphOfTheGodsFactory.load(graph)

g.V().has('name', 'jupiter')

// Disable the "name" composite index
m = graph.openManagement()
nameIndex = m.getGraphIndex('name')
m.updateIndex(nameIndex, SchemaAction.DISABLE_INDEX).get()
m.commit()
graph.tx().commit()

// Block until the SchemaStatus transitions from INSTALLED to REGISTERED
ManagementSystem.awaitGraphIndexStatus(graph, 'name').status(SchemaStatus.DISABLED).call()

// Delete the index using TitanManagement
m = graph.openManagement()
nameIndex = m.getGraphIndex('name')
future = m.updateIndex(nameIndex, SchemaAction.REMOVE_INDEX)
m.commit()
graph.tx().commit()

future.get()

m = graph.openManagement()
nameIndex = m.getGraphIndex('name')

g.V().has('name', 'jupiter')

28.3. Common Problems with Index Management

28.3.1. IllegalArgumentException when starting job

When a reindexing job is started shortly after a the index has been built, the job might fail with an exception like one of the following:

The index mixedExample is in an invalid state and cannot be indexed.
The following index keys have invalid status: desc has status INSTALLED
(status must be one of [REGISTERED, ENABLED])
The index mixedExample is in an invalid state and cannot be indexed.
The index has status INSTALLED, but one of [REGISTERED, ENABLED] is required

When an index is built, its existence is broadcast to all other Titan instances in the cluster. Those must acknowledge the existence of the index before the reindexing process can be started. The acknowledgements can take a while to come in depending on the size of the cluster and the connection speed. Hence, one should wait a few minutes after building the index and before starting the reindex process.

Note, that the acknowledgement might fail due to Titan instance failure. In other words, the cluster might wait indefinitely on the acknowledgement of a failed instance. In this case, the user must manually remove the failed instance from the cluster registry as described in Chapter 27, Failure & Recovery. After the cluster state has been restored, the acknowledgement process must be reinitiated by manually registering the index again in the management system.

mgmt = graph.openManagement()
rindex = mgmt.getRelationIndex(mgmt.getRelationType("battled"),"battlesByTime")
mgmt.updateIndex(rindex, SchemaAction.REGISTER_INDEX).get()
gindex = mgmt.getGraphIndex("byName")
mgmt.updateIndex(gindex, SchemaAction.REGISTER_INDEX).get()
mgmt.commit()

After waiting a few minutes for the acknowledgement to arrive the reindex job should start successfully.

28.3.2. Could not find index

This exception in the reindexing job indicates that an index with the given name does not exist or that the name has not been specified correctly. When reindexing a global graph index, only the name of the index as defined when building the index should be specified. When reindexing a global graph index, the name of the index must be given in addition to the name of the edge label or property key on which the vertex-centric index is defined.

28.3.3. Cassandra Mappers Fail with "Too many open files"

The end of the exception stacktrace may look like this:

java.net.SocketException: Too many open files
        at java.net.Socket.createImpl(Socket.java:447)
        at java.net.Socket.getImpl(Socket.java:510)
        at java.net.Socket.setSoLinger(Socket.java:988)
        at org.apache.thrift.transport.TSocket.initSocket(TSocket.java:118)
        at org.apache.thrift.transport.TSocket.<init>(TSocket.java:109)

When running Cassandra with virtual nodes enabled, the number of virtual nodes seems to set a floor under the number of mappers. Cassandra may generate more mappers than virtual nodes for clusters with lots of data, but it seems to generate at least as many mappers as there are virtual nodes even though the cluster might be empty or close to empty. The default is 256 as of this writing.

Each mapper opens and quickly closes several sockets to Cassandra. The kernel on the client side of those closed sockets goes into asynchronous TIME_WAIT, since Thrift uses SO_LINGER. Only a small number of sockets are open at any one time — usually low single digits — but potentially many lingering sockets can accumulate in TIME_WAIT. This accumulation is most pronounced when running a reindex job locally (not on a distributed MapReduce cluster), since all of those client-side TIME_WAIT sockets are lingering on a single client machine instead of being spread out across many machines in a cluster. Combined with the floor of 256 mappers, a reindex job can open thousands of sockets of the course of its execution. When these sockets all linger in TIME_WAIT on the same client, they have the potential to reach the open-files ulimit, which also controls the number of open sockets. The open-files ulimit is often set to 1024.

Here are a few suggestions for dealing with the "Too many open files" problem during reindexing on a single machine:

  • Reduce the maximum size of the Cassandra connection pool. For example, consider setting the cassandrathrift storage backend’s max-active and max-idle options to 1 each, and setting max-total to -1. See Chapter 12, Configuration Reference for full listings of connection pool settings on the Cassandra storage backends.
  • Increase the nofile ulimit. The ideal value depends on the size of the Cassandra dataset and the throughput of the reindex mappers; if starting at 1024, try an order of magnitude larger: 10000. This is just necessary to sustain lingering TIME_WAIT sockets. The reindex job won’t try to open nearly that many sockets at once.
  • Run the reindex task on a multi-node MapReduce cluster to spread out the socket load.