Skip to content

Commit

Permalink
Fix handling of zero byte files
Browse files Browse the repository at this point in the history
  • Loading branch information
cbears committed Nov 7, 2024
1 parent e50a46a commit 27828fb
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 29 deletions.
3 changes: 2 additions & 1 deletion libdtn/src/dtn.c
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,8 @@ int64_t network_recv( struct network_obj* knob, uint16_t* subheader ) {

}

if ( !knob->fob && (*subheader == FIHDR_SHORT) ) {
if ( !knob->fob && knob->dtn ) {
// if ( !knob->fob && (*subheader == FIHDR_SHORT) ) {
knob->block = knob->dtn->block;

knob->fob = file_memoryinit( knob->dtn, knob->id );
Expand Down
27 changes: 14 additions & 13 deletions src/escp/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn escp_receiver(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {
debug!("Got header of type {}", t);

let mut port_start = 1232;
let mut port_end = 65536; // XXX: port_end not implemented
let mut port_end = 1242; // XXX: port_end not implemented
let bind_interface;


Expand Down Expand Up @@ -232,6 +232,7 @@ pub fn escp_receiver(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {
}
Err(_) => { error!("receive_main: error receiving file completion notifications"); return; }
}

fct = 1;

if !did_init && !finish_fc {
Expand Down Expand Up @@ -281,14 +282,15 @@ pub fn escp_receiver(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {
let mut dst = unsafe { std::mem::transmute::<_, [u8; 49152]>(dst) };

let res = zstd_safe::compress( &mut dst, buf, 3 );
let compressed_sz = res.expect("Compression failed");
let csz = res.expect("Compression failed");

let hdr = to_header( compressed_sz as u32, msg_file_stat );
let hdr = to_header( csz as u32, msg_file_stat );

debug!("fc: Sending fc_state data for {} files, size is {}/{compressed_sz}",
debug!("fc: Sending fc_state data for {} files, size is {}/{csz}",
v.len(), buf.len());
unsafe {
meta_send( dst.as_ptr() as *mut i8, hdr.as_ptr() as *mut i8, compressed_sz as i32 );
meta_send( dst.as_ptr() as *mut i8, hdr.as_ptr() as *mut i8,
csz as i32 );
}
last_send = std::time::Instant::now();
did_init = false;
Expand Down Expand Up @@ -375,14 +377,6 @@ pub fn escp_receiver(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {
fd = open( fp.as_ptr(), (*args).flags, 0o644 );
}


if entry.sz() < 1 {
debug!("Empty file created (&closed) for {fino} because sz<=0",
fino=entry.fino());
close(fd);
break;
}

if fd < 1 {
let err = io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::NotFound {
Expand Down Expand Up @@ -420,6 +414,13 @@ pub fn escp_receiver(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {
}
}

if entry.sz() < 1 {
_ = fc_in.send( (0,0,0,4) );
debug!("Empty file created (&closed) for {fino} because sz<=0",
fino=entry.fino());
close(fd);
break;
}
debug!("Add file {full_path}:{fino} with {:#X} sz={sz} fd={fd}",
(*args).flags, fino=entry.fino(), sz=entry.sz() );

Expand Down
45 changes: 30 additions & 15 deletions src/escp/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,16 @@ pub fn escp_sender(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {
}

let bytes_now = unsafe { get_bytes_io( args ) };
if (last_update.elapsed().as_secs_f32() > 0.2) || (bytes_now>=bytes_total) {

if bytes_now == 0 {
continue;
}
if (last_update.elapsed().as_secs_f32() > 0.2) ||
(bytes_now>=bytes_total) ||
(files_ok >= files_total) {

let duration = start.elapsed();

let width= ((bytes_now as f32 / bytes_total as f32) * 40.0) as usize ;
let progress = format!("{1:=>0$}", width, ">");
let rate = bytes_now as f32/duration.as_secs_f32();
let rate = if bytes_now>0 {
bytes_now as f32/duration.as_secs_f32() } else {0.0 };

let eta= ((bytes_total - bytes_now) as f32 / rate) as i64;

Expand All @@ -329,8 +328,7 @@ pub fn escp_sender(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {
let tmp = human_write( bytes_now as u64, true );
tot_str= CStr::from_ptr(tmp).to_str().unwrap();

let files_now = tx_getclosed();
debug!("transfer progress: {}/{} {}/{}", bytes_now, bytes_total, files_now, files_ok);
debug!("transfer progress: {}/{} {}/{}", bytes_now, bytes_total, files_ok, files_total);
}

let units = if flags.bits { "bits" } else { "B" };
Expand All @@ -340,8 +338,8 @@ pub fn escp_sender(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {
_ = fi.write(bar.as_bytes());
_ = fi.flush();

if bytes_now >= bytes_total {
let s = format!("\rSent : {tot_str}B in {files_total} files at {rate_str}{units}/s in {:0.1}s {:15}\r",
if (bytes_now >= bytes_total) || (files_ok >= files_total) {
let s = format!("\rSent : {tot_str}B in {files_total} files at {rate_str}{units}/s in {:0.1}s {:18}\r",
duration.as_secs_f32(), "");
_ = fi.write(s.as_bytes());
_ = fi.flush();
Expand All @@ -351,15 +349,15 @@ pub fn escp_sender(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {

last_update = std::time::Instant::now();
}

}

unsafe { fc_push( 0, 0, 0 ); }

loop {
let files_now = unsafe { tx_getclosed() };

if files_ok as i64 >= files_now {
debug!("Exiting because {files_ok} >= {files_now}");
if files_ok as i64 >= (files_total as i64) {
debug!("Exiting because {files_ok} >= {files_total}");
break;
}

Expand All @@ -375,7 +373,7 @@ pub fn escp_sender(safe_args: logging::dtn_args_wrapper, flags: &EScp_Args) {
break;
}

debug!("Loopping {files_ok} / {files_now}");
debug!("Loopping {files_ok} / {files_total}");
}

// Finished sending data
Expand Down Expand Up @@ -450,6 +448,12 @@ fn file_check(
let (rx_fino, rx_sz, rx_crc, rx_complete) = (e.fino(), e.sz(), e.crc(), e.complete());
debug!("file_check on {} {} {:#X} {}", rx_fino, rx_sz, rx_crc, rx_complete);

if (rx_fino == 0) && (rx_sz == 0) && (rx_complete == 4) {
debug!("Receiver confirmed an empty file");
*files_ok += 1;
continue;
}

if !(*hm).contains_key(&rx_fino) {
loop {
// loop until the hm contains key or fc_out returns error
Expand Down Expand Up @@ -801,7 +805,18 @@ fn iterate_files ( flags: &EScp_Args,

for fi in &flags.source {
if fi.is_empty() { continue; };
let fi_path = fs::canonicalize(&PathBuf::from(fi)).unwrap();

let fi_path;
match fs::canonicalize(&PathBuf::from(fi)) {
Ok(a) => { fi_path = a; }
Err(_) => {
let errmsg = format!("\rCouldn't find/access/open file='{}'", fi);
eprintln!("{errmsg}");
info!("{errmsg}");
process::exit(2);
}
}

_ = files_in.send(
(fi_path.parent().unwrap().to_path_buf(),
std::path::Path::new(fi_path.file_name().unwrap().to_str().unwrap()).to_path_buf()
Expand Down

0 comments on commit 27828fb

Please sign in to comment.