Chapter 32. Titan with TinkerPop’s Hadoop-Gremlin

Titan-Hadoop works with TinkerPop 3’s new hadoop-gremlin package for general-purpose OLAP.

Here’s a three step example showing some basic integrated Titan-TinkerPop functionality.

  1. Manually define schema and then load the Grateful Dead graph from a TP3 Kryo-serialized binary file
  2. Run a VertexProgram to compute PageRanks, writing the derived graph to output/~g
  3. Read the derived graph vertices and their computed rank values
[Warning]Warning

Titan 1.0.0’s integration with TinkerPop 3.0.0 is still under active development. The APIs and configuration snippets shown below may change as Titan 1.0.0 and TinkerPop 3.0.0 move through milestone releases and eventually their respective final releases. The content of this chapter should be considered a tech preview rather than stable reference.

32.1. Defining defining schema and loading data

bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(3)-oOOo-----
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
plugin activated: tinkerpop.tinkergraph
INFO  com.tinkerpop.gremlin.hadoop.structure.HadoopGraph  - HADOOP_GREMLIN_LIBS is set to: /usr/local/titan/lib
plugin activated: tinkerpop.hadoop
plugin activated: aurelius.titan
gremlin> :load data/grateful-dead-titan-schema.groovy
==>true
==>true
gremlin> graph = TitanFactory.open('conf/titan-cassandra.properties')
==>standardtitangraph[cassandrathrift:[127.0.0.1]]
gremlin> defineGratefulDeadSchema(graph)
==>null
gremlin> graph.close()
==>null
gremlin> hdfs.copyFromLocal('data/grateful-dead.kryo','data/grateful-dead.kryo')
==>null
gremlin> graph = GraphFactory.open('conf/hadoop-graph/hadoop-load.properties')
==>hadoopgraph[gryoinputformat->nulloutputformat]
gremlin> blvp = BulkLoaderVertexProgram.build().writeGraph('conf/titan-cassandra.properties').create(graph)
==>BulkLoaderVertexProgram[bulkLoader=IncrementalBulkLoader,vertexIdProperty=bulkLoader.vertex.id,userSuppliedIds=false,keepOriginalIds=true,batchSize=0]
gremlin> graph.compute(SparkGraphComputer).program(blvp).submit().get()
...
==>result[hadoopgraph[gryoinputformat->nulloutputformat],memory[size:0]]
gremlin>
# hadoop-load.properties

#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.inputLocation=./data/grateful-dead.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true

#
# GiraphGraphComputer Configuration
#
giraph.minWorkers=2
giraph.maxWorkers=2
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapred.map.child.java.opts=-Xmx1024m
mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000

#
# SparkGraphComputer Configuration
#
spark.master=local[*]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
// titan-schema-grateful-dead.groovy

def defineGratefulDeadSchema(titanGraph) {
    m = titanGraph.openManagement()
    // vertex labels
    artist = m.makeVertexLabel("artist").make()
    song   = m.makeVertexLabel("song").make()
    // edge labels
    sungBy     = m.makeEdgeLabel("sungBy").make()
    writtenBy  = m.makeEdgeLabel("writtenBy").make()
    followedBy = m.makeEdgeLabel("followedBy").make()
    // vertex and edge properties
    blid         = m.makePropertyKey("bulkLoader.vertex.id").dataType(Long.class).make()
    name         = m.makePropertyKey("name").dataType(String.class).make()
    songType     = m.makePropertyKey("songType").dataType(String.class).make()
    performances = m.makePropertyKey("performances").dataType(Integer.class).make()
    weight       = m.makePropertyKey("weight").dataType(Integer.class).make()
    // global indices
    m.buildIndex("byBulkLoaderVertexId", Vertex.class).addKey(blid).buildCompositeIndex()
    m.buildIndex("artistsByName", Vertex.class).addKey(name).indexOnly(artist).buildCompositeIndex()
    m.buildIndex("songsByName", Vertex.class).addKey(name).indexOnly(song).buildCompositeIndex()
    // vertex centric indices
    m.buildEdgeIndex(followedBy, "followedByWeight", Direction.BOTH, Order.decr, weight)
    m.commit()
}

32.2. Running PageRank

A fully functional example of the PageRankVertexProgram can be found in the VertexProgram section of the TinkerPop3 docs.