- Developers
-
-
-
Delivering purpose-built software solutions that build lasting client relations.
-
-
-
Web Development
- PHP
- Laravel
- WordPress
- Machine Learning
- UI/UX Engineers
-
-
-
Mobile Development
- Swift
- Android
- Java
- React Native
-
-
-
- Designers
-
-
-
Designing applications that reflect the values of your brand.
-
-
-
- About Us
- Blog
- Case Study
- Contact Us
vteam #679 has been working on a client’s web application of data analysis. This application is able to sift through the vast amounts of information available to its users through social media, news and blogs and identify trends and patterns. In the previous article, graph database was implemented using Neo4j and Python.
Now the client wanted vteams engineer Usman Maqbool to migrate the social media data from relational database to graph database. Reason being that the social media data was taking too much time while working with relations as more than 145 millions of records existed in the database.
Solution
The data was getting larger day by day and since the RDBMS was taking a lot of time to run the query of relations for such a huge data, a solution (that works fast with relations as compared to RDBMS) to use graph database (Neo4j) was proposed.
To apply the solution, Usman had to migrate the data from RDBMS to Neo4j and implement Graph Database in the current live running project.
Migration
The migration of live data from RDBMS to Neo4j wasn’t possible, so it was decided to migrate the data in chunks using celery (v:3.1.23). To install django-celery via the Python Package Index (PyPI), run the following command:
pip install django-celery
Once the installation is done, add the following code in settings.py file:
INSTALLED_APPS = ( … 'djcelery', … ) CELERY_TASK_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] CELERY_RESULT_SERIALIZER = 'json' CELERY_RESULT_BACKEND = 'amqp' BROKER_CONNECTION_TIMEOUT = 30 BROKER_POOL_LIMIT = 1 # Will decrease connection usage CELERY_ENABLE_UTC=True CELERY_RESULT_BACKEND = 'djcelery.backends.database.DatabaseBackend' CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler" CELERY_SEND_EVENTS = False CELERY_EVENT_QUEUE_EXPIRE = 60 CELERY_EVENT_QUEUE_TTL = 10 CELERY_IGNORE_RESULT = True
Now, add the following migration script in script.py file:
from celery.decorators import periodic_task from celery.schedules import crontab import py2neo from py2neo import ServiceRoot # for Graphenedb add-on settings graphenedb_url = os.environ.get("GRAPHENEDB_URL") graph = ServiceRoot(graphenedb_url).graph conn = psycopg2.connect("dbname='db_name' user='user' password='password' host='host' port='5432'") @periodic_task(run_every=crontab(minute='*/6')) def rdbms_data_migration(): # to get max_id for previously migrated data. start_id_query = """MATCH (n:LabelName1) return MAX(n.id) as max_id""" id = graph.cypher.execute(start_id_query) # if script run very first time then post_id = 1 is hard coded. Depending upon id in RDBMS if not id[0].max_id: start_id = 1 else: start_id = int(id[0].max_id) end_id = start_id + 49 query = """SELECT * from table where id BETWEEN %(start_id)s AND %(end_id)s order by id""" \ %{"start_id": start_id, "end_id": end_id} cur = conn.cursor() cur.execute(query) rows = cur.fetchall() if not len(rows) == 0: n = 100 for rows_chunk in chunks(rows, n): s = 'CREATE ' for i, row in enumerate(rows_chunk): id = row[0] date = row[1] name = row[2] …... # Cypher query to migrate Facebook Posts from RDBMS to GrapheneDB s += ''' ''' + '''MERGE (lbname%(i)s:LabelName1 {id: %(id)s, name: "%(name)s", date: "%(date)s"})''' % {"id": int(id), "name": name, "date": date, "i": I} s = s[:-2] graph.cypher.execute(s) # to make relations between LabelName1 and LableName2 RELATION_NAME = '''Match (a:LableName1) WHERE a.id > %(start_id)s AND a.id < %(end_id)s With a MATCH(b:LabelName2) WHERE a.id = b.id with a,b MERGE (a)<-[rel:RELATION]-(b)''' \ % {"startid": start_id, "end_id": end_id} # method to divide data into chunks def chunks(l, n): """Yield successive n-sized chunks from l.""" for i in range(0, len(l), n): yield l[i:i+n]
To connect Cypher and Python, py2neo library was used. In the above mentioned script, the data has been divided into 50 chunks to avoid java heap error.
0 Comments