-
Notifications
You must be signed in to change notification settings - Fork 28
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* remove langchain dependency * convert embeddings to list * remove instruction templates * adapt readme and remove langchain from requirements * rename and move stuff * move some more stuff, fix imports * add __init__ to BM25Retriever * create FaissRetriever class and add BM25Retriever import * remove invoke method from MyQdrantSparseVectorRetriever * fix metadata type annotation * add TextSplitter base class * revert accidental edit in request header * fix details wrt. qdrant-client import * load embedding model in float32 if device is CPU
- Loading branch information
Showing
17 changed files
with
672 additions
and
267 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
import warnings | ||
import copy | ||
from typing import Any, List, Literal, Optional, Union, Callable, Iterable, Sequence | ||
from abc import abstractmethod | ||
|
||
try: | ||
from ..utils import Document | ||
except: | ||
from utils import Document | ||
|
||
|
||
class TextSplitter: | ||
"""Interface for splitting text into chunks. | ||
Source: https://github.com/langchain-ai/langchain/blob/master/libs/text-splitters/langchain_text_splitters/base.py#L30 | ||
""" | ||
|
||
def __init__( | ||
self, | ||
chunk_size: int = 4000, | ||
chunk_overlap: int = 200, | ||
length_function: Callable[[str], int] = len, | ||
keep_separator: Union[bool, Literal["start", "end"]] = False, | ||
add_start_index: bool = False, | ||
strip_whitespace: bool = True, | ||
) -> None: | ||
"""Create a new TextSplitter. | ||
Args: | ||
chunk_size: Maximum size of chunks to return | ||
chunk_overlap: Overlap in characters between chunks | ||
length_function: Function that measures the length of given chunks | ||
keep_separator: Whether to keep the separator and where to place it | ||
in each corresponding chunk (True='start') | ||
add_start_index: If `True`, includes chunk's start index in metadata | ||
strip_whitespace: If `True`, strips whitespace from the start and end of | ||
every document | ||
""" | ||
if chunk_overlap > chunk_size: | ||
raise ValueError( | ||
f"Got a larger chunk overlap ({chunk_overlap}) than chunk size " | ||
f"({chunk_size}), should be smaller." | ||
) | ||
self._chunk_size = chunk_size | ||
self._chunk_overlap = chunk_overlap | ||
self._length_function = length_function | ||
self._keep_separator = keep_separator | ||
self._add_start_index = add_start_index | ||
self._strip_whitespace = strip_whitespace | ||
|
||
@abstractmethod | ||
def split_text(self, text: str) -> List[str]: | ||
"""Split text into multiple components.""" | ||
|
||
def create_documents( | ||
self, texts: List[str], metadatas: Optional[List[dict]] = None | ||
) -> List[Document]: | ||
"""Create documents from a list of texts.""" | ||
_metadatas = metadatas or [{}] * len(texts) | ||
documents = [] | ||
for i, text in enumerate(texts): | ||
index = 0 | ||
previous_chunk_len = 0 | ||
for chunk in self.split_text(text): | ||
metadata = copy.deepcopy(_metadatas[i]) | ||
if self._add_start_index: | ||
offset = index + previous_chunk_len - self._chunk_overlap | ||
index = text.find(chunk, max(0, offset)) | ||
metadata["start_index"] = index | ||
previous_chunk_len = len(chunk) | ||
new_doc = Document(page_content=chunk, metadata=metadata) | ||
documents.append(new_doc) | ||
return documents | ||
|
||
def split_documents(self, documents: Iterable[Document]) -> List[Document]: | ||
"""Split documents.""" | ||
texts, metadatas = [], [] | ||
for doc in documents: | ||
texts.append(doc.page_content) | ||
metadatas.append(doc.metadata) | ||
return self.create_documents(texts, metadatas=metadatas) | ||
|
||
def _join_docs(self, docs: List[str], separator: str) -> Optional[str]: | ||
text = separator.join(docs) | ||
if self._strip_whitespace: | ||
text = text.strip() | ||
if text == "": | ||
return None | ||
else: | ||
return text | ||
|
||
def _merge_splits(self, splits: Iterable[str], separator: str) -> List[str]: | ||
# We now want to combine these smaller pieces into medium size | ||
# chunks to send to the LLM. | ||
separator_len = self._length_function(separator) | ||
|
||
docs = [] | ||
current_doc: List[str] = [] | ||
total = 0 | ||
for d in splits: | ||
_len = self._length_function(d) | ||
if ( | ||
total + _len + (separator_len if len(current_doc) > 0 else 0) | ||
> self._chunk_size | ||
): | ||
if total > self._chunk_size: | ||
warnings.warn( | ||
f"Created a chunk of size {total}, " | ||
f"which is longer than the specified {self._chunk_size}" | ||
) | ||
if len(current_doc) > 0: | ||
doc = self._join_docs(current_doc, separator) | ||
if doc is not None: | ||
docs.append(doc) | ||
# Keep on popping if: | ||
# - we have a larger chunk than in the chunk overlap | ||
# - or if we still have any chunks and the length is long | ||
while total > self._chunk_overlap or ( | ||
total + _len + (separator_len if len(current_doc) > 0 else 0) | ||
> self._chunk_size | ||
and total > 0 | ||
): | ||
total -= self._length_function(current_doc[0]) + ( | ||
separator_len if len(current_doc) > 1 else 0 | ||
) | ||
current_doc = current_doc[1:] | ||
current_doc.append(d) | ||
total += _len + (separator_len if len(current_doc) > 1 else 0) | ||
doc = self._join_docs(current_doc, separator) | ||
if doc is not None: | ||
docs.append(doc) | ||
return docs | ||
|
||
def transform_documents( | ||
self, documents: Sequence[Document], **kwargs: Any | ||
) -> Sequence[Document]: | ||
"""Transform sequence of documents by splitting them.""" | ||
return self.split_documents(list(documents)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
from typing import Any, List, Literal, Optional, Union, Callable | ||
import re | ||
|
||
try: | ||
from ..chunkers.base_chunker import TextSplitter | ||
except: | ||
from chunkers.base_chunker import TextSplitter | ||
|
||
|
||
class RecursiveCharacterTextSplitter(TextSplitter): | ||
"""Splitting text by recursively look at characters. | ||
Recursively tries to split by different characters to find one | ||
that works. | ||
Adapted from Langchain: | ||
https://github.com/langchain-ai/langchain/blob/0606aabfa39acb2ec575ea8bbfa4c8e662a6134f/libs/text-splitters/langchain_text_splitters/character.py#L58 | ||
""" | ||
def __init__(self, chunk_size: int = 4000, chunk_overlap: int = 200, length_function: Callable[[str], int] = len, | ||
add_start_index: bool = False, strip_whitespace: bool = True, separators: Optional[List[str]] = None, | ||
keep_separator: Union[bool, Literal["start", "end"]] = True, is_separator_regex: bool = False, | ||
**kwargs: Any) -> None: | ||
"""Create a new TextSplitter.""" | ||
super().__init__(chunk_size, chunk_overlap, length_function, keep_separator, add_start_index, strip_whitespace) | ||
if chunk_overlap > chunk_size: | ||
raise ValueError( | ||
f"Got a larger chunk overlap ({chunk_overlap}) than chunk size " | ||
f"({chunk_size}), should be smaller." | ||
) | ||
|
||
self._separators = separators or ["\n\n", "\n", " ", ""] | ||
self._is_separator_regex = is_separator_regex | ||
|
||
def _split_text(self, text: str, separators: List[str]) -> List[str]: | ||
"""Split incoming text and return chunks.""" | ||
final_chunks = [] | ||
# Get appropriate separator to use | ||
separator = separators[-1] | ||
new_separators = [] | ||
for i, _s in enumerate(separators): | ||
_separator = _s if self._is_separator_regex else re.escape(_s) | ||
if _s == "": | ||
separator = _s | ||
break | ||
if re.search(_separator, text): | ||
separator = _s | ||
new_separators = separators[i + 1 :] | ||
break | ||
|
||
_separator = separator if self._is_separator_regex else re.escape(separator) | ||
splits = _split_text_with_regex(text, _separator, self._keep_separator) | ||
|
||
# Now go merging things, recursively splitting longer texts. | ||
_good_splits = [] | ||
_separator = "" if self._keep_separator else separator | ||
for s in splits: | ||
if self._length_function(s) < self._chunk_size: | ||
_good_splits.append(s) | ||
else: | ||
if _good_splits: | ||
merged_text = self._merge_splits(_good_splits, _separator) | ||
final_chunks.extend(merged_text) | ||
_good_splits = [] | ||
if not new_separators: | ||
final_chunks.append(s) | ||
else: | ||
other_info = self._split_text(s, new_separators) | ||
final_chunks.extend(other_info) | ||
if _good_splits: | ||
merged_text = self._merge_splits(_good_splits, _separator) | ||
final_chunks.extend(merged_text) | ||
return final_chunks | ||
|
||
def split_text(self, text: str) -> List[str]: | ||
return self._split_text(text, self._separators) | ||
|
||
|
||
def _split_text_with_regex( | ||
text: str, separator: str, keep_separator: Union[bool, Literal["start", "end"]] | ||
) -> List[str]: | ||
# Now that we have the separator, split the text | ||
if separator: | ||
if keep_separator: | ||
# The parentheses in the pattern keep the delimiters in the result. | ||
_splits = re.split(f"({separator})", text) | ||
splits = ( | ||
([_splits[i] + _splits[i + 1] for i in range(0, len(_splits) - 1, 2)]) | ||
if keep_separator == "end" | ||
else ([_splits[i] + _splits[i + 1] for i in range(1, len(_splits), 2)]) | ||
) | ||
if len(_splits) % 2 == 0: | ||
splits += _splits[-1:] | ||
splits = ( | ||
(splits + [_splits[-1]]) | ||
if keep_separator == "end" | ||
else ([_splits[0]] + splits) | ||
) | ||
else: | ||
splits = re.split(separator, text) | ||
else: | ||
splits = list(text) | ||
return [s for s in splits if s != ""] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.