From 849dd2e041fc5fc8367ef1f5680271acb29a78c6 Mon Sep 17 00:00:00 2001 From: altugbakan Date: Sat, 1 Apr 2023 14:13:18 +0300 Subject: [PATCH] Add windowsize option to receive --- src/window.rs | 122 +++++++++++++++++++++++++++++++++++++++++--------- src/worker.rs | 24 +++++++--- 2 files changed, 117 insertions(+), 29 deletions(-) diff --git a/src/window.rs b/src/window.rs index 329dca5..933b059 100644 --- a/src/window.rs +++ b/src/window.rs @@ -1,4 +1,9 @@ -use std::{collections::VecDeque, error::Error, fs::File, io::Read}; +use std::{ + collections::VecDeque, + error::Error, + fs::File, + io::{Read, Write}, +}; /// Window `struct` is used to store chunks of data from a file. /// It is used to store the data that is being sent or received for @@ -55,10 +60,21 @@ impl Window { Ok(true) } + /// Empties the `Window` by writing the data to the file. + pub fn empty(&mut self) -> Result<(), Box> { + for data in &self.elements { + self.file.write_all(data)?; + } + + self.elements.clear(); + + Ok(()) + } + /// Removes the first `amount` of elements from the `Window`. - pub fn remove(&mut self, amount: u16) -> Result<(), Box> { + pub fn remove(&mut self, amount: u16) -> Result<(), &'static str> { if amount > self.len() { - return Err("amount cannot be larger than size".into()); + return Err("amount cannot be larger than length of window"); } drop(self.elements.drain(0..amount as usize)); @@ -66,6 +82,17 @@ impl Window { Ok(()) } + /// Adds a data `Vec` to the `Window`. + pub fn add(&mut self, data: Vec) -> Result<(), &'static str> { + if self.len() == self.size { + return Err("cannot add to a full window"); + } + + self.elements.push_back(data); + + Ok(()) + } + /// Returns a reference to the `VecDeque` containing the elements. pub fn get_elements(&self) -> &VecDeque> { &self.elements @@ -80,16 +107,32 @@ impl Window { pub fn is_empty(&self) -> bool { self.elements.is_empty() } + + /// Returns `true` if the `Window` is full. + pub fn is_full(&self) -> bool { + self.elements.len() as u16 == self.size + } } #[cfg(test)] mod tests { use super::*; - use std::{fs, io::Write}; + use std::{ + fs::{self, OpenOptions}, + io::{Seek, Write}, + path::Path, + }; + + const DIR_NAME: &str = "tmp"; #[test] fn fills_and_removes_from_window() { - let file = initialize(); + const FILE_NAME: &str = "fills_and_removes_from_window.txt"; + + let mut file = initialize(FILE_NAME); + file.write_all(b"Hello, world!").unwrap(); + file.flush().unwrap(); + file.rewind().unwrap(); let mut window = Window::new(2, 5, file); window.fill().unwrap(); @@ -106,30 +149,65 @@ mod tests { assert_eq!(window.elements[0], b", wor"[..]); assert_eq!(window.elements[1], b"ld!"[..]); - clean(); + clean(FILE_NAME); } - fn initialize() -> File { - let dir_name = "tmp"; - let file_name = "tmp/test.txt"; + #[test] + fn adds_to_and_empties_window() { + const FILE_NAME: &str = "adds_to_and_empties_window.txt"; - if fs::metadata(dir_name).is_err() { - fs::create_dir(dir_name).unwrap(); - } + let file = initialize(FILE_NAME); - if File::open(file_name).is_ok() { - fs::remove_file(file_name).unwrap(); - } + let mut window = Window::new(3, 5, file); + window.add(b"Hello".to_vec()).unwrap(); + assert_eq!(window.elements.len(), 1); + assert_eq!(window.elements[0], b"Hello"[..]); - let mut file = File::create(file_name).unwrap(); - file.write_all(b"Hello, world!").unwrap(); - file.flush().unwrap(); + window.add(b", wor".to_vec()).unwrap(); + assert_eq!(window.elements.len(), 2); + assert_eq!(window.elements[0], b"Hello"[..]); + assert_eq!(window.elements[1], b", wor"[..]); - File::open(file_name).unwrap() + window.add(b"ld!".to_vec()).unwrap(); + assert_eq!(window.elements.len(), 3); + assert_eq!(window.elements[0], b"Hello"[..]); + assert_eq!(window.elements[1], b", wor"[..]); + assert_eq!(window.elements[2], b"ld!"[..]); + + window.empty().unwrap(); + assert_eq!(window.elements.len(), 0); + + let mut contents = Default::default(); + File::read_to_string( + &mut File::open(DIR_NAME.to_string() + "/" + FILE_NAME).unwrap(), + &mut contents, + ) + .unwrap(); + assert_eq!(contents, "Hello, world!"); + + clean(FILE_NAME); } - fn clean() { - fs::remove_file("tmp/test.txt").unwrap(); - fs::remove_dir("tmp").unwrap(); + fn initialize(file_name: &str) -> File { + let file_name = DIR_NAME.to_string() + "/" + file_name; + if !Path::new(DIR_NAME).is_dir() { + fs::create_dir(DIR_NAME).unwrap(); + } + + if File::open(&file_name).is_ok() { + fs::remove_file(&file_name).unwrap(); + } + + OpenOptions::new() + .read(true) + .append(true) + .create(true) + .open(&file_name) + .unwrap() + } + + fn clean(file_name: &str) { + let file_name = DIR_NAME.to_string() + "/" + file_name; + fs::remove_file(file_name).unwrap(); } } diff --git a/src/worker.rs b/src/worker.rs index 65b675c..2a63293 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,7 +1,6 @@ use std::{ error::Error, fs::{self, File}, - io::Write, net::{SocketAddr, UdpSocket}, path::PathBuf, thread, @@ -101,7 +100,7 @@ impl Worker { let worker_options = parse_options(&mut options, &work_type)?; accept_request(&socket, &options, &work_type)?; - receive_file(&socket, &mut File::create(&file_path)?, &worker_options)?; + receive_file(&socket, File::create(&file_path)?, &worker_options)?; Ok(()) }; @@ -135,6 +134,7 @@ fn send_file( ) -> Result<(), Box> { let mut block_number = 1; let mut window = Window::new(worker_options.windowsize, worker_options.blk_size, file); + loop { let filled = window.fill()?; @@ -177,14 +177,16 @@ fn send_file( fn receive_file( socket: &UdpSocket, - file: &mut File, + file: File, worker_options: &WorkerOptions, ) -> Result<(), Box> { let mut block_number: u16 = 0; - loop { - let size; + let mut window = Window::new(worker_options.windowsize, worker_options.blk_size, file); + loop { + let mut size; let mut retry_cnt = 0; + loop { match Message::recv_packet_with_size(socket, worker_options.blk_size) { Ok(Packet::Data { @@ -193,9 +195,16 @@ fn receive_file( }) => { if received_block_number == block_number.wrapping_add(1) { block_number = received_block_number; - file.write_all(&data)?; size = data.len(); - break; + window.add(data)?; + + if size < worker_options.blk_size { + break; + } + + if window.is_full() { + break; + } } } Ok(Packet::Error { code, msg }) => { @@ -210,6 +219,7 @@ fn receive_file( } } + window.empty()?; Message::send_ack(socket, block_number)?; if size < worker_options.blk_size { break;