Skip to content

Commit

Permalink
Remove documents from index which are not returned by connector
Browse files Browse the repository at this point in the history
  • Loading branch information
scriptator authored and Weves committed Feb 22, 2024
1 parent cc69ba0 commit 918bc38
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""add removed documents to index_attempt
Revision ID: 5f4b8568a221
Revises: dbaa756c2ccf
Create Date: 2024-02-16 15:02:03.319907
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "5f4b8568a221"
down_revision = "8987770549c0"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"index_attempt",
sa.Column("docs_removed_from_index", sa.Integer()),
)
op.execute("UPDATE index_attempt SET docs_removed_from_index = 0")


def downgrade() -> None:
op.drop_column("index_attempt", "docs_removed_from_index")
4 changes: 4 additions & 0 deletions backend/danswer/background/connector_deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ def _delete_connector_credential_pair_batch(
credential_id: int,
document_index: DocumentIndex,
) -> None:
"""
Removes a batch of documents ids from a cc-pair. If no other cc-pair uses a document anymore
it gets permanently deleted.
"""
with Session(get_sqlalchemy_engine()) as db_session:
# acquire lock for all documents in this batch so that indexing can't
# override the deletion
Expand Down
57 changes: 52 additions & 5 deletions backend/danswer/background/indexing/run_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

from sqlalchemy.orm import Session

from danswer.background.connector_deletion import (
_delete_connector_credential_pair_batch,
)
from danswer.background.indexing.checkpointing import get_time_windows_for_index_attempt
from danswer.configs.app_configs import POLL_CONNECTOR_OFFSET
from danswer.connectors.factory import instantiate_connector
Expand All @@ -18,6 +21,7 @@
from danswer.db.connector_credential_pair import get_last_successful_attempt_time
from danswer.db.connector_credential_pair import update_connector_credential_pair
from danswer.db.credentials import backend_update_credential_json
from danswer.db.document import get_documents_for_connector_credential_pair
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import get_index_attempt
from danswer.db.index_attempt import mark_attempt_failed
Expand All @@ -41,8 +45,14 @@ def _get_document_generator(
attempt: IndexAttempt,
start_time: datetime,
end_time: datetime,
) -> GenerateDocumentsOutput:
"""NOTE: `start_time` and `end_time` are only used for poll connectors"""
) -> tuple[GenerateDocumentsOutput, bool]:
"""
NOTE: `start_time` and `end_time` are only used for poll connectors
Returns an interator of document batches and whether the returned documents
are the complete list of existing documents of the connector. If the task
of type LOAD_STATE, the list will be considered complete and otherwise incomplete.
"""
task = attempt.connector.input_type

