Add windowsize option for send

This commit is contained in:
altugbakan 2023-03-30 21:58:30 +03:00 committed by Altuğ Bakan
parent ef28c2585d
commit 3ed9834b60
4 changed files with 102 additions and 50 deletions

3
.gitignore vendored
View file

@ -1,2 +1,3 @@
.vscode
/target
/tmp
/tmp

View file

@ -188,6 +188,8 @@ pub enum OptionType {
TransferSize,
/// Timeout option type
Timeout,
/// Windowsize option type
Windowsize,
}
impl OptionType {
@ -197,6 +199,7 @@ impl OptionType {
OptionType::BlockSize => "blksize",
OptionType::TransferSize => "tsize",
OptionType::Timeout => "timeout",
OptionType::Windowsize => "windowsize",
}
}
}

View file

@ -20,14 +20,14 @@ use std::{collections::VecDeque, error::Error, fs::File, io::Read};
/// ```
pub struct Window {
elements: VecDeque<Vec<u8>>,
size: usize,
size: u16,
chunk_size: usize,
file: File,
}
impl Window {
/// Creates a new `Window` with the supplied size and chunk size.
pub fn new(size: usize, chunk_size: usize, file: File) -> Window {
pub fn new(size: u16, chunk_size: usize, file: File) -> Window {
Window {
elements: VecDeque::new(),
size,
@ -37,33 +37,49 @@ impl Window {
}
/// Fills the `Window` with chunks of data from the file.
pub fn fill(&mut self) -> Result<(), Box<dyn Error>> {
for _ in self.elements.len()..self.size {
/// Returns `true` if the `Window` is full.
pub fn fill(&mut self) -> Result<bool, Box<dyn Error>> {
for _ in self.len()..self.size {
let mut chunk = vec![0; self.chunk_size];
let size = self.file.read(&mut chunk)?;
if size != self.chunk_size {
chunk.truncate(size);
self.elements.push_back(chunk);
break;
return Ok(false);
}
self.elements.push_back(chunk);
}
Ok(())
Ok(true)
}
/// Removes the first `amount` of elements from the `Window`.
pub fn remove(&mut self, amount: usize) -> Result<(), Box<dyn Error>> {
if amount > self.elements.len() {
pub fn remove(&mut self, amount: u16) -> Result<(), Box<dyn Error>> {
if amount > self.len() {
return Err("amount cannot be larger than size".into());
}
drop(self.elements.drain(0..amount));
drop(self.elements.drain(0..amount as usize));
Ok(())
}
/// Returns a reference to the `VecDeque` containing the elements.
pub fn get_elements(&self) -> &VecDeque<Vec<u8>> {
&self.elements
}
/// Returns the length of the `Window`.
pub fn len(&self) -> u16 {
self.elements.len() as u16
}
/// Returns `true` if the `Window` is empty.
pub fn is_empty(&self) -> bool {
self.elements.is_empty()
}
}
#[cfg(test)]

View file

@ -1,14 +1,14 @@
use std::{
error::Error,
fs::{self, File},
io::{Read, Write},
io::Write,
net::{SocketAddr, UdpSocket},
path::PathBuf,
thread,
time::{Duration, SystemTime},
};
use crate::{ErrorCode, Message, OptionType, Packet, TransferOption};
use crate::{ErrorCode, Message, OptionType, Packet, TransferOption, Window};
/// Worker `struct` is used for multithreaded file sending and receiving.
/// It creates a new socket using the Server's IP and a random port
@ -37,6 +37,7 @@ struct WorkerOptions {
blk_size: usize,
t_size: usize,
timeout: u64,
windowsize: u16,
}
#[derive(PartialEq, Eq)]
@ -62,15 +63,25 @@ impl Worker {
let mut handle_send = || -> Result<(), Box<dyn Error>> {
let socket = setup_socket(&addr, &remote)?;
let work_type = WorkType::Send(file_path.metadata()?.len());
let worker_options = parse_options(&mut options, &work_type)?;
accept_request(&socket, &options, &work_type)?;
check_response(&socket)?;
send_file(&socket, &file_path, &mut options)?;
send_file(&socket, File::open(&file_path)?, &worker_options)?;
Ok(())
};
if let Err(err) = handle_send() {
eprintln!("{err}");
match handle_send() {
Ok(_) => {
println!(
"Sent {} to {}",
file_path.file_name().unwrap().to_str().unwrap(),
remote
);
}
Err(err) => {
eprintln!("{err}");
}
}
});
}
@ -87,20 +98,31 @@ impl Worker {
let mut handle_receive = || -> Result<(), Box<dyn Error>> {
let socket = setup_socket(&addr, &remote)?;
let work_type = WorkType::Receive;
let worker_options = parse_options(&mut options, &work_type)?;
accept_request(&socket, &options, &work_type)?;
receive_file(&socket, &file_path, &mut options)?;
receive_file(&socket, &mut File::create(&file_path)?, &worker_options)?;
Ok(())
};
if let Err(err) = handle_receive() {
eprintln!("{err}");
if fs::remove_file(&file_path).is_err() {
eprintln!(
"Error while cleaning {}",
file_path.file_name().unwrap().to_str().unwrap()
match handle_receive() {
Ok(_) => {
println!(
"Received {} from {}",
file_path.file_name().unwrap().to_str().unwrap(),
remote
);
}
Err(err) => {
eprintln!("{err}");
if fs::remove_file(&file_path).is_err() {
eprintln!(
"Error while cleaning {}",
file_path.file_name().unwrap().to_str().unwrap()
);
}
}
}
});
}
@ -108,29 +130,28 @@ impl Worker {
fn send_file(
socket: &UdpSocket,
file_path: &PathBuf,
options: &mut Vec<TransferOption>,
file: File,
worker_options: &WorkerOptions,
) -> Result<(), Box<dyn Error>> {
let mut file = File::open(file_path)?;
let worker_options = parse_options(options, &WorkType::Send(file.metadata()?.len()))?;
let mut block_number = 1;
let mut window = Window::new(worker_options.windowsize, worker_options.blk_size, file);
loop {
let mut chunk = vec![0; worker_options.blk_size];
let size = file.read(&mut chunk)?;
let filled = window.fill()?;
let mut retry_cnt = 0;
let mut time = SystemTime::now() - Duration::from_secs(DEFAULT_TIMEOUT_SECS);
loop {
if time.elapsed()? >= Duration::from_secs(DEFAULT_TIMEOUT_SECS) {
Message::send_data(socket, block_number, chunk[..size].to_vec())?;
send_window(socket, &window, block_number)?;
time = SystemTime::now();
}
match Message::recv(socket) {
Ok(Packet::Ack(received_block_number)) => {
if received_block_number == block_number {
block_number = block_number.wrapping_add(1);
let diff = received_block_number.wrapping_sub(block_number);
if diff <= worker_options.windowsize {
block_number = received_block_number.wrapping_add(1);
window.remove(diff + 1)?;
break;
}
}
@ -146,27 +167,19 @@ fn send_file(
}
}
if size < worker_options.blk_size {
if !filled && window.is_empty() {
break;
};
}
}
println!(
"Sent {} to {}",
file_path.file_name().unwrap().to_str().unwrap(),
socket.peer_addr()?
);
Ok(())
}
fn receive_file(
socket: &UdpSocket,
file_path: &PathBuf,
options: &mut Vec<TransferOption>,
file: &mut File,
worker_options: &WorkerOptions,
) -> Result<(), Box<dyn Error>> {
let mut file = File::create(file_path)?;
let worker_options = parse_options(options, &WorkType::Receive)?;
let mut block_number: u16 = 0;
loop {
let size;
@ -203,11 +216,19 @@ fn receive_file(
};
}
println!(
"Received {} from {}",
file_path.file_name().unwrap().to_str().unwrap(),
socket.peer_addr()?
);
Ok(())
}
fn send_window(
socket: &UdpSocket,
window: &Window,
mut block_num: u16,
) -> Result<(), Box<dyn Error>> {
for frame in window.get_elements() {
Message::send_data(socket, block_num, frame.to_vec())?;
block_num = block_num.wrapping_add(1);
}
Ok(())
}
@ -218,6 +239,9 @@ fn accept_request(
) -> Result<(), Box<dyn Error>> {
if !options.is_empty() {
Message::send_oack(socket, options.to_vec())?;
if let WorkType::Send(_) = work_type {
check_response(socket)?;
}
} else if *work_type == WorkType::Receive {
Message::send_ack(socket, 0)?
}
@ -251,6 +275,7 @@ fn parse_options(
blk_size: DEFAULT_BLOCK_SIZE,
t_size: 0,
timeout: DEFAULT_TIMEOUT_SECS,
windowsize: 1,
};
for option in &mut *options {
@ -273,6 +298,12 @@ fn parse_options(
}
worker_options.timeout = *value as u64;
}
OptionType::Windowsize => {
if *value == 0 || *value > u16::MAX as usize {
return Err("Invalid windowsize value".into());
}
worker_options.windowsize = *value as u16;
}
}
}
@ -343,6 +374,7 @@ mod tests {
blk_size: DEFAULT_BLOCK_SIZE,
t_size: 0,
timeout: DEFAULT_TIMEOUT_SECS,
windowsize: 1,
}
);
}