Add windowsize option to receive

This commit is contained in:
altugbakan 2023-04-01 14:13:18 +03:00 committed by Altuğ Bakan
parent 3ed9834b60
commit 849dd2e041
2 changed files with 117 additions and 29 deletions

View file

@ -1,4 +1,9 @@
use std::{collections::VecDeque, error::Error, fs::File, io::Read}; use std::{
collections::VecDeque,
error::Error,
fs::File,
io::{Read, Write},
};
/// Window `struct` is used to store chunks of data from a file. /// Window `struct` is used to store chunks of data from a file.
/// It is used to store the data that is being sent or received for /// It is used to store the data that is being sent or received for
@ -55,10 +60,21 @@ impl Window {
Ok(true) Ok(true)
} }
/// Empties the `Window` by writing the data to the file.
pub fn empty(&mut self) -> Result<(), Box<dyn Error>> {
for data in &self.elements {
self.file.write_all(data)?;
}
self.elements.clear();
Ok(())
}
/// Removes the first `amount` of elements from the `Window`. /// Removes the first `amount` of elements from the `Window`.
pub fn remove(&mut self, amount: u16) -> Result<(), Box<dyn Error>> { pub fn remove(&mut self, amount: u16) -> Result<(), &'static str> {
if amount > self.len() { if amount > self.len() {
return Err("amount cannot be larger than size".into()); return Err("amount cannot be larger than length of window");
} }
drop(self.elements.drain(0..amount as usize)); drop(self.elements.drain(0..amount as usize));
@ -66,6 +82,17 @@ impl Window {
Ok(()) Ok(())
} }
/// Adds a data `Vec<u8>` to the `Window`.
pub fn add(&mut self, data: Vec<u8>) -> Result<(), &'static str> {
if self.len() == self.size {
return Err("cannot add to a full window");
}
self.elements.push_back(data);
Ok(())
}
/// Returns a reference to the `VecDeque` containing the elements. /// Returns a reference to the `VecDeque` containing the elements.
pub fn get_elements(&self) -> &VecDeque<Vec<u8>> { pub fn get_elements(&self) -> &VecDeque<Vec<u8>> {
&self.elements &self.elements
@ -80,16 +107,32 @@ impl Window {
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.elements.is_empty() self.elements.is_empty()
} }
/// Returns `true` if the `Window` is full.
pub fn is_full(&self) -> bool {
self.elements.len() as u16 == self.size
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::{fs, io::Write}; use std::{
fs::{self, OpenOptions},
io::{Seek, Write},
path::Path,
};
const DIR_NAME: &str = "tmp";
#[test] #[test]
fn fills_and_removes_from_window() { fn fills_and_removes_from_window() {
let file = initialize(); const FILE_NAME: &str = "fills_and_removes_from_window.txt";
let mut file = initialize(FILE_NAME);
file.write_all(b"Hello, world!").unwrap();
file.flush().unwrap();
file.rewind().unwrap();
let mut window = Window::new(2, 5, file); let mut window = Window::new(2, 5, file);
window.fill().unwrap(); window.fill().unwrap();
@ -106,30 +149,65 @@ mod tests {
assert_eq!(window.elements[0], b", wor"[..]); assert_eq!(window.elements[0], b", wor"[..]);
assert_eq!(window.elements[1], b"ld!"[..]); assert_eq!(window.elements[1], b"ld!"[..]);
clean(); clean(FILE_NAME);
} }
fn initialize() -> File { #[test]
let dir_name = "tmp"; fn adds_to_and_empties_window() {
let file_name = "tmp/test.txt"; const FILE_NAME: &str = "adds_to_and_empties_window.txt";
if fs::metadata(dir_name).is_err() { let file = initialize(FILE_NAME);
fs::create_dir(dir_name).unwrap();
}
if File::open(file_name).is_ok() { let mut window = Window::new(3, 5, file);
fs::remove_file(file_name).unwrap(); window.add(b"Hello".to_vec()).unwrap();
} assert_eq!(window.elements.len(), 1);
assert_eq!(window.elements[0], b"Hello"[..]);
let mut file = File::create(file_name).unwrap(); window.add(b", wor".to_vec()).unwrap();
file.write_all(b"Hello, world!").unwrap(); assert_eq!(window.elements.len(), 2);
file.flush().unwrap(); assert_eq!(window.elements[0], b"Hello"[..]);
assert_eq!(window.elements[1], b", wor"[..]);
File::open(file_name).unwrap() window.add(b"ld!".to_vec()).unwrap();
assert_eq!(window.elements.len(), 3);
assert_eq!(window.elements[0], b"Hello"[..]);
assert_eq!(window.elements[1], b", wor"[..]);
assert_eq!(window.elements[2], b"ld!"[..]);
window.empty().unwrap();
assert_eq!(window.elements.len(), 0);
let mut contents = Default::default();
File::read_to_string(
&mut File::open(DIR_NAME.to_string() + "/" + FILE_NAME).unwrap(),
&mut contents,
)
.unwrap();
assert_eq!(contents, "Hello, world!");
clean(FILE_NAME);
} }
fn clean() { fn initialize(file_name: &str) -> File {
fs::remove_file("tmp/test.txt").unwrap(); let file_name = DIR_NAME.to_string() + "/" + file_name;
fs::remove_dir("tmp").unwrap(); if !Path::new(DIR_NAME).is_dir() {
fs::create_dir(DIR_NAME).unwrap();
}
if File::open(&file_name).is_ok() {
fs::remove_file(&file_name).unwrap();
}
OpenOptions::new()
.read(true)
.append(true)
.create(true)
.open(&file_name)
.unwrap()
}
fn clean(file_name: &str) {
let file_name = DIR_NAME.to_string() + "/" + file_name;
fs::remove_file(file_name).unwrap();
} }
} }

View file

@ -1,7 +1,6 @@
use std::{ use std::{
error::Error, error::Error,
fs::{self, File}, fs::{self, File},
io::Write,
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
path::PathBuf, path::PathBuf,
thread, thread,
@ -101,7 +100,7 @@ impl Worker {
let worker_options = parse_options(&mut options, &work_type)?; let worker_options = parse_options(&mut options, &work_type)?;
accept_request(&socket, &options, &work_type)?; accept_request(&socket, &options, &work_type)?;
receive_file(&socket, &mut File::create(&file_path)?, &worker_options)?; receive_file(&socket, File::create(&file_path)?, &worker_options)?;
Ok(()) Ok(())
}; };
@ -135,6 +134,7 @@ fn send_file(
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
let mut block_number = 1; let mut block_number = 1;
let mut window = Window::new(worker_options.windowsize, worker_options.blk_size, file); let mut window = Window::new(worker_options.windowsize, worker_options.blk_size, file);
loop { loop {
let filled = window.fill()?; let filled = window.fill()?;
@ -177,14 +177,16 @@ fn send_file(
fn receive_file( fn receive_file(
socket: &UdpSocket, socket: &UdpSocket,
file: &mut File, file: File,
worker_options: &WorkerOptions, worker_options: &WorkerOptions,
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
let mut block_number: u16 = 0; let mut block_number: u16 = 0;
loop { let mut window = Window::new(worker_options.windowsize, worker_options.blk_size, file);
let size;
loop {
let mut size;
let mut retry_cnt = 0; let mut retry_cnt = 0;
loop { loop {
match Message::recv_packet_with_size(socket, worker_options.blk_size) { match Message::recv_packet_with_size(socket, worker_options.blk_size) {
Ok(Packet::Data { Ok(Packet::Data {
@ -193,9 +195,16 @@ fn receive_file(
}) => { }) => {
if received_block_number == block_number.wrapping_add(1) { if received_block_number == block_number.wrapping_add(1) {
block_number = received_block_number; block_number = received_block_number;
file.write_all(&data)?;
size = data.len(); size = data.len();
break; window.add(data)?;
if size < worker_options.blk_size {
break;
}
if window.is_full() {
break;
}
} }
} }
Ok(Packet::Error { code, msg }) => { Ok(Packet::Error { code, msg }) => {
@ -210,6 +219,7 @@ fn receive_file(
} }
} }
window.empty()?;
Message::send_ack(socket, block_number)?; Message::send_ack(socket, block_number)?;
if size < worker_options.blk_size { if size < worker_options.blk_size {
break; break;