try:
Expand All @@ -64,7 +74,7 @@ def _get_document_generator(
if task == InputType.LOAD_STATE:
assert isinstance(runnable_connector, LoadConnector)
doc_batch_generator = runnable_connector.load_from_state()

is_listing_complete = True
elif task == InputType.POLL:
assert isinstance(runnable_connector, PollConnector)
if attempt.connector_id is None or attempt.credential_id is None:
Expand All @@ -77,12 +87,13 @@ def _get_document_generator(
doc_batch_generator = runnable_connector.poll_source(
start=start_time.timestamp(), end=end_time.timestamp()
)
is_listing_complete = False

else:
# Event types cannot be handled by a background type
raise RuntimeError(f"Invalid task type: {task}")

return doc_batch_generator
return doc_batch_generator, is_listing_complete


def _run_indexing(
Expand Down Expand Up @@ -162,14 +173,15 @@ def _run_indexing(
datetime(1970, 1, 1, tzinfo=timezone.utc),
)

doc_batch_generator = _get_document_generator(
doc_batch_generator, is_listing_complete = _get_document_generator(
db_session=db_session,
attempt=index_attempt,
start_time=window_start,
end_time=window_end,
)

try:
all_connector_doc_ids = set()
for doc_batch in doc_batch_generator:
# Check if connector is disabled mid run and stop if so unless it's the secondary
# index being built. We want to populate it even for paused connectors
Expand Down Expand Up @@ -202,6 +214,7 @@ def _run_indexing(
net_doc_change += new_docs
chunk_count += total_batch_chunks
document_count += len(doc_batch)
all_connector_doc_ids.update(doc.id for doc in doc_batch)

# commit transaction so that the `update` below begins
# with a brand new transaction. Postgres uses the start
Expand All @@ -216,6 +229,40 @@ def _run_indexing(
index_attempt=index_attempt,
total_docs_indexed=document_count,
new_docs_indexed=net_doc_change,
docs_removed_from_index=0,
)

if is_listing_complete:
# clean up all documents from the index that have not been returned from the connector
all_indexed_document_ids = {
d.id
for d in get_documents_for_connector_credential_pair(
db_session=db_session,
connector_id=index_attempt.connector_id,
credential_id=index_attempt.credential_id,
)
}
doc_ids_to_remove = list(
all_indexed_document_ids - all_connector_doc_ids
)
logger.debug(
f"Cleaning up {len(doc_ids_to_remove)} documents that are not contained in the newest connector state"
)

# delete docs from cc-pair and receive the number of completely deleted docs in return
_delete_connector_credential_pair_batch(
document_ids=doc_ids_to_remove,
connector_id=index_attempt.connector_id,
credential_id=index_attempt.credential_id,
document_index=document_index,
)

update_docs_indexed(
db_session=db_session,
index_attempt=index_attempt,
total_docs_indexed=document_count,
new_docs_indexed=net_doc_change,
docs_removed_from_index=len(doc_ids_to_remove),
)

run_end_dt = window_end
Expand Down
2 changes: 2 additions & 0 deletions backend/danswer/db/index_attempt.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,11 @@ def update_docs_indexed(
index_attempt: IndexAttempt,
total_docs_indexed: int,
new_docs_indexed: int,
docs_removed_from_index: int,
) -> None:
index_attempt.total_docs_indexed = total_docs_indexed
index_attempt.new_docs_indexed = new_docs_indexed
index_attempt.docs_removed_from_index = docs_removed_from_index

db_session.add(index_attempt)
db_session.commit()
Expand Down
1 change: 1 addition & 0 deletions backend/danswer/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ class IndexAttempt(Base):
# The two below may be slightly out of sync if user switches Embedding Model
new_docs_indexed: Mapped[int | None] = mapped_column(Integer, default=0)
total_docs_indexed: Mapped[int | None] = mapped_column(Integer, default=0)
docs_removed_from_index: Mapped[int | None] = mapped_column(Integer, default=0)
# only filled if status = "failed"
error_msg: Mapped[str | None] = mapped_column(Text, default=None)
# only filled if status = "failed" AND an unhandled exception caused the failure
Expand Down
2 changes: 2 additions & 0 deletions backend/danswer/server/documents/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class IndexAttemptSnapshot(BaseModel):
status: IndexingStatus | None
new_docs_indexed: int # only includes completely new docs
total_docs_indexed: int # includes docs that are updated
docs_removed_from_index: int
error_msg: str | None
full_exception_trace: str | None
time_started: str | None
Expand All @@ -45,6 +46,7 @@ def from_index_attempt_db_model(
status=index_attempt.status,
new_docs_indexed=index_attempt.new_docs_indexed or 0,
total_docs_indexed=index_attempt.total_docs_indexed or 0,
docs_removed_from_index=index_attempt.docs_removed_from_index or 0,
error_msg=index_attempt.error_msg,
full_exception_trace=index_attempt.full_exception_trace,
time_started=index_attempt.time_started.isoformat()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export function IndexingAttemptsTable({ ccPair }: { ccPair: CCPairFullInfo }) {
<TableHeaderCell>Time Started</TableHeaderCell>
<TableHeaderCell>Status</TableHeaderCell>
<TableHeaderCell>New Doc Cnt</TableHeaderCell>
<TableHeaderCell>Removed Doc Cnt</TableHeaderCell>
<TableHeaderCell>Total Doc Cnt</TableHeaderCell>
<TableHeaderCell>Error Msg</TableHeaderCell>
</TableRow>
Expand Down Expand Up @@ -109,6 +110,7 @@ export function IndexingAttemptsTable({ ccPair }: { ccPair: CCPairFullInfo }) {
)}
</TableCell>
<TableCell>{indexAttempt.new_docs_indexed}</TableCell>
<TableCell>{indexAttempt.docs_removed_from_index}</TableCell>
<TableCell>{indexAttempt.total_docs_indexed}</TableCell>
<TableCell>
<div>
Expand Down
1 change: 1 addition & 0 deletions web/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ export interface IndexAttemptSnapshot {
id: number;
status: ValidStatuses | null;
new_docs_indexed: number;
docs_removed_from_index: number;
total_docs_indexed: number;
error_msg: string | null;
full_exception_trace: string | null;
Expand Down

0 comments on commit 918bc38

Please sign in to comment.