我已经尝试了使用py2neo上传中等大小的数据集的方法。以我为例,每天大约需要加载80 K个节点和400 K边缘。我想分享我的经验,并询问社区是否还有我没有遇到过的更好的方法。
使用创建节点,graph.merge_one()
并使用设置属性push()
。我已经非常快地消除了这个问题,因为它非常缓慢,甚至在几分钟内都不会超过1万条记录。毫不奇怪,py2neo的文档和此处的一些帖子推荐使用Cypher。
使用py2neo.cypher.CypherTransaction
append()
中环和commit()
底。
# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query)
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
# begin new Cypher transaction
tx = neoGraph.cypher.begin()
for row in result:
tx.append(statement, {"ID": row.id_field})
tx.commit()
这超时并导致Neo4j服务器崩溃。我知道问题在于所有80 K Cypher语句都试图一次执行。
我使用计数器和process()
命令来一次运行1000条语句。
# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query)
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
counter = 0
tx = neoGraph.cypher.begin()
for row in result:
counter += 1
tx.append(statement, {"ID": row.id_field})
if (counter == 1000):
tx.process() # process 1000 statements
counter = 0
tx.commit()
它在开始时运行很快,但是在处理1000个事务时变慢了。最终,它在堆栈溢出中超时。这是令人惊讶的,因为我希望process()
每次都重置堆栈。
这是唯一运行良好的版本。难道commit()
1000个交易的每一个分区,并重新开始一个新的事务begin()
。
# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query)
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
counter = 0
tx = neoGraph.cypher.begin()
for row in result:
counter += 1
tx.append(statement, {"ID": row.id_field})
if (counter == 1000):
tx.commit() # commit 1000 statements
tx = neoGraph.cypher.begin() # reopen transaction
counter = 0
tx.commit()
这运行得很好。
任何意见?
正如您通过反复试验发现的那样,单个事务的操作不超过约10K-50K时,其性能最佳。在D中描述的方法最有效,因为您每1000条语句提交一次事务。您可能可以安全地增加该批次的大小。
您可能想尝试的另一种方法是将值数组作为参数传递,并使用Cypher的UNWIND
命令对其进行迭代。例如:
WITH {id_array} AS ids // something like [1,2,3,4,5,6]
UNWIND ids AS ident
MERGE (e:Entity {myid: ident})
SET e.p = 1
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句