Skip to content

Commit

Permalink
shmem updates
Browse files Browse the repository at this point in the history
  • Loading branch information
cbears committed Dec 9, 2024
1 parent 426f10a commit 77b9604
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 36 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "escp"
version = "0.8.0"
edition = "2018"
version = "0.9.0"
edition = "2021"
author = "Charles Shiflett <cshiflett@ESnet>"
authors = [ "Charles Shiflett" ]
about = "Energy Sciences Network transfer tool (EScp)"
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ sudo dpkg -i target/debian/escp_0.7.0_amd64.deb # Debian
# or
dnf install target/release/rpmbuild/RPMS/x86_64/escp-0.8.0*.rpm # Redhat Family
# For development
cargo install bindgen-cli --version 0.68.1
bindgen libdtn/include/dtn.h -o src/escp/bindings.rs --use-core --generate-cstr
Expand All @@ -217,7 +216,6 @@ complete -F _scp -o nospace escp
_completion_loader scp
```

KNOWNBUGS
Expand Down
1 change: 1 addition & 0 deletions flats/file_spec.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ table File {
uid: uint32;
gid: uint32;
sz: int64;
blocks: int64;

atim_sec: int64;
atim_nano: int64;
Expand Down
3 changes: 2 additions & 1 deletion libdtn/include/args.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ struct fc_info_struct {
uint64_t bytes;
uint32_t crc;
uint32_t completion;
uint64_t pad2[4];
uint64_t blocks;
uint64_t pad2[3];
};

struct dtn_args {
Expand Down
4 changes: 3 additions & 1 deletion libdtn/include/dtn.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,10 @@ struct fc_info_struct {
uint64_t bytes;
uint32_t crc;
uint32_t completion;
uint64_t pad2[4];
uint64_t blocks;
// uint64_t pad2[3];
};

struct fc_info_struct* fc_pop();
void fc_push( uint64_t file_no, uint64_t bytes, uint32_t crc );

Expand Down
14 changes: 9 additions & 5 deletions libdtn/src/dtn.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ uint32_t fc_info_cnt = 16384;
uint64_t fc_info_head __attribute__ ((aligned(64))) = 0;
uint64_t fc_info_tail __attribute__ ((aligned(64))) = 0;

void fc_push( uint64_t file_no, uint64_t bytes, uint32_t crc ) {
void fc_push( uint64_t file_no, uint64_t bytes, uint64_t blocks, uint32_t crc ) {
uint64_t head = atomic_fetch_add( &fc_info_head, 1 );
uint64_t tail = atomic_load( &fc_info_tail );
uint64_t h = head % fc_info_cnt;
Expand All @@ -96,6 +96,7 @@ void fc_push( uint64_t file_no, uint64_t bytes, uint32_t crc ) {
fc.state= 1;
fc.file_no = file_no;
fc.bytes = bytes;
fc.blocks = blocks;
fc.crc = crc;
fc.completion = 4;

Expand Down Expand Up @@ -899,12 +900,14 @@ void* rx_worker( void* arg ) {
DBG("[%2d] FIHDR_SHORT written=%08ld/%08ld fn=%ld os=%zX sz=%d",
id, written, fs.bytes, file_no, offset, sz );

// XXX: add block_total
if ( fs.bytes && fs.bytes <= written ) {

/* add blocks */
if (dtn->do_hash)
fc_push( file_no, fs.bytes, atomic_load(&fs_ptr->crc) );
fc_push( file_no, fs.bytes, fs.block_total, atomic_load(&fs_ptr->crc) );
else
fc_push( file_no, fs.bytes, 0 );
fc_push( file_no, fs.bytes, fs.block_total, 0 );

// Always truncate, even though it may not be needed in some cases
fob->truncate(fob, fs.bytes);
Expand Down Expand Up @@ -1070,7 +1073,7 @@ void* tx_worker( void* args ) {
int64_t res = atomic_fetch_add( &tx_filesclosed, 1 );
DBG("[%2d] Worker finished with fn=%ld files_closed=%ld; closing fd=%d",
id, fs_lcl.file_no, res, fs_lcl.fd);
fc_push(fs_lcl.file_no, atomic_load(&fs->bytes_total), atomic_load(&fs->crc));
fc_push(fs_lcl.file_no, atomic_load(&fs->bytes_total), atomic_load(&fs->block_total), atomic_load(&fs->crc));
wipe ++;
} else {
DBG("[%2d] Worker finished with fn=%ld", id, fs_lcl.file_no);
Expand Down Expand Up @@ -1122,7 +1125,8 @@ void* tx_worker( void* args ) {
VRFY( network_send(knob, buf, bytes_sent, 20+bytes_sent, false, FIHDR_SHORT) > 0, );

atomic_fetch_add( &fs->bytes_total, bytes_read );
atomic_fetch_add( &dtn->bytes_io, bytes_read );
atomic_fetch_add( &fs->block_total, 1 );
atomic_fetch_add( &dtn->bytes_io, bytes_read );

fob->complete(fob, token);

Expand Down
8 changes: 4 additions & 4 deletions src/escp/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1956,15 +1956,15 @@ pub struct fc_info_struct {
pub bytes: u64,
pub crc: u32,
pub completion: u32,
pub pad2: [u64; 4usize],
pub blocks: u64,
}
#[test]
fn bindgen_test_layout_fc_info_struct() {
const UNINIT: ::core::mem::MaybeUninit<fc_info_struct> = ::core::mem::MaybeUninit::uninit();
let ptr = UNINIT.as_ptr();
assert_eq!(
::core::mem::size_of::<fc_info_struct>(),
64usize,
40usize,
concat!("Size of: ", stringify!(fc_info_struct))
);
assert_eq!(
Expand Down Expand Up @@ -2023,13 +2023,13 @@ fn bindgen_test_layout_fc_info_struct() {
)
);
assert_eq!(
unsafe { ::core::ptr::addr_of!((*ptr).pad2) as usize - ptr as usize },
unsafe { ::core::ptr::addr_of!((*ptr).blocks) as usize - ptr as usize },
32usize,
concat!(
"Offset of field: ",
stringify!(fc_info_struct),
"::",
stringify!(pad2)
stringify!(blocks)
)
);
}
Expand Down
29 changes: 23 additions & 6 deletions src/escp/file_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,13 @@ impl<'a> File<'a> {
pub const VT_UID: flatbuffers::VOffsetT = 10;
pub const VT_GID: flatbuffers::VOffsetT = 12;
pub const VT_SZ: flatbuffers::VOffsetT = 14;
pub const VT_ATIM_SEC: flatbuffers::VOffsetT = 16;
pub const VT_ATIM_NANO: flatbuffers::VOffsetT = 18;
pub const VT_MTIM_SEC: flatbuffers::VOffsetT = 20;
pub const VT_MTIM_NANO: flatbuffers::VOffsetT = 22;
pub const VT_CRC: flatbuffers::VOffsetT = 24;
pub const VT_COMPLETE: flatbuffers::VOffsetT = 26;
pub const VT_BLOCKS: flatbuffers::VOffsetT = 16;
pub const VT_ATIM_SEC: flatbuffers::VOffsetT = 18;
pub const VT_ATIM_NANO: flatbuffers::VOffsetT = 20;
pub const VT_MTIM_SEC: flatbuffers::VOffsetT = 22;
pub const VT_MTIM_NANO: flatbuffers::VOffsetT = 24;
pub const VT_CRC: flatbuffers::VOffsetT = 26;
pub const VT_COMPLETE: flatbuffers::VOffsetT = 28;

#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
Expand All @@ -200,6 +201,7 @@ impl<'a> File<'a> {
builder.add_mtim_sec(args.mtim_sec);
builder.add_atim_nano(args.atim_nano);
builder.add_atim_sec(args.atim_sec);
builder.add_blocks(args.blocks);
builder.add_sz(args.sz);
builder.add_fino(args.fino);
builder.add_complete(args.complete);
Expand Down Expand Up @@ -255,6 +257,13 @@ impl<'a> File<'a> {
unsafe { self._tab.get::<i64>(File::VT_SZ, Some(0)).unwrap()}
}
#[inline]
pub fn blocks(&self) -> i64 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<i64>(File::VT_BLOCKS, Some(0)).unwrap()}
}
#[inline]
pub fn atim_sec(&self) -> i64 {
// Safety:
// Created from valid Table for this object
Expand Down Expand Up @@ -311,6 +320,7 @@ impl flatbuffers::Verifiable for File<'_> {
.visit_field::<u32>("uid", Self::VT_UID, false)?
.visit_field::<u32>("gid", Self::VT_GID, false)?
.visit_field::<i64>("sz", Self::VT_SZ, false)?
.visit_field::<i64>("blocks", Self::VT_BLOCKS, false)?
.visit_field::<i64>("atim_sec", Self::VT_ATIM_SEC, false)?
.visit_field::<i64>("atim_nano", Self::VT_ATIM_NANO, false)?
.visit_field::<i64>("mtim_sec", Self::VT_MTIM_SEC, false)?
Expand All @@ -328,6 +338,7 @@ pub struct FileArgs<'a> {
pub uid: u32,
pub gid: u32,
pub sz: i64,
pub blocks: i64,
pub atim_sec: i64,
pub atim_nano: i64,
pub mtim_sec: i64,
Expand All @@ -345,6 +356,7 @@ impl<'a> Default for FileArgs<'a> {
uid: 0,
gid: 0,
sz: 0,
blocks: 0,
atim_sec: 0,
atim_nano: 0,
mtim_sec: 0,
Expand Down Expand Up @@ -385,6 +397,10 @@ impl<'a: 'b, 'b> FileBuilder<'a, 'b> {
self.fbb_.push_slot::<i64>(File::VT_SZ, sz, 0);
}
#[inline]
pub fn add_blocks(&mut self, blocks: i64) {
self.fbb_.push_slot::<i64>(File::VT_BLOCKS, blocks, 0);
}
#[inline]
pub fn add_atim_sec(&mut self, atim_sec: i64) {
self.fbb_.push_slot::<i64>(File::VT_ATIM_SEC, atim_sec, 0);
}
Expand Down Expand Up @@ -432,6 +448,7 @@ impl core::fmt::Debug for File<'_> {
ds.field("uid", &self.uid());
ds.field("gid", &self.gid());
ds.field("sz", &self.sz());
ds.field("blocks", &self.blocks());
ds.field("atim_sec", &self.atim_sec());
ds.field("atim_nano", &self.atim_nano());
ds.field("mtim_sec", &self.mtim_sec());
Expand Down
6 changes: 4 additions & 2 deletions src/escp/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ pub fn escp_receiver(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {
let buf = builder.finished_data();

let dst:[MaybeUninit<u8>; 49152] = [{ std::mem::MaybeUninit::uninit() }; 49152];
let mut dst = unsafe { std::mem::transmute::<_, [u8; 49152]>(dst) };
let mut dst = unsafe { std::mem::transmute::<
[std::mem::MaybeUninit<u8>; 49152], [u8; 49152]>(dst) };

let res = zstd_safe::compress( &mut dst, buf, 3 );
let csz = res.expect("Compression failed");
Expand Down Expand Up @@ -315,7 +316,8 @@ pub fn escp_receiver(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {

if (t & msg_compressed) == msg_compressed {
let dst:[MaybeUninit<u8>; 131072] = [{ std::mem::MaybeUninit::uninit() }; 131072];
let mut dst = unsafe { std::mem::transmute::<_, [u8; 131072]>(dst) };
let mut dst = unsafe { std::mem::transmute::
<[std::mem::MaybeUninit<u8>; 131072], [u8; 131072]>(dst) };

let res = zstd_safe::decompress(dst.as_mut_slice(), c.as_mut_slice());
let decompressed_sz = res.expect("decompress failed");
Expand Down
25 changes: 12 additions & 13 deletions src/escp/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ pub fn escp_sender(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {
&fc_out );
debug!("Finished iterating files, total bytes={bytes_total}");

if (bytes_total <= 0) && (files_total <= 0) && (flags.io_engine != "shmem") {
if (bytes_total <= 0) && (files_total == 0) && (flags.io_engine != "shmem") {
eprintln!("Nothing to transfer, exiting.");
process::exit(1);
}
Expand Down Expand Up @@ -408,23 +408,24 @@ fn file_check(

let ptr = unsafe { meta_recv() };

while !ptr.is_null() {
if !ptr.is_null() {

let b = unsafe { slice::from_raw_parts(ptr, 6).to_vec() };
let (sz, t) = from_header( b );


if t != msg_file_stat {
if t == msg_keepalive {
debug!("file_check: Got keepalive, ignoring");
} else {
info!("file_check: Got unexpected type={t}, ignoring");
}
break;
unsafe{ meta_complete(); }
return 1;
}

let dst:[MaybeUninit<u8>; 131072] = [{ std::mem::MaybeUninit::uninit() }; 131072];
let mut dst = unsafe { std::mem::transmute::<_, [u8; 131072]>(dst) };
let mut dst = unsafe { std::mem::transmute::
<[std::mem::MaybeUninit<u8>; 131072], [u8; 131072]>(dst) };

let res = zstd_safe::decompress(dst.as_mut_slice(),
unsafe{slice::from_raw_parts(ptr.add(16), sz as usize)} );
Expand Down Expand Up @@ -495,8 +496,6 @@ fn file_check(
_ = (*hm).remove(&rx_fino);
}

break;

}

if !ptr.is_null() {
Expand Down Expand Up @@ -800,16 +799,15 @@ fn iterate_files ( flags: &EScp_Args,
for fi in &flags.source {
if fi.is_empty() { continue; };

let fi_path;
match fs::canonicalize(&PathBuf::from(fi)) {
Ok(a) => { fi_path = a; }
let fi_path = match fs::canonicalize(PathBuf::from(fi)) {
Ok(a) => { a }
Err(e) => {
let errmsg = format!("\rCould not open file='{}': {}", fi, e);
info!("{errmsg}");
eprintln!("\n{errmsg}");
fi_path = fi.into();
fi.into()
}
}
};

_ = files_in.send(
(fi_path.parent().unwrap().to_path_buf(),
Expand Down Expand Up @@ -995,7 +993,8 @@ fn iterate_files ( flags: &EScp_Args,
if buf.len() > 320 {

let dst:[MaybeUninit<u8>; 49152] = [{ std::mem::MaybeUninit::uninit() }; 49152 ];
let mut dst = unsafe { std::mem::transmute::<_, [u8; 49152]>(dst) };
let mut dst = unsafe { std::mem::transmute::
<[std::mem::MaybeUninit<u8>; 49152], [u8; 49152]>(dst) };

// let dst = Vec::<u8>::with_capacity(49152);

Expand Down

0 comments on commit 77b9604

Please sign in to comment.