Skip to content

Commit

Permalink
Facets in REST (qdrant#4848)
Browse files Browse the repository at this point in the history
* rename to FacetRequestInternal

* add rest endpoint

* fix correctness by fetching the whole list of values

* fix mmap map index variant

Also removes test for sorted output, for now

* add ytt spec

* fix clippy

* use hashmap inside of local shard

* rename operation to `facet`, add access test

* whitelist endpoint

* change api

* make limit optional
  • Loading branch information
coszio authored and generall committed Aug 26, 2024
1 parent 869b0dc commit ace8a90
Show file tree
Hide file tree
Showing 32 changed files with 537 additions and 87 deletions.
163 changes: 163 additions & 0 deletions docs/redoc/master/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -5260,6 +5260,94 @@
}
}
},
"/collections/{collection_name}/facet": {
"post": {
"tags": [
"points"
],
"summary": "Facet a payload key with a given filter.",
"description": "Count points that satisfy the given filter for each unique value of a payload key.",
"operationId": "facet",
"requestBody": {
"description": "Request counts of points for each unique value of a payload key",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/FacetRequest"
}
}
}
},
"parameters": [
{
"name": "collection_name",
"in": "path",
"description": "Name of the collection to facet in",
"required": true,
"schema": {
"type": "string"
}
},
{
"name": "timeout",
"in": "query",
"description": "If set, overrides global timeout for this request. Unit is seconds.",
"required": false,
"schema": {
"type": "integer",
"minimum": 1
}
}
],
"responses": {
"default": {
"description": "error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
}
}
}
},
"4XX": {
"description": "error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
}
}
}
},
"200": {
"description": "successful operation",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"time": {
"type": "number",
"format": "float",
"description": "Time spent to process this request",
"example": 0.002
},
"status": {
"type": "string",
"example": "ok"
},
"result": {
"$ref": "#/components/schemas/FacetResponse"
}
}
}
}
}
}
}
}
},
"/collections/{collection_name}/points/query": {
"post": {
"tags": [
Expand Down Expand Up @@ -12964,6 +13052,81 @@
"format": "float"
}
}
},
"FacetRequest": {
"type": "object",
"required": [
"key"
],
"properties": {
"shard_key": {
"anyOf": [
{
"$ref": "#/components/schemas/ShardKeySelector"
},
{
"nullable": true
}
]
},
"key": {
"type": "string"
},
"limit": {
"type": "integer",
"format": "uint",
"minimum": 1,
"nullable": true
},
"filter": {
"anyOf": [
{
"$ref": "#/components/schemas/Filter"
},
{
"nullable": true
}
]
}
}
},
"FacetResponse": {
"type": "object",
"required": [
"hits"
],
"properties": {
"hits": {
"type": "array",
"items": {
"$ref": "#/components/schemas/FacetValueHit"
}
}
}
},
"FacetValueHit": {
"type": "object",
"required": [
"count",
"value"
],
"properties": {
"value": {
"$ref": "#/components/schemas/FacetValue"
},
"count": {
"type": "integer",
"format": "uint",
"minimum": 0
}
}
},
"FacetValue": {
"anyOf": [
{
"type": "string"
}
]
}
}
}
Expand Down
40 changes: 39 additions & 1 deletion lib/api/src/rest/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use segment::data_types::order_by::OrderBy;
use segment::data_types::vectors::DEFAULT_VECTOR_NAME;

use super::schema::{BatchVectorStruct, ScoredPoint, Vector, VectorStruct};
use super::{NearestQuery, OrderByInterface, Query, QueryInterface};
use super::{
FacetRequestInternal, FacetResponse, FacetValue, FacetValueHit, NearestQuery, OrderByInterface,
Query, QueryInterface,
};
use crate::rest::{DenseVector, NamedVectorStruct};

impl From<segment::data_types::vectors::Vector> for Vector {
Expand Down Expand Up @@ -231,3 +234,38 @@ impl From<QueryInterface> for Query {
}
}
}

impl From<segment::data_types::facets::FacetValue> for FacetValue {
fn from(value: segment::data_types::facets::FacetValue) -> Self {
match value {
segment::data_types::facets::FacetValue::Keyword(keyword) => Self::Keyword(keyword),
}
}
}

impl From<segment::data_types::facets::FacetValueHit> for FacetValueHit {
fn from(value: segment::data_types::facets::FacetValueHit) -> Self {
Self {
value: From::from(value.value),
count: value.count,
}
}
}

impl From<segment::data_types::facets::FacetResponse> for FacetResponse {
fn from(value: segment::data_types::facets::FacetResponse) -> Self {
Self {
hits: value.hits.into_iter().map(From::from).collect(),
}
}
}

impl From<FacetRequestInternal> for segment::data_types::facets::FacetParams {
fn from(value: FacetRequestInternal) -> Self {
Self {
key: value.key,
limit: value.limit.unwrap_or(Self::DEFAULT_LIMIT),
filter: value.filter,
}
}
}
36 changes: 36 additions & 0 deletions lib/api/src/rest/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,3 +719,39 @@ pub struct SearchMatrixPairsResponse {
/// List of pairs of points with scores
pub pairs: Vec<SearchMatrixPair>,
}

#[derive(Debug, JsonSchema, Serialize, Deserialize, Validate)]
pub struct FacetRequestInternal {
pub key: JsonPath,

#[validate(range(min = 1))]
pub limit: Option<usize>,

pub filter: Option<Filter>,
}

