From 2dcac051f15c1fff55246c7e19c8e52eef8f82d0 Mon Sep 17 00:00:00 2001 From: luanshaotong Date: Mon, 17 Jul 2023 17:54:12 +0800 Subject: [PATCH] add docker file; restart recover (#156) --- Makefile | 11 + docker/client/Dockerfile | 10 + docker/manager/Dockerfile | 10 + docker/server/Dockerfile | 10 + examples/manager.yaml | 4 - scripts/test_run_all.sh | 3 +- src/bin/manager.rs | 10 - src/client/fuse_client.rs | 18 +- src/client/mod.rs | 197 ++++++++++++++--- src/common/hash_ring.rs | 4 + src/common/sender.rs | 127 ++++++++++- src/common/serialization.rs | 25 +++ src/server/distributed_engine.rs | 229 +++++++++++++------- src/server/mod.rs | 88 ++++++-- src/server/storage_engine/file_engine.rs | 8 +- src/server/storage_engine/meta_engine.rs | 256 ++++++++++++++++++----- src/server/volume.rs | 9 - test_io500.sh | 5 +- 18 files changed, 809 insertions(+), 215 deletions(-) create mode 100644 docker/client/Dockerfile create mode 100644 docker/manager/Dockerfile create mode 100644 docker/server/Dockerfile mode change 100644 => 100755 scripts/test_run_all.sh delete mode 100644 src/server/volume.rs diff --git a/Makefile b/Makefile index 1e9b5d3..889a378 100644 --- a/Makefile +++ b/Makefile @@ -22,3 +22,14 @@ build: test: cargo test --features=$(features) + +images: manager-image server-image client-image + +manager-image: + docker build -t manager -f docker/manager/Dockerfile . --no-cache + +server-image: + docker build -t server -f docker/server/Dockerfile . + +client-image: + docker build -t client -f docker/client/Dockerfile . \ No newline at end of file diff --git a/docker/client/Dockerfile b/docker/client/Dockerfile new file mode 100644 index 0000000..f4b1b6a --- /dev/null +++ b/docker/client/Dockerfile @@ -0,0 +1,10 @@ +FROM debian:bullseye-20221205 + +RUN apt update && apt upgrade -y && apt-mark unhold libcap2 && \ + apt install -y libfuse3-3 libfuse2 libibverbs1 && \ + apt clean && \ + rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* + +COPY target/debug/client /usr/local/bin/client + +ENTRYPOINT ["/usr/local/bin/client"] diff --git a/docker/manager/Dockerfile b/docker/manager/Dockerfile new file mode 100644 index 0000000..c694c0e --- /dev/null +++ b/docker/manager/Dockerfile @@ -0,0 +1,10 @@ +FROM debian:bullseye-20221205 + +RUN apt update && apt upgrade -y && apt-mark unhold libcap2 && \ + apt install -y libfuse3-3 libfuse2 libibverbs1 && \ + apt clean && \ + rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* + +COPY target/debug/manager /usr/local/bin/manager + +ENTRYPOINT ["/usr/local/bin/manager"] diff --git a/docker/server/Dockerfile b/docker/server/Dockerfile new file mode 100644 index 0000000..62f020e --- /dev/null +++ b/docker/server/Dockerfile @@ -0,0 +1,10 @@ +FROM debian:bullseye-20221205 + +RUN apt update && apt upgrade -y && apt-mark unhold libcap2 && \ + apt install -y libfuse3-3 libfuse2 libibverbs1 && \ + apt clean && \ + rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* + +COPY target/debug/server /usr/local/bin/server + +ENTRYPOINT ["/usr/local/bin/server"] diff --git a/examples/manager.yaml b/examples/manager.yaml index 40a7622..be05e7e 100644 --- a/examples/manager.yaml +++ b/examples/manager.yaml @@ -8,9 +8,5 @@ all_servers_address: - 127.0.0.1:8089 virtual_nodes: 100 -heartbeat: - false log_level: warn -protect_threshold: - 0.5 diff --git a/scripts/test_run_all.sh b/scripts/test_run_all.sh old mode 100644 new mode 100755 index 98dbb9b..47f1e62 --- a/scripts/test_run_all.sh +++ b/scripts/test_run_all.sh @@ -1,7 +1,6 @@ #!/bin/bash function finish() { - sudo rm -rf ~/fs trap 'kill $(jobs -p)' EXIT exit $1 } @@ -51,7 +50,7 @@ done sleep 3 -./target/debug/client --log-level $log_level create test1 100000 +./target/debug/client --log-level $log_level create-volume test1 100000 ./target/debug/client --log-level $log_level daemon& sleep 3 diff --git a/src/bin/manager.rs b/src/bin/manager.rs index 901849a..0f42483 100644 --- a/src/bin/manager.rs +++ b/src/bin/manager.rs @@ -17,15 +17,11 @@ struct Args { #[arg(long)] address: Option, #[arg(long)] - protect_threshold: Option, - #[arg(long)] config_file: Option, /// To use customized configuration or not. If this flag is used, please provide a config file through --config_file #[arg(long)] use_config_file: bool, #[arg(long)] - heartbeat: Option, - #[arg(long)] log_level: Option, #[arg(long)] all_servers_address: Option>, @@ -36,10 +32,8 @@ struct Args { #[derive(Debug, Serialize, Deserialize)] struct Properties { address: String, - protect_threshold: String, all_servers_address: Vec, virtual_nodes: usize, - heartbeat: bool, log_level: String, } @@ -89,16 +83,12 @@ async fn main() -> anyhow::Result<()> { } false => Properties { address: args.address.unwrap_or(default_properties.address), - protect_threshold: args - .protect_threshold - .unwrap_or(default_properties.protect_threshold), all_servers_address: args .all_servers_address .unwrap_or(default_properties.all_servers_address), virtual_nodes: args .virtual_nodes .unwrap_or(default_properties.virtual_nodes), - heartbeat: args.heartbeat.unwrap_or(default_properties.heartbeat), log_level: args.log_level.unwrap_or(default_properties.log_level), }, }; diff --git a/src/client/fuse_client.rs b/src/client/fuse_client.rs index fe0b5e3..a1d0429 100644 --- a/src/client/fuse_client.rs +++ b/src/client/fuse_client.rs @@ -7,7 +7,7 @@ use crate::common::hash_ring::HashRing; use crate::common::serialization::{ ClusterStatus, CreateDirSendMetaData, CreateFileSendMetaData, DeleteDirSendMetaData, DeleteFileSendMetaData, FileAttrSimple, OpenFileSendMetaData, OperationType, - ReadDirSendMetaData, ReadFileSendMetaData, WriteFileSendMetaData, + ReadDirSendMetaData, ReadFileSendMetaData, Volume, WriteFileSendMetaData, }; use crate::common::{errors, sender}; use crate::rpc; @@ -213,6 +213,16 @@ impl Client { .await } + pub async fn list_volumes(&self) -> Result, i32> { + let mut volumes: Vec = Vec::new(); + + for server_address in self.hash_ring.read().as_ref().unwrap().get_server_lists() { + let mut new_volumes = self.sender.list_volumes(&server_address).await?; + volumes.append(&mut new_volumes); + } + Ok(volumes) + } + pub async fn delete_servers(&self, servers_info: Vec) -> Result<(), i32> { self.sender .delete_servers(&self.manager_address.lock().await, servers_info) @@ -248,6 +258,12 @@ impl Client { .await } + pub async fn delete_volume(&self, name: &str) -> Result<(), i32> { + self.sender + .delete_volume(&self.get_connection_address(name), name) + .await + } + pub async fn lookup_remote(&self, parent: u64, name: OsString, reply: ReplyEntry) { info!( "lookup_remote, parent: {}, name: {}", diff --git a/src/client/mod.rs b/src/client/mod.rs index fa740a8..6ddf406 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -36,46 +36,71 @@ struct Cli { #[derive(Subcommand)] enum Commands { - Create { + CreateVolume { /// Create a volume with a given mount point and size - #[arg(required = true, name = "MOUNT_POINT")] + #[arg(required = true, name = "mount-point")] mount_point: Option, /// Size of the volume - #[arg(required = true, name = "volume_SIZE")] + #[arg(required = true, name = "volume-size")] volume_size: Option, /// Address of the manager - #[arg(short = 'm', long = "manager_address", name = "manager_address")] + #[arg(short = 'm', long = "manager-address", name = "manager-address")] + manager_address: Option, + }, + DeleteVolume { + /// Create a volume with a given mount point and size + #[arg(required = true, name = "mount-point")] + mount_point: Option, + + /// Address of the manager + #[arg(short = 'm', long = "manager-address", name = "manager-address")] manager_address: Option, }, Daemon { /// Start a daemon that hosts volumes - /// index file - #[arg(name = "INDEX_FILE")] - index_file: Option, /// Address of the manager - #[arg(short = 'm', long = "manager_address", name = "manager_address")] + #[arg(short = 'm', long = "manager-address", name = "manager-address")] manager_address: Option, + + /// index file + #[arg(long = "index-file", name = "index-file")] + index_file: Option, + + /// socket path + #[arg(long = "socket-path", name = "socket-path")] + socket_path: Option, + + /// clean socket file + #[arg(long = "clean-socket", name = "clean-socket")] + clean_socket: bool, }, Mount { /// Act as a client, and mount FUSE at given path - #[arg(required = true, name = "MOUNT_POINT")] + #[arg(required = true, name = "mount-point")] mount_point: Option, /// Remote volume name - #[arg(required = true, name = "volume_NAME")] + #[arg(required = true, name = "volume-name")] volume_name: Option, + + #[arg(name = "socket-path")] + socket_path: Option, }, Umount { /// Unmount FUSE at given path - #[arg(required = true, name = "MOUNT_POINT")] + + #[arg(required = true, name = "mount-point")] mount_point: Option, + + #[arg(name = "socket-path")] + socket_path: Option, }, Add { /// Add a server to the cluster - #[arg(required = true, name = "SERVER_ADDRESS")] + #[arg(required = true, name = "server-address")] server_address: Option, /// Weight of the server @@ -83,37 +108,42 @@ enum Commands { weight: Option, /// Address of the manager - #[arg(short = 'm', long = "manager_address", name = "manager_address")] + #[arg(short = 'm', long = "manager-address", name = "manager-address")] manager_address: Option, }, Delete { /// Delete a server from the cluster - #[arg(required = true, name = "SERVER_ADDRESS")] + #[arg(required = true, name = "server-address")] server_address: Option, /// Address of the manager - #[arg(short = 'm', long = "manager_address", name = "manager_address")] + #[arg(short = 'm', long = "manager-address", name = "manager-address")] manager_address: Option, }, ListServers { /// List all servers in the cluster /// Address of the manager - #[arg(short = 'm', long = "manager_address", name = "manager_address")] + #[arg(short = 'm', long = "manager-address", name = "manager-address")] _manager_address: Option, }, ListVolumes { /// List all servers in the cluster /// Address of the manager - #[arg(short = 'm', long = "manager_address", name = "manager_address")] - _manager_address: Option, + #[arg(short = 'm', long = "manager-address", name = "manager-address")] + manager_address: Option, + }, + ListMountpoints { + #[arg(name = "socket-path")] + socket_path: Option, }, - ListMountpoints {}, Status { /// Address of the manager - #[arg(short = 'm', long = "manager_address", name = "manager_address")] + #[arg(short = 'm', long = "manager-address", name = "manager-ddress")] manager_address: Option, }, Probe { + #[arg(name = "socket-path")] + socket_path: Option, // Probe the local client }, } @@ -350,7 +380,7 @@ pub async fn run_command() -> Result<(), Box> { let client = Arc::new(Client::new()); match cli.command { - Commands::Create { + Commands::CreateVolume { mount_point, volume_size, manager_address, @@ -388,9 +418,45 @@ pub async fn run_command() -> Result<(), Box> { Ok(()) } + Commands::DeleteVolume { + mount_point, + manager_address, + } => { + let mountpoint = mount_point.unwrap(); + + let manager_address = match manager_address { + Some(address) => address, + None => "127.0.0.1:8081".to_owned(), + }; + + info!("init client"); + connect_to_manager(manager_address, client.clone()).await; + + info!("connect_servers"); + if let Err(status) = client.connect_servers().await { + error!( + "connect_servers failed, status = {:?}", + status_to_string(status) + ); + return Ok(()); + } + + info!("delete_volume"); + if let Err(status) = client.delete_volume(&mountpoint).await { + error!( + "delete_volume failed, status = {:?}", + status_to_string(status) + ); + return Ok(()); + } + + Ok(()) + } Commands::Daemon { index_file, manager_address, + socket_path, + clean_socket, } => { let index_file = match index_file { Some(file) => file, @@ -419,7 +485,20 @@ pub async fn run_command() -> Result<(), Box> { Err(e) => panic!("sealfsd init failed, error = {}", e), } - let server = RpcServer::new(Arc::new(sealfsd), LOCAL_PATH); + let socket_path = match socket_path { + Some(path) => path, + None => LOCAL_PATH.to_owned(), + }; + + if clean_socket { + if let Err(e) = std::fs::remove_file(&socket_path) { + if e.kind() != std::io::ErrorKind::NotFound { + panic!("remove socket file failed, error = {}", e); + } + } + } + + let server = RpcServer::new(Arc::new(sealfsd), &socket_path); let result = server.run_unix_stream().await; match result { Ok(_) => info!("server run success"), @@ -432,10 +511,15 @@ pub async fn run_command() -> Result<(), Box> { Commands::Mount { mount_point, volume_name, + socket_path, } => { - let local_client = LocalCli::new(LOCAL_PATH.to_owned()); + let socket_path = match socket_path { + Some(path) => path, + None => LOCAL_PATH.to_owned(), + }; + let local_client = LocalCli::new(socket_path.clone()); - if let Err(e) = local_client.add_connection(LOCAL_PATH).await { + if let Err(e) = local_client.add_connection(&socket_path).await { panic!("add connection failed, error = {}", status_to_string(e)) } @@ -449,10 +533,17 @@ pub async fn run_command() -> Result<(), Box> { Ok(()) } - Commands::Umount { mount_point } => { - let local_client = LocalCli::new(LOCAL_PATH.to_owned()); + Commands::Umount { + mount_point, + socket_path, + } => { + let socket_path = match socket_path { + Some(path) => path, + None => LOCAL_PATH.to_owned(), + }; + let local_client = LocalCli::new(socket_path.clone()); - if let Err(e) = local_client.add_connection(LOCAL_PATH).await { + if let Err(e) = local_client.add_connection(&socket_path).await { panic!("add connection failed, error = {}", status_to_string(e)) } @@ -516,11 +607,45 @@ pub async fn run_command() -> Result<(), Box> { Ok(()) } Commands::ListServers { _manager_address } => todo!(), - Commands::ListVolumes { _manager_address } => todo!(), - Commands::ListMountpoints {} => { - let local_client = LocalCli::new(LOCAL_PATH.to_owned()); + Commands::ListVolumes { manager_address } => { + let manager_address = match manager_address { + Some(address) => address, + None => "127.0.0.1:8081".to_owned(), + }; + info!("init client"); + connect_to_manager(manager_address, client.clone()).await; - if let Err(e) = local_client.add_connection(LOCAL_PATH).await { + info!("connect_servers"); + if let Err(status) = client.connect_servers().await { + error!( + "connect_servers failed, status = {:?}", + status_to_string(status) + ); + return Ok(()); + } + + let result = client.list_volumes().await; + match result { + Ok(volumes) => { + info!("list volumes success"); + for volume in volumes { + println!("{}", volume); + } + } + Err(e) => { + info!("list volumes failed, error = {}", status_to_string(e)) + } + }; + Ok(()) + } + Commands::ListMountpoints { socket_path } => { + let socket_path = match socket_path { + Some(path) => path, + None => LOCAL_PATH.to_owned(), + }; + let local_client = LocalCli::new(socket_path.clone()); + + if let Err(e) = local_client.add_connection(&socket_path).await { panic!("add connection failed, error = {}", status_to_string(e)) } @@ -558,10 +683,14 @@ pub async fn run_command() -> Result<(), Box> { }; Ok(()) } - Commands::Probe {} => { - let local_client = LocalCli::new(LOCAL_PATH.to_owned()); + Commands::Probe { socket_path } => { + let socket_path = match socket_path { + Some(path) => path, + None => LOCAL_PATH.to_owned(), + }; + let local_client = LocalCli::new(socket_path.clone()); - if let Err(e) = local_client.add_connection(LOCAL_PATH).await { + if let Err(e) = local_client.add_connection(&socket_path).await { panic!("add connection failed, error = {}", status_to_string(e)) } diff --git a/src/common/hash_ring.rs b/src/common/hash_ring.rs index dd1ec93..d15fbc5 100644 --- a/src/common/hash_ring.rs +++ b/src/common/hash_ring.rs @@ -74,4 +74,8 @@ impl HashRing { pub fn contains(&self, server: &str) -> bool { self.servers.contains_key(server) } + + pub fn get_server_lists(&self) -> Vec { + self.servers.keys().cloned().collect() + } } diff --git a/src/common/sender.rs b/src/common/sender.rs index 0ac595d..dc4715b 100644 --- a/src/common/sender.rs +++ b/src/common/sender.rs @@ -16,6 +16,7 @@ use crate::{ use super::serialization::{ AddNodesSendMetaData, ClusterStatus, CreateVolumeSendMetaData, DeleteNodesSendMetaData, GetClusterStatusRecvMetaData, GetHashRingInfoRecvMetaData, ManagerOperationType, OperationType, + Volume, }; pub struct Sender { @@ -268,15 +269,53 @@ impl Sender { } } + pub async fn list_volumes(&self, address: &str) -> Result, i32> { + let mut status = 0i32; + let mut rsp_flags = 0u32; + + let mut recv_meta_data_length = 0usize; + let mut recv_data_length = 0usize; + + let mut recv_meta_data = vec![0u8; 65535]; + + let result = self + .client + .call_remote( + address, + OperationType::ListVolumes.into(), + 0, + "", + &[], + &[], + &mut status, + &mut rsp_flags, + &mut recv_meta_data_length, + &mut recv_data_length, + &mut recv_meta_data, + &mut [], + ) + .await; + match result { + Ok(_) => { + if status != 0 { + return Err(status); + } + let volumes: Vec = + bincode::deserialize(&recv_meta_data[..recv_meta_data_length]).unwrap(); + Ok(volumes) + } + Err(e) => { + error!("list volumes failed: {}", e); + Err(CONNECTION_ERROR) + } + } + } + pub async fn create_volume(&self, address: &str, name: &str, size: u64) -> Result<(), i32> { let mut status = 0i32; let mut rsp_flags = 0u32; - let send_meta_data = bincode::serialize(&CreateVolumeSendMetaData { - name: name.to_string(), - size, - }) - .unwrap(); + let send_meta_data = bincode::serialize(&CreateVolumeSendMetaData { size }).unwrap(); let mut recv_meta_data_length = 0usize; let mut recv_data_length = 0usize; @@ -287,7 +326,7 @@ impl Sender { address, OperationType::CreateVolume.into(), 0, - "", + name, &send_meta_data, &[], &mut status, @@ -312,6 +351,82 @@ impl Sender { } } + pub async fn delete_volume(&self, address: &str, name: &str) -> Result<(), i32> { + let mut status = 0i32; + let mut rsp_flags = 0u32; + + let mut recv_meta_data_length = 0usize; + let mut recv_data_length = 0usize; + + let result = self + .client + .call_remote( + address, + OperationType::DeleteVolume.into(), + 0, + name, + &[], + &[], + &mut status, + &mut rsp_flags, + &mut recv_meta_data_length, + &mut recv_data_length, + &mut [], + &mut [], + ) + .await; + match result { + Ok(_) => { + if status != 0 { + return Err(status); + } + Ok(()) + } + Err(e) => { + error!("delete volume failed: {:?}", e); + Err(CONNECTION_ERROR) + } + } + } + + pub async fn clean_volume(&self, address: &str, name: &str) -> Result<(), i32> { + let mut status = 0i32; + let mut rsp_flags = 0u32; + + let mut recv_meta_data_length = 0usize; + let mut recv_data_length = 0usize; + + let result = self + .client + .call_remote( + address, + OperationType::CleanVolume.into(), + 0, + name, + &[], + &[], + &mut status, + &mut rsp_flags, + &mut recv_meta_data_length, + &mut recv_data_length, + &mut [], + &mut [], + ) + .await; + match result { + Ok(_) => { + if status != 0 { + return Err(status); + } + Ok(()) + } + Err(e) => { + error!("clean volume failed: {:?}", e); + Err(CONNECTION_ERROR) + } + } + } + pub async fn init_volume(&self, address: &str, name: &str) -> Result<(), i32> { let mut status = 0i32; let mut rsp_flags = 0u32; diff --git a/src/common/serialization.rs b/src/common/serialization.rs index 098fbcc..7db0e06 100644 --- a/src/common/serialization.rs +++ b/src/common/serialization.rs @@ -46,6 +46,9 @@ pub enum OperationType { DeleteFileNoParent = 19, CreateVolume = 20, InitVolume = 21, + ListVolumes = 22, + DeleteVolume = 23, + CleanVolume = 24, } impl TryFrom for OperationType { @@ -75,6 +78,9 @@ impl TryFrom for OperationType { 19 => Ok(OperationType::DeleteFileNoParent), 20 => Ok(OperationType::CreateVolume), 21 => Ok(OperationType::InitVolume), + 22 => Ok(OperationType::ListVolumes), + 23 => Ok(OperationType::DeleteVolume), + 24 => Ok(OperationType::CleanVolume), _ => panic!("Unkown value: {}", value), } } @@ -105,6 +111,9 @@ impl From for u32 { OperationType::DeleteFileNoParent => 19, OperationType::CreateVolume => 20, OperationType::InitVolume => 21, + OperationType::ListVolumes => 22, + OperationType::DeleteVolume => 23, + OperationType::CleanVolume => 24, } } } @@ -775,6 +784,22 @@ pub struct CheckDirSendMetaData { #[derive(Serialize, Deserialize, PartialEq)] pub struct CreateVolumeSendMetaData { + pub size: u64, +} + +#[derive(Serialize, Deserialize, PartialEq, Clone)] +pub struct Volume { pub name: String, pub size: u64, + pub used_size: u64, +} + +impl Display for Volume { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Volume {{ name: {}, size: {}, used_size: {} }}", + self.name, self.size, self.used_size + ) + } } diff --git a/src/server/distributed_engine.rs b/src/server/distributed_engine.rs index 461ceb8..7320a4e 100644 --- a/src/server/distributed_engine.rs +++ b/src/server/distributed_engine.rs @@ -1,7 +1,6 @@ use super::storage_engine::meta_engine::MetaEngine; use super::storage_engine::StorageEngine; use super::transfer_manager::TransferManager; -use super::volume::Volume; use crate::common::byte::CHUNK_SIZE; use crate::common::errors::CONNECTION_ERROR; use crate::common::hash_ring::HashRing; @@ -48,10 +47,7 @@ pub struct DistributedEngine { pub manager_address: Arc>, pub file_locks: DashMap>, - pub volumes: DashMap, - pub volume_lock: spin::Mutex<()>, pub transfer_manager: TransferManager, - pub volume_indexes: DashMap, } impl DistributedEngine @@ -64,6 +60,9 @@ where meta_engine: Arc, ) -> Self { let file_locks = DashMap::new(); + for kv in &meta_engine.file_indexs { + file_locks.insert(kv.key().to_owned(), HashMap::new()); + } let client = Arc::new(RpcClient::new()); Self { address, @@ -76,10 +75,7 @@ where new_hash_ring: Arc::new(RwLock::new(None)), manager_address: Arc::new(Mutex::new("".to_string())), file_locks, - volumes: DashMap::new(), - volume_lock: spin::Mutex::new(()), transfer_manager: TransferManager::new(), - volume_indexes: DashMap::new(), } } @@ -88,7 +84,6 @@ where error!("add connection failed: {:?}", e); CONNECTION_ERROR }) - //self.sender.init_volume(&address, "").await.unwrap(); } pub fn lock_file( @@ -643,6 +638,9 @@ where OperationType::DeleteFileNoParent => (0, 0, 0, 0, vec![], vec![]), OperationType::CreateVolume => (0, 0, 0, 0, vec![], vec![]), OperationType::InitVolume => (0, 0, 0, 0, vec![], vec![]), + OperationType::ListVolumes => (0, 0, 0, 0, vec![], vec![]), + OperationType::DeleteVolume => (0, 0, 0, 0, vec![], vec![]), + OperationType::CleanVolume => (0, 0, 0, 0, vec![], vec![]), }; let result = self .client @@ -705,29 +703,33 @@ where file_lock.insert(name.to_owned(), 0); } - let path = get_full_path(parent, name); - let (address, _lock) = self.get_server_address(&path); - let result = if self.address == address { - info!( - "local create dir, parent_dir: {}, file_name: {}", - parent, name - ); - self.create_dir_no_parent(&path, mode) - } else { - self.sender - .create_no_parent( - &address, - OperationType::CreateDirNoParent, - &path, - &send_meta_data, - ) - .await - }; - - if result.is_ok() { + let result = self.meta_engine - .directory_add_entry(parent, name, FileTypeSimple::Directory.into())?; - } + .directory_add_entry(parent, name, FileTypeSimple::Directory.into()); + + let result = match result { + Ok(_) => { + let path = get_full_path(parent, name); + let (address, _lock) = self.get_server_address(&path); + if self.address == address { + info!( + "local create dir, parent_dir: {}, file_name: {}", + parent, name + ); + self.create_dir_no_parent(&path, mode) + } else { + self.sender + .create_no_parent( + &address, + OperationType::CreateDirNoParent, + &path, + &send_meta_data, + ) + .await + } + } + Err(e) => Err(e), + }; let mut file_lock = self.lock_file_mut(parent)?; file_lock.remove(name); @@ -914,45 +916,47 @@ where file_lock.insert(name.to_owned(), 0); } - let (address, _lock) = self.get_server_address(&path); - let result = if self.address == address { - info!( - "local create file, parent_file: {}, file_name: {}", - parent, name - ); - match self.create_file_no_parent(&path, oflag, umask, mode) { - Ok(attr) => Ok(attr), - Err(libc::EEXIST) => { - if (oflag & O_EXCL) != 0 { - return Err(libc::EEXIST); // this may indicate that the file is being created or deleted - } else { - return self.call_get_attr_remote_or_local(&path).await; + let result = + self.meta_engine + .directory_add_entry(parent, name, FileTypeSimple::RegularFile.into()); + + let result = match result { + Ok(_) => { + let (address, _lock) = self.get_server_address(&path); + if self.address == address { + info!( + "local create file, parent_file: {}, file_name: {}", + parent, name + ); + match self.create_file_no_parent(&path, oflag, umask, mode) { + Ok(attr) => Ok(attr), + Err(libc::EEXIST) => { + if (oflag & O_EXCL) != 0 { + Err(libc::EEXIST) // this may indicate that the file is being created or deleted + } else { + self.call_get_attr_remote_or_local(&path).await + } + } + Err(e) => { + error!("Create file: DirectoryAddEntry failed: {} ,{:?}", path, e); + Err(e) + } } - } - Err(e) => { - error!("Create file: DirectoryAddEntry failed: {} ,{:?}", path, e); - Err(e) + } else { + self.sender + .create_no_parent( + &address, + OperationType::CreateFileNoParent, + &path, + &send_meta_data, + ) + .await } } - } else { - self.sender - .create_no_parent( - &address, - OperationType::CreateFileNoParent, - &path, - &send_meta_data, - ) - .await + Err(e) => Err(e), }; let mut file_lock = self.lock_file_mut(parent)?; - if result.is_ok() { - self.meta_engine.directory_add_entry( - parent, - name, - FileTypeSimple::RegularFile.into(), - )?; - } file_lock.remove(name); drop(file_lock); @@ -1115,25 +1119,90 @@ where } } - pub fn create_volume(&self, name: &str) -> Result<(), i32> { - let _vlock = { - let _lock = self.volume_lock.lock(); - if self.volumes.contains_key(name) { - return Err(libc::EEXIST); + pub fn create_volume(&self, name: &str, _size: u64) -> Result<(), i32> { + match self.file_locks.insert(name.to_owned(), HashMap::new()) { + Some(_) => Err(libc::EEXIST), + None => self.meta_engine.create_volume(name), + } + } + + // delete and clean volume only work for unmounted volume + pub fn clean_volume(&self, name: &str) -> Result<(), i32> { + let files: Vec<(String, FileTypeSimple)> = self + .meta_engine + .file_indexs + .iter() + .map(|x| (x.key().to_owned(), x.value().file_type)) + .collect(); + for kv in files { + if kv.0.starts_with(&(name.to_owned() + "/")) { + if kv.1 == FileTypeSimple::RegularFile { + self.delete_file_no_parent(&kv.0)?; + } else { + self.delete_dir_no_parent_force(&kv.0)?; + } } - self.volumes.insert( - name.to_owned(), - Volume { - name: name.to_owned(), - size: 100000000, - used_size: 0, - }, - ); - self.volumes.get(name).unwrap() + } + Ok(()) + } + + // delete and clean volume only work for unmounted volume + pub async fn delete_volume(&self, name: &str) -> Result<(), i32> { + // TODO: check if the volume is not mounted + let server_addresses: Vec = self + .hash_ring + .read() + .as_ref() + .unwrap() + .servers + .keys() + .cloned() + .collect(); + for address in &server_addresses { + if address == &self.address { + self.clean_volume(name).unwrap_or_else(|e| { + error!("clean volume failed: {:?}", e); + }); + } else { + match self.sender.clean_volume(address, name).await { + Ok(_) => {} + Err(e) => { + error!("delete volume failed: {:?}", e); + return Err(e); + } + } + } + } + let new_server_addresses = match self.new_hash_ring.read().as_ref() { + Some(new_hash_ring) => new_hash_ring.servers.keys().cloned().collect(), + None => vec![], }; - match self.create_dir_no_parent(name, 0o755) { - Ok(_) => Ok(()), - Err(e) => Err(e), + for address in &new_server_addresses { + if server_addresses.contains(address) { + continue; + } + if address == &self.address { + self.clean_volume(name).unwrap_or_else(|e| { + error!("clean volume failed: {:?}", e); + }); + } else { + match self.sender.clean_volume(address, name).await { + Ok(_) => {} + Err(e) => { + error!("delete volume failed: {:?}", e); + return Err(e); + } + } + } + } + match self.file_locks.get_mut(name) { + Some(value) => { + self.meta_engine.delete_volume(name)?; + drop(value); + self.file_locks.remove(name); + Ok(()) + } + None => Err(libc::ENOENT), } } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 8c75d21..6ff30f3 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -5,8 +5,6 @@ pub mod distributed_engine; pub mod storage_engine; mod transfer_manager; -mod volume; - use std::{ sync::{atomic::Ordering, Arc}, time::Duration, @@ -176,7 +174,7 @@ pub async fn watch_status(engine: Arc>) { let _old_hash_ring = engine .hash_ring .write() - .replace(engine.new_hash_ring.read().clone().unwrap()); + .replace(engine.new_hash_ring.read().clone().unwrap()); // TODO: _old_hash_ring should be used to rollback the transfer process info!("Transfer: start to finishing"); match engine.update_server_status(ServerStatus::Finishing).await { @@ -815,15 +813,18 @@ where info!("{} Create Volume", self.engine.address); let meta_data_unwraped: CreateVolumeSendMetaData = bincode::deserialize(&metadata).unwrap(); - info!("Create Volume: {:?}, id: {}", &meta_data_unwraped.name, id); - if meta_data_unwraped.name.is_empty() - || meta_data_unwraped.name.len() > 255 - || meta_data_unwraped.name.contains('\0') - || meta_data_unwraped.name.contains('/') + info!("Create Volume: {:?}, id: {}", file_path, id); + if file_path.is_empty() + || file_path.len() > 255 + || file_path.contains('\0') + || file_path.contains('/') { return Ok((libc::EINVAL, 0, 0, 0, vec![], vec![])); } - let status = match self.engine.create_volume(&meta_data_unwraped.name) { + let status = match self + .engine + .create_volume(file_path, meta_data_unwraped.size) + { Ok(()) => 0, Err(e) => { info!( @@ -839,20 +840,19 @@ where return Ok((status, 0, 0, 0, Vec::new(), Vec::new())); } OperationType::InitVolume => { - let file_path = String::from_utf8(path).unwrap(); info!( "{} Init Volume: {}, id: {}", self.engine.address, file_path, id ); if !file_path.is_empty() - && self.engine.get_address(&file_path) == self.engine.address - && !self.engine.volumes.contains_key(&file_path) + && self.engine.get_address(file_path) == self.engine.address + && self.engine.meta_engine.init_volume(file_path).is_err() { error!( "Volume not Exists: id: {}, file_path: {}, address {}, self_address {}", id, file_path, - self.engine.get_address(&file_path), + self.engine.get_address(file_path), self.engine.address ); return Ok((libc::ENOENT, 0, 0, 0, vec![], vec![])); @@ -860,6 +860,68 @@ where //self.engine.volume_indexes.insert(id, file_path); return Ok((0, 0, 0, 0, Vec::new(), Vec::new())); } + OperationType::ListVolumes => { + info!("{} List Volume", self.engine.address); + let return_meta_data = self.engine.meta_engine.list_volumes().unwrap(); + return Ok(( + 0, + 0, + return_meta_data.len(), + 0, + return_meta_data, + Vec::new(), + )); + } + OperationType::DeleteVolume => { + info!("{} Delete Volume", self.engine.address); + info!("Delete Volume: {:?}, id: {}", file_path, id); + if file_path.is_empty() + || file_path.len() > 255 + || file_path.contains('\0') + || file_path.contains('/') + { + return Ok((libc::EINVAL, 0, 0, 0, vec![], vec![])); + } + let status = match self.engine.delete_volume(file_path).await { + Ok(()) => 0, + Err(e) => { + info!( + "Delete Volume Failed: {:?}, path: {}, operation_type: {}, flags: {}", + status_to_string(e), + std::str::from_utf8(path.as_slice()).unwrap(), + operation_type, + flags + ); + e + } + }; + return Ok((status, 0, 0, 0, Vec::new(), Vec::new())); + } + OperationType::CleanVolume => { + info!("{} Clean Volume", self.engine.address); + info!("Clean Volume: {:?}, id: {}", file_path, id); + if file_path.is_empty() + || file_path.len() > 255 + || file_path.contains('\0') + || file_path.contains('/') + { + return Ok((libc::EINVAL, 0, 0, 0, vec![], vec![])); + } + let status = match self.engine.clean_volume(file_path) { + Ok(()) => 0, + Err(e) => { + info!( + "Clean Volume Failed: {:?}, path: {}, operation_type: {}, flags: {}", + status_to_string(e), + std::str::from_utf8(path.as_slice()).unwrap(), + operation_type, + flags + ); + e + } + }; + return Ok((status, 0, 0, 0, Vec::new(), Vec::new())); + } } } } diff --git a/src/server/storage_engine/file_engine.rs b/src/server/storage_engine/file_engine.rs index 8530fc6..90bd8b1 100644 --- a/src/server/storage_engine/file_engine.rs +++ b/src/server/storage_engine/file_engine.rs @@ -62,8 +62,8 @@ impl StorageEngine for FileEngine { } fn init(&self) { - self.meta_engine.init(); self.fsck().unwrap(); + self.meta_engine.init(); } fn read_file(&self, path: &str, size: u32, offset: i64) -> Result, i32> { @@ -231,7 +231,7 @@ impl StorageEngine for FileEngine { return Err(f_errno); }; self.meta_engine.delete_file_attr(path)?; - self.meta_engine.delete_file(&local_file_name)?; + self.meta_engine.delete_file(&local_file_name, path)?; Ok(()) } @@ -382,6 +382,7 @@ mod tests { assert_eq!(Path::new(&local_file_name).is_file(), true); engine.delete_file("test1/a.txt").unwrap(); assert_eq!(Path::new(&local_file_name).is_file(), false); + meta_engine.delete_directory("test1").unwrap(); } { @@ -402,6 +403,9 @@ mod tests { assert_eq!(Path::new(&local_file_name).is_file(), true); engine.delete_file("test1/test_a/a/a.txt").unwrap(); assert_eq!(Path::new(&local_file_name).is_file(), false); + meta_engine.delete_directory("test1/test_a/a").unwrap(); + meta_engine.delete_directory("test1/test_a").unwrap(); + meta_engine.delete_directory("test1").unwrap(); } rocksdb::DB::destroy(&rocksdb::Options::default(), format!("{}_dir", db_path)).unwrap(); rocksdb::DB::destroy(&rocksdb::Options::default(), format!("{}_file", db_path)).unwrap(); diff --git a/src/server/storage_engine/meta_engine.rs b/src/server/storage_engine/meta_engine.rs index f22a411..3bdc912 100644 --- a/src/server/storage_engine/meta_engine.rs +++ b/src/server/storage_engine/meta_engine.rs @@ -1,17 +1,17 @@ use bytes::BufMut; use dashmap::DashMap; use libc::{DT_DIR, DT_LNK, DT_REG}; -use log::{debug, error}; +use log::{debug, error, info}; #[cfg(feature = "mem-db")] use pegasusdb::DB; -use rocksdb::BlockBasedOptions; +use rocksdb::{BlockBasedOptions, WriteBatch}; #[cfg(feature = "disk-db")] use rocksdb::{Cache, IteratorMode, Options, DB}; use crate::{ common::{ errors::{DATABASE_ERROR, SERIALIZATION_ERROR}, - serialization::{FileAttrSimple, FileTypeSimple}, + serialization::{FileAttrSimple, FileTypeSimple, Volume}, }, server::path_split, }; @@ -28,11 +28,18 @@ pub struct Database { pub db: DB, } +pub struct FileIndex { + pub file_type: FileTypeSimple, + pub status: u32, + pub sub_files_num: u32, +} + pub struct MetaEngine { pub file_db: Database, pub dir_db: Database, pub file_attr_db: Database, - pub file_indexs: DashMap, + pub file_indexs: DashMap, + pub volumes: DashMap, } impl MetaEngine { @@ -110,10 +117,65 @@ impl MetaEngine { dir_db, file_attr_db, file_indexs: DashMap::new(), + volumes: DashMap::new(), } } - pub fn init(&self) {} + pub fn init(&self) { + for file_name in self.file_attr_db.db.iterator(IteratorMode::Start) { + let (k, v) = file_name.unwrap(); + let k = String::from_utf8(k.to_vec()).unwrap(); + let attr: FileAttrSimple = bincode::deserialize(&v).unwrap(); + let file_type = attr.kind; + match file_type { + 4 => { + // RegularFile + self.file_indexs.insert( + k, + FileIndex { + file_type: FileTypeSimple::RegularFile, + status: 0, + sub_files_num: 0, + }, + ); + } + 3 => { + // Directory + self.file_indexs.insert( + k.clone(), + FileIndex { + file_type: FileTypeSimple::Directory, + status: 0, + sub_files_num: 2, + }, + ); + if !k.contains('/') { + self.volumes.insert( + k.clone(), + Volume { + name: k, + size: 10000000, + used_size: 0, + }, + ); + } + } + _ => {} + } + } + + for dir_name in self.dir_db.db.iterator(IteratorMode::Start) { + let sub_dir_info = String::from_utf8(dir_name.unwrap().0.to_vec()).unwrap(); + let list = sub_dir_info.split('-').collect::>(); + info!("list: {:?}", list); + let mut file_index = self + .file_indexs + .get_mut(list.first().unwrap().to_owned()) + .unwrap(); + file_index.sub_files_num += 1; + info!("file_index.sub_files_num: {:}", file_index.sub_files_num); + } + } pub fn get_file_map(&self) -> Result, i32> { let mut file_map = Vec::new(); @@ -129,22 +191,42 @@ impl MetaEngine { } pub fn put_file(&self, loacl_file_name: &str, path: &str) -> Result<(), i32> { - match self.file_db.db.put(loacl_file_name, path) { - Ok(_) => Ok(()), - Err(e) => { - error!("put file error: {}", e); - Err(DATABASE_ERROR) + match self.file_indexs.get_mut(path) { + Some(_) => Err(libc::EEXIST), + None => { + self.file_indexs.insert( + path.to_string(), + FileIndex { + file_type: FileTypeSimple::RegularFile, + status: 0, + sub_files_num: 2, + }, + ); + match self.file_db.db.put(loacl_file_name, path) { + Ok(_) => Ok(()), + Err(e) => { + error!("put file error: {}", e); + Err(DATABASE_ERROR) + } + } } } } - pub fn delete_file(&self, local_file_name: &str) -> Result<(), i32> { - match self.file_db.db.delete(local_file_name) { - Ok(_) => Ok(()), - Err(e) => { - error!("delete file error: {}", e); - Err(DATABASE_ERROR) + pub fn delete_file(&self, local_file_name: &str, path: &str) -> Result<(), i32> { + match self.file_indexs.get_mut(path) { + Some(value) => { + drop(value); + self.file_indexs.remove(path); + match self.file_db.db.delete(local_file_name) { + Ok(_) => Ok(()), + Err(e) => { + error!("delete file error: {}", e); + Err(DATABASE_ERROR) + } + } } + None => Err(libc::ENOENT), } } @@ -164,7 +246,14 @@ impl MetaEngine { match self.file_indexs.get_mut(path) { Some(_) => Err(libc::EEXIST), None => { - self.file_indexs.insert(path.to_owned(), 2); + self.file_indexs.insert( + path.to_owned(), + FileIndex { + file_type: (FileTypeSimple::Directory), + status: (0), + sub_files_num: (2), + }, + ); let attr = FileAttrSimple::new(FileTypeSimple::Directory); self.put_file_attr(path, attr) } @@ -175,7 +264,7 @@ impl MetaEngine { pub fn delete_directory(&self, path: &str) -> Result<(), i32> { match self.file_indexs.get_mut(path) { Some(value) => { - if *value > 2 { + if value.sub_files_num > 2 { Err(libc::ENOTEMPTY) } else { drop(value); @@ -191,6 +280,19 @@ impl MetaEngine { if self.file_indexs.remove(path).is_none() { return Err(libc::ENOENT); } + + // delete sub file index in dir_db with prefix "path_" + let (start_key, end_key) = (path.to_owned() + "-", path.to_owned() + "-~"); + let mut batch = WriteBatch::default(); + batch.delete_range(start_key, end_key); + match self.dir_db.db.write(batch) { + Ok(_) => {} + Err(e) => { + error!("delete directory force error: {}", e); + return Err(DATABASE_ERROR); + } + } + self.delete_file_attr(path) } @@ -222,18 +324,24 @@ impl MetaEngine { // TODO: optimize the situation while offset is not 0 let mut index_num = match self.file_indexs.get(path) { - Some(value) => *value, + Some(value) => value.sub_files_num, //maybe better hold a lock None => { return Err(libc::ENOENT); } }; + info!( + "read directory: {}, size: {}, offset: {}, index_num: {}", + path, size, offset, index_num + ); + let mut result = Vec::with_capacity(size as usize); let mut total = 0; for item in self.dir_db.db.iterator(IteratorMode::From( format!("{}-", path).as_bytes(), rocksdb::Direction::Forward, )) { + info!("item: {:?}", item); if index_num == 2 { break; } @@ -281,6 +389,9 @@ impl MetaEngine { ) -> Result<(), i32> { match self.file_indexs.get_mut(parent_dir) { Some(mut value) => { + if value.file_type != FileTypeSimple::Directory { + return Err(libc::ENOTDIR); + } match self.dir_db.db.put( format!("{}-{}-{}", parent_dir, file_name, file_type as char), file_name, @@ -291,7 +402,7 @@ impl MetaEngine { return Err(DATABASE_ERROR); } } - *value += 1; + value.sub_files_num += 1; Ok(()) } None => { @@ -309,6 +420,9 @@ impl MetaEngine { ) -> Result<(), i32> { match self.file_indexs.get_mut(parent_dir) { Some(mut value) => { + if value.file_type != FileTypeSimple::Directory { + return Err(libc::ENOTDIR); + } match self.dir_db.db.delete(format!( "{}-{}-{}", parent_dir, file_name, file_type as char @@ -319,7 +433,8 @@ impl MetaEngine { return Err(DATABASE_ERROR); } } - *value -= 1; + assert!(value.sub_files_num > 2); + value.sub_files_num -= 1; Ok(()) } None => { @@ -341,7 +456,7 @@ impl MetaEngine { error!("delete from parent error: {}", e); return Err(DATABASE_ERROR); } - *value -= 1; + value.sub_files_num -= 1; Ok(()) } None => Err(libc::ENOENT), @@ -419,32 +534,64 @@ impl MetaEngine { } } + pub fn create_volume(&self, name: &str) -> Result<(), i32> { + if self.volumes.contains_key(name) { + return Err(libc::EEXIST); + } + self.volumes.insert( + name.to_owned(), + Volume { + name: name.to_owned(), + size: 100000000, + used_size: 0, + }, + ); + match self.create_directory(name, 0o755) { + Ok(_) => Ok(()), + Err(e) => Err(e), + } + } + + pub fn list_volumes(&self) -> Result, i32> { + let mut volumes = Vec::new(); + for kv in self.volumes.iter() { + volumes.push((*kv).clone()); + } + Ok(bincode::serialize(&volumes).unwrap()) + } + + pub fn init_volume(&self, name: &str) -> Result<(), i32> { + if !self.volumes.contains_key(name) { + return Err(libc::ENOENT); + } + Ok(()) + } + + // make sure the volume is empty + pub fn delete_volume(&self, name: &str) -> Result<(), i32> { + if !self.volumes.contains_key(name) { + return Err(libc::ENOENT); + } + self.volumes.remove(name); + match self.delete_directory_force(name) { + Ok(_) => Ok(()), + Err(e) => Err(e), + } + } + pub fn is_dir(&self, path: &str) -> Result { - match self.file_attr_db.db.get(path.as_bytes()) { - Ok(Some(value)) => { - match bincode::deserialize::(&value) { - Ok(file_attr) => { - // fuser::FileType::RegularFile - if file_attr.kind != 4 { - Ok(true) // not a file - } else { - Ok(false) - } - } - Err(e) => { - error!("deserialize error: {:?}", e); - Err(SERIALIZATION_ERROR) - } + match self.file_indexs.get(path) { + Some(value) => { + if value.file_type == FileTypeSimple::Directory { + Ok(true) + } else { + Ok(false) } } - Ok(None) => { - debug!("read_file path: {}, no entry", path); + None => { + debug!("is_dir path: {}, no entry", path); Err(libc::ENOENT) } - Err(e) => { - error!("read_file path: {}, io error: {:?}", path, e); - Err(DATABASE_ERROR) - } } } @@ -452,7 +599,12 @@ impl MetaEngine { #[cfg(feature = "disk-db")] for item in self.dir_db.db.iterator(IteratorMode::End) { let (key, _value) = item.unwrap(); - if !self.file_attr_db.db.key_may_exist(&key) { + let key = String::from_utf8(key.to_vec()).unwrap(); + if !self + .file_attr_db + .db + .key_may_exist(key.split('-').next().unwrap()) + { let _ = self.dir_db.db.delete(&key); } } @@ -492,15 +644,16 @@ mod tests { engine.directory_add_entry("test1", "a", 3).unwrap(); let mode: mode_t = 0o777; engine.create_directory("test1/a", mode).unwrap(); - let l = engine.file_indexs.get("test1/a").unwrap().clone(); + let l = engine.file_indexs.get("test1/a").unwrap().sub_files_num; assert_eq!(2, l); - let l = engine.file_indexs.get("test1").unwrap().clone(); + let l = engine.file_indexs.get("test1").unwrap().sub_files_num; assert_eq!(3, l); engine.directory_delete_entry("test1", "a", 3).unwrap(); engine.delete_directory("test1/a").unwrap(); assert_eq!(engine.file_indexs.get("test1/a").is_none(), true); - let l = engine.file_indexs.get("test1").unwrap().clone(); + let l = engine.file_indexs.get("test1").unwrap().sub_files_num; assert_eq!(2, l); + engine.delete_directory("test1").unwrap(); } { @@ -510,12 +663,12 @@ mod tests { engine.directory_add_entry("test1", "a1", 3).unwrap(); let mode: mode_t = 0o777; engine.create_directory("test1/a1", mode).unwrap(); - let l = engine.file_indexs.get("test1/a1").unwrap().clone(); + let l = engine.file_indexs.get("test1/a1").unwrap().sub_files_num; assert_eq!(2, l); engine.directory_add_entry("test1/a1", "a2", 3).unwrap(); engine.create_directory("test1/a1/a2", mode).unwrap(); - let l = engine.file_indexs.get("test1/a1").unwrap().clone(); + let l = engine.file_indexs.get("test1/a1").unwrap().sub_files_num; assert_eq!(3, l); engine.delete_directory("test1/a1/a2").unwrap(); engine.delete_from_parent("test1/a1/a2", 3).unwrap(); @@ -524,14 +677,15 @@ mod tests { engine.directory_add_entry("test1", "a3", 3).unwrap(); engine.create_directory("test1/a3", mode).unwrap(); - let l = engine.file_indexs.get("test1/a3").unwrap().clone(); + let l = engine.file_indexs.get("test1/a3").unwrap().sub_files_num; assert_eq!(2, l); engine.directory_delete_entry("test1", "a3", 3).unwrap(); engine.delete_directory("test1/a3").unwrap(); assert_eq!(engine.file_indexs.get("test1/a3").is_none(), true); - let l = engine.file_indexs.get("test1").unwrap().clone(); + let l = engine.file_indexs.get("test1").unwrap().sub_files_num; assert_eq!(2, l); + engine.delete_directory("test1").unwrap(); } rocksdb::DB::destroy(&rocksdb::Options::default(), format!("{}_dir", db_path)).unwrap(); rocksdb::DB::destroy(&rocksdb::Options::default(), format!("{}_file", db_path)).unwrap(); diff --git a/src/server/volume.rs b/src/server/volume.rs deleted file mode 100644 index d6dbe94..0000000 --- a/src/server/volume.rs +++ /dev/null @@ -1,9 +0,0 @@ -// Copyright 2022 labring. All rights reserved. -// -// SPDX-License-Identifier: Apache-2.0 - -pub struct Volume { - pub name: String, - pub size: u64, - pub used_size: u64, -} diff --git a/test_io500.sh b/test_io500.sh index bc833b4..02d6ee6 100755 --- a/test_io500.sh +++ b/test_io500.sh @@ -10,7 +10,7 @@ function green_font() { } function fuse_test() { - ./target/debug/client --log-level warn create test1 100000 + ./target/debug/client --log-level warn create-volume test1 100000 ./target/debug/client --log-level warn daemon& sleep 3 ./target/debug/client --log-level warn mount ~/fs test1 @@ -23,7 +23,6 @@ function fuse_test() { end_time=$[$(date +%s%N)/1000000] result_time=$[ $end_time - $start_time ] echo -e "fuse tests finish, cost: $(green_font ${result_time}ms)" - sudo rm -rf ~/fs return $result } @@ -90,7 +89,7 @@ echo "[global]" > config-minimal.ini echo "datadir = /home/sealos/fs" >> config-minimal.ini echo "" >> config-minimal.ini echo "[debug]" >> config-minimal.ini -echo "stonewall-time = 10" >> config-minimal.ini +echo "stonewall-time = 2" >> config-minimal.ini cd ..