vteam #679 has been working on a client’s web application of data analysis. This application is able to sift through the vast amounts of information available to its users through social media, news and blogs and identify trends and patterns. In the previous article, graph database was implemented using Neo4j and Python.
Now the client wanted vteams engineer Usman Maqbool to migrate the social media data from relational database to graph database. Reason being that the social media data was taking too much time while working with relations as more than 145 millions of records existed in the database.
Solution
The data was getting larger day by day and since the RDBMS was taking a lot of time to run the query of relations for such a huge data, a solution (that works fast with relations as compared to RDBMS) to use graph database (Neo4j) was proposed.
To apply the solution, Usman had to migrate the data from RDBMS to Neo4j and implement Graph Database in the current live running project.
Migration
The migration of live data from RDBMS to Neo4j wasn’t possible, so it was decided to migrate the data in chunks using celery (v:3.1.23). To install django-celery via the Python Package Index (PyPI), run the following command:
pip install django-celery
Once the installation is done, add the following code in settings.py file:
INSTALLED_APPS = (
…
'djcelery',
…
)
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_RESULT_BACKEND = 'amqp'
BROKER_CONNECTION_TIMEOUT = 30
BROKER_POOL_LIMIT = 1 # Will decrease connection usage
CELERY_ENABLE_UTC=True
CELERY_RESULT_BACKEND = 'djcelery.backends.database.DatabaseBackend'
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
CELERY_SEND_EVENTS = False
CELERY_EVENT_QUEUE_EXPIRE = 60
CELERY_EVENT_QUEUE_TTL = 10
CELERY_IGNORE_RESULT = True
Now, add the following migration script in script.py file:
from celery.decorators import periodic_task
from celery.schedules import crontab
import py2neo
from py2neo import ServiceRoot
# for Graphenedb add-on settings
graphenedb_url = os.environ.get("GRAPHENEDB_URL")
graph = ServiceRoot(graphenedb_url).graph
conn = psycopg2.connect("dbname='db_name' user='user' password='password' host='host' port='5432'")
@periodic_task(run_every=crontab(minute='*/6'))
def rdbms_data_migration():
# to get max_id for previously migrated data.
start_id_query = """MATCH (n:LabelName1) return MAX(n.id) as max_id"""
id = graph.cypher.execute(start_id_query)
# if script run very first time then post_id = 1 is hard coded. Depending upon id in RDBMS
if not id[0].max_id:
start_id = 1
else:
start_id = int(id[0].max_id)
end_id = start_id + 49
query = """SELECT * from table where id BETWEEN %(start_id)s AND %(end_id)s order by id"""
%{"start_id": start_id, "end_id": end_id}
cur = conn.cursor()
cur.execute(query)
rows = cur.fetchall()
if not len(rows) == 0:
n = 100
for rows_chunk in chunks(rows, n):
s = 'CREATE '
for i, row in enumerate(rows_chunk):
id = row[0]
date = row[1]
name = row[2]
…...
# Cypher query to migrate Facebook Posts from RDBMS to GrapheneDB
s += ''' ''' + '''MERGE (lbname%(i)s:LabelName1 {id: %(id)s, name: "%(name)s", date: "%(date)s"})''' % {"id": int(id), "name": name, "date": date, "i": I}
s = s[:-2]
graph.cypher.execute(s)
# to make relations between LabelName1 and LableName2
RELATION_NAME = '''Match (a:LableName1) WHERE a.id > %(start_id)s AND a.id < %(end_id)s With a MATCH(b:LabelName2) WHERE a.id = b.id with a,b MERGE (a)<-[rel:RELATION]-(b)'''
% {"startid": start_id, "end_id": end_id}
# method to divide data into chunks
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i+n]
To connect Cypher and Python, py2neo library was used. In the above mentioned script, the data has been divided into 50 chunks to avoid java heap error.
0 Comments