From 3ed9834b606969f840fc7bf52ff62334cce3a54c Mon Sep 17 00:00:00 2001 From: altugbakan Date: Thu, 30 Mar 2023 21:58:30 +0300 Subject: [PATCH] Add windowsize option for send --- .gitignore | 3 +- src/packet.rs | 3 ++ src/window.rs | 34 +++++++++++---- src/worker.rs | 112 ++++++++++++++++++++++++++++++++------------------ 4 files changed, 102 insertions(+), 50 deletions(-) diff --git a/.gitignore b/.gitignore index 787aa48..ef9bcb4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ +.vscode /target -/tmp +/tmp \ No newline at end of file diff --git a/src/packet.rs b/src/packet.rs index 25d7ed9..579ef5f 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -188,6 +188,8 @@ pub enum OptionType { TransferSize, /// Timeout option type Timeout, + /// Windowsize option type + Windowsize, } impl OptionType { @@ -197,6 +199,7 @@ impl OptionType { OptionType::BlockSize => "blksize", OptionType::TransferSize => "tsize", OptionType::Timeout => "timeout", + OptionType::Windowsize => "windowsize", } } } diff --git a/src/window.rs b/src/window.rs index 81c3fa0..329dca5 100644 --- a/src/window.rs +++ b/src/window.rs @@ -20,14 +20,14 @@ use std::{collections::VecDeque, error::Error, fs::File, io::Read}; /// ``` pub struct Window { elements: VecDeque>, - size: usize, + size: u16, chunk_size: usize, file: File, } impl Window { /// Creates a new `Window` with the supplied size and chunk size. - pub fn new(size: usize, chunk_size: usize, file: File) -> Window { + pub fn new(size: u16, chunk_size: usize, file: File) -> Window { Window { elements: VecDeque::new(), size, @@ -37,33 +37,49 @@ impl Window { } /// Fills the `Window` with chunks of data from the file. - pub fn fill(&mut self) -> Result<(), Box> { - for _ in self.elements.len()..self.size { + /// Returns `true` if the `Window` is full. + pub fn fill(&mut self) -> Result> { + for _ in self.len()..self.size { let mut chunk = vec![0; self.chunk_size]; let size = self.file.read(&mut chunk)?; if size != self.chunk_size { chunk.truncate(size); self.elements.push_back(chunk); - break; + return Ok(false); } self.elements.push_back(chunk); } - Ok(()) + Ok(true) } /// Removes the first `amount` of elements from the `Window`. - pub fn remove(&mut self, amount: usize) -> Result<(), Box> { - if amount > self.elements.len() { + pub fn remove(&mut self, amount: u16) -> Result<(), Box> { + if amount > self.len() { return Err("amount cannot be larger than size".into()); } - drop(self.elements.drain(0..amount)); + drop(self.elements.drain(0..amount as usize)); Ok(()) } + + /// Returns a reference to the `VecDeque` containing the elements. + pub fn get_elements(&self) -> &VecDeque> { + &self.elements + } + + /// Returns the length of the `Window`. + pub fn len(&self) -> u16 { + self.elements.len() as u16 + } + + /// Returns `true` if the `Window` is empty. + pub fn is_empty(&self) -> bool { + self.elements.is_empty() + } } #[cfg(test)] diff --git a/src/worker.rs b/src/worker.rs index 62ce3d8..65b675c 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,14 +1,14 @@ use std::{ error::Error, fs::{self, File}, - io::{Read, Write}, + io::Write, net::{SocketAddr, UdpSocket}, path::PathBuf, thread, time::{Duration, SystemTime}, }; -use crate::{ErrorCode, Message, OptionType, Packet, TransferOption}; +use crate::{ErrorCode, Message, OptionType, Packet, TransferOption, Window}; /// Worker `struct` is used for multithreaded file sending and receiving. /// It creates a new socket using the Server's IP and a random port @@ -37,6 +37,7 @@ struct WorkerOptions { blk_size: usize, t_size: usize, timeout: u64, + windowsize: u16, } #[derive(PartialEq, Eq)] @@ -62,15 +63,25 @@ impl Worker { let mut handle_send = || -> Result<(), Box> { let socket = setup_socket(&addr, &remote)?; let work_type = WorkType::Send(file_path.metadata()?.len()); + let worker_options = parse_options(&mut options, &work_type)?; + accept_request(&socket, &options, &work_type)?; - check_response(&socket)?; - send_file(&socket, &file_path, &mut options)?; + send_file(&socket, File::open(&file_path)?, &worker_options)?; Ok(()) }; - if let Err(err) = handle_send() { - eprintln!("{err}"); + match handle_send() { + Ok(_) => { + println!( + "Sent {} to {}", + file_path.file_name().unwrap().to_str().unwrap(), + remote + ); + } + Err(err) => { + eprintln!("{err}"); + } } }); } @@ -87,20 +98,31 @@ impl Worker { let mut handle_receive = || -> Result<(), Box> { let socket = setup_socket(&addr, &remote)?; let work_type = WorkType::Receive; + let worker_options = parse_options(&mut options, &work_type)?; + accept_request(&socket, &options, &work_type)?; - receive_file(&socket, &file_path, &mut options)?; + receive_file(&socket, &mut File::create(&file_path)?, &worker_options)?; Ok(()) }; - if let Err(err) = handle_receive() { - eprintln!("{err}"); - if fs::remove_file(&file_path).is_err() { - eprintln!( - "Error while cleaning {}", - file_path.file_name().unwrap().to_str().unwrap() + match handle_receive() { + Ok(_) => { + println!( + "Received {} from {}", + file_path.file_name().unwrap().to_str().unwrap(), + remote ); } + Err(err) => { + eprintln!("{err}"); + if fs::remove_file(&file_path).is_err() { + eprintln!( + "Error while cleaning {}", + file_path.file_name().unwrap().to_str().unwrap() + ); + } + } } }); } @@ -108,29 +130,28 @@ impl Worker { fn send_file( socket: &UdpSocket, - file_path: &PathBuf, - options: &mut Vec, + file: File, + worker_options: &WorkerOptions, ) -> Result<(), Box> { - let mut file = File::open(file_path)?; - let worker_options = parse_options(options, &WorkType::Send(file.metadata()?.len()))?; - let mut block_number = 1; + let mut window = Window::new(worker_options.windowsize, worker_options.blk_size, file); loop { - let mut chunk = vec![0; worker_options.blk_size]; - let size = file.read(&mut chunk)?; + let filled = window.fill()?; let mut retry_cnt = 0; let mut time = SystemTime::now() - Duration::from_secs(DEFAULT_TIMEOUT_SECS); loop { if time.elapsed()? >= Duration::from_secs(DEFAULT_TIMEOUT_SECS) { - Message::send_data(socket, block_number, chunk[..size].to_vec())?; + send_window(socket, &window, block_number)?; time = SystemTime::now(); } match Message::recv(socket) { Ok(Packet::Ack(received_block_number)) => { - if received_block_number == block_number { - block_number = block_number.wrapping_add(1); + let diff = received_block_number.wrapping_sub(block_number); + if diff <= worker_options.windowsize { + block_number = received_block_number.wrapping_add(1); + window.remove(diff + 1)?; break; } } @@ -146,27 +167,19 @@ fn send_file( } } - if size < worker_options.blk_size { + if !filled && window.is_empty() { break; - }; + } } - println!( - "Sent {} to {}", - file_path.file_name().unwrap().to_str().unwrap(), - socket.peer_addr()? - ); Ok(()) } fn receive_file( socket: &UdpSocket, - file_path: &PathBuf, - options: &mut Vec, + file: &mut File, + worker_options: &WorkerOptions, ) -> Result<(), Box> { - let mut file = File::create(file_path)?; - let worker_options = parse_options(options, &WorkType::Receive)?; - let mut block_number: u16 = 0; loop { let size; @@ -203,11 +216,19 @@ fn receive_file( }; } - println!( - "Received {} from {}", - file_path.file_name().unwrap().to_str().unwrap(), - socket.peer_addr()? - ); + Ok(()) +} + +fn send_window( + socket: &UdpSocket, + window: &Window, + mut block_num: u16, +) -> Result<(), Box> { + for frame in window.get_elements() { + Message::send_data(socket, block_num, frame.to_vec())?; + block_num = block_num.wrapping_add(1); + } + Ok(()) } @@ -218,6 +239,9 @@ fn accept_request( ) -> Result<(), Box> { if !options.is_empty() { Message::send_oack(socket, options.to_vec())?; + if let WorkType::Send(_) = work_type { + check_response(socket)?; + } } else if *work_type == WorkType::Receive { Message::send_ack(socket, 0)? } @@ -251,6 +275,7 @@ fn parse_options( blk_size: DEFAULT_BLOCK_SIZE, t_size: 0, timeout: DEFAULT_TIMEOUT_SECS, + windowsize: 1, }; for option in &mut *options { @@ -273,6 +298,12 @@ fn parse_options( } worker_options.timeout = *value as u64; } + OptionType::Windowsize => { + if *value == 0 || *value > u16::MAX as usize { + return Err("Invalid windowsize value".into()); + } + worker_options.windowsize = *value as u16; + } } } @@ -343,6 +374,7 @@ mod tests { blk_size: DEFAULT_BLOCK_SIZE, t_size: 0, timeout: DEFAULT_TIMEOUT_SECS, + windowsize: 1, } ); }