Skip to content

Commit

Permalink
refactor: Revert "refactor: add ref secret in WithOption and use hash…
Browse files Browse the repository at this point in the history
…map for Wi… (risingwavelabs#17135)
  • Loading branch information
yuhao-su authored Jun 5, 2024
1 parent c2a70bd commit a4f5455
Show file tree
Hide file tree
Showing 25 changed files with 79 additions and 118 deletions.
2 changes: 1 addition & 1 deletion src/connector/codec/src/decoder/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl MapHandling {
pub const OPTION_KEY: &'static str = "map.handling.mode";

pub fn from_options(
options: &std::collections::HashMap<String, String>,
options: &std::collections::BTreeMap<String, String>,
) -> anyhow::Result<Option<Self>> {
let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
Some("jsonb") => Self::Jsonb,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::BTreeMap;

use risingwave_common::bail;

Expand Down Expand Up @@ -49,7 +49,7 @@ pub struct DebeziumProps {
}

impl DebeziumProps {
pub fn from(props: &HashMap<String, String>) -> Self {
pub fn from(props: &BTreeMap<String, String>) -> Self {
let ignore_key = props
.get(DEBEZIUM_IGNORE_KEY)
.map(|v| v.eq_ignore_ascii_case("true"))
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl TimestamptzHandling {
pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";

pub fn from_options(
options: &std::collections::HashMap<String, String>,
options: &std::collections::BTreeMap<String, String>,
) -> Result<Option<Self>, InvalidOptionError> {
let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
Some("utc_string") => Self::UtcString,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/schema/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::BTreeMap;

use risingwave_pb::catalog::PbSchemaRegistryNameStrategy;

Expand All @@ -35,7 +35,7 @@ pub struct SchemaLoader {
impl SchemaLoader {
pub fn from_format_options(
topic: &str,
format_options: &HashMap<String, String>,
format_options: &BTreeMap<String, String>,
) -> Result<Self, SchemaFetchError> {
let schema_location = format_options
.get(SCHEMA_REGISTRY_KEY)
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/schema/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::BTreeMap;

use itertools::Itertools as _;
use prost_reflect::{DescriptorPool, FileDescriptor, MessageDescriptor};
Expand All @@ -28,7 +28,7 @@ use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties

/// `aws_auth_props` is only required when reading `s3://` URL.
pub async fn fetch_descriptor(
format_options: &HashMap<String, String>,
format_options: &BTreeMap<String, String>,
topic: &str,
aws_auth_props: Option<&AwsAuthProps>,
) -> Result<(MessageDescriptor, Option<i32>), SchemaFetchError> {
Expand Down Expand Up @@ -82,7 +82,7 @@ pub async fn fetch_descriptor(

pub async fn fetch_from_registry(
message_name: &str,
format_options: &HashMap<String, String>,
format_options: &BTreeMap<String, String>,
topic: &str,
) -> Result<(MessageDescriptor, i32), SchemaFetchError> {
let loader = SchemaLoader::from_format_options(topic, format_options)?;
Expand Down
12 changes: 3 additions & 9 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};

use itertools::Itertools;
use risingwave_common::catalog::{
Expand All @@ -23,7 +23,7 @@ use risingwave_pb::stream_plan::PbSinkDesc;

use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SinkDesc {
/// Id of the sink. For debug now.
pub id: SinkId,
Expand All @@ -48,7 +48,7 @@ pub struct SinkDesc {
pub distribution_key: Vec<usize>,

/// The properties of the sink.
pub properties: HashMap<String, String>,
pub properties: BTreeMap<String, String>,

// The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
// based on both its own derivation on the append-only attribute and other user-specified
Expand Down Expand Up @@ -136,9 +136,3 @@ impl SinkDesc {
}
}
}

impl std::hash::Hash for SinkDesc {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
6 changes: 3 additions & 3 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

pub mod desc;

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};

use anyhow::anyhow;
use itertools::Itertools;
Expand Down Expand Up @@ -114,11 +114,11 @@ impl SinkType {
/// May replace [`SinkType`].
///
/// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`].
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SinkFormatDesc {
pub format: SinkFormat,
pub encode: SinkEncode,
pub options: HashMap<String, String>,
pub options: BTreeMap<String, String>,

pub key_encode: Option<SinkEncode>,
}
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use risingwave_common::catalog::Schema;
Expand Down Expand Up @@ -117,7 +117,7 @@ impl TimestamptzHandlingMode {
pub const FRONTEND_DEFAULT: &'static str = "utc_string";
pub const OPTION_KEY: &'static str = "timestamptz.handling.mode";

pub fn from_options(options: &HashMap<String, String>) -> Result<Self> {
pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
Some(Self::FRONTEND_DEFAULT) => Ok(Self::UtcString),
Some("utc_without_suffix") => Ok(Self::UtcWithoutSuffix),
Expand Down
11 changes: 6 additions & 5 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ impl AsyncTruncateSinkWriter for RedisSinkWriter {
#[cfg(test)]
mod test {
use core::panic;
use std::collections::BTreeMap;

use rdkafka::message::FromBytes;
use risingwave_common::array::{Array, I32Array, Op, Utf8Array};
Expand Down Expand Up @@ -409,7 +410,7 @@ mod test {
let format_desc = SinkFormatDesc {
format: SinkFormat::AppendOnly,
encode: SinkEncode::Json,
options: HashMap::default(),
options: BTreeMap::default(),
key_encode: None,
};

Expand Down Expand Up @@ -477,16 +478,16 @@ mod test {
},
]);

let mut hash_map = HashMap::default();
hash_map.insert(KEY_FORMAT.to_string(), "key-{id}".to_string());
hash_map.insert(
let mut btree_map = BTreeMap::default();
btree_map.insert(KEY_FORMAT.to_string(), "key-{id}".to_string());
btree_map.insert(
VALUE_FORMAT.to_string(),
"values:{id:{id},name:{name}}".to_string(),
);
let format_desc = SinkFormatDesc {
format: SinkFormat::AppendOnly,
encode: SinkEncode::Template,
options: hash_map,
options: btree_map,
key_encode: None,
};

Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl BrokerAddrRewriter {
}

#[inline(always)]
fn kafka_props_broker_key(with_properties: &HashMap<String, String>) -> &str {
fn kafka_props_broker_key(with_properties: &BTreeMap<String, String>) -> &str {
if with_properties.contains_key(KAFKA_PROPS_BROKER_KEY) {
KAFKA_PROPS_BROKER_KEY
} else {
Expand All @@ -103,7 +103,7 @@ fn kafka_props_broker_key(with_properties: &HashMap<String, String>) -> &str {

#[inline(always)]
fn get_property_required(
with_properties: &HashMap<String, String>,
with_properties: &BTreeMap<String, String>,
property: &str,
) -> ConnectorResult<String> {
with_properties
Expand All @@ -114,7 +114,7 @@ fn get_property_required(
}

pub fn insert_privatelink_broker_rewrite_map(
with_options: &mut HashMap<String, String>,
with_options: &mut BTreeMap<String, String>,
svc: Option<&PrivateLinkService>,
privatelink_endpoint: Option<String>,
) -> ConnectorResult<()> {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/catalog/connection_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::anyhow;
Expand Down Expand Up @@ -67,7 +67,7 @@ impl OwnedByUserCatalog for ConnectionCatalog {

pub(crate) fn resolve_private_link_connection(
connection: &Arc<ConnectionCatalog>,
properties: &mut HashMap<String, String>,
properties: &mut BTreeMap<String, String>,
) -> Result<()> {
#[allow(irrefutable_let_patterns)]
if let connection::Info::PrivateLinkService(svc) = &connection.info {
Expand Down
12 changes: 3 additions & 9 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::BTreeMap;

use risingwave_common::catalog::{ColumnCatalog, SourceVersionId};
use risingwave_common::util::epoch::Epoch;
Expand All @@ -26,7 +26,7 @@ use crate::user::UserId;

/// This struct `SourceCatalog` is used in frontend.
/// Compared with `PbSource`, it only maintains information used during optimization.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct SourceCatalog {
pub id: SourceId,
pub name: String,
Expand All @@ -36,7 +36,7 @@ pub struct SourceCatalog {
pub owner: UserId,
pub info: StreamSourceInfo,
pub row_id_index: Option<usize>,
pub with_properties: HashMap<String, String>,
pub with_properties: BTreeMap<String, String>,
pub watermark_descs: Vec<WatermarkDesc>,
pub associated_table_id: Option<TableId>,
pub definition: String,
Expand Down Expand Up @@ -149,9 +149,3 @@ impl OwnedByUserCatalog for SourceCatalog {
self.owner
}
}

impl std::hash::Hash for SourceCatalog {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
4 changes: 2 additions & 2 deletions src/frontend/src/catalog/subscription_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use core::str::FromStr;
use std::collections::BTreeMap;

use risingwave_common::catalog::{TableId, UserId, OBJECT_ID_PLACEHOLDER};
use risingwave_common::types::Interval;
Expand All @@ -23,7 +24,6 @@ use thiserror_ext::AsReport;

use super::OwnedByUserCatalog;
use crate::error::{ErrorCode, Result};
use crate::WithOptions;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(test, derive(Default))]
Expand Down Expand Up @@ -82,7 +82,7 @@ impl SubscriptionId {
}

impl SubscriptionCatalog {
pub fn set_retention_seconds(&mut self, properties: &WithOptions) -> Result<()> {
pub fn set_retention_seconds(&mut self, properties: BTreeMap<String, String>) -> Result<()> {
let retention_seconds_str = properties.get("retention").ok_or_else(|| {
ErrorCode::InternalError("Subscription retention time not set.".to_string())
})?;
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ pub async fn refresh_sr_and_get_columns_diff(
connector_schema: &ConnectorSchema,
session: &Arc<SessionImpl>,
) -> Result<(StreamSourceInfo, Vec<ColumnCatalog>, Vec<ColumnCatalog>)> {
let mut with_properties = original_source.with_properties.clone();

let mut with_properties = original_source
.with_properties
.clone()
.into_iter()
.collect();
validate_compatibility(connector_schema, &mut with_properties)?;

if with_properties.is_cdc_connector() {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub async fn handle_create_connection(
let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
let with_properties = handler_args.with_options.clone().into_connector_props();

let create_connection_payload = resolve_create_connection_payload(with_properties.inner())?;
let create_connection_payload = resolve_create_connection_payload(&with_properties)?;

let catalog_writer = session.catalog_writer()?;
catalog_writer
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,6 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result<SinkFormatDesc> {
options
.entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
.or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
let options = options.into_iter().collect();

Ok(SinkFormatDesc {
format,
Expand Down
Loading

0 comments on commit a4f5455

Please sign in to comment.