Skip to content

Commit cb6929f

Browse files
committed
Migrate raft_state from cbor to JSON (qdrant#2337)
* migrate raft_state to json * review rename * review rename
1 parent 029c21e commit cb6929f

File tree

2 files changed

+36
-8
lines changed

2 files changed

+36
-8
lines changed

lib/storage/src/content_manager/consensus/persistent.rs

+27-8
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ use crate::content_manager::consensus::entry_queue::{EntryApplyProgressQueue, En
1717
use crate::types::PeerAddressById;
1818
use crate::StorageError;
1919

20-
const STATE_FILE_NAME: &str = "raft_state";
20+
// Deprecated, use `STATE_FILE_NAME` instead
21+
const STATE_FILE_NAME_CBOR: &str = "raft_state";
22+
23+
const STATE_FILE_NAME: &str = "raft_state.json";
2124

2225
/// State of the Raft consensus, which should be saved between restarts.
2326
/// State of the collections, aliases and transfers are stored as regular storage.
@@ -73,14 +76,23 @@ impl Persistent {
7376
first_peer: bool,
7477
) -> Result<Self, StorageError> {
7578
create_dir_all(storage_path.as_ref())?;
76-
let path = storage_path.as_ref().join(STATE_FILE_NAME);
77-
let state = if path.exists() {
78-
log::info!("Loading raft state from {}", path.display());
79-
Self::load(path)?
79+
let path_legacy = storage_path.as_ref().join(STATE_FILE_NAME_CBOR);
80+
let path_json = storage_path.as_ref().join(STATE_FILE_NAME);
81+
let state = if path_json.exists() {
82+
log::info!("Loading raft state from {}", path_json.display());
83+
Self::load_json(path_json)?
84+
} else if path_legacy.exists() {
85+
log::info!("Loading raft state from {}", path_legacy.display());
86+
let mut state = Self::load(path_legacy)?;
87+
// migrate to json
88+
state.path = path_json;
89+
state.save()?;
90+
state
8091
} else {
81-
log::info!("Initializing new raft state at {}", path.display());
82-
Self::init(path, first_peer)?
92+
log::info!("Initializing new raft state at {}", path_json.display());
93+
Self::init(path_json, first_peer)?
8394
};
95+
8496
log::debug!("State: {:?}", state);
8597
Ok(state)
8698
}
@@ -192,10 +204,17 @@ impl Persistent {
192204
Ok(state)
193205
}
194206

207+
fn load_json(path: PathBuf) -> Result<Self, StorageError> {
208+
let file = File::open(&path)?;
209+
let mut state: Self = serde_json::from_reader(&file)?;
210+
state.path = path;
211+
Ok(state)
212+
}
213+
195214
pub fn save(&self) -> Result<(), StorageError> {
196215
let result = AtomicFile::new(&self.path, AllowOverwrite).write(|file| {
197216
let writer = BufWriter::new(file);
198-
serde_cbor::to_writer(writer, self)
217+
serde_json::to_writer(writer, self)
199218
});
200219
log::trace!("Saved state: {:?}", self);
201220
self.dirty.store(result.is_err(), Ordering::Relaxed);

lib/storage/src/content_manager/errors.rs

+9
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,15 @@ impl From<serde_cbor::Error> for StorageError {
193193
}
194194
}
195195

196+
impl From<serde_json::Error> for StorageError {
197+
fn from(err: serde_json::Error) -> Self {
198+
StorageError::ServiceError {
199+
description: format!("json (de)serialization error: {err}"),
200+
backtrace: Some(Backtrace::force_capture().to_string()),
201+
}
202+
}
203+
}
204+
196205
impl From<prost::EncodeError> for StorageError {
197206
fn from(err: prost::EncodeError) -> Self {
198207
StorageError::ServiceError {

0 commit comments

Comments
 (0)