From 333fc5010c05b843fea62e7d85def603414d4e6c Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Sat, 2 Nov 2019 06:46:28 +1100 Subject: [PATCH 01/19] New file downloading mechanism --- Cargo.lock | 2 + audio/Cargo.toml | 2 + audio/src/fetch.rs | 744 ++++++++++++++++++++++++++++++++++------- audio/src/lib.rs | 7 +- audio/src/range_set.rs | 241 +++++++++++++ core/src/channel.rs | 2 + playback/src/player.rs | 65 +++- 7 files changed, 944 insertions(+), 119 deletions(-) create mode 100644 audio/src/range_set.rs diff --git a/Cargo.lock b/Cargo.lock index 1f4824f..67573c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -467,6 +467,7 @@ version = "0.1.0" dependencies = [ "bit-set 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "lewton 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", "librespot-core 0.1.0", @@ -475,6 +476,7 @@ dependencies = [ "num-traits 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)", "rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "tremor 0.1.0 (git+https://github.com/plietar/rust-tremor)", "vorbis 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/audio/Cargo.toml b/audio/Cargo.toml index 5c61b69..59ebfe8 100644 --- a/audio/Cargo.toml +++ b/audio/Cargo.toml @@ -9,6 +9,7 @@ path = "../core" [dependencies] bit-set = "0.4.0" byteorder = "1.0" +bytes = "0.4" futures = "0.1.8" lewton = "0.9.3" log = "0.3.5" @@ -16,6 +17,7 @@ num-bigint = "0.1.35" num-traits = "0.1.36" rust-crypto = "0.2.36" tempfile = "2.1" +tokio = "0.1.2" tremor = { git = "https://github.com/plietar/rust-tremor", optional = true } vorbis = { version ="0.1.0", optional = true } diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 1aa0c0c..ffdbe4b 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -1,5 +1,5 @@ -use bit_set::BitSet; use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; +use bytes::Bytes; use futures::sync::{mpsc, oneshot}; use futures::Stream; use futures::{Async, Future, Poll}; @@ -7,13 +7,20 @@ use std::cmp::min; use std::fs; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::sync::{Arc, Condvar, Mutex}; +use std::time::{Duration, Instant}; use tempfile::NamedTempFile; +use range_set::{Range, RangeSet}; use core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; use core::session::Session; use core::spotify_id::FileId; +use futures::sync::mpsc::unbounded; +use std::sync::atomic; +use std::sync::atomic::AtomicUsize; -const CHUNK_SIZE: usize = 0x20000; +const MINIMUM_CHUNK_SIZE: usize = 1024 * 16; +const MAXIMUM_CHUNK_SIZE: usize = 1024 * 128; +const MAXIMUM_ASSUMED_PING_TIME_SECONDS: u64 = 5; pub enum AudioFile { Cached(fs::File), @@ -27,37 +34,187 @@ pub enum AudioFileOpen { pub struct AudioFileOpenStreaming { session: Session, - data_rx: Option, + initial_data_rx: Option, + initial_data_length: Option, + initial_request_sent_time: Instant, headers: ChannelHeaders, file_id: FileId, complete_tx: Option>, } + +enum StreamLoaderCommand{ + Fetch(Range), // signal the stream loader to fetch a range of the file + RandomAccessMode(), // optimise download strategy for random access + StreamMode(), // optimise download strategy for streaming + StreamDataRate(usize), // when optimising for streaming, assume a streaming rate of this many bytes per second. + Close(), // terminate and don't load any more data +} + + +#[derive(Clone)] +pub struct StreamLoaderController { + channel_tx: Option>, + stream_shared: Option>, + file_size: usize, +} + + +impl StreamLoaderController { + pub fn len(&self) -> usize { + return self.file_size; + } + + pub fn range_available(&self, range: Range) -> bool { + if let Some(ref shared) = self.stream_shared { + let mut download_status = shared.download_status.lock().unwrap(); + if range.length <= download_status.downloaded.contained_length_from_value(range.start) { + return true; + } else { + return false; + } + } else { + if range.length <= self.len() - range.start { + return true; + } else { + return false; + } + } + } + + pub fn ping_time_ms(&self) -> usize { + if let Some(ref shared) = self.stream_shared { + return shared.ping_time_ms.load(atomic::Ordering::Relaxed); + } else { + return 0; + } + } + + fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) { + if let Some(ref mut channel) = self.channel_tx { + // ignore the error in case the channel has been closed already. + let _ = channel.unbounded_send(command); + } + } + + pub fn fetch(&mut self, range: Range) { + // signal the stream loader to fetch a range of the file + self.send_stream_loader_command(StreamLoaderCommand::Fetch(range)); + } + + pub fn fetch_blocking(&mut self, mut range: Range) { + // signal the stream loader to tech a range of the file and block until it is loaded. + + // ensure the range is within the file's bounds. + if range.start >= self.len() { + range.length = 0; + } else if range.end() > self.len() { + range.length = self.len() - range.start; + } + + self.fetch(range); + + if let Some(ref shared) = self.stream_shared { + let mut download_status = shared.download_status.lock().unwrap(); + while range.length > download_status.downloaded.contained_length_from_value(range.start) { + download_status = shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0; + if range.length > (download_status.downloaded.union(&download_status.requested).contained_length_from_value(range.start)) { + // For some reason, the requested range is neither downloaded nor requested. + // This could be due to a network error. Request it again. + // We can't use self.fetch here because self can't borrowed mutably, so we access the channel directly. + if let Some(ref mut channel) = self.channel_tx { + // ignore the error in case the channel has been closed already. + let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range)); + } + } + } + } + + } + + pub fn fetch_next(&mut self, length: usize) { + let range:Range = if let Some(ref shared) = self.stream_shared { + Range { + start: shared.read_position.load(atomic::Ordering::Relaxed), + length: length, + } + } else { + return; + }; + self.fetch(range); + } + + pub fn fetch_next_blocking(&mut self, length: usize) { + let range:Range = if let Some(ref shared) = self.stream_shared { + Range { + start: shared.read_position.load(atomic::Ordering::Relaxed), + length: length, + } + } else { + return; + }; + self.fetch_blocking(range); + } + + pub fn set_random_access_mode(&mut self) { + // optimise download strategy for random access + self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode()); + } + + pub fn set_stream_mode(&mut self) { + // optimise download strategy for streaming + self.send_stream_loader_command(StreamLoaderCommand::StreamMode()); + } + + pub fn set_stream_data_rate(&mut self, bytes_per_second: usize) { + // when optimising for streaming, assume a streaming rate of this many bytes per second. + self.send_stream_loader_command(StreamLoaderCommand::StreamDataRate(bytes_per_second)); + } + + pub fn close(&mut self) { + // terminate stream loading and don't load any more data for this file. + self.send_stream_loader_command(StreamLoaderCommand::Close()); + } + + +} + + pub struct AudioFileStreaming { read_file: fs::File, position: u64, - seek: mpsc::UnboundedSender, + + stream_loader_command_tx: mpsc::UnboundedSender, shared: Arc, } + +struct AudioFileDownloadStatus { + requested: RangeSet, + downloaded: RangeSet, +} + struct AudioFileShared { file_id: FileId, - chunk_count: usize, + file_size: usize, cond: Condvar, - bitmap: Mutex, + download_status: Mutex, + ping_time_ms: AtomicUsize, + read_position: AtomicUsize, } impl AudioFileOpenStreaming { fn finish(&mut self, size: usize) -> AudioFileStreaming { - let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE; let shared = Arc::new(AudioFileShared { file_id: self.file_id, - chunk_count: chunk_count, + file_size: size, cond: Condvar::new(), - bitmap: Mutex::new(BitSet::with_capacity(chunk_count)), + download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}), + ping_time_ms: AtomicUsize::new(0), + read_position: AtomicUsize::new(0), }); let mut write_file = NamedTempFile::new().unwrap(); @@ -66,16 +223,20 @@ impl AudioFileOpenStreaming { let read_file = write_file.reopen().unwrap(); - let data_rx = self.data_rx.take().unwrap(); + let initial_data_rx = self.initial_data_rx.take().unwrap(); + let initial_data_length = self.initial_data_length.take().unwrap(); let complete_tx = self.complete_tx.take().unwrap(); - let (seek_tx, seek_rx) = mpsc::unbounded(); + //let (seek_tx, seek_rx) = mpsc::unbounded(); + let (stream_loader_command_tx, stream_loader_command_rx) = mpsc::unbounded::(); let fetcher = AudioFileFetch::new( self.session.clone(), shared.clone(), - data_rx, + initial_data_rx, + self.initial_request_sent_time, + initial_data_length, write_file, - seek_rx, + stream_loader_command_rx, complete_tx, ); self.session.spawn(move |_| fetcher); @@ -84,7 +245,8 @@ impl AudioFileOpenStreaming { read_file: read_file, position: 0, - seek: seek_tx, + //seek: seek_tx, + stream_loader_command_tx: stream_loader_command_tx, shared: shared, } @@ -139,14 +301,17 @@ impl AudioFile { debug!("Downloading file {}", file_id); let (complete_tx, complete_rx) = oneshot::channel(); - let (headers, data) = request_chunk(session, file_id, 0).split(); + let initial_data_length = MINIMUM_CHUNK_SIZE; + let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); let open = AudioFileOpenStreaming { session: session.clone(), file_id: file_id, headers: headers, - data_rx: Some(data), + initial_data_rx: Some(data), + initial_data_length: Some(initial_data_length), + initial_request_sent_time: Instant::now(), complete_tx: Some(complete_tx), }; @@ -167,13 +332,36 @@ impl AudioFile { AudioFileOpen::Streaming(open) } + + pub fn get_stream_loader_controller(&self) -> StreamLoaderController { + match self { + AudioFile::Streaming(stream) => { + return StreamLoaderController { + channel_tx: Some(stream.stream_loader_command_tx.clone()), + stream_shared: Some(stream.shared.clone()), + file_size: stream.shared.file_size, + } + } + AudioFile::Cached(ref file) => { + return StreamLoaderController { + channel_tx: None, + stream_shared: None, + file_size: file.metadata().unwrap().len() as usize, + } + } + } + } } -fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel { - trace!("requesting chunk {}", index); - let start = (index * CHUNK_SIZE / 4) as u32; - let end = ((index + 1) * CHUNK_SIZE / 4) as u32; +fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { + trace!("requesting range starting at {} of length {}", offset, length); + + let start = offset / 4; + let mut end = (offset+length) / 4; + if (offset+length) % 4 != 0 { + end += 1; + } let (id, channel) = session.channel().allocate(); @@ -186,81 +374,405 @@ fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel { data.write_u32::(0x00009C40).unwrap(); data.write_u32::(0x00020000).unwrap(); data.write(&file.0).unwrap(); - data.write_u32::(start).unwrap(); - data.write_u32::(end).unwrap(); + data.write_u32::(start as u32).unwrap(); + data.write_u32::(end as u32).unwrap(); session.send_packet(0x8, data); channel } + + +struct PartialFileData { + offset: usize, + data: Bytes, +} + +enum ReceivedData { + ResponseTimeMs(usize), + Data(PartialFileData), +} + +struct AudioFileFetchDataReceiver { + shared: Arc, + file_data_tx: mpsc::UnboundedSender, + data_rx: ChannelData, + data_offset: usize, + request_length: usize, + request_sent_time: Option, +} + +impl AudioFileFetchDataReceiver { + fn new( + shared: Arc, + file_data_tx: mpsc::UnboundedSender, + data_rx: ChannelData, + data_offset: usize, + request_length: usize, + request_sent_time: Instant, + ) -> AudioFileFetchDataReceiver { + + AudioFileFetchDataReceiver { + shared: shared, + data_rx: data_rx, + file_data_tx: file_data_tx, + data_offset: data_offset, + request_length: request_length, + request_sent_time: Some(request_sent_time), + } + } +} + + + +impl AudioFileFetchDataReceiver { + fn finish(&mut self) { + if self.request_length > 0 { + + let missing_range = Range::new(self.data_offset, self.request_length); + + let mut download_status = self.shared.download_status.lock().unwrap(); + download_status.requested.subtract_range(&missing_range); + self.shared.cond.notify_all(); + } + } +} + +impl Future for AudioFileFetchDataReceiver { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + loop { + trace!("Looping data_receiver for offset {} and length {}", self.data_offset, self.request_length); + match self.data_rx.poll() { + Ok(Async::Ready(Some(data))) => { + if let Some(request_sent_time) = self.request_sent_time { + let duration = Instant::now() - request_sent_time; + let mut duration_ms: u64; + if duration.as_secs() > MAXIMUM_ASSUMED_PING_TIME_SECONDS { + duration_ms = MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000; + }else { + duration_ms = duration.as_secs() *1000 + duration.subsec_millis() as u64; + } + let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); + } + let data_size = data.len(); + trace!("data_receiver got {} bytes of data", data_size); + let _ = self.file_data_tx.unbounded_send(ReceivedData::Data(PartialFileData { offset: self.data_offset, data: data, })); + self.data_offset += data_size; + if self.request_length < data_size { + warn!("Received more data from server than requested."); + self.request_length = 0; + } else { + self.request_length -= data_size; + } + if self.request_length == 0 { + trace!("Data receiver completed at position {}", self.data_offset); + return Ok(Async::Ready(())); + } + } + Ok(Async::Ready(None)) => { + if self.request_length > 0 { + warn!("Received less data from server than requested."); + self.finish(); + } + return Ok(Async::Ready(())); + } + Ok(Async::NotReady) => { + //trace!("No more data for data_receiver at the moment."); + return Ok(Async::NotReady); + } + Err(ChannelError) => { + warn!("error from channel"); + self.finish(); + return Ok(Async::Ready(())); + } + } + } + } +} + + +enum DownloadStrategy { + RandomAccess(), + Streaming(), +} + struct AudioFileFetch { session: Session, shared: Arc, output: Option, - index: usize, - data_rx: ChannelData, + file_data_tx: mpsc::UnboundedSender, + file_data_rx: mpsc::UnboundedReceiver, - seek_rx: mpsc::UnboundedReceiver, + stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: Option>, + download_strategy: DownloadStrategy, + streaming_data_rate: usize, + network_response_times_ms: Vec, } impl AudioFileFetch { fn new( session: Session, shared: Arc, - data_rx: ChannelData, + initial_data_rx: ChannelData, + initial_request_sent_time: Instant, + initial_data_length: usize, + output: NamedTempFile, - seek_rx: mpsc::UnboundedReceiver, + stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: oneshot::Sender, ) -> AudioFileFetch { + + let (file_data_tx, file_data_rx) = unbounded::(); + + { + let requested_range = Range::new(0, initial_data_length); + let mut download_status = shared.download_status.lock().unwrap(); + download_status.requested.add_range(&requested_range); + } + + + let initial_data_receiver = AudioFileFetchDataReceiver::new( + shared.clone(), + file_data_tx.clone(), + initial_data_rx, + 0, + initial_data_length, + initial_request_sent_time, + ); + + session.spawn(move |_| initial_data_receiver); + AudioFileFetch { session: session, shared: shared, output: Some(output), - index: 0, - data_rx: data_rx, + file_data_tx: file_data_tx, + file_data_rx: file_data_rx, - seek_rx: seek_rx, + stream_loader_command_rx: stream_loader_command_rx, complete_tx: Some(complete_tx), + download_strategy: DownloadStrategy::RandomAccess(), // start with random access mode until someone tells us otherwise + streaming_data_rate: 40, // assume 360 kbit per second unless someone tells us otherwise. + network_response_times_ms: Vec::new(), } } - fn download(&mut self, mut new_index: usize) { - assert!(new_index < self.shared.chunk_count); + fn download_range(&mut self, mut offset: usize, mut length: usize) { + if length < MINIMUM_CHUNK_SIZE { + length = MINIMUM_CHUNK_SIZE; + } + + // ensure the values are within the bounds and align them by 4 for the spotify protocol. + if offset >= self.shared.file_size { + return; + } + + if length <= 0 { + return; + } + + if offset + length > self.shared.file_size { + length = self.shared.file_size - offset; + } + + if offset % 4 != 0 { + length += offset % 4; + offset -= offset % 4; + } + + if length % 4 != 0 { + length += 4 - (length % 4); + } + + let mut ranges_to_request = RangeSet::new(); + ranges_to_request.add_range(&Range::new(offset, length)); + + let mut download_status = self.shared.download_status.lock().unwrap(); + + ranges_to_request.subtract_range_set(&download_status.downloaded); + ranges_to_request.subtract_range_set(&download_status.requested); + + + for range in ranges_to_request.iter() { + let (_headers, data) = request_range(&self.session, self.shared.file_id, range.start, range.length).split(); + + download_status.requested.add_range(range); + + + let receiver = AudioFileFetchDataReceiver::new( + self.shared.clone(), + self.file_data_tx.clone(), + data, + range.start, + range.length, + Instant::now(), + ); + + self.session.spawn(move |_| receiver); + } + + } + + fn pre_fetch_more_data(&mut self) { + + // determine what is still missing + let mut missing_data = RangeSet::new(); + missing_data.add_range(&Range::new(0,self.shared.file_size)); { - let bitmap = self.shared.bitmap.lock().unwrap(); - while bitmap.contains(new_index) { - new_index = (new_index + 1) % self.shared.chunk_count; + let download_status = self.shared.download_status.lock().unwrap(); + missing_data.subtract_range_set(&download_status.downloaded); + missing_data.subtract_range_set(&download_status.requested); + } + + // download data from after the current read position first + let mut tail_end = RangeSet::new(); + let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed); + tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position)); + let tail_end = tail_end.intersection(&missing_data); + + if ! tail_end.is_empty() { + let range = tail_end.get_range(0); + let offset = range.start; + let length = min(range.length, MAXIMUM_CHUNK_SIZE); + self.download_range(offset, length); + + } else if ! missing_data.is_empty() { + // ok, the tail is downloaded, download something fom the beginning. + let range = missing_data.get_range(0); + let offset = range.start; + let length = min(range.length, MAXIMUM_CHUNK_SIZE); + self.download_range(offset, length); + } + + } + + fn poll_file_data_rx(&mut self) -> Poll<(), ()> { + + loop { + match self.file_data_rx.poll() { + Ok(Async::Ready(None)) => { + trace!("File data channel closed."); + return Ok(Async::Ready(())); + } + Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => { + trace!("Received ping time information: {} ms.", response_time_ms); + + // record the response time + self.network_response_times_ms.push(response_time_ms); + + // prone old response times. Keep at most three. + while self.network_response_times_ms.len() > 3 { + self.network_response_times_ms.remove(0); + } + + // stats::median is experimental. So we calculate the median of up to three ourselves. + let ping_time_ms: usize = match self.network_response_times_ms.len() { + 1 => self.network_response_times_ms[0] as usize, + 2 => ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2) as usize, + 3 => { + let mut times = self.network_response_times_ms.clone(); + times.sort(); + times[1] + } + _ => unreachable!(), + }; + + // store our new estimate for everyone to see + self.shared.ping_time_ms.store(ping_time_ms, atomic::Ordering::Relaxed); + + }, + Ok(Async::Ready(Some(ReceivedData::Data(data)))) => { + + trace!("Writing data to file: offset {}, length {}", data.offset, data.data.len()); + + self.output + .as_mut() + .unwrap() + .seek(SeekFrom::Start(data.offset as u64)) + .unwrap(); + self.output.as_mut().unwrap().write_all(data.data.as_ref()).unwrap(); + + + + let mut full = false; + + { + let mut download_status = self.shared.download_status.lock().unwrap(); + + let received_range = Range::new(data.offset, data.data.len()); + download_status.downloaded.add_range(&received_range); + self.shared.cond.notify_all(); + + if download_status.downloaded.contained_length_from_value(0) >= self.shared.file_size { + full = true; + } + drop(download_status); + } + + if full { + self.finish(); + return Ok(Async::Ready(())); + } + + + } + Ok(Async::NotReady) => { + return Ok(Async::NotReady); + }, + Err(()) => unreachable!(), + } + + } + + } + + + fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> { + + loop { + match self.stream_loader_command_rx.poll() { + Ok(Async::Ready(None)) => {} + Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => { + self.download_range(request.start, request.length); + } + Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => { + self.download_strategy = DownloadStrategy::RandomAccess(); + } + Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => { + self.download_strategy = DownloadStrategy::Streaming(); + } + Ok(Async::Ready(Some(StreamLoaderCommand::StreamDataRate(rate)))) => { + self.streaming_data_rate = rate; + } + Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => { + return Ok(Async::Ready(())); + } + Ok(Async::NotReady) => { + return Ok(Async::NotReady) + }, + Err(()) => unreachable!(), } } - if self.index != new_index { - self.index = new_index; - - let offset = self.index * CHUNK_SIZE; - - self.output - .as_mut() - .unwrap() - .seek(SeekFrom::Start(offset as u64)) - .unwrap(); - - let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split(); - self.data_rx = data; - } } fn finish(&mut self) { + trace!("====== FINISHED DOWNLOADING FILE! ======"); let mut output = self.output.take().unwrap(); let complete_tx = self.complete_tx.take().unwrap(); output.seek(SeekFrom::Start(0)).unwrap(); let _ = complete_tx.send(output); } + } impl Future for AudioFileFetch { @@ -268,80 +780,92 @@ impl Future for AudioFileFetch { type Error = (); fn poll(&mut self) -> Poll<(), ()> { - loop { - let mut progress = false; - match self.seek_rx.poll() { - Ok(Async::Ready(None)) => { - return Ok(Async::Ready(())); - } - Ok(Async::Ready(Some(offset))) => { - progress = true; - let index = offset as usize / CHUNK_SIZE; - self.download(index); - } - Ok(Async::NotReady) => (), - Err(()) => unreachable!(), + trace!("Polling AudioFileFetch"); + + match self.poll_stream_loader_command_rx() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(_)) => { + return Ok(Async::Ready(())); } + Err(()) => unreachable!(), + } - match self.data_rx.poll() { - Ok(Async::Ready(Some(data))) => { - progress = true; - - self.output.as_mut().unwrap().write_all(data.as_ref()).unwrap(); - } - Ok(Async::Ready(None)) => { - progress = true; - - trace!("chunk {} / {} complete", self.index, self.shared.chunk_count); - - let full = { - let mut bitmap = self.shared.bitmap.lock().unwrap(); - bitmap.insert(self.index as usize); - self.shared.cond.notify_all(); - - bitmap.len() >= self.shared.chunk_count - }; - - if full { - self.finish(); - return Ok(Async::Ready(())); - } - - let new_index = (self.index + 1) % self.shared.chunk_count; - self.download(new_index); - } - Ok(Async::NotReady) => (), - Err(ChannelError) => { - warn!("error from channel"); - return Ok(Async::Ready(())); - } + match self.poll_file_data_rx() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(_)) => { + return Ok(Async::Ready(())); } + Err(()) => unreachable!(), + } - if !progress { - return Ok(Async::NotReady); + + if let DownloadStrategy::Streaming() = self.download_strategy { + let bytes_pending: usize = { + let download_status = self.shared.download_status.lock().unwrap(); + download_status.requested.minus(&download_status.downloaded).len() + }; + + let ping_time = self.shared.ping_time_ms.load(atomic::Ordering::Relaxed); + + if bytes_pending < 2 * ping_time * self.streaming_data_rate { + self.pre_fetch_more_data(); } } + + + return Ok(Async::NotReady) } } impl Read for AudioFileStreaming { fn read(&mut self, output: &mut [u8]) -> io::Result { - let index = self.position as usize / CHUNK_SIZE; - let offset = self.position as usize % CHUNK_SIZE; - let len = min(output.len(), CHUNK_SIZE - offset); + let offset = self.position as usize; - let mut bitmap = self.shared.bitmap.lock().unwrap(); - while !bitmap.contains(index) { - bitmap = self.shared.cond.wait(bitmap).unwrap(); + if offset >= self.shared.file_size { + return Ok(0); } - drop(bitmap); - let read_len = try!(self.read_file.read(&mut output[..len])); + let length = min(output.len(), self.shared.file_size - offset); + + if length == 0 { + return Ok(0); + } + + + + let mut ranges_to_request = RangeSet::new(); + ranges_to_request.add_range(&Range::new(offset, length)); + + + let mut download_status = self.shared.download_status.lock().unwrap(); + ranges_to_request.subtract_range_set(&download_status.downloaded); + ranges_to_request.subtract_range_set(&download_status.requested); + + + for range in ranges_to_request.iter() { + debug!("requesting data at position {} (length : {})", range.start, range.length); + self.stream_loader_command_tx.unbounded_send(StreamLoaderCommand::Fetch(range.clone())).unwrap(); + } + + while !download_status.downloaded.contains(offset) { + download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0; + } + let available_length = download_status.downloaded.contained_length_from_value(offset); + assert!(available_length > 0); + drop(download_status); + + + self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap(); + let read_len = min(length, available_length); + let read_len = try!(self.read_file.read(&mut output[..read_len])); + self.position += read_len as u64; + self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed); - Ok(read_len) + + return Ok(read_len); } } @@ -349,15 +873,7 @@ impl Seek for AudioFileStreaming { fn seek(&mut self, pos: SeekFrom) -> io::Result { self.position = try!(self.read_file.seek(pos)); // Do not seek past EOF - if (self.position as usize % CHUNK_SIZE) != 0 { - // Notify the fetch thread to get the correct block - // This can fail if fetch thread has completed, in which case the - // block is ready. Just ignore the error. - let _ = self.seek.unbounded_send(self.position); - } else { - warn!("Trying to seek past EOF"); - } - + self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed); Ok(self.position) } } diff --git a/audio/src/lib.rs b/audio/src/lib.rs index 5b582dc..f316e14 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -5,10 +5,12 @@ extern crate log; extern crate bit_set; extern crate byteorder; +extern crate bytes; extern crate crypto; extern crate num_bigint; extern crate num_traits; extern crate tempfile; +extern crate tokio; extern crate librespot_core as core; @@ -20,10 +22,13 @@ mod lewton_decoder; #[cfg(any(feature = "with-tremor", feature = "with-vorbis"))] mod libvorbis_decoder; +mod range_set; + pub use decrypt::AudioDecrypt; -pub use fetch::{AudioFile, AudioFileOpen}; +pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController}; #[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))] pub use lewton_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; #[cfg(any(feature = "with-tremor", feature = "with-vorbis"))] pub use libvorbis_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; + diff --git a/audio/src/range_set.rs b/audio/src/range_set.rs new file mode 100644 index 0000000..378725f --- /dev/null +++ b/audio/src/range_set.rs @@ -0,0 +1,241 @@ + +use std::cmp::{max,min}; +use std::slice::Iter; + + + +#[derive(Copy, Clone)] +pub struct Range { + pub start: usize, + pub length: usize, +} + +impl Range { + + pub fn new(start: usize, length: usize) -> Range { + return Range { + start: start, + length: length, + } + } + + pub fn end(&self) -> usize { + return self.start + self.length; + } + +} + +#[derive(Clone)] +pub struct RangeSet { + ranges: Vec, +} + + +impl RangeSet { + pub fn new() -> RangeSet { + RangeSet{ + ranges: Vec::::new(), + } + } + + pub fn is_empty(&self) -> bool { + return self.ranges.is_empty(); + } + + pub fn len(&self) -> usize { + let mut result = 0; + for range in self.ranges.iter() { + result += range.length; + } + return result; + } + + pub fn get_range(&self, index: usize) -> Range { + return self.ranges[index].clone(); + } + + pub fn iter(&self) -> Iter { + return self.ranges.iter(); + } + + pub fn contains(&self, value: usize) -> bool { + for range in self.ranges.iter() { + if value < range.start { + return false; + } else if range.start <= value && value < range.end() { + return true; + } + } + return false; + } + + pub fn contained_length_from_value(&self, value: usize) -> usize { + for range in self.ranges.iter() { + if value < range.start { + return 0; + } else if range.start <= value && value < range.end() { + return range.end() - value; + } + } + return 0; + + } + + #[allow(dead_code)] + pub fn contains_range_set(&self, other: &RangeSet) -> bool { + for range in other.ranges.iter() { + if self.contained_length_from_value(range.start) < range.length { + return false; + } + } + return true; + } + + + pub fn add_range(&mut self, range:&Range) { + + if range.length <= 0 { + // the interval is empty or invalid -> nothing to do. + return; + } + + + for index in 0..self.ranges.len() { + // the new range is clear of any ranges we already iterated over. + if range.end() < self.ranges[index].start{ + // the new range starts after anything we already passed and ends before the next range starts (they don't touch) -> insert it. + self.ranges.insert(index, range.clone()); + return; + + } else if range.start <= self.ranges[index].end() && self.ranges[index].start <= range.end() { + // the new range overlaps (or touches) the first range. They are to be merged. + // In addition we might have to merge further ranges in as well. + + let mut new_range = range.clone(); + + while index < self.ranges.len() && self.ranges[index].start <= new_range.end() { + let new_end = max(new_range.end(), self.ranges[index].end()); + new_range.start = min(new_range.start, self.ranges[index].start); + new_range.length = new_end - new_range.start; + self.ranges.remove(index); + } + + self.ranges.insert(index, new_range); + return; + + } + } + + // the new range is after everything else -> just add it + self.ranges.push(range.clone()); + } + + #[allow(dead_code)] + pub fn add_range_set(&mut self, other: &RangeSet) { + for range in other.ranges.iter() { + self.add_range(range); + } + } + + #[allow(dead_code)] + pub fn union(&self, other: &RangeSet) -> RangeSet { + let mut result = self.clone(); + result.add_range_set(other); + return result; + } + + pub fn subtract_range(&mut self, range: &Range) { + + if range.length <= 0 { + return; + } + + for index in 0..self.ranges.len() { + // the ranges we already passed don't overlap with the range to remove + + if range.end() <= self.ranges[index].start { + // the remaining ranges are past the one to subtract. -> we're done. + return + + } else if range.start <= self.ranges[index].start && self.ranges[index].start < range.end() { + // the range to subtract started before the current range and reaches into the current range + // -> we have to remove the beginning of the range or the entire range and do the same for following ranges. + + while index < self.ranges.len() && self.ranges[index].end() <= range.end() { + self.ranges.remove(index); + } + + if index < self.ranges.len() && self.ranges[index].start < range.end() { + self.ranges[index].start = range.end(); + } + + return; + + } else if range.end() < self.ranges[index].end() { + // the range to subtract punches a hole into the current range -> we need to create two smaller ranges. + + let first_range = Range { + start: self.ranges[index].start, + length: range.start - self.ranges[index].start, + }; + + self.ranges[index].start = range.end(); + + self.ranges.insert(index, first_range); + + return; + + } else if range.start < self.ranges[index].end() { + // the range truncates the existing range -> truncate the range. Let the for loop take care of overlaps with other ranges. + self.ranges[index].length = range.start - self.ranges[index].start; + + } + } + } + + pub fn subtract_range_set(&mut self, other: &RangeSet) { + for range in other.ranges.iter() { + self.subtract_range(range); + } + } + + pub fn minus(&self, other: &RangeSet) -> RangeSet { + let mut result = self.clone(); + result.subtract_range_set(other); + return result; + } + + pub fn intersection(&self, other: &RangeSet) -> RangeSet { + let mut result = RangeSet::new(); + + let mut self_index: usize = 0; + let mut other_index: usize = 0; + + while self_index < self.ranges.len() && other_index < other.ranges.len() { + if self.ranges[self_index].end() <= other.ranges[other_index].start { + // skip the interval + self_index += 1; + } else if other.ranges[other_index].end() <= self.ranges[self_index].start { + // skip the interval + other_index += 1; + } else { + // the two intervals overlap. Add the union and advance the index of the one that ends first. + let new_start = max(self.ranges[self_index].start, other.ranges[other_index].start); + let new_end = min(self.ranges[self_index].end(), other.ranges[other_index].end()); + assert!(new_start <= new_end); + result.add_range(&Range::new(new_start, new_end-new_start)); + if self.ranges[self_index].end() <= other.ranges[other_index].end() { + self_index += 1; + } else { + other_index += 1; + } + + } + + } + + return result; + } + +} + diff --git a/core/src/channel.rs b/core/src/channel.rs index 57655fe..3238a0a 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -59,6 +59,8 @@ impl ChannelManager { let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref()); + trace!("Received data for channel {}: {} bytes.", id, data.len()); + self.lock(|inner| { if let Entry::Occupied(entry) = inner.channels.entry(id) { let _ = entry.get().unbounded_send((cmd, data)); diff --git a/playback/src/player.rs b/playback/src/player.rs index a421c9a..5d0e58a 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -14,12 +14,14 @@ use config::{Bitrate, PlayerConfig}; use core::session::Session; use core::spotify_id::SpotifyId; -use audio::{AudioDecrypt, AudioFile}; +use audio::{AudioDecrypt, AudioFile, StreamLoaderController}; use audio::{VorbisDecoder, VorbisPacket}; use audio_backend::Sink; use metadata::{FileFormat, Metadata, Track}; use mixer::AudioFilter; + + pub struct Player { commands: Option>, thread_handle: Option>, @@ -202,12 +204,14 @@ enum PlayerState { decoder: Decoder, end_of_track: oneshot::Sender<()>, normalisation_factor: f32, + stream_loader_controller: StreamLoaderController, }, Playing { track_id: SpotifyId, decoder: Decoder, end_of_track: oneshot::Sender<()>, normalisation_factor: f32, + stream_loader_controller: StreamLoaderController, }, EndOfTrack { track_id: SpotifyId, @@ -234,6 +238,15 @@ impl PlayerState { } } + fn stream_loader_controller(&mut self) -> Option<&mut StreamLoaderController> { + use self::PlayerState::*; + match *self { + Stopped | EndOfTrack { .. } => None, + Paused { ref mut stream_loader_controller, .. } | Playing { ref mut stream_loader_controller, .. } => Some(stream_loader_controller), + Invalid => panic!("invalid state"), + } + } + fn playing_to_end_of_track(&mut self) { use self::PlayerState::*; match mem::replace(self, Invalid) { @@ -257,12 +270,14 @@ impl PlayerState { decoder, end_of_track, normalisation_factor, + stream_loader_controller, } => { *self = Playing { track_id: track_id, decoder: decoder, end_of_track: end_of_track, normalisation_factor: normalisation_factor, + stream_loader_controller: stream_loader_controller, }; } _ => panic!("invalid state"), @@ -277,12 +292,14 @@ impl PlayerState { decoder, end_of_track, normalisation_factor, + stream_loader_controller, } => { *self = Paused { track_id: track_id, decoder: decoder, end_of_track: end_of_track, normalisation_factor: normalisation_factor, + stream_loader_controller: stream_loader_controller, }; } _ => panic!("invalid state"), @@ -403,7 +420,7 @@ impl PlayerInternal { } match self.load_track(track_id, position as i64) { - Some((decoder, normalisation_factor)) => { + Some((decoder, normalisation_factor, stream_loader_controller)) => { if play { match self.state { PlayerState::Playing { @@ -427,6 +444,7 @@ impl PlayerInternal { decoder: decoder, end_of_track: end_of_track, normalisation_factor: normalisation_factor, + stream_loader_controller: stream_loader_controller, }; } else { self.state = PlayerState::Paused { @@ -434,6 +452,7 @@ impl PlayerInternal { decoder: decoder, end_of_track: end_of_track, normalisation_factor: normalisation_factor, + stream_loader_controller: stream_loader_controller, }; match self.state { PlayerState::Playing { @@ -460,6 +479,9 @@ impl PlayerInternal { } PlayerCommand::Seek(position) => { + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + stream_loader_controller.set_random_access_mode(); + } if let Some(decoder) = self.state.decoder() { match decoder.seek(position as i64) { Ok(_) => (), @@ -468,6 +490,17 @@ impl PlayerInternal { } else { warn!("Player::seek called from invalid state"); } + + // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun. + let stream_data_rate = self.stream_data_rate(); + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + stream_loader_controller.set_stream_mode(); + if let PlayerState::Playing{..} = self.state { + let wait_for_data_length = (2 * stream_loader_controller.ping_time_ms() * stream_data_rate) / 1000; + stream_loader_controller.fetch_next_blocking(wait_for_data_length); + } + } + } PlayerCommand::Play => { @@ -526,7 +559,15 @@ impl PlayerInternal { } } - fn load_track(&self, track_id: SpotifyId, position: i64) -> Option<(Decoder, f32)> { + fn stream_data_rate(&self) -> usize { + match self.config.bitrate { + Bitrate::Bitrate96 => 12 * 1024, + Bitrate::Bitrate160 => 20 * 1024, + Bitrate::Bitrate320 => 40 * 1024, + } + } + + fn load_track(&self, track_id: SpotifyId, position: i64) -> Option<(Decoder, f32, StreamLoaderController)> { let track = Track::get(&self.session, track_id).wait().unwrap(); info!( @@ -565,6 +606,21 @@ impl PlayerInternal { let encrypted_file = encrypted_file.wait().unwrap(); + + let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(); + + // tell the stream loader how to optimise its strategy. + stream_loader_controller.set_stream_data_rate(self.stream_data_rate()); + + if position == 0 { + // No need to seek -> we stream from the beginning + stream_loader_controller.set_stream_mode(); + } else { + // we need to seek -> we set stream mode after the initial seek. + stream_loader_controller.set_random_access_mode(); + } + + let key = key.wait().unwrap(); let mut decrypted_file = AudioDecrypt::new(key, encrypted_file); @@ -585,11 +641,12 @@ impl PlayerInternal { Ok(_) => (), Err(err) => error!("Vorbis error: {:?}", err), } + stream_loader_controller.set_stream_mode(); } info!("Track \"{}\" loaded", track.name); - Some((decoder, normalisation_factor)) + Some((decoder, normalisation_factor, stream_loader_controller)) } } From bf47ca70332fb2853f949566afdacb6f6476fcde Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Sat, 2 Nov 2019 06:48:18 +1100 Subject: [PATCH 02/19] some debug messages --- audio/src/fetch.rs | 67 ++++++++++++++++++++++++++++++++++++++++++ playback/src/player.rs | 27 +++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index ffdbe4b..d36df09 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -1,3 +1,4 @@ +//use bit_set::BitSet; use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use bytes::Bytes; use futures::sync::{mpsc, oneshot}; @@ -301,8 +302,10 @@ impl AudioFile { debug!("Downloading file {}", file_id); let (complete_tx, complete_rx) = oneshot::channel(); + debug!("calling request_chunk"); let initial_data_length = MINIMUM_CHUNK_SIZE; let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); + debug!("returned from request_chunk"); let open = AudioFileOpenStreaming { session: session.clone(), @@ -316,6 +319,7 @@ impl AudioFile { complete_tx: Some(complete_tx), }; + debug!("cloning into cache session"); let session_ = session.clone(); session.spawn(move |_| { complete_rx @@ -330,6 +334,7 @@ impl AudioFile { .or_else(|oneshot::Canceled| Ok(())) }); + debug!("returning open stream"); AudioFileOpen::Streaming(open) } @@ -365,6 +370,8 @@ fn request_range(session: &Session, file: FileId, offset: usize, length: usize) let (id, channel) = session.channel().allocate(); + trace!("allocated channel {}", id); + let mut data: Vec = Vec::new(); data.write_u16::(id).unwrap(); data.write_u8(0).unwrap(); @@ -382,6 +389,32 @@ fn request_range(session: &Session, file: FileId, offset: usize, length: usize) channel } +//fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel { +// trace!("requesting chunk {}", index); +// +// let start = (index * CHUNK_SIZE / 4) as u32; +// let end = ((index + 1) * CHUNK_SIZE / 4) as u32; +// +// let (id, channel) = session.channel().allocate(); +// +// trace!("allocated channel {}", id); +// +// let mut data: Vec = Vec::new(); +// data.write_u16::(id).unwrap(); +// data.write_u8(0).unwrap(); +// data.write_u8(1).unwrap(); +// data.write_u16::(0x0000).unwrap(); +// data.write_u32::(0x00000000).unwrap(); +// data.write_u32::(0x00009C40).unwrap(); +// data.write_u32::(0x00020000).unwrap(); +// data.write(&file.0).unwrap(); +// data.write_u32::(start).unwrap(); +// data.write_u32::(end).unwrap(); +// +// session.send_packet(0x8, data); +// +// channel +//} struct PartialFileData { @@ -508,6 +541,7 @@ struct AudioFileFetch { file_data_tx: mpsc::UnboundedSender, file_data_rx: mpsc::UnboundedReceiver, + //seek_rx: mpsc::UnboundedReceiver, stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: Option>, download_strategy: DownloadStrategy, @@ -654,6 +688,35 @@ impl AudioFileFetch { } +// fn download(&mut self, mut new_index: usize) { +// assert!(new_index < self.shared.chunk_count); +// +// { +// let download_status = self.shared.download_status.lock().unwrap(); +// while download_status.downloaded.contains(new_index) { +// new_index = (new_index + 1) % self.shared.chunk_count; +// debug!("Download iterated to new_index {}", new_index); +// } +// } +// +// trace!("== download called for chunk {} of {}", new_index, self.shared.chunk_count); +// +// if self.index != new_index { +// self.index = new_index; +// +// let offset = self.index * CHUNK_SIZE; +// +// self.output +// .as_mut() +// .unwrap() +// .seek(SeekFrom::Start(offset as u64)) +// .unwrap(); +// +// let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split(); +// self.data_rx = data; +// } +// } + fn poll_file_data_rx(&mut self) -> Poll<(), ()> { loop { @@ -837,6 +900,7 @@ impl Read for AudioFileStreaming { let mut ranges_to_request = RangeSet::new(); ranges_to_request.add_range(&Range::new(offset, length)); + debug!("reading at postion {} (length : {})", offset, length); let mut download_status = self.shared.download_status.lock().unwrap(); ranges_to_request.subtract_range_set(&download_status.downloaded); @@ -849,7 +913,9 @@ impl Read for AudioFileStreaming { } while !download_status.downloaded.contains(offset) { + debug!("waiting for download"); download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0; + debug!("re-checking data availability at offset {}.", offset); } let available_length = download_status.downloaded.contained_length_from_value(offset); assert!(available_length > 0); @@ -860,6 +926,7 @@ impl Read for AudioFileStreaming { let read_len = min(length, available_length); let read_len = try!(self.read_file.read(&mut output[..read_len])); + debug!("read at postion {} (length : {})", offset, read_len); self.position += read_len as u64; self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed); diff --git a/playback/src/player.rs b/playback/src/player.rs index 5d0e58a..1e6db12 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -576,6 +576,8 @@ impl PlayerInternal { track_id.to_base62() ); + info!("find_available_alternative"); + let track = match self.find_available_alternative(&track) { Some(track) => track, None => { @@ -584,12 +586,17 @@ impl PlayerInternal { } }; + info!("config.bitrate"); + + let format = match self.config.bitrate { Bitrate::Bitrate96 => FileFormat::OGG_VORBIS_96, Bitrate::Bitrate160 => FileFormat::OGG_VORBIS_160, Bitrate::Bitrate320 => FileFormat::OGG_VORBIS_320, }; + info!("file_id"); + let file_id = match track.files.get(&format) { Some(&file_id) => file_id, None => { @@ -598,13 +605,22 @@ impl PlayerInternal { } }; + info!("key"); + let key = self .session .audio_key() .request(track.id, file_id); + //.wait() + //.unwrap() + + info!("encrypted_file"); + let encrypted_file = AudioFile::open(&self.session, file_id); + info!("waiting for encrypted_file"); + let encrypted_file = encrypted_file.wait().unwrap(); let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(); @@ -621,9 +637,16 @@ impl PlayerInternal { } + + info!("wait for key"); let key = key.wait().unwrap(); + + info!("decrypted_file"); + let mut decrypted_file = AudioDecrypt::new(key, encrypted_file); + info!("normalisation_factor"); + let normalisation_factor = match NormalisationData::parse_from_file(&mut decrypted_file) { Ok(normalisation_data) => NormalisationData::get_factor(&self.config, normalisation_data), Err(_) => { @@ -632,8 +655,12 @@ impl PlayerInternal { } }; + info!("new Subfile"); + let audio_file = Subfile::new(decrypted_file, 0xa7); + info!("new VorbisDecoder"); + let mut decoder = VorbisDecoder::new(audio_file).unwrap(); if position != 0 { From 971b2a9b9fdd91d3e9e68c2d51ae1190e7b67f2b Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Sat, 2 Nov 2019 08:38:46 +1100 Subject: [PATCH 03/19] Fix compile issues after merge --- Cargo.lock | 1 + audio/Cargo.toml | 1 + audio/src/fetch.rs | 6 +++--- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 602b6ab..0f84b3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -843,6 +843,7 @@ dependencies = [ "aes-ctr 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "bit-set 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "lewton 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", "librespot-core 0.1.0", diff --git a/audio/Cargo.toml b/audio/Cargo.toml index 49902c5..a7237da 100644 --- a/audio/Cargo.toml +++ b/audio/Cargo.toml @@ -9,6 +9,7 @@ path = "../core" [dependencies] bit-set = "0.5" byteorder = "1.3" +bytes = "0.4" futures = "0.1" lewton = "0.9" log = "0.4" diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index c464a9a..9bde9c3 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -67,7 +67,7 @@ impl StreamLoaderController { pub fn range_available(&self, range: Range) -> bool { if let Some(ref shared) = self.stream_shared { - let mut download_status = shared.download_status.lock().unwrap(); + let download_status = shared.download_status.lock().unwrap(); if range.length <= download_status.downloaded.contained_length_from_value(range.start) { return true; } else { @@ -218,7 +218,7 @@ impl AudioFileOpenStreaming { }); let mut write_file = NamedTempFile::new().unwrap(); - write_file.set_len(size as u64).unwrap(); + write_file.as_file().set_len(size as u64).unwrap(); write_file.seek(SeekFrom::Start(0)).unwrap(); let read_file = write_file.reopen().unwrap(); @@ -450,7 +450,7 @@ impl Future for AudioFileFetchDataReceiver { Ok(Async::Ready(Some(data))) => { if let Some(request_sent_time) = self.request_sent_time { let duration = Instant::now() - request_sent_time; - let mut duration_ms: u64; + let duration_ms: u64; if duration.as_secs() > MAXIMUM_ASSUMED_PING_TIME_SECONDS { duration_ms = MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000; }else { From 5ad6446616d203993ec3d4a0715418f170826f84 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Sat, 2 Nov 2019 09:22:07 +1100 Subject: [PATCH 04/19] remove compiler warning --- playback/src/player.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/playback/src/player.rs b/playback/src/player.rs index 2e47c84..7d26ecf 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -495,7 +495,9 @@ impl PlayerInternal { let stream_data_rate = self.stream_data_rate(); if let Some(stream_loader_controller) = self.state.stream_loader_controller() { stream_loader_controller.set_stream_mode(); - if let PlayerState::Playing{..} = self.state { + } + if let PlayerState::Playing{..} = self.state { + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { let wait_for_data_length = (2 * stream_loader_controller.ping_time_ms() * stream_data_rate) / 1000; stream_loader_controller.fetch_next_blocking(wait_for_data_length); } From 216bdc0f6fe8c38fc328a5715837cc4c6bd41e30 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Sat, 2 Nov 2019 10:00:08 +1100 Subject: [PATCH 05/19] Fix typo --- audio/src/fetch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index e24b2ff..0ff8a5a 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -731,7 +731,7 @@ impl AudioFileFetch { // record the response time self.network_response_times_ms.push(response_time_ms); - // prone old response times. Keep at most three. + // prune old response times. Keep at most three. while self.network_response_times_ms.len() > 3 { self.network_response_times_ms.remove(0); } From 9f3e3d09d982667cf8e6439ffe3973706534a59e Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Sat, 2 Nov 2019 11:04:46 +1100 Subject: [PATCH 06/19] Fix infinite loop bug --- audio/src/fetch.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 0ff8a5a..67ccdbb 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -302,10 +302,10 @@ impl AudioFile { debug!("Downloading file {}", file_id); let (complete_tx, complete_rx) = oneshot::channel(); - debug!("calling request_chunk"); + trace!("calling request_chunk"); let initial_data_length = MINIMUM_CHUNK_SIZE; let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); - debug!("returned from request_chunk"); + trace!("returned from request_chunk"); let open = AudioFileOpenStreaming { session: session.clone(), @@ -319,7 +319,7 @@ impl AudioFile { complete_tx: Some(complete_tx), }; - debug!("cloning into cache session"); + trace!("cloning into cache session"); let session_ = session.clone(); session.spawn(move |_| { complete_rx @@ -334,7 +334,7 @@ impl AudioFile { .or_else(|oneshot::Canceled| Ok(())) }); - debug!("returning open stream"); + trace!("returning open stream"); AudioFileOpen::Streaming(open) } @@ -802,7 +802,9 @@ impl AudioFileFetch { loop { match self.stream_loader_command_rx.poll() { - Ok(Async::Ready(None)) => {} + Ok(Async::Ready(None)) => { + return Ok(Async::Ready(())); + } Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => { self.download_range(request.start, request.length); } @@ -900,7 +902,7 @@ impl Read for AudioFileStreaming { let mut ranges_to_request = RangeSet::new(); ranges_to_request.add_range(&Range::new(offset, length)); - debug!("reading at postion {} (length : {})", offset, length); + trace!("reading at postion {} (length : {})", offset, length); let mut download_status = self.shared.download_status.lock().unwrap(); ranges_to_request.subtract_range_set(&download_status.downloaded); @@ -908,14 +910,14 @@ impl Read for AudioFileStreaming { for range in ranges_to_request.iter() { - debug!("requesting data at position {} (length : {})", range.start, range.length); + trace!("requesting data at position {} (length : {})", range.start, range.length); self.stream_loader_command_tx.unbounded_send(StreamLoaderCommand::Fetch(range.clone())).unwrap(); } while !download_status.downloaded.contains(offset) { - debug!("waiting for download"); + trace!("waiting for download"); download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0; - debug!("re-checking data availability at offset {}.", offset); + trace!("re-checking data availability at offset {}.", offset); } let available_length = download_status.downloaded.contained_length_from_value(offset); assert!(available_length > 0); @@ -926,7 +928,7 @@ impl Read for AudioFileStreaming { let read_len = min(length, available_length); let read_len = try!(self.read_file.read(&mut output[..read_len])); - debug!("read at postion {} (length : {})", offset, read_len); + trace!("read at postion {} (length : {})", offset, read_len); self.position += read_len as u64; self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed); From af6e33bfa012835077b066cb5bcab91658deb44c Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Sat, 2 Nov 2019 11:11:24 +1100 Subject: [PATCH 07/19] Remove commented line --- audio/src/fetch.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 67ccdbb..dd7ae51 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -1,4 +1,3 @@ -//use bit_set::BitSet; use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use bytes::Bytes; use futures::sync::{mpsc, oneshot}; From c991974f821bbe08754485cb4308ffbc13342079 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Sat, 2 Nov 2019 17:19:31 +1100 Subject: [PATCH 08/19] Improve ping time measurements. Don't measure response times if other requests are pending. --- audio/src/fetch.rs | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index dd7ae51..8ca37c6 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -201,6 +201,7 @@ struct AudioFileShared { file_size: usize, cond: Condvar, download_status: Mutex, + number_of_open_requests: AtomicUsize, ping_time_ms: AtomicUsize, read_position: AtomicUsize, } @@ -213,6 +214,7 @@ impl AudioFileOpenStreaming { file_size: size, cond: Condvar::new(), download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}), + number_of_open_requests: AtomicUsize::new(0), ping_time_ms: AtomicUsize::new(0), read_position: AtomicUsize::new(0), }); @@ -433,6 +435,7 @@ struct AudioFileFetchDataReceiver { data_offset: usize, request_length: usize, request_sent_time: Option, + measure_ping_time: bool, } impl AudioFileFetchDataReceiver { @@ -445,6 +448,10 @@ impl AudioFileFetchDataReceiver { request_sent_time: Instant, ) -> AudioFileFetchDataReceiver { + let measure_ping_time = shared.number_of_open_requests.load(atomic::Ordering::SeqCst) == 0; + + shared.number_of_open_requests.fetch_add(1, atomic::Ordering::SeqCst); + AudioFileFetchDataReceiver { shared: shared, data_rx: data_rx, @@ -452,6 +459,7 @@ impl AudioFileFetchDataReceiver { data_offset: data_offset, request_length: request_length, request_sent_time: Some(request_sent_time), + measure_ping_time: measure_ping_time, } } } @@ -468,6 +476,9 @@ impl AudioFileFetchDataReceiver { download_status.requested.subtract_range(&missing_range); self.shared.cond.notify_all(); } + + self.shared.number_of_open_requests.fetch_sub(1, atomic::Ordering::SeqCst); + } } @@ -480,15 +491,18 @@ impl Future for AudioFileFetchDataReceiver { trace!("Looping data_receiver for offset {} and length {}", self.data_offset, self.request_length); match self.data_rx.poll() { Ok(Async::Ready(Some(data))) => { - if let Some(request_sent_time) = self.request_sent_time { - let duration = Instant::now() - request_sent_time; - let duration_ms: u64; - if duration.as_secs() > MAXIMUM_ASSUMED_PING_TIME_SECONDS { - duration_ms = MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000; - }else { - duration_ms = duration.as_secs() *1000 + duration.subsec_millis() as u64; + if self.measure_ping_time { + if let Some(request_sent_time) = self.request_sent_time { + let duration = Instant::now() - request_sent_time; + let duration_ms: u64; + if duration.as_secs() > MAXIMUM_ASSUMED_PING_TIME_SECONDS { + duration_ms = MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000; + } else { + duration_ms = duration.as_secs() * 1000 + duration.subsec_millis() as u64; + } + let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); + self.measure_ping_time = false; } - let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); } let data_size = data.len(); trace!("data_receiver got {} bytes of data", data_size); @@ -502,14 +516,15 @@ impl Future for AudioFileFetchDataReceiver { } if self.request_length == 0 { trace!("Data receiver completed at position {}", self.data_offset); + self.finish(); return Ok(Async::Ready(())); } } Ok(Async::Ready(None)) => { if self.request_length > 0 { warn!("Received less data from server than requested."); - self.finish(); } + self.finish(); return Ok(Async::Ready(())); } Ok(Async::NotReady) => { From 393df6475e7b2731f777cd9df7cca06b0f8859e0 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Tue, 5 Nov 2019 22:58:00 +1100 Subject: [PATCH 09/19] Set better log messages. --- audio/src/fetch.rs | 94 ++++++++---------------------------------- audio/src/range_set.rs | 20 +++++++++ 2 files changed, 38 insertions(+), 76 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 8ca37c6..b15033b 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -303,10 +303,8 @@ impl AudioFile { debug!("Downloading file {}", file_id); let (complete_tx, complete_rx) = oneshot::channel(); - trace!("calling request_chunk"); let initial_data_length = MINIMUM_CHUNK_SIZE; let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); - trace!("returned from request_chunk"); let open = AudioFileOpenStreaming { session: session.clone(), @@ -320,7 +318,6 @@ impl AudioFile { complete_tx: Some(complete_tx), }; - trace!("cloning into cache session"); let session_ = session.clone(); session.spawn(move |_| { complete_rx @@ -335,7 +332,6 @@ impl AudioFile { .or_else(|oneshot::Canceled| Ok(())) }); - trace!("returning open stream"); AudioFileOpen::Streaming(open) } @@ -361,17 +357,15 @@ impl AudioFile { fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { - trace!("requesting range starting at {} of length {}", offset, length); + assert!(offset % 4 == 0, "Range request start positions must be aligned by 4 bytes."); + assert!(length % 4 == 0, "Range request range lengths must be aligned by 4 bytes."); let start = offset / 4; - let mut end = (offset+length) / 4; - if (offset+length) % 4 != 0 { - end += 1; - } + let end = (offset+length) / 4; let (id, channel) = session.channel().allocate(); - trace!("allocated channel {}", id); + trace!("requesting range starting at {} of length {} on channel {}.", offset, length, id); let mut data: Vec = Vec::new(); data.write_u16::(id).unwrap(); @@ -390,32 +384,6 @@ fn request_range(session: &Session, file: FileId, offset: usize, length: usize) channel } -//fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel { -// trace!("requesting chunk {}", index); -// -// let start = (index * CHUNK_SIZE / 4) as u32; -// let end = ((index + 1) * CHUNK_SIZE / 4) as u32; -// -// let (id, channel) = session.channel().allocate(); -// -// trace!("allocated channel {}", id); -// -// let mut data: Vec = Vec::new(); -// data.write_u16::(id).unwrap(); -// data.write_u8(0).unwrap(); -// data.write_u8(1).unwrap(); -// data.write_u16::(0x0000).unwrap(); -// data.write_u32::(0x00000000).unwrap(); -// data.write_u32::(0x00009C40).unwrap(); -// data.write_u32::(0x00020000).unwrap(); -// data.write(&file.0).unwrap(); -// data.write_u32::(start).unwrap(); -// data.write_u32::(end).unwrap(); -// -// session.send_packet(0x8, data); -// -// channel -//} struct PartialFileData { @@ -432,6 +400,8 @@ struct AudioFileFetchDataReceiver { shared: Arc, file_data_tx: mpsc::UnboundedSender, data_rx: ChannelData, + initial_data_offset: usize, + initial_request_length: usize, data_offset: usize, request_length: usize, request_sent_time: Option, @@ -456,6 +426,8 @@ impl AudioFileFetchDataReceiver { shared: shared, data_rx: data_rx, file_data_tx: file_data_tx, + initial_data_offset: data_offset, + initial_request_length: request_length, data_offset: data_offset, request_length: request_length, request_sent_time: Some(request_sent_time), @@ -505,34 +477,33 @@ impl Future for AudioFileFetchDataReceiver { } } let data_size = data.len(); - trace!("data_receiver got {} bytes of data", data_size); + trace!("data_receiver for range {} (+{}) got {} bytes of data starting at {}. ({} bytes pending).", self.initial_data_offset, self.initial_request_length, data_size, self.data_offset, self.request_length - data_size); let _ = self.file_data_tx.unbounded_send(ReceivedData::Data(PartialFileData { offset: self.data_offset, data: data, })); self.data_offset += data_size; if self.request_length < data_size { - warn!("Received more data from server than requested."); + warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length); self.request_length = 0; } else { self.request_length -= data_size; } if self.request_length == 0 { - trace!("Data receiver completed at position {}", self.data_offset); + trace!("Data receiver for range {} (+{}) completed.", self.initial_data_offset, self.initial_request_length); self.finish(); return Ok(Async::Ready(())); } } Ok(Async::Ready(None)) => { if self.request_length > 0 { - warn!("Received less data from server than requested."); + warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length); } self.finish(); return Ok(Async::Ready(())); } Ok(Async::NotReady) => { - //trace!("No more data for data_receiver at the moment."); return Ok(Async::NotReady); } Err(ChannelError) => { - warn!("error from channel"); + warn!("Error from channel for data receiver for range {} (+{}).", self.initial_data_offset, self.initial_request_length); self.finish(); return Ok(Async::Ready(())); } @@ -702,45 +673,16 @@ impl AudioFileFetch { } -// fn download(&mut self, mut new_index: usize) { -// assert!(new_index < self.shared.chunk_count); -// -// { -// let download_status = self.shared.download_status.lock().unwrap(); -// while download_status.downloaded.contains(new_index) { -// new_index = (new_index + 1) % self.shared.chunk_count; -// debug!("Download iterated to new_index {}", new_index); -// } -// } -// -// trace!("== download called for chunk {} of {}", new_index, self.shared.chunk_count); -// -// if self.index != new_index { -// self.index = new_index; -// -// let offset = self.index * CHUNK_SIZE; -// -// self.output -// .as_mut() -// .unwrap() -// .seek(SeekFrom::Start(offset as u64)) -// .unwrap(); -// -// let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split(); -// self.data_rx = data; -// } -// } fn poll_file_data_rx(&mut self) -> Poll<(), ()> { loop { match self.file_data_rx.poll() { Ok(Async::Ready(None)) => { - trace!("File data channel closed."); return Ok(Async::Ready(())); } Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => { - trace!("Received ping time information: {} ms.", response_time_ms); + trace!("Received ping time estimate: {} ms.", response_time_ms); // record the response time self.network_response_times_ms.push(response_time_ms); @@ -768,7 +710,6 @@ impl AudioFileFetch { }, Ok(Async::Ready(Some(ReceivedData::Data(data)))) => { - trace!("Writing data to file: offset {}, length {}", data.offset, data.data.len()); self.output .as_mut() @@ -791,6 +732,9 @@ impl AudioFileFetch { if download_status.downloaded.contained_length_from_value(0) >= self.shared.file_size { full = true; } + + trace!("Downloaded: {} Requested: {}", download_status.downloaded, download_status.requested); + drop(download_status); } @@ -860,8 +804,6 @@ impl Future for AudioFileFetch { fn poll(&mut self) -> Poll<(), ()> { - trace!("Polling AudioFileFetch"); - match self.poll_stream_loader_command_rx() { Ok(Async::NotReady) => (), Ok(Async::Ready(_)) => { @@ -942,7 +884,7 @@ impl Read for AudioFileStreaming { let read_len = min(length, available_length); let read_len = try!(self.read_file.read(&mut output[..read_len])); - trace!("read at postion {} (length : {})", offset, read_len); + trace!("read successfully at postion {} (length : {})", offset, read_len); self.position += read_len as u64; self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed); diff --git a/audio/src/range_set.rs b/audio/src/range_set.rs index 378725f..12b8299 100644 --- a/audio/src/range_set.rs +++ b/audio/src/range_set.rs @@ -1,6 +1,7 @@ use std::cmp::{max,min}; use std::slice::Iter; +use std::fmt; @@ -10,6 +11,13 @@ pub struct Range { pub length: usize, } +impl fmt::Display for Range { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + return write!(f, "[{}, {}]", self.start, self.start+self.length-1); + } +} + + impl Range { pub fn new(start: usize, length: usize) -> Range { @@ -25,11 +33,23 @@ impl Range { } + #[derive(Clone)] pub struct RangeSet { ranges: Vec, } +impl fmt::Display for RangeSet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "(").unwrap(); + for range in self.ranges.iter() { + write!(f, "{}", range).unwrap(); + } + write!(f, ")") + } +} + + impl RangeSet { pub fn new() -> RangeSet { From 4a611d9af33b1858b47aa372b55a90dd4b3625f6 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Tue, 5 Nov 2019 23:58:35 +1100 Subject: [PATCH 10/19] Fix pre-fetch heuristic. --- audio/src/fetch.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index b15033b..e17473e 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -460,7 +460,6 @@ impl Future for AudioFileFetchDataReceiver { fn poll(&mut self) -> Poll<(), ()> { loop { - trace!("Looping data_receiver for offset {} and length {}", self.data_offset, self.request_length); match self.data_rx.poll() { Ok(Async::Ready(Some(data))) => { if self.measure_ping_time { @@ -733,7 +732,7 @@ impl AudioFileFetch { full = true; } - trace!("Downloaded: {} Requested: {}", download_status.downloaded, download_status.requested); + trace!("Downloaded: {} Requested: {}", download_status.downloaded, download_status.requested.minus(&download_status.downloaded)); drop(download_status); } @@ -829,7 +828,8 @@ impl Future for AudioFileFetch { let ping_time = self.shared.ping_time_ms.load(atomic::Ordering::Relaxed); - if bytes_pending < 2 * ping_time * self.streaming_data_rate { + if bytes_pending < 2 * ping_time * self.streaming_data_rate / 1000 { + trace!("Prefetching more data. pending bytes({}) < 2 * ping time ({}) * data rate({}) / 1000.",bytes_pending, ping_time, self.streaming_data_rate); self.pre_fetch_more_data(); } } From c50fc9885ac296799416895f851f3c715863aaba Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Wed, 6 Nov 2019 08:16:01 +1100 Subject: [PATCH 11/19] Adapt code for the new bitrate selection via alternatives. --- audio/src/fetch.rs | 14 +++++++++++--- playback/src/player.rs | 28 ++++++++++++++++++---------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index e17473e..e3d63df 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -57,6 +57,7 @@ pub struct StreamLoaderController { channel_tx: Option>, stream_shared: Option>, file_size: usize, + bytes_per_second: usize, } @@ -65,6 +66,8 @@ impl StreamLoaderController { return self.file_size; } + pub fn data_rate(&self) -> usize { return self.bytes_per_second; } + pub fn range_available(&self, range: Range) -> bool { if let Some(ref shared) = self.stream_shared { let download_status = shared.download_status.lock().unwrap(); @@ -168,6 +171,7 @@ impl StreamLoaderController { pub fn set_stream_data_rate(&mut self, bytes_per_second: usize) { // when optimising for streaming, assume a streaming rate of this many bytes per second. + self.bytes_per_second = bytes_per_second; self.send_stream_loader_command(StreamLoaderCommand::StreamDataRate(bytes_per_second)); } @@ -335,20 +339,24 @@ impl AudioFile { AudioFileOpen::Streaming(open) } - pub fn get_stream_loader_controller(&self) -> StreamLoaderController { + pub fn get_stream_loader_controller(&self, bytes_per_second: usize) -> StreamLoaderController { match self { AudioFile::Streaming(stream) => { - return StreamLoaderController { + let mut result = StreamLoaderController { channel_tx: Some(stream.stream_loader_command_tx.clone()), stream_shared: Some(stream.shared.clone()), file_size: stream.shared.file_size, - } + bytes_per_second: bytes_per_second, + }; + result.set_stream_data_rate(bytes_per_second); + return result; } AudioFile::Cached(ref file) => { return StreamLoaderController { channel_tx: None, stream_shared: None, file_size: file.metadata().unwrap().len() as usize, + bytes_per_second: bytes_per_second, } } } diff --git a/playback/src/player.rs b/playback/src/player.rs index 2312178..32500e3 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -490,12 +490,12 @@ impl PlayerInternal { } // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun. - let stream_data_rate = self.stream_data_rate(); if let Some(stream_loader_controller) = self.state.stream_loader_controller() { stream_loader_controller.set_stream_mode(); } if let PlayerState::Playing{..} = self.state { if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + let stream_data_rate = stream_loader_controller.data_rate(); let wait_for_data_length = (2 * stream_loader_controller.ping_time_ms() * stream_data_rate) / 1000; stream_loader_controller.fetch_next_blocking(wait_for_data_length); } @@ -561,11 +561,22 @@ impl PlayerInternal { } } - fn stream_data_rate(&self) -> usize { - match self.config.bitrate { - Bitrate::Bitrate96 => 12 * 1024, - Bitrate::Bitrate160 => 20 * 1024, - Bitrate::Bitrate320 => 40 * 1024, + fn stream_data_rate(&self, format: FileFormat) -> usize { + match format { + FileFormat::OGG_VORBIS_96 => 12 * 1024, + FileFormat::OGG_VORBIS_160 => 20 * 1024, + FileFormat::OGG_VORBIS_320=> 40 * 1024, + FileFormat::MP3_256 => 32 * 1024, + FileFormat::MP3_320 => 40 * 1024, + FileFormat::MP3_160 => 20 * 1024, + FileFormat::MP3_96 => 12 * 1024, + FileFormat::MP3_160_ENC => 20 * 1024, + FileFormat::MP4_128_DUAL => 16 * 1024, + FileFormat::OTHER3 => 40 * 1024, // better some high guess than nothing + FileFormat::AAC_160 => 20 * 1024, + FileFormat::AAC_320 => 40 * 1024, + FileFormat::MP4_128 => 16 * 1024, + FileFormat::OTHER5 => 40 * 1024, // better some high guess than nothing } } @@ -618,10 +629,7 @@ impl PlayerInternal { let encrypted_file = encrypted_file.wait().unwrap(); - let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(); - - // tell the stream loader how to optimise its strategy. - stream_loader_controller.set_stream_data_rate(self.stream_data_rate()); + let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(self.stream_data_rate(*format)); if position == 0 { // No need to seek -> we stream from the beginning From 224ec0a04e0ff1f4a58f81d34c2f8d5607e2b76b Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Wed, 6 Nov 2019 14:38:28 +1100 Subject: [PATCH 12/19] Remove log message --- core/src/channel.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/channel.rs b/core/src/channel.rs index 3238a0a..57655fe 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -59,8 +59,6 @@ impl ChannelManager { let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref()); - trace!("Received data for channel {}: {} bytes.", id, data.len()); - self.lock(|inner| { if let Entry::Occupied(entry) = inner.channels.entry(id) { let _ = entry.get().unbounded_send((cmd, data)); From 395d29ad473d6bc15554163e26f6e93a896dc572 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Wed, 6 Nov 2019 17:15:32 +1100 Subject: [PATCH 13/19] Add warning in source code. --- audio/src/fetch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index e3d63df..8e39e5a 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -18,7 +18,7 @@ use futures::sync::mpsc::unbounded; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -const MINIMUM_CHUNK_SIZE: usize = 1024 * 16; +const MINIMUM_CHUNK_SIZE: usize = 1024 * 16; // This number MUST be divisible by 4. const MAXIMUM_CHUNK_SIZE: usize = 1024 * 128; const MAXIMUM_ASSUMED_PING_TIME_SECONDS: u64 = 5; From 6422dcef78471583d3a282daf626f51766ad125d Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Fri, 8 Nov 2019 00:02:53 +1100 Subject: [PATCH 14/19] Refine file downloading heuristics to use data rates and ping times everywhere. --- audio/src/fetch.rs | 241 +++++++++++++++++++++++++++-------------- audio/src/lib.rs | 1 + core/src/channel.rs | 37 +++++++ playback/src/player.rs | 42 +++++-- 4 files changed, 233 insertions(+), 88 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index e3d63df..46d8a88 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use futures::sync::{mpsc, oneshot}; use futures::Stream; use futures::{Async, Future, Poll}; -use std::cmp::min; +use std::cmp::{min, max}; use std::fs; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::sync::{Arc, Condvar, Mutex}; @@ -18,9 +18,67 @@ use futures::sync::mpsc::unbounded; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -const MINIMUM_CHUNK_SIZE: usize = 1024 * 16; -const MAXIMUM_CHUNK_SIZE: usize = 1024 * 128; -const MAXIMUM_ASSUMED_PING_TIME_SECONDS: u64 = 5; + +const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; +// The minimum size of a block that is requested from the Spotify servers in one request. +// This is the block size that is typically requested while doing a seek() on a file. +// Note: smaller requests can happen if part of the block is downloaded already. + +const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; // MUST be divisible by four!!! +// The amount of data that is requested when initially opening a file. +// Note: if the file is opened to play from the beginning, the amount of data to +// read ahead is requested in addition to this amount. If the file is opened to seek to +// another position, then only this amount is requested on the first request. + +const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5; +// The pig time that is used for calculations before a ping time was actually measured. + +const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5; +// If the measured ping time to the Spotify server is larger than this value, it is capped +// to avoid run-away block sizes and pre-fetching. + +pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0; +// Before playback starts, this many seconds of data must be present. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0; +// Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping +// time to the Spotify server. +// Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are +// obeyed. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 1.0; +// While playing back, this many seconds of data ahead of the current read position are +// requested. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 2.0; +// Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping +// time to the Spotify server. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0; +// If the amount of data that is pending (requested but not received) is less than a certain amount, +// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more +// data is calculated as +// < PREFETCH_THRESHOLD_FACTOR * * + +const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5; +// Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account. +// The formula used is +// < FAST_PREFETCH_THRESHOLD_FACTOR * * +// This mechanism allows for fast downloading of the remainder of the file. The number should be larger +// than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster +// the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is +// performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively +// only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted. + + pub enum AudioFile { Cached(fs::File), @@ -40,6 +98,7 @@ pub struct AudioFileOpenStreaming { headers: ChannelHeaders, file_id: FileId, complete_tx: Option>, + streaming_data_rate: usize, } @@ -47,7 +106,6 @@ enum StreamLoaderCommand{ Fetch(Range), // signal the stream loader to fetch a range of the file RandomAccessMode(), // optimise download strategy for random access StreamMode(), // optimise download strategy for streaming - StreamDataRate(usize), // when optimising for streaming, assume a streaming rate of this many bytes per second. Close(), // terminate and don't load any more data } @@ -57,7 +115,6 @@ pub struct StreamLoaderController { channel_tx: Option>, stream_shared: Option>, file_size: usize, - bytes_per_second: usize, } @@ -66,8 +123,6 @@ impl StreamLoaderController { return self.file_size; } - pub fn data_rate(&self) -> usize { return self.bytes_per_second; } - pub fn range_available(&self, range: Range) -> bool { if let Some(ref shared) = self.stream_shared { let download_status = shared.download_status.lock().unwrap(); @@ -124,7 +179,7 @@ impl StreamLoaderController { if range.length > (download_status.downloaded.union(&download_status.requested).contained_length_from_value(range.start)) { // For some reason, the requested range is neither downloaded nor requested. // This could be due to a network error. Request it again. - // We can't use self.fetch here because self can't borrowed mutably, so we access the channel directly. + // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly. if let Some(ref mut channel) = self.channel_tx { // ignore the error in case the channel has been closed already. let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range)); @@ -169,12 +224,6 @@ impl StreamLoaderController { self.send_stream_loader_command(StreamLoaderCommand::StreamMode()); } - pub fn set_stream_data_rate(&mut self, bytes_per_second: usize) { - // when optimising for streaming, assume a streaming rate of this many bytes per second. - self.bytes_per_second = bytes_per_second; - self.send_stream_loader_command(StreamLoaderCommand::StreamDataRate(bytes_per_second)); - } - pub fn close(&mut self) { // terminate stream loading and don't load any more data for this file. self.send_stream_loader_command(StreamLoaderCommand::Close()); @@ -200,11 +249,19 @@ struct AudioFileDownloadStatus { downloaded: RangeSet, } +#[derive(Copy, Clone)] +enum DownloadStrategy { + RandomAccess(), + Streaming(), +} + struct AudioFileShared { file_id: FileId, file_size: usize, + stream_data_rate: usize, cond: Condvar, download_status: Mutex, + download_strategy: Mutex, number_of_open_requests: AtomicUsize, ping_time_ms: AtomicUsize, read_position: AtomicUsize, @@ -216,8 +273,10 @@ impl AudioFileOpenStreaming { let shared = Arc::new(AudioFileShared { file_id: self.file_id, file_size: size, + stream_data_rate: self.streaming_data_rate, cond: Condvar::new(), download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}), + download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise number_of_open_requests: AtomicUsize::new(0), ping_time_ms: AtomicUsize::new(0), read_position: AtomicUsize::new(0), @@ -296,7 +355,7 @@ impl Future for AudioFileOpenStreaming { } impl AudioFile { - pub fn open(session: &Session, file_id: FileId) -> AudioFileOpen { + pub fn open(session: &Session, file_id: FileId, bytes_per_second: usize, play_from_beginning: bool) -> AudioFileOpen { let cache = session.cache().cloned(); if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) { @@ -307,7 +366,14 @@ impl AudioFile { debug!("Downloading file {}", file_id); let (complete_tx, complete_rx) = oneshot::channel(); - let initial_data_length = MINIMUM_CHUNK_SIZE; + let mut initial_data_length = if play_from_beginning { + INITIAL_DOWNLOAD_SIZE + max((READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, (INITIAL_PING_TIME_ESTIMATE_SECONDS * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * bytes_per_second as f64) as usize) + } else { + INITIAL_DOWNLOAD_SIZE + }; + if initial_data_length % 4 != 0 { + initial_data_length += 4 - (initial_data_length % 4); + } let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); let open = AudioFileOpenStreaming { @@ -320,6 +386,8 @@ impl AudioFile { initial_request_sent_time: Instant::now(), complete_tx: Some(complete_tx), + streaming_data_rate: bytes_per_second, + }; let session_ = session.clone(); @@ -336,27 +404,23 @@ impl AudioFile { .or_else(|oneshot::Canceled| Ok(())) }); - AudioFileOpen::Streaming(open) + return AudioFileOpen::Streaming(open); } - pub fn get_stream_loader_controller(&self, bytes_per_second: usize) -> StreamLoaderController { + pub fn get_stream_loader_controller(&self) -> StreamLoaderController { match self { - AudioFile::Streaming(stream) => { - let mut result = StreamLoaderController { + AudioFile::Streaming(ref stream) => { + return StreamLoaderController { channel_tx: Some(stream.stream_loader_command_tx.clone()), stream_shared: Some(stream.shared.clone()), file_size: stream.shared.file_size, - bytes_per_second: bytes_per_second, }; - result.set_stream_data_rate(bytes_per_second); - return result; } AudioFile::Cached(ref file) => { return StreamLoaderController { channel_tx: None, stream_shared: None, file_size: file.metadata().unwrap().len() as usize, - bytes_per_second: bytes_per_second, } } } @@ -474,8 +538,8 @@ impl Future for AudioFileFetchDataReceiver { if let Some(request_sent_time) = self.request_sent_time { let duration = Instant::now() - request_sent_time; let duration_ms: u64; - if duration.as_secs() > MAXIMUM_ASSUMED_PING_TIME_SECONDS { - duration_ms = MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000; + if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS { + duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; } else { duration_ms = duration.as_secs() * 1000 + duration.subsec_millis() as u64; } @@ -520,11 +584,6 @@ impl Future for AudioFileFetchDataReceiver { } -enum DownloadStrategy { - RandomAccess(), - Streaming(), -} - struct AudioFileFetch { session: Session, shared: Arc, @@ -533,11 +592,8 @@ struct AudioFileFetch { file_data_tx: mpsc::UnboundedSender, file_data_rx: mpsc::UnboundedReceiver, - //seek_rx: mpsc::UnboundedReceiver, stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: Option>, - download_strategy: DownloadStrategy, - streaming_data_rate: usize, network_response_times_ms: Vec, } @@ -584,16 +640,18 @@ impl AudioFileFetch { stream_loader_command_rx: stream_loader_command_rx, complete_tx: Some(complete_tx), - download_strategy: DownloadStrategy::RandomAccess(), // start with random access mode until someone tells us otherwise - streaming_data_rate: 40, // assume 360 kbit per second unless someone tells us otherwise. network_response_times_ms: Vec::new(), } } + fn get_download_strategy(&mut self) -> DownloadStrategy { + *(self.shared.download_strategy.lock().unwrap()) + } + fn download_range(&mut self, mut offset: usize, mut length: usize) { - if length < MINIMUM_CHUNK_SIZE { - length = MINIMUM_CHUNK_SIZE; + if length < MINIMUM_DOWNLOAD_SIZE { + length = MINIMUM_DOWNLOAD_SIZE; } // ensure the values are within the bounds and align them by 4 for the spotify protocol. @@ -647,35 +705,43 @@ impl AudioFileFetch { } - fn pre_fetch_more_data(&mut self) { + fn pre_fetch_more_data(&mut self, bytes: usize) { - // determine what is still missing - let mut missing_data = RangeSet::new(); - missing_data.add_range(&Range::new(0,self.shared.file_size)); - { - let download_status = self.shared.download_status.lock().unwrap(); - missing_data.subtract_range_set(&download_status.downloaded); - missing_data.subtract_range_set(&download_status.requested); - } + let mut bytes_to_go = bytes; - // download data from after the current read position first - let mut tail_end = RangeSet::new(); - let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed); - tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position)); - let tail_end = tail_end.intersection(&missing_data); + while bytes_to_go > 0 { - if ! tail_end.is_empty() { - let range = tail_end.get_range(0); - let offset = range.start; - let length = min(range.length, MAXIMUM_CHUNK_SIZE); - self.download_range(offset, length); + // determine what is still missing + let mut missing_data = RangeSet::new(); + missing_data.add_range(&Range::new(0, self.shared.file_size)); + { + let download_status = self.shared.download_status.lock().unwrap(); + missing_data.subtract_range_set(&download_status.downloaded); + missing_data.subtract_range_set(&download_status.requested); + } - } else if ! missing_data.is_empty() { - // ok, the tail is downloaded, download something fom the beginning. - let range = missing_data.get_range(0); - let offset = range.start; - let length = min(range.length, MAXIMUM_CHUNK_SIZE); - self.download_range(offset, length); + // download data from after the current read position first + let mut tail_end = RangeSet::new(); + let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed); + tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position)); + let tail_end = tail_end.intersection(&missing_data); + + if !tail_end.is_empty() { + let range = tail_end.get_range(0); + let offset = range.start; + let length = min(range.length, bytes_to_go); + self.download_range(offset, length); + bytes_to_go -= length; + } else if !missing_data.is_empty() { + // ok, the tail is downloaded, download something fom the beginning. + let range = missing_data.get_range(0); + let offset = range.start; + let length = min(range.length, bytes_to_go); + self.download_range(offset, length); + bytes_to_go -= length; + } else { + return; + } } } @@ -774,13 +840,10 @@ impl AudioFileFetch { self.download_range(request.start, request.length); } Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => { - self.download_strategy = DownloadStrategy::RandomAccess(); + *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); } Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => { - self.download_strategy = DownloadStrategy::Streaming(); - } - Ok(Async::Ready(Some(StreamLoaderCommand::StreamDataRate(rate)))) => { - self.streaming_data_rate = rate; + *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); } Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => { return Ok(Async::Ready(())); @@ -827,18 +890,22 @@ impl Future for AudioFileFetch { Err(()) => unreachable!(), } - - if let DownloadStrategy::Streaming() = self.download_strategy { + if let DownloadStrategy::Streaming() = self.get_download_strategy() { let bytes_pending: usize = { let download_status = self.shared.download_status.lock().unwrap(); download_status.requested.minus(&download_status.downloaded).len() }; - let ping_time = self.shared.ping_time_ms.load(atomic::Ordering::Relaxed); + let ping_time_seconds = 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; - if bytes_pending < 2 * ping_time * self.streaming_data_rate / 1000 { - trace!("Prefetching more data. pending bytes({}) < 2 * ping time ({}) * data rate({}) / 1000.",bytes_pending, ping_time, self.streaming_data_rate); - self.pre_fetch_more_data(); + let desired_pending_bytes = max( + (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) as usize, + (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.session.channel().get_download_rate_estimate() as f64) as usize + ); + + if bytes_pending < desired_pending_bytes { + trace!("Prefetching more data. pending bytes({}) < {}",bytes_pending, desired_pending_bytes); + self.pre_fetch_more_data(desired_pending_bytes - bytes_pending); } } @@ -857,14 +924,25 @@ impl Read for AudioFileStreaming { let length = min(output.len(), self.shared.file_size - offset); - if length == 0 { - return Ok(0); - } + + let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) { + DownloadStrategy::RandomAccess() => { length } + DownloadStrategy::Streaming() => { + // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded. + let ping_time_seconds = 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + + let length_to_request = length + max( + (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64) as usize, + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * ping_time_seconds * self.shared.stream_data_rate as f64) as usize + ); + min(length_to_request, self.shared.file_size - offset) + } + }; let mut ranges_to_request = RangeSet::new(); - ranges_to_request.add_range(&Range::new(offset, length)); + ranges_to_request.add_range(&Range::new(offset, length_to_request)); trace!("reading at postion {} (length : {})", offset, length); @@ -878,6 +956,11 @@ impl Read for AudioFileStreaming { self.stream_loader_command_tx.unbounded_send(StreamLoaderCommand::Fetch(range.clone())).unwrap(); } + + if length == 0 { + return Ok(0); + } + while !download_status.downloaded.contains(offset) { trace!("waiting for download"); download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0; diff --git a/audio/src/lib.rs b/audio/src/lib.rs index b178c39..845ba5f 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -25,6 +25,7 @@ mod range_set; pub use decrypt::AudioDecrypt; pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController}; +pub use fetch::{READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS}; #[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))] pub use lewton_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; diff --git a/core/src/channel.rs b/core/src/channel.rs index 3238a0a..62620e3 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -3,6 +3,7 @@ use bytes::Bytes; use futures::sync::{mpsc, BiLock}; use futures::{Async, Poll, Stream}; use std::collections::HashMap; +use std::time::{Instant}; use util::SeqGenerator; @@ -10,6 +11,10 @@ component! { ChannelManager : ChannelManagerInner { sequence: SeqGenerator = SeqGenerator::new(0), channels: HashMap> = HashMap::new(), + download_rate_estimate: usize = 0, + download_measurement_start: Option = None, + download_measurement_bytes: usize = 0, + download_measurement_last_received: Option = None, } } @@ -59,14 +64,46 @@ impl ChannelManager { let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref()); + + trace!("Received data for channel {}: {} bytes.", id, data.len()); self.lock(|inner| { + + let current_time = Instant::now(); + if let Some(download_measurement_start) = inner.download_measurement_start { + if let Some(download_measurement_last_received) = inner.download_measurement_last_received { + if (current_time - download_measurement_start).as_millis() > 1000 { + if (current_time - download_measurement_last_received).as_millis() <= 500 { + inner.download_rate_estimate = 1000 * inner.download_measurement_bytes / (current_time - download_measurement_start).as_millis() as usize; + } else { + inner.download_rate_estimate = 1000 * inner.download_measurement_bytes / (500+(download_measurement_last_received - download_measurement_start).as_millis() as usize); + } + + inner.download_measurement_start = Some(current_time); + inner.download_measurement_bytes = 0; + } + } + } else { + inner.download_measurement_start = Some(current_time); + } + + inner.download_measurement_last_received = Some(current_time); + inner.download_measurement_bytes += data.len(); + if let Entry::Occupied(entry) = inner.channels.entry(id) { let _ = entry.get().unbounded_send((cmd, data)); } }); } + + pub fn get_download_rate_estimate(&self) -> usize { + return self.lock(|inner| { + inner.download_rate_estimate + }); + + } + } impl Channel { diff --git a/playback/src/player.rs b/playback/src/player.rs index 32500e3..bdccea3 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -9,12 +9,14 @@ use std::mem; use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; use std::thread; use std::time::Duration; +use std::cmp::max; use config::{Bitrate, PlayerConfig}; use librespot_core::session::Session; use librespot_core::spotify_id::SpotifyId; use audio::{AudioDecrypt, AudioFile, StreamLoaderController}; +use audio::{READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS}; use audio::{VorbisDecoder, VorbisPacket}; use audio_backend::Sink; use metadata::{AudioItem, FileFormat}; @@ -203,6 +205,7 @@ enum PlayerState { end_of_track: oneshot::Sender<()>, normalisation_factor: f32, stream_loader_controller: StreamLoaderController, + bytes_per_second: usize, }, Playing { track_id: SpotifyId, @@ -210,6 +213,7 @@ enum PlayerState { end_of_track: oneshot::Sender<()>, normalisation_factor: f32, stream_loader_controller: StreamLoaderController, + bytes_per_second: usize, }, EndOfTrack { track_id: SpotifyId, @@ -269,6 +273,7 @@ impl PlayerState { end_of_track, normalisation_factor, stream_loader_controller, + bytes_per_second } => { *self = Playing { track_id: track_id, @@ -276,6 +281,7 @@ impl PlayerState { end_of_track: end_of_track, normalisation_factor: normalisation_factor, stream_loader_controller: stream_loader_controller, + bytes_per_second: bytes_per_second, }; } _ => panic!("invalid state"), @@ -291,6 +297,7 @@ impl PlayerState { end_of_track, normalisation_factor, stream_loader_controller, + bytes_per_second, } => { *self = Paused { track_id: track_id, @@ -298,6 +305,7 @@ impl PlayerState { end_of_track: end_of_track, normalisation_factor: normalisation_factor, stream_loader_controller: stream_loader_controller, + bytes_per_second: bytes_per_second, }; } _ => panic!("invalid state"), @@ -418,7 +426,7 @@ impl PlayerInternal { } match self.load_track(track_id, position as i64) { - Some((decoder, normalisation_factor, stream_loader_controller)) => { + Some((decoder, normalisation_factor, stream_loader_controller, bytes_per_second)) => { if play { match self.state { PlayerState::Playing { @@ -443,6 +451,7 @@ impl PlayerInternal { end_of_track: end_of_track, normalisation_factor: normalisation_factor, stream_loader_controller: stream_loader_controller, + bytes_per_second: bytes_per_second, }; } else { self.state = PlayerState::Paused { @@ -451,6 +460,7 @@ impl PlayerInternal { end_of_track: end_of_track, normalisation_factor: normalisation_factor, stream_loader_controller: stream_loader_controller, + bytes_per_second: bytes_per_second, }; match self.state { PlayerState::Playing { @@ -493,10 +503,21 @@ impl PlayerInternal { if let Some(stream_loader_controller) = self.state.stream_loader_controller() { stream_loader_controller.set_stream_mode(); } - if let PlayerState::Playing{..} = self.state { + if let PlayerState::Playing{bytes_per_second, ..} = self.state { if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - let stream_data_rate = stream_loader_controller.data_rate(); - let wait_for_data_length = (2 * stream_loader_controller.ping_time_ms() * stream_data_rate) / 1000; + + // Request our read ahead range + let request_data_length = max( + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * (0.001 * stream_loader_controller.ping_time_ms() as f64) * bytes_per_second as f64) as usize, + (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize + ); + stream_loader_controller.fetch_next(request_data_length); + + // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete. + let wait_for_data_length = max( + (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS * (0.001 * stream_loader_controller.ping_time_ms() as f64) * bytes_per_second as f64) as usize, + (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize + ); stream_loader_controller.fetch_next_blocking(wait_for_data_length); } } @@ -580,7 +601,7 @@ impl PlayerInternal { } } - fn load_track(&self, spotify_id: SpotifyId, position: i64) -> Option<(Decoder, f32, StreamLoaderController)> { + fn load_track(&self, spotify_id: SpotifyId, position: i64) -> Option<(Decoder, f32, StreamLoaderController, usize)> { let audio = AudioItem::get_audio_item(&self.session, spotify_id) .wait() .unwrap(); @@ -624,14 +645,17 @@ impl PlayerInternal { } }; + let bytes_per_second = self.stream_data_rate(*format); + let play_from_beginning = position==0; + let key = self.session.audio_key().request(spotify_id, file_id); - let encrypted_file = AudioFile::open(&self.session, file_id); + let encrypted_file = AudioFile::open(&self.session, file_id, bytes_per_second, play_from_beginning); let encrypted_file = encrypted_file.wait().unwrap(); - let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(self.stream_data_rate(*format)); + let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(); - if position == 0 { + if play_from_beginning { // No need to seek -> we stream from the beginning stream_loader_controller.set_stream_mode(); } else { @@ -663,7 +687,7 @@ impl PlayerInternal { stream_loader_controller.set_stream_mode(); } info!("<{}> loaded", audio.name); - Some((decoder, normalisation_factor, stream_loader_controller)) + Some((decoder, normalisation_factor, stream_loader_controller, bytes_per_second)) } } From c4e0f15eb32af3284aa9e9bfc94c23bb12ad7802 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Fri, 8 Nov 2019 08:58:17 +1100 Subject: [PATCH 15/19] Tune prefetch, squish bugs. --- audio/src/fetch.rs | 9 +++++---- audio/src/range_set.rs | 2 ++ core/src/channel.rs | 17 ++++------------- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 46d8a88..a791841 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -541,7 +541,7 @@ impl Future for AudioFileFetchDataReceiver { if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS { duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; } else { - duration_ms = duration.as_secs() * 1000 + duration.subsec_millis() as u64; + duration_ms = duration.as_millis() as u64; } let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); self.measure_ping_time = false; @@ -896,15 +896,16 @@ impl Future for AudioFileFetch { download_status.requested.minus(&download_status.downloaded).len() }; - let ping_time_seconds = 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + let ping_time_seconds = 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + let download_rate = self.session.channel().get_download_rate_estimate(); let desired_pending_bytes = max( (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) as usize, - (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.session.channel().get_download_rate_estimate() as f64) as usize + (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize ); if bytes_pending < desired_pending_bytes { - trace!("Prefetching more data. pending bytes({}) < {}",bytes_pending, desired_pending_bytes); + trace!("Prefetching more data. pending: {}, desired: {}, ping: {}, rate: {}", bytes_pending, desired_pending_bytes, ping_time_seconds, download_rate); self.pre_fetch_more_data(desired_pending_bytes - bytes_pending); } } diff --git a/audio/src/range_set.rs b/audio/src/range_set.rs index 12b8299..835477b 100644 --- a/audio/src/range_set.rs +++ b/audio/src/range_set.rs @@ -186,6 +186,7 @@ impl RangeSet { } if index < self.ranges.len() && self.ranges[index].start < range.end() { + self.ranges[index].length -= range.end() - self.ranges[index].start; self.ranges[index].start = range.end(); } @@ -199,6 +200,7 @@ impl RangeSet { length: range.start - self.ranges[index].start, }; + self.ranges[index].length -= range.end() - self.ranges[index].start; self.ranges[index].start = range.end(); self.ranges.insert(index, first_range); diff --git a/core/src/channel.rs b/core/src/channel.rs index 62620e3..9471c07 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -14,7 +14,6 @@ component! { download_rate_estimate: usize = 0, download_measurement_start: Option = None, download_measurement_bytes: usize = 0, - download_measurement_last_received: Option = None, } } @@ -72,23 +71,15 @@ impl ChannelManager { let current_time = Instant::now(); if let Some(download_measurement_start) = inner.download_measurement_start { - if let Some(download_measurement_last_received) = inner.download_measurement_last_received { - if (current_time - download_measurement_start).as_millis() > 1000 { - if (current_time - download_measurement_last_received).as_millis() <= 500 { - inner.download_rate_estimate = 1000 * inner.download_measurement_bytes / (current_time - download_measurement_start).as_millis() as usize; - } else { - inner.download_rate_estimate = 1000 * inner.download_measurement_bytes / (500+(download_measurement_last_received - download_measurement_start).as_millis() as usize); - } - - inner.download_measurement_start = Some(current_time); - inner.download_measurement_bytes = 0; - } + if (current_time - download_measurement_start).as_millis() > 1000 { + inner.download_rate_estimate = 1000 * inner.download_measurement_bytes / (current_time - download_measurement_start).as_millis() as usize; + inner.download_measurement_start = Some(current_time); + inner.download_measurement_bytes = 0; } } else { inner.download_measurement_start = Some(current_time); } - inner.download_measurement_last_received = Some(current_time); inner.download_measurement_bytes += data.len(); if let Entry::Occupied(entry) = inner.channels.entry(id) { From a2b2150dbec640d742d3cc72ece7680084fd8150 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Fri, 8 Nov 2019 10:17:56 +1100 Subject: [PATCH 16/19] Update minimum Rust version to 1.33.0. --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 6a28f10..4ba0b54 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: rust rust: - - 1.32.0 + - 1.33.0 - stable - beta - nightly From d2d6df0e2441e4d84de3037977d36ebea59f458a Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Mon, 11 Nov 2019 18:22:41 +1100 Subject: [PATCH 17/19] Run cargo fmt for my code. --- audio/src/fetch.rs | 278 ++++++++++++++++++++++++----------------- audio/src/lib.rs | 7 +- audio/src/range_set.rs | 45 ++----- core/src/channel.rs | 12 +- playback/src/player.rs | 64 +++++++--- 5 files changed, 228 insertions(+), 178 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index a791841..80df21a 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -3,28 +3,27 @@ use bytes::Bytes; use futures::sync::{mpsc, oneshot}; use futures::Stream; use futures::{Async, Future, Poll}; -use std::cmp::{min, max}; +use range_set::{Range, RangeSet}; +use std::cmp::{max, min}; use std::fs; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, Instant}; use tempfile::NamedTempFile; -use range_set::{Range, RangeSet}; +use futures::sync::mpsc::unbounded; use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; use librespot_core::session::Session; use librespot_core::spotify_id::FileId; -use futures::sync::mpsc::unbounded; use std::sync::atomic; use std::sync::atomic::AtomicUsize; - const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; // The minimum size of a block that is requested from the Spotify servers in one request. // This is the block size that is typically requested while doing a seek() on a file. // Note: smaller requests can happen if part of the block is downloaded already. -const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; // MUST be divisible by four!!! +const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; // The amount of data that is requested when initially opening a file. // Note: if the file is opened to play from the beginning, the amount of data to // read ahead is requested in addition to this amount. If the file is opened to seek to @@ -78,8 +77,6 @@ const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5; // performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively // only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted. - - pub enum AudioFile { Cached(fs::File), Streaming(AudioFileStreaming), @@ -101,15 +98,13 @@ pub struct AudioFileOpenStreaming { streaming_data_rate: usize, } - -enum StreamLoaderCommand{ - Fetch(Range), // signal the stream loader to fetch a range of the file +enum StreamLoaderCommand { + Fetch(Range), // signal the stream loader to fetch a range of the file RandomAccessMode(), // optimise download strategy for random access - StreamMode(), // optimise download strategy for streaming - Close(), // terminate and don't load any more data + StreamMode(), // optimise download strategy for streaming + Close(), // terminate and don't load any more data } - #[derive(Clone)] pub struct StreamLoaderController { channel_tx: Option>, @@ -117,7 +112,6 @@ pub struct StreamLoaderController { file_size: usize, } - impl StreamLoaderController { pub fn len(&self) -> usize { return self.file_size; @@ -126,7 +120,11 @@ impl StreamLoaderController { pub fn range_available(&self, range: Range) -> bool { if let Some(ref shared) = self.stream_shared { let download_status = shared.download_status.lock().unwrap(); - if range.length <= download_status.downloaded.contained_length_from_value(range.start) { + if range.length + <= download_status + .downloaded + .contained_length_from_value(range.start) + { return true; } else { return false; @@ -174,9 +172,22 @@ impl StreamLoaderController { if let Some(ref shared) = self.stream_shared { let mut download_status = shared.download_status.lock().unwrap(); - while range.length > download_status.downloaded.contained_length_from_value(range.start) { - download_status = shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0; - if range.length > (download_status.downloaded.union(&download_status.requested).contained_length_from_value(range.start)) { + while range.length + > download_status + .downloaded + .contained_length_from_value(range.start) + { + download_status = shared + .cond + .wait_timeout(download_status, Duration::from_millis(1000)) + .unwrap() + .0; + if range.length + > (download_status + .downloaded + .union(&download_status.requested) + .contained_length_from_value(range.start)) + { // For some reason, the requested range is neither downloaded nor requested. // This could be due to a network error. Request it again. // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly. @@ -187,11 +198,10 @@ impl StreamLoaderController { } } } - } pub fn fetch_next(&mut self, length: usize) { - let range:Range = if let Some(ref shared) = self.stream_shared { + let range: Range = if let Some(ref shared) = self.stream_shared { Range { start: shared.read_position.load(atomic::Ordering::Relaxed), length: length, @@ -203,7 +213,7 @@ impl StreamLoaderController { } pub fn fetch_next_blocking(&mut self, length: usize) { - let range:Range = if let Some(ref shared) = self.stream_shared { + let range: Range = if let Some(ref shared) = self.stream_shared { Range { start: shared.read_position.load(atomic::Ordering::Relaxed), length: length, @@ -228,11 +238,8 @@ impl StreamLoaderController { // terminate stream loading and don't load any more data for this file. self.send_stream_loader_command(StreamLoaderCommand::Close()); } - - } - pub struct AudioFileStreaming { read_file: fs::File, @@ -243,7 +250,6 @@ pub struct AudioFileStreaming { shared: Arc, } - struct AudioFileDownloadStatus { requested: RangeSet, downloaded: RangeSet, @@ -269,13 +275,15 @@ struct AudioFileShared { impl AudioFileOpenStreaming { fn finish(&mut self, size: usize) -> AudioFileStreaming { - let shared = Arc::new(AudioFileShared { file_id: self.file_id, file_size: size, stream_data_rate: self.streaming_data_rate, cond: Condvar::new(), - download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}), + download_status: Mutex::new(AudioFileDownloadStatus { + requested: RangeSet::new(), + downloaded: RangeSet::new(), + }), download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise number_of_open_requests: AtomicUsize::new(0), ping_time_ms: AtomicUsize::new(0), @@ -292,7 +300,8 @@ impl AudioFileOpenStreaming { let initial_data_length = self.initial_data_length.take().unwrap(); let complete_tx = self.complete_tx.take().unwrap(); //let (seek_tx, seek_rx) = mpsc::unbounded(); - let (stream_loader_command_tx, stream_loader_command_rx) = mpsc::unbounded::(); + let (stream_loader_command_tx, stream_loader_command_rx) = + mpsc::unbounded::(); let fetcher = AudioFileFetch::new( self.session.clone(), @@ -355,7 +364,12 @@ impl Future for AudioFileOpenStreaming { } impl AudioFile { - pub fn open(session: &Session, file_id: FileId, bytes_per_second: usize, play_from_beginning: bool) -> AudioFileOpen { + pub fn open( + session: &Session, + file_id: FileId, + bytes_per_second: usize, + play_from_beginning: bool, + ) -> AudioFileOpen { let cache = session.cache().cloned(); if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) { @@ -367,10 +381,16 @@ impl AudioFile { let (complete_tx, complete_rx) = oneshot::channel(); let mut initial_data_length = if play_from_beginning { - INITIAL_DOWNLOAD_SIZE + max((READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, (INITIAL_PING_TIME_ESTIMATE_SECONDS * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * bytes_per_second as f64) as usize) - } else { - INITIAL_DOWNLOAD_SIZE - }; + INITIAL_DOWNLOAD_SIZE + + max( + (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, + (INITIAL_PING_TIME_ESTIMATE_SECONDS + * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * bytes_per_second as f64) as usize, + ) + } else { + INITIAL_DOWNLOAD_SIZE + }; if initial_data_length % 4 != 0 { initial_data_length += 4 - (initial_data_length % 4); } @@ -387,7 +407,6 @@ impl AudioFile { complete_tx: Some(complete_tx), streaming_data_rate: bytes_per_second, - }; let session_ = session.clone(); @@ -427,17 +446,26 @@ impl AudioFile { } } - fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { - - assert!(offset % 4 == 0, "Range request start positions must be aligned by 4 bytes."); - assert!(length % 4 == 0, "Range request range lengths must be aligned by 4 bytes."); + assert!( + offset % 4 == 0, + "Range request start positions must be aligned by 4 bytes." + ); + assert!( + length % 4 == 0, + "Range request range lengths must be aligned by 4 bytes." + ); let start = offset / 4; - let end = (offset+length) / 4; + let end = (offset + length) / 4; let (id, channel) = session.channel().allocate(); - trace!("requesting range starting at {} of length {} on channel {}.", offset, length, id); + trace!( + "requesting range starting at {} of length {} on channel {}.", + offset, + length, + id + ); let mut data: Vec = Vec::new(); data.write_u16::(id).unwrap(); @@ -456,8 +484,6 @@ fn request_range(session: &Session, file: FileId, offset: usize, length: usize) channel } - - struct PartialFileData { offset: usize, data: Bytes, @@ -489,10 +515,11 @@ impl AudioFileFetchDataReceiver { request_length: usize, request_sent_time: Instant, ) -> AudioFileFetchDataReceiver { - let measure_ping_time = shared.number_of_open_requests.load(atomic::Ordering::SeqCst) == 0; - shared.number_of_open_requests.fetch_add(1, atomic::Ordering::SeqCst); + shared + .number_of_open_requests + .fetch_add(1, atomic::Ordering::SeqCst); AudioFileFetchDataReceiver { shared: shared, @@ -508,12 +535,9 @@ impl AudioFileFetchDataReceiver { } } - - impl AudioFileFetchDataReceiver { fn finish(&mut self) { if self.request_length > 0 { - let missing_range = Range::new(self.data_offset, self.request_length); let mut download_status = self.shared.download_status.lock().unwrap(); @@ -521,8 +545,9 @@ impl AudioFileFetchDataReceiver { self.shared.cond.notify_all(); } - self.shared.number_of_open_requests.fetch_sub(1, atomic::Ordering::SeqCst); - + self.shared + .number_of_open_requests + .fetch_sub(1, atomic::Ordering::SeqCst); } } @@ -538,18 +563,26 @@ impl Future for AudioFileFetchDataReceiver { if let Some(request_sent_time) = self.request_sent_time { let duration = Instant::now() - request_sent_time; let duration_ms: u64; - if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS { + if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS + { duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; } else { duration_ms = duration.as_millis() as u64; } - let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); + let _ = self + .file_data_tx + .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); self.measure_ping_time = false; } } let data_size = data.len(); trace!("data_receiver for range {} (+{}) got {} bytes of data starting at {}. ({} bytes pending).", self.initial_data_offset, self.initial_request_length, data_size, self.data_offset, self.request_length - data_size); - let _ = self.file_data_tx.unbounded_send(ReceivedData::Data(PartialFileData { offset: self.data_offset, data: data, })); + let _ = self + .file_data_tx + .unbounded_send(ReceivedData::Data(PartialFileData { + offset: self.data_offset, + data: data, + })); self.data_offset += data_size; if self.request_length < data_size { warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length); @@ -558,7 +591,11 @@ impl Future for AudioFileFetchDataReceiver { self.request_length -= data_size; } if self.request_length == 0 { - trace!("Data receiver for range {} (+{}) completed.", self.initial_data_offset, self.initial_request_length); + trace!( + "Data receiver for range {} (+{}) completed.", + self.initial_data_offset, + self.initial_request_length + ); self.finish(); return Ok(Async::Ready(())); } @@ -574,7 +611,10 @@ impl Future for AudioFileFetchDataReceiver { return Ok(Async::NotReady); } Err(ChannelError) => { - warn!("Error from channel for data receiver for range {} (+{}).", self.initial_data_offset, self.initial_request_length); + warn!( + "Error from channel for data receiver for range {} (+{}).", + self.initial_data_offset, self.initial_request_length + ); self.finish(); return Ok(Async::Ready(())); } @@ -583,7 +623,6 @@ impl Future for AudioFileFetchDataReceiver { } } - struct AudioFileFetch { session: Session, shared: Arc, @@ -609,7 +648,6 @@ impl AudioFileFetch { stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: oneshot::Sender, ) -> AudioFileFetch { - let (file_data_tx, file_data_rx) = unbounded::(); { @@ -618,7 +656,6 @@ impl AudioFileFetch { download_status.requested.add_range(&requested_range); } - let initial_data_receiver = AudioFileFetchDataReceiver::new( shared.clone(), file_data_tx.clone(), @@ -649,7 +686,6 @@ impl AudioFileFetch { } fn download_range(&mut self, mut offset: usize, mut length: usize) { - if length < MINIMUM_DOWNLOAD_SIZE { length = MINIMUM_DOWNLOAD_SIZE; } @@ -684,13 +720,12 @@ impl AudioFileFetch { ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); - for range in ranges_to_request.iter() { - let (_headers, data) = request_range(&self.session, self.shared.file_id, range.start, range.length).split(); + let (_headers, data) = + request_range(&self.session, self.shared.file_id, range.start, range.length).split(); download_status.requested.add_range(range); - let receiver = AudioFileFetchDataReceiver::new( self.shared.clone(), self.file_data_tx.clone(), @@ -702,15 +737,12 @@ impl AudioFileFetch { self.session.spawn(move |_| receiver); } - } fn pre_fetch_more_data(&mut self, bytes: usize) { - let mut bytes_to_go = bytes; while bytes_to_go > 0 { - // determine what is still missing let mut missing_data = RangeSet::new(); missing_data.add_range(&Range::new(0, self.shared.file_size)); @@ -743,12 +775,9 @@ impl AudioFileFetch { return; } } - } - fn poll_file_data_rx(&mut self) -> Poll<(), ()> { - loop { match self.file_data_rx.poll() { Ok(Async::Ready(None)) => { @@ -768,7 +797,10 @@ impl AudioFileFetch { // stats::median is experimental. So we calculate the median of up to three ourselves. let ping_time_ms: usize = match self.network_response_times_ms.len() { 1 => self.network_response_times_ms[0] as usize, - 2 => ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2) as usize, + 2 => { + ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2) + as usize + } 3 => { let mut times = self.network_response_times_ms.clone(); times.sort(); @@ -778,20 +810,21 @@ impl AudioFileFetch { }; // store our new estimate for everyone to see - self.shared.ping_time_ms.store(ping_time_ms, atomic::Ordering::Relaxed); - - }, + self.shared + .ping_time_ms + .store(ping_time_ms, atomic::Ordering::Relaxed); + } Ok(Async::Ready(Some(ReceivedData::Data(data)))) => { - - self.output .as_mut() .unwrap() .seek(SeekFrom::Start(data.offset as u64)) .unwrap(); - self.output.as_mut().unwrap().write_all(data.data.as_ref()).unwrap(); - - + self.output + .as_mut() + .unwrap() + .write_all(data.data.as_ref()) + .unwrap(); let mut full = false; @@ -802,11 +835,17 @@ impl AudioFileFetch { download_status.downloaded.add_range(&received_range); self.shared.cond.notify_all(); - if download_status.downloaded.contained_length_from_value(0) >= self.shared.file_size { + if download_status.downloaded.contained_length_from_value(0) + >= self.shared.file_size + { full = true; } - trace!("Downloaded: {} Requested: {}", download_status.downloaded, download_status.requested.minus(&download_status.downloaded)); + trace!( + "Downloaded: {} Requested: {}", + download_status.downloaded, + download_status.requested.minus(&download_status.downloaded) + ); drop(download_status); } @@ -815,22 +854,16 @@ impl AudioFileFetch { self.finish(); return Ok(Async::Ready(())); } - - } Ok(Async::NotReady) => { return Ok(Async::NotReady); - }, + } Err(()) => unreachable!(), } - } - } - fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> { - loop { match self.stream_loader_command_rx.poll() { Ok(Async::Ready(None)) => { @@ -848,13 +881,10 @@ impl AudioFileFetch { Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => { return Ok(Async::Ready(())); } - Ok(Async::NotReady) => { - return Ok(Async::NotReady) - }, + Ok(Async::NotReady) => return Ok(Async::NotReady), Err(()) => unreachable!(), } } - } fn finish(&mut self) { @@ -865,7 +895,6 @@ impl AudioFileFetch { output.seek(SeekFrom::Start(0)).unwrap(); let _ = complete_tx.send(output); } - } impl Future for AudioFileFetch { @@ -873,7 +902,6 @@ impl Future for AudioFileFetch { type Error = (); fn poll(&mut self) -> Poll<(), ()> { - match self.poll_stream_loader_command_rx() { Ok(Async::NotReady) => (), Ok(Async::Ready(_)) => { @@ -896,22 +924,29 @@ impl Future for AudioFileFetch { download_status.requested.minus(&download_status.downloaded).len() }; - let ping_time_seconds = 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + let ping_time_seconds = + 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; let download_rate = self.session.channel().get_download_rate_estimate(); let desired_pending_bytes = max( - (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) as usize, - (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize + (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) + as usize, + (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize, ); if bytes_pending < desired_pending_bytes { - trace!("Prefetching more data. pending: {}, desired: {}, ping: {}, rate: {}", bytes_pending, desired_pending_bytes, ping_time_seconds, download_rate); + trace!( + "Prefetching more data. pending: {}, desired: {}, ping: {}, rate: {}", + bytes_pending, + desired_pending_bytes, + ping_time_seconds, + download_rate + ); self.pre_fetch_more_data(desired_pending_bytes - bytes_pending); } } - - return Ok(Async::NotReady) + return Ok(Async::NotReady); } } @@ -925,23 +960,25 @@ impl Read for AudioFileStreaming { let length = min(output.len(), self.shared.file_size - offset); - let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) { - DownloadStrategy::RandomAccess() => { length } + DownloadStrategy::RandomAccess() => length, DownloadStrategy::Streaming() => { // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded. - let ping_time_seconds = 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + let ping_time_seconds = + 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; - let length_to_request = length + max( - (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64) as usize, - (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * ping_time_seconds * self.shared.stream_data_rate as f64) as usize - ); + let length_to_request = length + + max( + (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64) + as usize, + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * ping_time_seconds + * self.shared.stream_data_rate as f64) as usize, + ); min(length_to_request, self.shared.file_size - offset) } }; - - let mut ranges_to_request = RangeSet::new(); ranges_to_request.add_range(&Range::new(offset, length_to_request)); @@ -951,27 +988,35 @@ impl Read for AudioFileStreaming { ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); - for range in ranges_to_request.iter() { - trace!("requesting data at position {} (length : {})", range.start, range.length); - self.stream_loader_command_tx.unbounded_send(StreamLoaderCommand::Fetch(range.clone())).unwrap(); + trace!( + "requesting data at position {} (length : {})", + range.start, + range.length + ); + self.stream_loader_command_tx + .unbounded_send(StreamLoaderCommand::Fetch(range.clone())) + .unwrap(); } - if length == 0 { return Ok(0); } while !download_status.downloaded.contains(offset) { trace!("waiting for download"); - download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0; + download_status = self + .shared + .cond + .wait_timeout(download_status, Duration::from_millis(1000)) + .unwrap() + .0; trace!("re-checking data availability at offset {}.", offset); } let available_length = download_status.downloaded.contained_length_from_value(offset); assert!(available_length > 0); drop(download_status); - self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap(); let read_len = min(length, available_length); let read_len = try!(self.read_file.read(&mut output[..read_len])); @@ -979,8 +1024,9 @@ impl Read for AudioFileStreaming { trace!("read successfully at postion {} (length : {})", offset, read_len); self.position += read_len as u64; - self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed); - + self.shared + .read_position + .store(self.position as usize, atomic::Ordering::Relaxed); return Ok(read_len); } @@ -990,7 +1036,9 @@ impl Seek for AudioFileStreaming { fn seek(&mut self, pos: SeekFrom) -> io::Result { self.position = try!(self.read_file.seek(pos)); // Do not seek past EOF - self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed); + self.shared + .read_position + .store(self.position as usize, atomic::Ordering::Relaxed); Ok(self.position) } } diff --git a/audio/src/lib.rs b/audio/src/lib.rs index 845ba5f..9a82f90 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -3,13 +3,13 @@ extern crate futures; #[macro_use] extern crate log; +extern crate aes_ctr; extern crate bit_set; extern crate byteorder; extern crate bytes; extern crate num_bigint; extern crate num_traits; extern crate tempfile; -extern crate aes_ctr; extern crate librespot_core; @@ -25,7 +25,10 @@ mod range_set; pub use decrypt::AudioDecrypt; pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController}; -pub use fetch::{READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS}; +pub use fetch::{ + READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS, + READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, +}; #[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))] pub use lewton_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; diff --git a/audio/src/range_set.rs b/audio/src/range_set.rs index 835477b..448c097 100644 --- a/audio/src/range_set.rs +++ b/audio/src/range_set.rs @@ -1,9 +1,6 @@ - -use std::cmp::{max,min}; -use std::slice::Iter; +use std::cmp::{max, min}; use std::fmt; - - +use std::slice::Iter; #[derive(Copy, Clone)] pub struct Range { @@ -13,27 +10,23 @@ pub struct Range { impl fmt::Display for Range { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - return write!(f, "[{}, {}]", self.start, self.start+self.length-1); + return write!(f, "[{}, {}]", self.start, self.start + self.length - 1); } } - impl Range { - pub fn new(start: usize, length: usize) -> Range { return Range { start: start, length: length, - } + }; } pub fn end(&self) -> usize { return self.start + self.length; } - } - #[derive(Clone)] pub struct RangeSet { ranges: Vec, @@ -49,11 +42,9 @@ impl fmt::Display for RangeSet { } } - - impl RangeSet { pub fn new() -> RangeSet { - RangeSet{ + RangeSet { ranges: Vec::::new(), } } @@ -98,7 +89,6 @@ impl RangeSet { } } return 0; - } #[allow(dead_code)] @@ -111,23 +101,20 @@ impl RangeSet { return true; } - - pub fn add_range(&mut self, range:&Range) { - + pub fn add_range(&mut self, range: &Range) { if range.length <= 0 { // the interval is empty or invalid -> nothing to do. return; } - for index in 0..self.ranges.len() { // the new range is clear of any ranges we already iterated over. - if range.end() < self.ranges[index].start{ + if range.end() < self.ranges[index].start { // the new range starts after anything we already passed and ends before the next range starts (they don't touch) -> insert it. self.ranges.insert(index, range.clone()); return; - - } else if range.start <= self.ranges[index].end() && self.ranges[index].start <= range.end() { + } else if range.start <= self.ranges[index].end() && self.ranges[index].start <= range.end() + { // the new range overlaps (or touches) the first range. They are to be merged. // In addition we might have to merge further ranges in as well. @@ -142,7 +129,6 @@ impl RangeSet { self.ranges.insert(index, new_range); return; - } } @@ -165,7 +151,6 @@ impl RangeSet { } pub fn subtract_range(&mut self, range: &Range) { - if range.length <= 0 { return; } @@ -175,8 +160,7 @@ impl RangeSet { if range.end() <= self.ranges[index].start { // the remaining ranges are past the one to subtract. -> we're done. - return - + return; } else if range.start <= self.ranges[index].start && self.ranges[index].start < range.end() { // the range to subtract started before the current range and reaches into the current range // -> we have to remove the beginning of the range or the entire range and do the same for following ranges. @@ -191,7 +175,6 @@ impl RangeSet { } return; - } else if range.end() < self.ranges[index].end() { // the range to subtract punches a hole into the current range -> we need to create two smaller ranges. @@ -206,11 +189,9 @@ impl RangeSet { self.ranges.insert(index, first_range); return; - } else if range.start < self.ranges[index].end() { // the range truncates the existing range -> truncate the range. Let the for loop take care of overlaps with other ranges. self.ranges[index].length = range.start - self.ranges[index].start; - } } } @@ -245,19 +226,15 @@ impl RangeSet { let new_start = max(self.ranges[self_index].start, other.ranges[other_index].start); let new_end = min(self.ranges[self_index].end(), other.ranges[other_index].end()); assert!(new_start <= new_end); - result.add_range(&Range::new(new_start, new_end-new_start)); + result.add_range(&Range::new(new_start, new_end - new_start)); if self.ranges[self_index].end() <= other.ranges[other_index].end() { self_index += 1; } else { other_index += 1; } - } - } return result; } - } - diff --git a/core/src/channel.rs b/core/src/channel.rs index 276f2bf..a4785eb 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use futures::sync::{mpsc, BiLock}; use futures::{Async, Poll, Stream}; use std::collections::HashMap; -use std::time::{Instant}; +use std::time::Instant; use util::SeqGenerator; @@ -64,11 +64,11 @@ impl ChannelManager { let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref()); self.lock(|inner| { - let current_time = Instant::now(); if let Some(download_measurement_start) = inner.download_measurement_start { if (current_time - download_measurement_start).as_millis() > 1000 { - inner.download_rate_estimate = 1000 * inner.download_measurement_bytes / (current_time - download_measurement_start).as_millis() as usize; + inner.download_rate_estimate = 1000 * inner.download_measurement_bytes + / (current_time - download_measurement_start).as_millis() as usize; inner.download_measurement_start = Some(current_time); inner.download_measurement_bytes = 0; } @@ -85,12 +85,8 @@ impl ChannelManager { } pub fn get_download_rate_estimate(&self) -> usize { - return self.lock(|inner| { - inner.download_rate_estimate - }); - + return self.lock(|inner| inner.download_rate_estimate); } - } impl Channel { diff --git a/playback/src/player.rs b/playback/src/player.rs index bdccea3..a54a577 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -4,20 +4,23 @@ use futures::sync::oneshot; use futures::{future, Future}; use std; use std::borrow::Cow; +use std::cmp::max; use std::io::{Read, Result, Seek, SeekFrom}; use std::mem; use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; use std::thread; use std::time::Duration; -use std::cmp::max; use config::{Bitrate, PlayerConfig}; use librespot_core::session::Session; use librespot_core::spotify_id::SpotifyId; use audio::{AudioDecrypt, AudioFile, StreamLoaderController}; -use audio::{READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS}; use audio::{VorbisDecoder, VorbisPacket}; +use audio::{ + READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS, + READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, +}; use audio_backend::Sink; use metadata::{AudioItem, FileFormat}; use mixer::AudioFilter; @@ -244,7 +247,14 @@ impl PlayerState { use self::PlayerState::*; match *self { Stopped | EndOfTrack { .. } => None, - Paused { ref mut stream_loader_controller, .. } | Playing { ref mut stream_loader_controller, .. } => Some(stream_loader_controller), + Paused { + ref mut stream_loader_controller, + .. + } + | Playing { + ref mut stream_loader_controller, + .. + } => Some(stream_loader_controller), Invalid => panic!("invalid state"), } } @@ -273,7 +283,7 @@ impl PlayerState { end_of_track, normalisation_factor, stream_loader_controller, - bytes_per_second + bytes_per_second, } => { *self = Playing { track_id: track_id, @@ -426,7 +436,12 @@ impl PlayerInternal { } match self.load_track(track_id, position as i64) { - Some((decoder, normalisation_factor, stream_loader_controller, bytes_per_second)) => { + Some(( + decoder, + normalisation_factor, + stream_loader_controller, + bytes_per_second, + )) => { if play { match self.state { PlayerState::Playing { @@ -503,25 +518,27 @@ impl PlayerInternal { if let Some(stream_loader_controller) = self.state.stream_loader_controller() { stream_loader_controller.set_stream_mode(); } - if let PlayerState::Playing{bytes_per_second, ..} = self.state { + if let PlayerState::Playing { bytes_per_second, .. } = self.state { if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - // Request our read ahead range let request_data_length = max( - (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * (0.001 * stream_loader_controller.ping_time_ms() as f64) * bytes_per_second as f64) as usize, - (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * (0.001 * stream_loader_controller.ping_time_ms() as f64) + * bytes_per_second as f64) as usize, + (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, ); stream_loader_controller.fetch_next(request_data_length); // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete. let wait_for_data_length = max( - (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS * (0.001 * stream_loader_controller.ping_time_ms() as f64) * bytes_per_second as f64) as usize, - (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize + (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS + * (0.001 * stream_loader_controller.ping_time_ms() as f64) + * bytes_per_second as f64) as usize, + (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, ); stream_loader_controller.fetch_next_blocking(wait_for_data_length); } } - } PlayerCommand::Play => { @@ -584,9 +601,9 @@ impl PlayerInternal { fn stream_data_rate(&self, format: FileFormat) -> usize { match format { - FileFormat::OGG_VORBIS_96 => 12 * 1024, + FileFormat::OGG_VORBIS_96 => 12 * 1024, FileFormat::OGG_VORBIS_160 => 20 * 1024, - FileFormat::OGG_VORBIS_320=> 40 * 1024, + FileFormat::OGG_VORBIS_320 => 40 * 1024, FileFormat::MP3_256 => 32 * 1024, FileFormat::MP3_320 => 40 * 1024, FileFormat::MP3_160 => 20 * 1024, @@ -601,7 +618,11 @@ impl PlayerInternal { } } - fn load_track(&self, spotify_id: SpotifyId, position: i64) -> Option<(Decoder, f32, StreamLoaderController, usize)> { + fn load_track( + &self, + spotify_id: SpotifyId, + position: i64, + ) -> Option<(Decoder, f32, StreamLoaderController, usize)> { let audio = AudioItem::get_audio_item(&self.session, spotify_id) .wait() .unwrap(); @@ -646,10 +667,11 @@ impl PlayerInternal { }; let bytes_per_second = self.stream_data_rate(*format); - let play_from_beginning = position==0; + let play_from_beginning = position == 0; let key = self.session.audio_key().request(spotify_id, file_id); - let encrypted_file = AudioFile::open(&self.session, file_id, bytes_per_second, play_from_beginning); + let encrypted_file = + AudioFile::open(&self.session, file_id, bytes_per_second, play_from_beginning); let encrypted_file = encrypted_file.wait().unwrap(); @@ -663,7 +685,6 @@ impl PlayerInternal { stream_loader_controller.set_random_access_mode(); } - let key = key.wait().unwrap(); let mut decrypted_file = AudioDecrypt::new(key, encrypted_file); @@ -687,7 +708,12 @@ impl PlayerInternal { stream_loader_controller.set_stream_mode(); } info!("<{}> loaded", audio.name); - Some((decoder, normalisation_factor, stream_loader_controller, bytes_per_second)) + Some(( + decoder, + normalisation_factor, + stream_loader_controller, + bytes_per_second, + )) } } From e4134806dfa4287608f6f674c6fc9ab4dc7f35d7 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Mon, 11 Nov 2019 18:43:41 +1100 Subject: [PATCH 18/19] Remove debug messages. --- audio/src/fetch.rs | 47 +++++++++------------------------------------- 1 file changed, 9 insertions(+), 38 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 80df21a..2331b5c 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -460,13 +460,6 @@ fn request_range(session: &Session, file: FileId, offset: usize, length: usize) let (id, channel) = session.channel().allocate(); - trace!( - "requesting range starting at {} of length {} on channel {}.", - offset, - length, - id - ); - let mut data: Vec = Vec::new(); data.write_u16::(id).unwrap(); data.write_u8(0).unwrap(); @@ -576,7 +569,6 @@ impl Future for AudioFileFetchDataReceiver { } } let data_size = data.len(); - trace!("data_receiver for range {} (+{}) got {} bytes of data starting at {}. ({} bytes pending).", self.initial_data_offset, self.initial_request_length, data_size, self.data_offset, self.request_length - data_size); let _ = self .file_data_tx .unbounded_send(ReceivedData::Data(PartialFileData { @@ -591,11 +583,6 @@ impl Future for AudioFileFetchDataReceiver { self.request_length -= data_size; } if self.request_length == 0 { - trace!( - "Data receiver for range {} (+{}) completed.", - self.initial_data_offset, - self.initial_request_length - ); self.finish(); return Ok(Async::Ready(())); } @@ -784,7 +771,7 @@ impl AudioFileFetch { return Ok(Async::Ready(())); } Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => { - trace!("Received ping time estimate: {} ms.", response_time_ms); + trace!("Ping time estimated as: {} ms.", response_time_ms); // record the response time self.network_response_times_ms.push(response_time_ms); @@ -841,12 +828,6 @@ impl AudioFileFetch { full = true; } - trace!( - "Downloaded: {} Requested: {}", - download_status.downloaded, - download_status.requested.minus(&download_status.downloaded) - ); - drop(download_status); } @@ -888,7 +869,6 @@ impl AudioFileFetch { } fn finish(&mut self) { - trace!("====== FINISHED DOWNLOADING FILE! ======"); let mut output = self.output.take().unwrap(); let complete_tx = self.complete_tx.take().unwrap(); @@ -935,13 +915,6 @@ impl Future for AudioFileFetch { ); if bytes_pending < desired_pending_bytes { - trace!( - "Prefetching more data. pending: {}, desired: {}, ping: {}, rate: {}", - bytes_pending, - desired_pending_bytes, - ping_time_seconds, - download_rate - ); self.pre_fetch_more_data(desired_pending_bytes - bytes_pending); } } @@ -982,18 +955,11 @@ impl Read for AudioFileStreaming { let mut ranges_to_request = RangeSet::new(); ranges_to_request.add_range(&Range::new(offset, length_to_request)); - trace!("reading at postion {} (length : {})", offset, length); - let mut download_status = self.shared.download_status.lock().unwrap(); ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); for range in ranges_to_request.iter() { - trace!( - "requesting data at position {} (length : {})", - range.start, - range.length - ); self.stream_loader_command_tx .unbounded_send(StreamLoaderCommand::Fetch(range.clone())) .unwrap(); @@ -1003,15 +969,18 @@ impl Read for AudioFileStreaming { return Ok(0); } + let mut download_message_printed = false; while !download_status.downloaded.contains(offset) { - trace!("waiting for download"); + if !download_message_printed { + debug!("Waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded)); + download_message_printed = true; + } download_status = self .shared .cond .wait_timeout(download_status, Duration::from_millis(1000)) .unwrap() .0; - trace!("re-checking data availability at offset {}.", offset); } let available_length = download_status.downloaded.contained_length_from_value(offset); assert!(available_length > 0); @@ -1021,7 +990,9 @@ impl Read for AudioFileStreaming { let read_len = min(length, available_length); let read_len = try!(self.read_file.read(&mut output[..read_len])); - trace!("read successfully at postion {} (length : {})", offset, read_len); + if download_message_printed { + debug!("Read at postion {} completed. {} bytes returned, {} bytes were requested.", offset, read_len, output.len()); + } self.position += read_len as u64; self.shared From 5d8c9f8860e09ad98b23243659b592ee1be47e5c Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Mon, 11 Nov 2019 19:00:19 +1100 Subject: [PATCH 19/19] Hide waiting for download message during seek. --- audio/src/fetch.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 2331b5c..69e34d2 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -971,9 +971,11 @@ impl Read for AudioFileStreaming { let mut download_message_printed = false; while !download_status.downloaded.contains(offset) { - if !download_message_printed { - debug!("Waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded)); - download_message_printed = true; + if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() { + if !download_message_printed { + debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded)); + download_message_printed = true; + } } download_status = self .shared