From e71a004e93d9376bb30834416cdd9248ad521246 Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Sun, 28 Feb 2021 11:36:14 +0100 Subject: [PATCH 1/2] Refactor AudioFileFetch using async/await Previously, polling `AudioFileFetch` consisted of three parts: Handling stream loader commands, handling received data, and triggering preloading in stream mode when the number of open requests is sufficiently small. The first steps use channels which are polled, and if something's available, it's handled. The third step is executed on every call of `poll`. The first two could easily be refactored using a `tokio::select!`-loop. Therefore, counting the number of open requests was also refactored to fit into this scheme. They were previously counted using a shared `AtomicUsize`. Now, the number of open requests is stored exclusively in `AudioFileFetch`, increased on starting a request, and decreased by an oneshot channel that is fired when a request is finished. This allows us to `select` that channel in the loop too, and since loading ahead makes only sense if the number of open requests decreases, the third step is only executed in this case. `AudioFileFetch` does not implement `Future` anymore, but is rather used as helper struct in an async fn `audio_file_fetch`. --- audio/src/fetch.rs | 515 +++++++++++++++------------------------------ 1 file changed, 172 insertions(+), 343 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 5fdf9e7..da34309 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -1,11 +1,8 @@ use std::cmp::{max, min}; use std::fs; -use std::future::Future; use std::io::{self, Read, Seek, SeekFrom, Write}; -use std::pin::Pin; use std::sync::atomic::{self, AtomicUsize}; use std::sync::{Arc, Condvar, Mutex}; -use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; @@ -236,7 +233,7 @@ struct AudioFileDownloadStatus { downloaded: RangeSet, } -#[derive(Copy, Clone)] +#[derive(Copy, Clone, PartialEq, Eq)] enum DownloadStrategy { RandomAccess(), Streaming(), @@ -249,7 +246,6 @@ struct AudioFileShared { cond: Condvar, download_status: Mutex, download_strategy: Mutex, - number_of_open_requests: AtomicUsize, ping_time_ms: AtomicUsize, read_position: AtomicUsize, } @@ -358,7 +354,6 @@ impl AudioFileStreaming { downloaded: RangeSet::new(), }), download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise - number_of_open_requests: AtomicUsize::new(0), ping_time_ms: AtomicUsize::new(0), read_position: AtomicUsize::new(0), }); @@ -373,7 +368,7 @@ impl AudioFileStreaming { let (stream_loader_command_tx, stream_loader_command_rx) = mpsc::unbounded_channel::(); - let fetcher = AudioFileFetch::new( + session.spawn(audio_file_fetch( session.clone(), shared.clone(), initial_data_rx, @@ -382,9 +377,8 @@ impl AudioFileStreaming { write_file, stream_loader_command_rx, complete_tx, - ); + )); - session.spawn(fetcher); Ok(AudioFileStreaming { read_file, position: 0, @@ -442,17 +436,11 @@ async fn audio_file_fetch_receive_data( initial_data_offset: usize, initial_request_length: usize, request_sent_time: Instant, + mut measure_ping_time: bool, + finish_tx: mpsc::UnboundedSender<()>, ) { let mut data_offset = initial_data_offset; let mut request_length = initial_request_length; - let mut measure_ping_time = shared - .number_of_open_requests - .load(atomic::Ordering::SeqCst) - == 0; - - shared - .number_of_open_requests - .fetch_add(1, atomic::Ordering::SeqCst); let result = loop { let data = match data_rx.next().await { @@ -501,9 +489,7 @@ async fn audio_file_fetch_receive_data( shared.cond.notify_all(); } - shared - .number_of_open_requests - .fetch_sub(1, atomic::Ordering::SeqCst); + let _ = finish_tx.send(()); if result.is_err() { warn!( @@ -517,162 +503,6 @@ async fn audio_file_fetch_receive_data( ); } } -/* -async fn audio_file_fetch( - session: Session, - shared: Arc, - initial_data_rx: ChannelData, - initial_request_sent_time: Instant, - initial_data_length: usize, - - output: NamedTempFile, - stream_loader_command_rx: mpsc::UnboundedReceiver, - complete_tx: oneshot::Sender, -) { - let (file_data_tx, file_data_rx) = unbounded::(); - - let requested_range = Range::new(0, initial_data_length); - let mut download_status = shared.download_status.lock().unwrap(); - download_status.requested.add_range(&requested_range); - - session.spawn(audio_file_fetch_receive_data( - shared.clone(), - file_data_tx.clone(), - initial_data_rx, - 0, - initial_data_length, - initial_request_sent_time, - )); - - let mut network_response_times_ms: Vec::new(); - - let f1 = file_data_rx.map(|x| Ok::<_, ()>(x)).try_for_each(|x| { - match x { - ReceivedData::ResponseTimeMs(response_time_ms) => { - trace!("Ping time estimated as: {} ms.", response_time_ms); - - // record the response time - network_response_times_ms.push(response_time_ms); - - // prune old response times. Keep at most three. - while network_response_times_ms.len() > 3 { - network_response_times_ms.remove(0); - } - - // stats::median is experimental. So we calculate the median of up to three ourselves. - let ping_time_ms: usize = match network_response_times_ms.len() { - 1 => network_response_times_ms[0] as usize, - 2 => { - ((network_response_times_ms[0] + network_response_times_ms[1]) / 2) as usize - } - 3 => { - let mut times = network_response_times_ms.clone(); - times.sort(); - times[1] - } - _ => unreachable!(), - }; - - // store our new estimate for everyone to see - shared - .ping_time_ms - .store(ping_time_ms, atomic::Ordering::Relaxed); - } - ReceivedData::Data(data) => { - output - .as_mut() - .unwrap() - .seek(SeekFrom::Start(data.offset as u64)) - .unwrap(); - output - .as_mut() - .unwrap() - .write_all(data.data.as_ref()) - .unwrap(); - - let mut full = false; - - { - let mut download_status = shared.download_status.lock().unwrap(); - - let received_range = Range::new(data.offset, data.data.len()); - download_status.downloaded.add_range(&received_range); - shared.cond.notify_all(); - - if download_status.downloaded.contained_length_from_value(0) - >= shared.file_size - { - full = true; - } - - drop(download_status); - } - - if full { - self.finish(); - return future::ready(Err(())); - } - } - } - future::ready(Ok(())) - }); - - let f2 = stream_loader_command_rx.map(Ok::<_, ()>).try_for_each(|x| { - match cmd { - StreamLoaderCommand::Fetch(request) => { - self.download_range(request.start, request.length); - } - StreamLoaderCommand::RandomAccessMode() => { - *(shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); - } - StreamLoaderCommand::StreamMode() => { - *(shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); - } - StreamLoaderCommand::Close() => return future::ready(Err(())), - } - Ok(()) - }); - - let f3 = future::poll_fn(|_| { - if let DownloadStrategy::Streaming() = self.get_download_strategy() { - let number_of_open_requests = shared - .number_of_open_requests - .load(atomic::Ordering::SeqCst); - let max_requests_to_send = - MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests); - - if max_requests_to_send > 0 { - let bytes_pending: usize = { - let download_status = shared.download_status.lock().unwrap(); - download_status - .requested - .minus(&download_status.downloaded) - .len() - }; - - let ping_time_seconds = - 0.001 * shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; - let download_rate = session.channel().get_download_rate_estimate(); - - let desired_pending_bytes = max( - (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * shared.stream_data_rate as f64) - as usize, - (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) - as usize, - ); - - if bytes_pending < desired_pending_bytes { - self.pre_fetch_more_data( - desired_pending_bytes - bytes_pending, - max_requests_to_send, - ); - } - } - } - Poll::Pending - }); - future::select_all(vec![f1, f2, f3]).await -}*/ struct AudioFileFetch { session: Session, @@ -680,54 +510,21 @@ struct AudioFileFetch { output: Option, file_data_tx: mpsc::UnboundedSender, - file_data_rx: mpsc::UnboundedReceiver, - - stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: Option>, network_response_times_ms: Vec, + number_of_open_requests: usize, + + download_finish_tx: mpsc::UnboundedSender<()>, +} + +// Might be replaced by enum from std once stable +#[derive(PartialEq, Eq)] +enum ControlFlow { + Break, + Continue, } impl AudioFileFetch { - fn new( - session: Session, - shared: Arc, - initial_data_rx: ChannelData, - initial_request_sent_time: Instant, - initial_data_length: usize, - - output: NamedTempFile, - stream_loader_command_rx: mpsc::UnboundedReceiver, - complete_tx: oneshot::Sender, - ) -> AudioFileFetch { - let (file_data_tx, file_data_rx) = mpsc::unbounded_channel::(); - - { - let requested_range = Range::new(0, initial_data_length); - let mut download_status = shared.download_status.lock().unwrap(); - download_status.requested.add_range(&requested_range); - } - - session.spawn(audio_file_fetch_receive_data( - shared.clone(), - file_data_tx.clone(), - initial_data_rx, - 0, - initial_data_length, - initial_request_sent_time, - )); - - AudioFileFetch { - session, - shared, - output: Some(output), - file_data_tx, - file_data_rx, - stream_loader_command_rx, - complete_tx: Some(complete_tx), - network_response_times_ms: Vec::new(), - } - } - fn get_download_strategy(&mut self) -> DownloadStrategy { *(self.shared.download_strategy.lock().unwrap()) } @@ -785,7 +582,11 @@ impl AudioFileFetch { range.start, range.length, Instant::now(), + self.number_of_open_requests == 0, + self.download_finish_tx.clone(), )); + + self.number_of_open_requests += 1; } } @@ -833,103 +634,86 @@ impl AudioFileFetch { } } - fn poll_file_data_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> { - loop { - match self.file_data_rx.poll_recv(cx) { - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms))) => { - trace!("Ping time estimated as: {} ms.", response_time_ms); + fn handle_file_data(&mut self, data: ReceivedData) -> ControlFlow { + match data { + ReceivedData::ResponseTimeMs(response_time_ms) => { + trace!("Ping time estimated as: {} ms.", response_time_ms); - // record the response time - self.network_response_times_ms.push(response_time_ms); + // record the response time + self.network_response_times_ms.push(response_time_ms); - // prune old response times. Keep at most three. - while self.network_response_times_ms.len() > 3 { - self.network_response_times_ms.remove(0); - } - - // stats::median is experimental. So we calculate the median of up to three ourselves. - let ping_time_ms: usize = match self.network_response_times_ms.len() { - 1 => self.network_response_times_ms[0] as usize, - 2 => { - ((self.network_response_times_ms[0] - + self.network_response_times_ms[1]) - / 2) as usize - } - 3 => { - let mut times = self.network_response_times_ms.clone(); - times.sort_unstable(); - times[1] - } - _ => unreachable!(), - }; - - // store our new estimate for everyone to see - self.shared - .ping_time_ms - .store(ping_time_ms, atomic::Ordering::Relaxed); + // prune old response times. Keep at most three. + while self.network_response_times_ms.len() > 3 { + self.network_response_times_ms.remove(0); } - Poll::Ready(Some(ReceivedData::Data(data))) => { - self.output - .as_mut() - .unwrap() - .seek(SeekFrom::Start(data.offset as u64)) - .unwrap(); - self.output - .as_mut() - .unwrap() - .write_all(data.data.as_ref()) - .unwrap(); - let mut full = false; - - { - let mut download_status = self.shared.download_status.lock().unwrap(); - - let received_range = Range::new(data.offset, data.data.len()); - download_status.downloaded.add_range(&received_range); - self.shared.cond.notify_all(); - - if download_status.downloaded.contained_length_from_value(0) - >= self.shared.file_size - { - full = true; - } - - drop(download_status); + // stats::median is experimental. So we calculate the median of up to three ourselves. + let ping_time_ms: usize = match self.network_response_times_ms.len() { + 1 => self.network_response_times_ms[0] as usize, + 2 => { + ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) + / 2) as usize } - - if full { - self.finish(); - return Poll::Ready(()); + 3 => { + let mut times = self.network_response_times_ms.clone(); + times.sort_unstable(); + times[1] } + _ => unreachable!(), + }; + + // store our new estimate for everyone to see + self.shared + .ping_time_ms + .store(ping_time_ms, atomic::Ordering::Relaxed); + } + ReceivedData::Data(data) => { + self.output + .as_mut() + .unwrap() + .seek(SeekFrom::Start(data.offset as u64)) + .unwrap(); + self.output + .as_mut() + .unwrap() + .write_all(data.data.as_ref()) + .unwrap(); + + let mut download_status = self.shared.download_status.lock().unwrap(); + + let received_range = Range::new(data.offset, data.data.len()); + download_status.downloaded.add_range(&received_range); + self.shared.cond.notify_all(); + + let full = download_status.downloaded.contained_length_from_value(0) + >= self.shared.file_size; + + drop(download_status); + + if full { + self.finish(); + return ControlFlow::Break; } - Poll::Pending => return Poll::Pending, } } + ControlFlow::Continue } - fn poll_stream_loader_command_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> { - loop { - match self.stream_loader_command_rx.poll_recv(cx) { - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(cmd)) => match cmd { - StreamLoaderCommand::Fetch(request) => { - self.download_range(request.start, request.length); - } - StreamLoaderCommand::RandomAccessMode() => { - *(self.shared.download_strategy.lock().unwrap()) = - DownloadStrategy::RandomAccess(); - } - StreamLoaderCommand::StreamMode() => { - *(self.shared.download_strategy.lock().unwrap()) = - DownloadStrategy::Streaming(); - } - StreamLoaderCommand::Close() => return Poll::Ready(()), - }, - Poll::Pending => return Poll::Pending, + fn handle_stream_loader_command(&mut self, cmd: StreamLoaderCommand) -> ControlFlow { + match cmd { + StreamLoaderCommand::Fetch(request) => { + self.download_range(request.start, request.length); } + StreamLoaderCommand::RandomAccessMode() => { + *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); + } + StreamLoaderCommand::StreamMode() => { + *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); + self.trigger_preload(); + } + StreamLoaderCommand::Close() => return ControlFlow::Break, } + ControlFlow::Continue } fn finish(&mut self) { @@ -939,57 +723,102 @@ impl AudioFileFetch { output.seek(SeekFrom::Start(0)).unwrap(); let _ = complete_tx.send(output); } + + fn trigger_preload(&mut self) { + if self.number_of_open_requests >= MAX_PREFETCH_REQUESTS { + return; + } + + let max_requests_to_send = MAX_PREFETCH_REQUESTS - self.number_of_open_requests; + + let bytes_pending: usize = { + let download_status = self.shared.download_status.lock().unwrap(); + download_status + .requested + .minus(&download_status.downloaded) + .len() + }; + + let ping_time_seconds = + 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + let download_rate = self.session.channel().get_download_rate_estimate(); + + let desired_pending_bytes = max( + (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) + as usize, + (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize, + ); + + if bytes_pending < desired_pending_bytes { + self.pre_fetch_more_data(desired_pending_bytes - bytes_pending, max_requests_to_send); + } + } } -impl Future for AudioFileFetch { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - if let Poll::Ready(()) = self.poll_stream_loader_command_rx(cx) { - return Poll::Ready(()); - } +async fn audio_file_fetch( + session: Session, + shared: Arc, + initial_data_rx: ChannelData, + initial_request_sent_time: Instant, + initial_data_length: usize, - if let Poll::Ready(()) = self.poll_file_data_rx(cx) { - return Poll::Ready(()); - } + output: NamedTempFile, + mut stream_loader_command_rx: mpsc::UnboundedReceiver, + complete_tx: oneshot::Sender, +) { + let (file_data_tx, mut file_data_rx) = mpsc::unbounded_channel(); + let (download_finish_tx, mut download_finish_rx) = mpsc::unbounded_channel(); - if let DownloadStrategy::Streaming() = self.get_download_strategy() { - let number_of_open_requests = self - .shared - .number_of_open_requests - .load(atomic::Ordering::SeqCst); - let max_requests_to_send = - MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests); + { + let requested_range = Range::new(0, initial_data_length); + let mut download_status = shared.download_status.lock().unwrap(); + download_status.requested.add_range(&requested_range); + } - if max_requests_to_send > 0 { - let bytes_pending: usize = { - let download_status = self.shared.download_status.lock().unwrap(); - download_status - .requested - .minus(&download_status.downloaded) - .len() - }; + session.spawn(audio_file_fetch_receive_data( + shared.clone(), + file_data_tx.clone(), + initial_data_rx, + 0, + initial_data_length, + initial_request_sent_time, + true, + download_finish_tx.clone(), + )); - let ping_time_seconds = - 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; - let download_rate = self.session.channel().get_download_rate_estimate(); + let mut fetch = AudioFileFetch { + session, + shared, + output: Some(output), - let desired_pending_bytes = max( - (PREFETCH_THRESHOLD_FACTOR - * ping_time_seconds - * self.shared.stream_data_rate as f64) as usize, - (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) - as usize, - ); + file_data_tx, + complete_tx: Some(complete_tx), + network_response_times_ms: Vec::new(), + number_of_open_requests: 1, - if bytes_pending < desired_pending_bytes { - self.pre_fetch_more_data( - desired_pending_bytes - bytes_pending, - max_requests_to_send, - ); + download_finish_tx, + }; + + loop { + tokio::select! { + cmd = stream_loader_command_rx.recv() => { + if cmd.map_or(true, |cmd| fetch.handle_stream_loader_command(cmd) == ControlFlow::Break) { + break; + } + }, + data = file_data_rx.recv() => { + if data.map_or(true, |data| fetch.handle_file_data(data) == ControlFlow::Break) { + break; + } + }, + _ = download_finish_rx.recv() => { + fetch.number_of_open_requests -= 1; + + if fetch.get_download_strategy() == DownloadStrategy::Streaming() { + fetch.trigger_preload(); } } } - Poll::Pending } } From ca255c17f06801ad9b79259233bd7a6b952b3942 Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Sun, 28 Feb 2021 11:36:15 +0100 Subject: [PATCH 2/2] Split file fetch.rs --- audio/src/{fetch.rs => fetch/mod.rs} | 444 +------------------------- audio/src/fetch/receive.rs | 455 +++++++++++++++++++++++++++ 2 files changed, 461 insertions(+), 438 deletions(-) rename audio/src/{fetch.rs => fetch/mod.rs} (55%) create mode 100644 audio/src/fetch/receive.rs diff --git a/audio/src/fetch.rs b/audio/src/fetch/mod.rs similarity index 55% rename from audio/src/fetch.rs rename to audio/src/fetch/mod.rs index da34309..c19fac2 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch/mod.rs @@ -1,19 +1,21 @@ +mod receive; + use std::cmp::{max, min}; use std::fs; -use std::io::{self, Read, Seek, SeekFrom, Write}; +use std::io::{self, Read, Seek, SeekFrom}; use std::sync::atomic::{self, AtomicUsize}; use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, Instant}; -use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; -use bytes::Bytes; +use byteorder::{BigEndian, ByteOrder}; use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt}; -use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; +use librespot_core::channel::{ChannelData, ChannelError, ChannelHeaders}; use librespot_core::session::Session; use librespot_core::spotify_id::FileId; use tempfile::NamedTempFile; use tokio::sync::{mpsc, oneshot}; +use self::receive::{audio_file_fetch, request_range}; use crate::range_set::{Range, RangeSet}; const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; @@ -388,440 +390,6 @@ impl AudioFileStreaming { } } -fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { - assert!( - offset % 4 == 0, - "Range request start positions must be aligned by 4 bytes." - ); - assert!( - length % 4 == 0, - "Range request range lengths must be aligned by 4 bytes." - ); - let start = offset / 4; - let end = (offset + length) / 4; - - let (id, channel) = session.channel().allocate(); - - let mut data: Vec = Vec::new(); - data.write_u16::(id).unwrap(); - data.write_u8(0).unwrap(); - data.write_u8(1).unwrap(); - data.write_u16::(0x0000).unwrap(); - data.write_u32::(0x00000000).unwrap(); - data.write_u32::(0x00009C40).unwrap(); - data.write_u32::(0x00020000).unwrap(); - data.write(&file.0).unwrap(); - data.write_u32::(start as u32).unwrap(); - data.write_u32::(end as u32).unwrap(); - - session.send_packet(0x8, data); - - channel -} - -struct PartialFileData { - offset: usize, - data: Bytes, -} - -enum ReceivedData { - ResponseTimeMs(usize), - Data(PartialFileData), -} - -async fn audio_file_fetch_receive_data( - shared: Arc, - file_data_tx: mpsc::UnboundedSender, - mut data_rx: ChannelData, - initial_data_offset: usize, - initial_request_length: usize, - request_sent_time: Instant, - mut measure_ping_time: bool, - finish_tx: mpsc::UnboundedSender<()>, -) { - let mut data_offset = initial_data_offset; - let mut request_length = initial_request_length; - - let result = loop { - let data = match data_rx.next().await { - Some(Ok(data)) => data, - Some(Err(e)) => break Err(e), - None => break Ok(()), - }; - - if measure_ping_time { - let duration = Instant::now() - request_sent_time; - let duration_ms: u64; - if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS { - duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; - } else { - duration_ms = duration.as_millis() as u64; - } - let _ = file_data_tx.send(ReceivedData::ResponseTimeMs(duration_ms as usize)); - measure_ping_time = false; - } - let data_size = data.len(); - let _ = file_data_tx.send(ReceivedData::Data(PartialFileData { - offset: data_offset, - data, - })); - data_offset += data_size; - if request_length < data_size { - warn!( - "Data receiver for range {} (+{}) received more data from server than requested.", - initial_data_offset, initial_request_length - ); - request_length = 0; - } else { - request_length -= data_size; - } - - if request_length == 0 { - break Ok(()); - } - }; - - if request_length > 0 { - let missing_range = Range::new(data_offset, request_length); - - let mut download_status = shared.download_status.lock().unwrap(); - download_status.requested.subtract_range(&missing_range); - shared.cond.notify_all(); - } - - let _ = finish_tx.send(()); - - if result.is_err() { - warn!( - "Error from channel for data receiver for range {} (+{}).", - initial_data_offset, initial_request_length - ); - } else if request_length > 0 { - warn!( - "Data receiver for range {} (+{}) received less data from server than requested.", - initial_data_offset, initial_request_length - ); - } -} - -struct AudioFileFetch { - session: Session, - shared: Arc, - output: Option, - - file_data_tx: mpsc::UnboundedSender, - complete_tx: Option>, - network_response_times_ms: Vec, - number_of_open_requests: usize, - - download_finish_tx: mpsc::UnboundedSender<()>, -} - -// Might be replaced by enum from std once stable -#[derive(PartialEq, Eq)] -enum ControlFlow { - Break, - Continue, -} - -impl AudioFileFetch { - fn get_download_strategy(&mut self) -> DownloadStrategy { - *(self.shared.download_strategy.lock().unwrap()) - } - - fn download_range(&mut self, mut offset: usize, mut length: usize) { - if length < MINIMUM_DOWNLOAD_SIZE { - length = MINIMUM_DOWNLOAD_SIZE; - } - - // ensure the values are within the bounds and align them by 4 for the spotify protocol. - if offset >= self.shared.file_size { - return; - } - - if length == 0 { - return; - } - - if offset + length > self.shared.file_size { - length = self.shared.file_size - offset; - } - - if offset % 4 != 0 { - length += offset % 4; - offset -= offset % 4; - } - - if length % 4 != 0 { - length += 4 - (length % 4); - } - - let mut ranges_to_request = RangeSet::new(); - ranges_to_request.add_range(&Range::new(offset, length)); - - let mut download_status = self.shared.download_status.lock().unwrap(); - - ranges_to_request.subtract_range_set(&download_status.downloaded); - ranges_to_request.subtract_range_set(&download_status.requested); - - for range in ranges_to_request.iter() { - let (_headers, data) = request_range( - &self.session, - self.shared.file_id, - range.start, - range.length, - ) - .split(); - - download_status.requested.add_range(range); - - self.session.spawn(audio_file_fetch_receive_data( - self.shared.clone(), - self.file_data_tx.clone(), - data, - range.start, - range.length, - Instant::now(), - self.number_of_open_requests == 0, - self.download_finish_tx.clone(), - )); - - self.number_of_open_requests += 1; - } - } - - fn pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize) { - let mut bytes_to_go = bytes; - let mut requests_to_go = max_requests_to_send; - - while bytes_to_go > 0 && requests_to_go > 0 { - // determine what is still missing - let mut missing_data = RangeSet::new(); - missing_data.add_range(&Range::new(0, self.shared.file_size)); - { - let download_status = self.shared.download_status.lock().unwrap(); - missing_data.subtract_range_set(&download_status.downloaded); - missing_data.subtract_range_set(&download_status.requested); - } - - // download data from after the current read position first - let mut tail_end = RangeSet::new(); - let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed); - tail_end.add_range(&Range::new( - read_position, - self.shared.file_size - read_position, - )); - let tail_end = tail_end.intersection(&missing_data); - - if !tail_end.is_empty() { - let range = tail_end.get_range(0); - let offset = range.start; - let length = min(range.length, bytes_to_go); - self.download_range(offset, length); - requests_to_go -= 1; - bytes_to_go -= length; - } else if !missing_data.is_empty() { - // ok, the tail is downloaded, download something fom the beginning. - let range = missing_data.get_range(0); - let offset = range.start; - let length = min(range.length, bytes_to_go); - self.download_range(offset, length); - requests_to_go -= 1; - bytes_to_go -= length; - } else { - return; - } - } - } - - fn handle_file_data(&mut self, data: ReceivedData) -> ControlFlow { - match data { - ReceivedData::ResponseTimeMs(response_time_ms) => { - trace!("Ping time estimated as: {} ms.", response_time_ms); - - // record the response time - self.network_response_times_ms.push(response_time_ms); - - // prune old response times. Keep at most three. - while self.network_response_times_ms.len() > 3 { - self.network_response_times_ms.remove(0); - } - - // stats::median is experimental. So we calculate the median of up to three ourselves. - let ping_time_ms: usize = match self.network_response_times_ms.len() { - 1 => self.network_response_times_ms[0] as usize, - 2 => { - ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) - / 2) as usize - } - 3 => { - let mut times = self.network_response_times_ms.clone(); - times.sort_unstable(); - times[1] - } - _ => unreachable!(), - }; - - // store our new estimate for everyone to see - self.shared - .ping_time_ms - .store(ping_time_ms, atomic::Ordering::Relaxed); - } - ReceivedData::Data(data) => { - self.output - .as_mut() - .unwrap() - .seek(SeekFrom::Start(data.offset as u64)) - .unwrap(); - self.output - .as_mut() - .unwrap() - .write_all(data.data.as_ref()) - .unwrap(); - - let mut download_status = self.shared.download_status.lock().unwrap(); - - let received_range = Range::new(data.offset, data.data.len()); - download_status.downloaded.add_range(&received_range); - self.shared.cond.notify_all(); - - let full = download_status.downloaded.contained_length_from_value(0) - >= self.shared.file_size; - - drop(download_status); - - if full { - self.finish(); - return ControlFlow::Break; - } - } - } - ControlFlow::Continue - } - - fn handle_stream_loader_command(&mut self, cmd: StreamLoaderCommand) -> ControlFlow { - match cmd { - StreamLoaderCommand::Fetch(request) => { - self.download_range(request.start, request.length); - } - StreamLoaderCommand::RandomAccessMode() => { - *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); - } - StreamLoaderCommand::StreamMode() => { - *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); - self.trigger_preload(); - } - StreamLoaderCommand::Close() => return ControlFlow::Break, - } - ControlFlow::Continue - } - - fn finish(&mut self) { - let mut output = self.output.take().unwrap(); - let complete_tx = self.complete_tx.take().unwrap(); - - output.seek(SeekFrom::Start(0)).unwrap(); - let _ = complete_tx.send(output); - } - - fn trigger_preload(&mut self) { - if self.number_of_open_requests >= MAX_PREFETCH_REQUESTS { - return; - } - - let max_requests_to_send = MAX_PREFETCH_REQUESTS - self.number_of_open_requests; - - let bytes_pending: usize = { - let download_status = self.shared.download_status.lock().unwrap(); - download_status - .requested - .minus(&download_status.downloaded) - .len() - }; - - let ping_time_seconds = - 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; - let download_rate = self.session.channel().get_download_rate_estimate(); - - let desired_pending_bytes = max( - (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) - as usize, - (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize, - ); - - if bytes_pending < desired_pending_bytes { - self.pre_fetch_more_data(desired_pending_bytes - bytes_pending, max_requests_to_send); - } - } -} - -async fn audio_file_fetch( - session: Session, - shared: Arc, - initial_data_rx: ChannelData, - initial_request_sent_time: Instant, - initial_data_length: usize, - - output: NamedTempFile, - mut stream_loader_command_rx: mpsc::UnboundedReceiver, - complete_tx: oneshot::Sender, -) { - let (file_data_tx, mut file_data_rx) = mpsc::unbounded_channel(); - let (download_finish_tx, mut download_finish_rx) = mpsc::unbounded_channel(); - - { - let requested_range = Range::new(0, initial_data_length); - let mut download_status = shared.download_status.lock().unwrap(); - download_status.requested.add_range(&requested_range); - } - - session.spawn(audio_file_fetch_receive_data( - shared.clone(), - file_data_tx.clone(), - initial_data_rx, - 0, - initial_data_length, - initial_request_sent_time, - true, - download_finish_tx.clone(), - )); - - let mut fetch = AudioFileFetch { - session, - shared, - output: Some(output), - - file_data_tx, - complete_tx: Some(complete_tx), - network_response_times_ms: Vec::new(), - number_of_open_requests: 1, - - download_finish_tx, - }; - - loop { - tokio::select! { - cmd = stream_loader_command_rx.recv() => { - if cmd.map_or(true, |cmd| fetch.handle_stream_loader_command(cmd) == ControlFlow::Break) { - break; - } - }, - data = file_data_rx.recv() => { - if data.map_or(true, |data| fetch.handle_file_data(data) == ControlFlow::Break) { - break; - } - }, - _ = download_finish_rx.recv() => { - fetch.number_of_open_requests -= 1; - - if fetch.get_download_strategy() == DownloadStrategy::Streaming() { - fetch.trigger_preload(); - } - } - } - } -} - impl Read for AudioFileStreaming { fn read(&mut self, output: &mut [u8]) -> io::Result { let offset = self.position as usize; diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs new file mode 100644 index 0000000..17f884f --- /dev/null +++ b/audio/src/fetch/receive.rs @@ -0,0 +1,455 @@ +use std::cmp::{max, min}; +use std::io::{Seek, SeekFrom, Write}; +use std::sync::{atomic, Arc}; +use std::time::Instant; + +use byteorder::{BigEndian, WriteBytesExt}; +use bytes::Bytes; +use futures_util::StreamExt; +use librespot_core::channel::{Channel, ChannelData}; +use librespot_core::session::Session; +use librespot_core::spotify_id::FileId; +use tempfile::NamedTempFile; +use tokio::sync::{mpsc, oneshot}; + +use crate::range_set::{Range, RangeSet}; + +use super::{AudioFileShared, DownloadStrategy, StreamLoaderCommand}; +use super::{ + FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME_SECONDS, MAX_PREFETCH_REQUESTS, + MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR, +}; + +pub fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { + assert!( + offset % 4 == 0, + "Range request start positions must be aligned by 4 bytes." + ); + assert!( + length % 4 == 0, + "Range request range lengths must be aligned by 4 bytes." + ); + let start = offset / 4; + let end = (offset + length) / 4; + + let (id, channel) = session.channel().allocate(); + + let mut data: Vec = Vec::new(); + data.write_u16::(id).unwrap(); + data.write_u8(0).unwrap(); + data.write_u8(1).unwrap(); + data.write_u16::(0x0000).unwrap(); + data.write_u32::(0x00000000).unwrap(); + data.write_u32::(0x00009C40).unwrap(); + data.write_u32::(0x00020000).unwrap(); + data.write(&file.0).unwrap(); + data.write_u32::(start as u32).unwrap(); + data.write_u32::(end as u32).unwrap(); + + session.send_packet(0x8, data); + + channel +} + +struct PartialFileData { + offset: usize, + data: Bytes, +} + +enum ReceivedData { + ResponseTimeMs(usize), + Data(PartialFileData), +} + +async fn receive_data( + shared: Arc, + file_data_tx: mpsc::UnboundedSender, + mut data_rx: ChannelData, + initial_data_offset: usize, + initial_request_length: usize, + request_sent_time: Instant, + mut measure_ping_time: bool, + finish_tx: mpsc::UnboundedSender<()>, +) { + let mut data_offset = initial_data_offset; + let mut request_length = initial_request_length; + + let result = loop { + let data = match data_rx.next().await { + Some(Ok(data)) => data, + Some(Err(e)) => break Err(e), + None => break Ok(()), + }; + + if measure_ping_time { + let duration = Instant::now() - request_sent_time; + let duration_ms: u64; + if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS { + duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; + } else { + duration_ms = duration.as_millis() as u64; + } + let _ = file_data_tx.send(ReceivedData::ResponseTimeMs(duration_ms as usize)); + measure_ping_time = false; + } + let data_size = data.len(); + let _ = file_data_tx.send(ReceivedData::Data(PartialFileData { + offset: data_offset, + data, + })); + data_offset += data_size; + if request_length < data_size { + warn!( + "Data receiver for range {} (+{}) received more data from server than requested.", + initial_data_offset, initial_request_length + ); + request_length = 0; + } else { + request_length -= data_size; + } + + if request_length == 0 { + break Ok(()); + } + }; + + if request_length > 0 { + let missing_range = Range::new(data_offset, request_length); + + let mut download_status = shared.download_status.lock().unwrap(); + download_status.requested.subtract_range(&missing_range); + shared.cond.notify_all(); + } + + let _ = finish_tx.send(()); + + if result.is_err() { + warn!( + "Error from channel for data receiver for range {} (+{}).", + initial_data_offset, initial_request_length + ); + } else if request_length > 0 { + warn!( + "Data receiver for range {} (+{}) received less data from server than requested.", + initial_data_offset, initial_request_length + ); + } +} + +struct AudioFileFetch { + session: Session, + shared: Arc, + output: Option, + + file_data_tx: mpsc::UnboundedSender, + complete_tx: Option>, + network_response_times_ms: Vec, + number_of_open_requests: usize, + + download_finish_tx: mpsc::UnboundedSender<()>, +} + +// Might be replaced by enum from std once stable +#[derive(PartialEq, Eq)] +enum ControlFlow { + Break, + Continue, +} + +impl AudioFileFetch { + fn get_download_strategy(&mut self) -> DownloadStrategy { + *(self.shared.download_strategy.lock().unwrap()) + } + + fn download_range(&mut self, mut offset: usize, mut length: usize) { + if length < MINIMUM_DOWNLOAD_SIZE { + length = MINIMUM_DOWNLOAD_SIZE; + } + + // ensure the values are within the bounds and align them by 4 for the spotify protocol. + if offset >= self.shared.file_size { + return; + } + + if length == 0 { + return; + } + + if offset + length > self.shared.file_size { + length = self.shared.file_size - offset; + } + + if offset % 4 != 0 { + length += offset % 4; + offset -= offset % 4; + } + + if length % 4 != 0 { + length += 4 - (length % 4); + } + + let mut ranges_to_request = RangeSet::new(); + ranges_to_request.add_range(&Range::new(offset, length)); + + let mut download_status = self.shared.download_status.lock().unwrap(); + + ranges_to_request.subtract_range_set(&download_status.downloaded); + ranges_to_request.subtract_range_set(&download_status.requested); + + for range in ranges_to_request.iter() { + let (_headers, data) = request_range( + &self.session, + self.shared.file_id, + range.start, + range.length, + ) + .split(); + + download_status.requested.add_range(range); + + self.session.spawn(receive_data( + self.shared.clone(), + self.file_data_tx.clone(), + data, + range.start, + range.length, + Instant::now(), + self.number_of_open_requests == 0, + self.download_finish_tx.clone(), + )); + + self.number_of_open_requests += 1; + } + } + + fn pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize) { + let mut bytes_to_go = bytes; + let mut requests_to_go = max_requests_to_send; + + while bytes_to_go > 0 && requests_to_go > 0 { + // determine what is still missing + let mut missing_data = RangeSet::new(); + missing_data.add_range(&Range::new(0, self.shared.file_size)); + { + let download_status = self.shared.download_status.lock().unwrap(); + missing_data.subtract_range_set(&download_status.downloaded); + missing_data.subtract_range_set(&download_status.requested); + } + + // download data from after the current read position first + let mut tail_end = RangeSet::new(); + let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed); + tail_end.add_range(&Range::new( + read_position, + self.shared.file_size - read_position, + )); + let tail_end = tail_end.intersection(&missing_data); + + if !tail_end.is_empty() { + let range = tail_end.get_range(0); + let offset = range.start; + let length = min(range.length, bytes_to_go); + self.download_range(offset, length); + requests_to_go -= 1; + bytes_to_go -= length; + } else if !missing_data.is_empty() { + // ok, the tail is downloaded, download something fom the beginning. + let range = missing_data.get_range(0); + let offset = range.start; + let length = min(range.length, bytes_to_go); + self.download_range(offset, length); + requests_to_go -= 1; + bytes_to_go -= length; + } else { + return; + } + } + } + + fn handle_file_data(&mut self, data: ReceivedData) -> ControlFlow { + match data { + ReceivedData::ResponseTimeMs(response_time_ms) => { + trace!("Ping time estimated as: {} ms.", response_time_ms); + + // record the response time + self.network_response_times_ms.push(response_time_ms); + + // prune old response times. Keep at most three. + while self.network_response_times_ms.len() > 3 { + self.network_response_times_ms.remove(0); + } + + // stats::median is experimental. So we calculate the median of up to three ourselves. + let ping_time_ms: usize = match self.network_response_times_ms.len() { + 1 => self.network_response_times_ms[0] as usize, + 2 => { + ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) + / 2) as usize + } + 3 => { + let mut times = self.network_response_times_ms.clone(); + times.sort_unstable(); + times[1] + } + _ => unreachable!(), + }; + + // store our new estimate for everyone to see + self.shared + .ping_time_ms + .store(ping_time_ms, atomic::Ordering::Relaxed); + } + ReceivedData::Data(data) => { + self.output + .as_mut() + .unwrap() + .seek(SeekFrom::Start(data.offset as u64)) + .unwrap(); + self.output + .as_mut() + .unwrap() + .write_all(data.data.as_ref()) + .unwrap(); + + let mut download_status = self.shared.download_status.lock().unwrap(); + + let received_range = Range::new(data.offset, data.data.len()); + download_status.downloaded.add_range(&received_range); + self.shared.cond.notify_all(); + + let full = download_status.downloaded.contained_length_from_value(0) + >= self.shared.file_size; + + drop(download_status); + + if full { + self.finish(); + return ControlFlow::Break; + } + } + } + ControlFlow::Continue + } + + fn handle_stream_loader_command(&mut self, cmd: StreamLoaderCommand) -> ControlFlow { + match cmd { + StreamLoaderCommand::Fetch(request) => { + self.download_range(request.start, request.length); + } + StreamLoaderCommand::RandomAccessMode() => { + *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); + } + StreamLoaderCommand::StreamMode() => { + *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); + self.trigger_preload(); + } + StreamLoaderCommand::Close() => return ControlFlow::Break, + } + ControlFlow::Continue + } + + fn finish(&mut self) { + let mut output = self.output.take().unwrap(); + let complete_tx = self.complete_tx.take().unwrap(); + + output.seek(SeekFrom::Start(0)).unwrap(); + let _ = complete_tx.send(output); + } + + fn trigger_preload(&mut self) { + if self.number_of_open_requests >= MAX_PREFETCH_REQUESTS { + return; + } + + let max_requests_to_send = MAX_PREFETCH_REQUESTS - self.number_of_open_requests; + + let bytes_pending: usize = { + let download_status = self.shared.download_status.lock().unwrap(); + download_status + .requested + .minus(&download_status.downloaded) + .len() + }; + + let ping_time_seconds = + 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + let download_rate = self.session.channel().get_download_rate_estimate(); + + let desired_pending_bytes = max( + (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) + as usize, + (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize, + ); + + if bytes_pending < desired_pending_bytes { + self.pre_fetch_more_data(desired_pending_bytes - bytes_pending, max_requests_to_send); + } + } +} + +pub(super) async fn audio_file_fetch( + session: Session, + shared: Arc, + initial_data_rx: ChannelData, + initial_request_sent_time: Instant, + initial_data_length: usize, + + output: NamedTempFile, + mut stream_loader_command_rx: mpsc::UnboundedReceiver, + complete_tx: oneshot::Sender, +) { + let (file_data_tx, mut file_data_rx) = mpsc::unbounded_channel(); + let (download_finish_tx, mut download_finish_rx) = mpsc::unbounded_channel(); + + { + let requested_range = Range::new(0, initial_data_length); + let mut download_status = shared.download_status.lock().unwrap(); + download_status.requested.add_range(&requested_range); + } + + session.spawn(receive_data( + shared.clone(), + file_data_tx.clone(), + initial_data_rx, + 0, + initial_data_length, + initial_request_sent_time, + true, + download_finish_tx.clone(), + )); + + let mut fetch = AudioFileFetch { + session, + shared, + output: Some(output), + + file_data_tx, + complete_tx: Some(complete_tx), + network_response_times_ms: Vec::new(), + number_of_open_requests: 1, + + download_finish_tx, + }; + + loop { + tokio::select! { + cmd = stream_loader_command_rx.recv() => { + if cmd.map_or(true, |cmd| fetch.handle_stream_loader_command(cmd) == ControlFlow::Break) { + break; + } + }, + data = file_data_rx.recv() => { + if data.map_or(true, |data| fetch.handle_file_data(data) == ControlFlow::Break) { + break; + } + }, + _ = download_finish_rx.recv() => { + fetch.number_of_open_requests -= 1; + + if fetch.get_download_strategy() == DownloadStrategy::Streaming() { + fetch.trigger_preload(); + } + } + } + } +}