You are currently viewing Migrating Data from RDBMS to Graph Database

Migrating Data from RDBMS to Graph Database

  • Post published:May 23, 2017

vteam #679 has been working on a client’s web application of data analysis. This application is able to sift through the vast amounts of information available to its users through social media, news and blogs and identify trends and patterns. In the previous article, graph database was implemented using Neo4j and Python.

Now the client wanted vteams engineer Usman Maqbool to migrate the social media data from relational database to graph database. Reason being that the social media data was taking too much time while working with relations as more than 145 millions of records existed in the database.


The data was getting larger day by day and since the RDBMS was taking a lot of time to run the query of relations for such a huge data, a solution (that works fast with relations as compared to RDBMS) to use graph database (Neo4j) was proposed.

To apply the solution, Usman had to migrate the data from RDBMS to Neo4j and implement Graph Database in the current live running project.


The migration of live data from RDBMS to Neo4j wasn’t possible, so it was decided to migrate the data in chunks using celery (v:3.1.23). To install django-celery via the Python Package Index (PyPI), run the following command:

pip install django-celery

Once the installation is done, add the following code in file:

CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
BROKER_POOL_LIMIT = 1 # Will decrease connection usage
CELERY_RESULT_BACKEND = 'djcelery.backends.database.DatabaseBackend'
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"

Now, add the following migration script in file:

from celery.decorators import periodic_task
from celery.schedules import crontab
import py2neo
from py2neo import ServiceRoot

# for Graphenedb add-on settings
graphenedb_url = os.environ.get("GRAPHENEDB_URL")
graph = ServiceRoot(graphenedb_url).graph
conn = psycopg2.connect("dbname='db_name' user='user' password='password' host='host' port='5432'")

def rdbms_data_migration():
	# to get max_id for previously migrated data.
	start_id_query = """MATCH (n:LabelName1) return MAX( as max_id"""
	id = graph.cypher.execute(start_id_query)
	# if script run very first time then post_id = 1 is hard coded. Depending upon id in RDBMS
	if not id[0].max_id:
		start_id = 1
		start_id = int(id[0].max_id)
	end_id = start_id + 49
	query = """SELECT * from table where id BETWEEN %(start_id)s AND %(end_id)s order by id""" \  	
	%{"start_id": start_id, "end_id": end_id}
	cur = conn.cursor()
	rows = cur.fetchall()
	if not len(rows) == 0:
		n = 100
		for rows_chunk in chunks(rows, n):
			s = 'CREATE '
			for i, row in enumerate(rows_chunk):
				id = row[0]
				date = row[1]
				name = row[2]
			# Cypher query to migrate Facebook Posts from RDBMS to GrapheneDB
				s += ''' ''' + '''MERGE (lbname%(i)s:LabelName1 {id: %(id)s, name: "%(name)s", date: "%(date)s"})''' % {"id": int(id), "name": name, "date": date, "i": I}
			s = s[:-2]
	# to make relations between LabelName1 and LableName2
	RELATION_NAME = '''Match (a:LableName1) WHERE > %(start_id)s AND < %(end_id)s With a MATCH(b:LabelName2) WHERE = with a,b MERGE (a)<-[rel:RELATION]-(b)'''  \
	% {"startid": start_id, "end_id": end_id}

# method to divide data into chunks
def chunks(l, n):
	"""Yield successive n-sized chunks from l."""
	for i in range(0, len(l), n):
		yield l[i:i+n]

To connect Cypher and Python, py2neo library was used. In the above mentioned script, the data has been divided into 50 chunks to avoid java heap error.