Skip to content

Commit

Permalink
[meta] feature: protobuf message has to persist MIN_COMPATIBLE_VER
Browse files Browse the repository at this point in the history
…in it

This way to let old query be able to decide if it is safe to load data
written by a newer query executable.

- Fix: databendlabs#5784
  • Loading branch information
drmingdrmer committed Jun 5, 2022
1 parent 25531a7 commit 28598be
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 49 deletions.
7 changes: 5 additions & 2 deletions common/proto-conv/src/config_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ use common_protos::pb;
use crate::check_ver;
use crate::FromToProto;
use crate::Incompatible;
use crate::MIN_COMPATIBLE_VER;
use crate::VER;

impl FromToProto<pb::S3StorageConfig> for StorageParams {
fn from_pb(p: pb::S3StorageConfig) -> Result<Self, Incompatible>
where Self: Sized {
// TODO: config will have it's own version flags in the future.
check_ver(p.version)?;
check_ver(p.version, p.min_compatible)?;

Ok(Self::S3(StorageS3Config {
region: p.region,
Expand All @@ -43,6 +44,7 @@ impl FromToProto<pb::S3StorageConfig> for StorageParams {
if let StorageParams::S3(v) = self {
Ok(pb::S3StorageConfig {
version: VER,
min_compatible: MIN_COMPATIBLE_VER,
region: v.region.clone(),
endpoint_url: v.endpoint_url.clone(),
access_key_id: v.access_key_id.clone(),
Expand All @@ -63,7 +65,7 @@ impl FromToProto<pb::FsStorageConfig> for StorageParams {
fn from_pb(p: pb::FsStorageConfig) -> Result<Self, Incompatible>
where Self: Sized {
// TODO: config will have it's own version flags in the future.
check_ver(p.version)?;
check_ver(p.version, p.min_compatible)?;

Ok(Self::Fs(StorageFsConfig { root: p.root }))
}
Expand All @@ -72,6 +74,7 @@ impl FromToProto<pb::FsStorageConfig> for StorageParams {
if let StorageParams::Fs(v) = self {
Ok(pb::FsStorageConfig {
version: VER,
min_compatible: MIN_COMPATIBLE_VER,
root: v.root.clone(),
})
} else {
Expand Down
66 changes: 52 additions & 14 deletions common/proto-conv/src/data_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use num::FromPrimitive;
use crate::check_ver;
use crate::FromToProto;
use crate::Incompatible;
use crate::MIN_COMPATIBLE_VER;
use crate::VER;

impl FromToProto<pb::DataSchema> for dv::DataSchema {
fn from_pb(p: pb::DataSchema) -> Result<Self, Incompatible> {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

let mut fs = Vec::with_capacity(p.fields.len());
for f in p.fields.into_iter() {
Expand All @@ -50,6 +51,7 @@ impl FromToProto<pb::DataSchema> for dv::DataSchema {

let p = pb::DataSchema {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
fields: fs,
metadata: self.meta().clone(),
};
Expand All @@ -59,7 +61,7 @@ impl FromToProto<pb::DataSchema> for dv::DataSchema {

impl FromToProto<pb::DataField> for dv::DataField {
fn from_pb(p: pb::DataField) -> Result<Self, Incompatible> {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

let v = dv::DataField::new(
&p.name,
Expand All @@ -74,6 +76,7 @@ impl FromToProto<pb::DataField> for dv::DataField {
fn to_pb(&self) -> Result<pb::DataField, Incompatible> {
let p = pb::DataField {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
name: self.name().clone(),
default_expr: self.default_expr().cloned(),
data_type: Some(self.data_type().to_pb()?),
Expand All @@ -84,7 +87,7 @@ impl FromToProto<pb::DataField> for dv::DataField {

impl FromToProto<pb::DataType> for dv::DataTypeImpl {
fn from_pb(p: pb::DataType) -> Result<Self, Incompatible> {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

let dt = match p.dt {
None => {
Expand Down Expand Up @@ -136,90 +139,103 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::NullableType(Box::new(inn))),
};
Ok(v)
}
dv::DataTypeImpl::Boolean(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::BoolType(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Int8(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Int8Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Int16(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Int16Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Int32(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Int32Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Int64(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Int64Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::UInt8(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Uint8Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::UInt16(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Uint16Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::UInt32(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Uint32Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::UInt64(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Uint64Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Float32(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Float32Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Float64(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Float64Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Date(_x) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::DateType(pb::Empty {})),
};
Ok(v)
Expand All @@ -229,13 +245,15 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::TimestampType(inn)),
};
Ok(v)
}
dv::DataTypeImpl::String(_x) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::StringType(pb::Empty {})),
};
Ok(v)
Expand All @@ -245,6 +263,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::StructType(inn)),
};
Ok(v)
Expand All @@ -254,6 +273,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::ArrayType(Box::new(inn))),
};
Ok(v)
Expand All @@ -263,6 +283,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let p = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::VariantType(inn)),
};
Ok(p)
Expand All @@ -272,6 +293,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let p = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::VariantArrayType(inn)),
};
Ok(p)
Expand All @@ -281,6 +303,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let p = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::VariantObjectType(inn)),
};
Ok(p)
Expand All @@ -290,6 +313,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let p = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::IntervalType(inn)),
};
Ok(p)
Expand All @@ -301,7 +325,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {
impl FromToProto<pb::NullableType> for dv::NullableType {
fn from_pb(p: pb::NullableType) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

let inner = p.inner.ok_or_else(|| Incompatible {
reason: "NullableType.inner can not be None".to_string(),
Expand All @@ -318,6 +342,7 @@ impl FromToProto<pb::NullableType> for dv::NullableType {

let p = pb::NullableType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
inner: Some(Box::new(inner_pb_type)),
};

Expand All @@ -328,14 +353,15 @@ impl FromToProto<pb::NullableType> for dv::NullableType {
impl FromToProto<pb::Timestamp> for dv::TimestampType {
fn from_pb(p: pb::Timestamp) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;
let v = dv::TimestampType::create(p.precision as usize);
Ok(v)
}

fn to_pb(&self) -> Result<pb::Timestamp, Incompatible> {
let p = pb::Timestamp {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
precision: self.precision() as u64,
// tz: self.tz().cloned(),
};
Expand All @@ -347,7 +373,7 @@ impl FromToProto<pb::Timestamp> for dv::TimestampType {
impl FromToProto<pb::Struct> for dv::StructType {
fn from_pb(p: pb::Struct) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;
let names = p.names.clone();

let mut types = Vec::with_capacity(p.types.len());
Expand All @@ -369,6 +395,7 @@ impl FromToProto<pb::Struct> for dv::StructType {

let p = pb::Struct {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,

names,
types,
Expand All @@ -381,7 +408,7 @@ impl FromToProto<pb::Struct> for dv::StructType {
impl FromToProto<pb::Array> for dv::ArrayType {
fn from_pb(p: pb::Array) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

let inner = p.inner.ok_or_else(|| Incompatible {
reason: "Array.inner can not be None".to_string(),
Expand All @@ -398,6 +425,7 @@ impl FromToProto<pb::Array> for dv::ArrayType {

let p = pb::Array {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
inner: Some(Box::new(inner_pb_type)),
};

Expand All @@ -408,27 +436,33 @@ impl FromToProto<pb::Array> for dv::ArrayType {
impl FromToProto<pb::VariantArray> for dv::VariantArrayType {
fn from_pb(p: pb::VariantArray) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

Ok(Self {})
}

fn to_pb(&self) -> Result<pb::VariantArray, Incompatible> {
let p = pb::VariantArray { ver: VER };
let p = pb::VariantArray {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
};
Ok(p)
}
}

impl FromToProto<pb::VariantObject> for dv::VariantObjectType {
fn from_pb(p: pb::VariantObject) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

Ok(Self {})
}

fn to_pb(&self) -> Result<pb::VariantObject, Incompatible> {
let p = pb::VariantObject { ver: VER };
let p = pb::VariantObject {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
};
Ok(p)
}
}
Expand Down Expand Up @@ -467,7 +501,7 @@ impl FromToProto<pb::IntervalKind> for dv::IntervalKind {
impl FromToProto<pb::IntervalType> for dv::IntervalType {
fn from_pb(p: pb::IntervalType) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

let pb_kind: pb::IntervalKind =
FromPrimitive::from_i32(p.kind).ok_or_else(|| Incompatible {
Expand All @@ -482,6 +516,7 @@ impl FromToProto<pb::IntervalType> for dv::IntervalType {
let pb_kind = self.kind().to_pb()?;
let p = pb::IntervalType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
kind: pb_kind as i32,
};
Ok(p)
Expand All @@ -491,13 +526,16 @@ impl FromToProto<pb::IntervalType> for dv::IntervalType {
impl FromToProto<pb::Variant> for dv::VariantType {
fn from_pb(p: pb::Variant) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

Ok(Self {})
}

fn to_pb(&self) -> Result<pb::Variant, Incompatible> {
let p = pb::Variant { ver: VER };
let p = pb::Variant {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
};
Ok(p)
}
}
Expand Down
Loading

0 comments on commit 28598be

Please sign in to comment.