Skip to content

Commit

Permalink
optimize ingestion pipeline deduping (run-llama#14858)
Browse files Browse the repository at this point in the history
  • Loading branch information
logan-markewich authored Jul 20, 2024
1 parent 97ac7e8 commit e0433cf
Showing 1 changed file with 8 additions and 10 deletions.
18 changes: 8 additions & 10 deletions llama-index-core/llama_index/core/ingestion/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ def _handle_upserts(
"""Handle docstore upserts by checking hashes and ids."""
assert self.docstore is not None

existing_doc_ids_before = set(self.docstore.get_all_document_hashes().values())
doc_ids_from_nodes = set()
deduped_nodes_to_run = {}
for node in nodes:
Expand All @@ -410,22 +409,22 @@ def _handle_upserts(
existing_hash = self.docstore.get_document_hash(ref_doc_id)
if not existing_hash:
# document doesn't exist, so add it
self.docstore.set_document_hash(ref_doc_id, node.hash)
deduped_nodes_to_run[ref_doc_id] = node
elif existing_hash and existing_hash != node.hash:
self.docstore.delete_ref_doc(ref_doc_id, raise_error=False)

if self.vector_store is not None:
self.vector_store.delete(ref_doc_id)

self.docstore.set_document_hash(ref_doc_id, node.hash)

deduped_nodes_to_run[ref_doc_id] = node
else:
continue # document exists and is unchanged, so skip it

if self.docstore_strategy == DocstoreStrategy.UPSERTS_AND_DELETE:
# Identify missing docs and delete them from docstore and vector store
existing_doc_ids_before = set(
self.docstore.get_all_document_hashes().values()
)
doc_ids_to_delete = existing_doc_ids_before - doc_ids_from_nodes
for ref_doc_id in doc_ids_to_delete:
self.docstore.delete_document(ref_doc_id)
Expand All @@ -434,6 +433,7 @@ def _handle_upserts(
self.vector_store.delete(ref_doc_id)

nodes_to_run = list(deduped_nodes_to_run.values())
self.docstore.set_document_hashes({n.id_: n.hash for n in nodes_to_run})
self.docstore.add_documents(nodes_to_run, store_text=store_doc_text)

return nodes_to_run
Expand Down Expand Up @@ -585,9 +585,6 @@ async def _ahandle_upserts(
"""Handle docstore upserts by checking hashes and ids."""
assert self.docstore is not None

existing_doc_ids_before = set(
(await self.docstore.aget_all_document_hashes()).values()
)
doc_ids_from_nodes = set()
deduped_nodes_to_run = {}
for node in nodes:
Expand All @@ -596,22 +593,22 @@ async def _ahandle_upserts(
existing_hash = await self.docstore.aget_document_hash(ref_doc_id)
if not existing_hash:
# document doesn't exist, so add it
await self.docstore.aset_document_hash(ref_doc_id, node.hash)
deduped_nodes_to_run[ref_doc_id] = node
elif existing_hash and existing_hash != node.hash:
await self.docstore.adelete_ref_doc(ref_doc_id, raise_error=False)

if self.vector_store is not None:
await self.vector_store.adelete(ref_doc_id)

await self.docstore.aset_document_hash(ref_doc_id, node.hash)

deduped_nodes_to_run[ref_doc_id] = node
else:
continue # document exists and is unchanged, so skip it

if self.docstore_strategy == DocstoreStrategy.UPSERTS_AND_DELETE:
# Identify missing docs and delete them from docstore and vector store
existing_doc_ids_before = set(
(await self.docstore.aget_all_document_hashes()).values()
)
doc_ids_to_delete = existing_doc_ids_before - doc_ids_from_nodes
for ref_doc_id in doc_ids_to_delete:
await self.docstore.adelete_document(ref_doc_id)
Expand All @@ -621,6 +618,7 @@ async def _ahandle_upserts(

nodes_to_run = list(deduped_nodes_to_run.values())
await self.docstore.async_add_documents(nodes_to_run, store_text=store_doc_text)
await self.docstore.aset_document_hashes({n.id_: n.hash for n in nodes_to_run})

return nodes_to_run

Expand Down

0 comments on commit e0433cf

Please sign in to comment.