#[derive(Debug, Serialize, Deserialize, JsonSchema, Validate)]
pub struct FacetRequest {
#[validate(nested)]
#[serde(flatten)]
pub facet_request: FacetRequestInternal,

pub shard_key: Option<ShardKeySelector>,
}

#[derive(Debug, Serialize, JsonSchema)]
#[serde(untagged)]
pub enum FacetValue {
Keyword(String),
}

#[derive(Debug, Serialize, JsonSchema)]
pub struct FacetValueHit {
pub value: FacetValue,
pub count: usize,
}

#[derive(Debug, Serialize, JsonSchema)]
pub struct FacetResponse {
pub hits: Vec<FacetValueHit>,
}
4 changes: 2 additions & 2 deletions lib/collection/src/collection/facet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;
use futures::future;
use itertools::Itertools;
use segment::data_types::facets::{
aggregate_facet_hits, FacetRequest, FacetResponse, FacetValueHit,
aggregate_facet_hits, FacetParams, FacetResponse, FacetValueHit,
};

use super::Collection;
Expand All @@ -15,7 +15,7 @@ use crate::operations::types::CollectionResult;
impl Collection {
pub async fn facet(
&self,
request: FacetRequest,
request: FacetParams,
shard_selection: ShardSelectorInternal,
read_consistency: Option<ReadConsistency>,
timeout: Option<Duration>,
Expand Down
21 changes: 9 additions & 12 deletions lib/collection/src/collection_manager/holders/proxy_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ use bitvec::prelude::BitVec;
use common::types::{PointOffsetType, TelemetryDetail};
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use segment::common::operation_error::{OperationResult, SegmentFailedState};
use segment::data_types::facets::{aggregate_facet_hits, FacetHit, FacetRequest, FacetValueHit};
use segment::data_types::facets::{FacetParams, FacetValue};
use segment::data_types::named_vectors::NamedVectors;
use segment::data_types::order_by::OrderValue;
use segment::data_types::query_context::{QueryContext, SegmentQueryContext};
use segment::data_types::vectors::{QueryVector, Vector};
use segment::entry::entry_point::SegmentEntry;
use segment::index::field_index::CardinalityEstimation;
use segment::json_path::JsonPath;
use segment::spaces::tools::peek_top_largest_iterable;
use segment::telemetry::SegmentTelemetry;
use segment::types::{
Condition, Filter, Payload, PayloadFieldSchema, PayloadKeyType, PayloadKeyTypeRef, PointIdType,
Expand Down Expand Up @@ -715,11 +714,11 @@ impl SegmentEntry for ProxySegment {

fn facet(
&self,
request: &FacetRequest,
request: &FacetParams,
is_stopped: &AtomicBool,
) -> OperationResult<Vec<FacetValueHit>> {
) -> OperationResult<HashMap<FacetValue, usize>> {
let deleted_points = self.deleted_points.read();
let read_segment_hits = if deleted_points.is_empty() {
let mut segment_hits = if deleted_points.is_empty() {
self.wrapped_segment
.get()
.read()
Expand All @@ -729,7 +728,7 @@ impl SegmentEntry for ProxySegment {
request.filter.as_ref(),
&deleted_points,
);
let new_request = FacetRequest {
let new_request = FacetParams {
key: request.key.clone(),
limit: request.limit,
filter: Some(wrapped_filter),
Expand All @@ -739,14 +738,12 @@ impl SegmentEntry for ProxySegment {
.read()
.facet(&new_request, is_stopped)?
};

let write_segment_hits = self.write_segment.get().read().facet(request, is_stopped)?;

let hits_iter =
aggregate_facet_hits(read_segment_hits.into_iter().chain(write_segment_hits))
.into_iter()
.map(|(value, count)| FacetHit { value, count });
let hits = peek_top_largest_iterable(hits_iter, request.limit);
Ok(hits)
segment_hits.extend(write_segment_hits);

Ok(segment_hits)
}

fn has_point(&self, point_id: PointIdType) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions lib/collection/src/shards/dummy_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use segment::data_types::facets::{FacetRequest, FacetResponse};
use segment::data_types::facets::{FacetParams, FacetResponse};
use segment::data_types::order_by::OrderBy;
use segment::types::{
ExtendedPointId, Filter, ScoredPoint, WithPayload, WithPayloadInterface, WithVector,
Expand Down Expand Up @@ -122,7 +122,7 @@ impl ShardOperation for DummyShard {

async fn facet(
&self,
_: Arc<FacetRequest>,
_: Arc<FacetParams>,
_search_runtime_handle: &Handle,
_: Option<Duration>,
) -> CollectionResult<FacetResponse> {
Expand Down
4 changes: 2 additions & 2 deletions lib/collection/src/shards/forward_proxy_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;

use async_trait::async_trait;
use common::types::TelemetryDetail;
use segment::data_types::facets::{FacetRequest, FacetResponse};
use segment::data_types::facets::{FacetParams, FacetResponse};
use segment::data_types::order_by::OrderBy;
use segment::types::{
ExtendedPointId, Filter, PointIdType, ScoredPoint, WithPayload, WithPayloadInterface,
Expand Down Expand Up @@ -400,7 +400,7 @@ impl ShardOperation for ForwardProxyShard {

async fn facet(
&self,
request: Arc<FacetRequest>,
request: Arc<FacetParams>,
search_runtime_handle: &Handle,
timeout: Option<Duration>,
) -> CollectionResult<FacetResponse> {
Expand Down
Loading

0 comments on commit ace8a90

Please sign in to comment.