diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 546d818..aed73aa 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -333,7 +333,11 @@ impl Future for SpircTask { progress = true; self.handle_frame(frame); } - Async::Ready(None) => panic!("subscription terminated"), + Async::Ready(None) => { + error!("subscription terminated"); + self.shutdown = true; + self.commands.close(); + } Async::NotReady => (), } diff --git a/core/src/channel.rs b/core/src/channel.rs index 06488b9..b614fac 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -14,6 +14,7 @@ component! { download_rate_estimate: usize = 0, download_measurement_start: Option = None, download_measurement_bytes: usize = 0, + invalid: bool = false, } } @@ -46,7 +47,9 @@ impl ChannelManager { let seq = self.lock(|inner| { let seq = inner.sequence.get(); - inner.channels.insert(seq, tx); + if !inner.invalid { + inner.channels.insert(seq, tx); + } seq }); @@ -87,12 +90,21 @@ impl ChannelManager { pub fn get_download_rate_estimate(&self) -> usize { return self.lock(|inner| inner.download_rate_estimate); } + + pub(crate) fn shutdown(&self) { + self.lock(|inner| { + inner.invalid = true; + // destroy the sending halves of the channels to signal everyone who is waiting for something. + inner.channels.clear(); + }); + } } impl Channel { fn recv_packet(&mut self) -> Poll { let (cmd, packet) = match self.receiver.poll() { - Ok(Async::Ready(t)) => t.expect("channel closed"), + Ok(Async::Ready(Some(t))) => t, + Ok(Async::Ready(None)) => return Err(ChannelError), // The channel has been closed. Ok(Async::NotReady) => return Ok(Async::NotReady), Err(()) => unreachable!(), }; diff --git a/core/src/connection/mod.rs b/core/src/connection/mod.rs index 5f9b3dc..7249779 100644 --- a/core/src/connection/mod.rs +++ b/core/src/connection/mod.rs @@ -28,9 +28,27 @@ pub fn connect( let (addr, connect_url) = match *proxy { Some(ref url) => { info!("Using proxy \"{}\"", url); - (url.to_socket_addrs().unwrap().next().unwrap(), Some(addr)) + match url.to_socket_addrs().and_then(|mut iter| { + iter.next().ok_or(io::Error::new( + io::ErrorKind::NotFound, + "Can't resolve proxy server address", + )) + }) { + Ok(socket_addr) => (socket_addr, Some(addr)), + Err(error) => return Box::new(futures::future::err(error)), + } + } + None => { + match addr.to_socket_addrs().and_then(|mut iter| { + iter.next().ok_or(io::Error::new( + io::ErrorKind::NotFound, + "Can't resolve server address", + )) + }) { + Ok(socket_addr) => (socket_addr, None), + Err(error) => return Box::new(futures::future::err(error)), + } } - None => (addr.to_socket_addrs().unwrap().next().unwrap(), None), }; let socket = TcpStream::connect(&addr, handle); diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 89a1ff1..e167f9c 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -20,6 +20,7 @@ component! { sequence: SeqGenerator = SeqGenerator::new(0), pending: HashMap, MercuryPending> = HashMap::new(), subscriptions: Vec<(String, mpsc::UnboundedSender)> = Vec::new(), + invalid: bool = false, } } @@ -61,7 +62,11 @@ impl MercuryManager { }; let seq = self.next_seq(); - self.lock(|inner| inner.pending.insert(seq.clone(), pending)); + self.lock(|inner| { + if !inner.invalid { + inner.pending.insert(seq.clone(), pending); + } + }); let cmd = req.method.command(); let data = req.encode(&seq); @@ -110,21 +115,23 @@ impl MercuryManager { let (tx, rx) = mpsc::unbounded(); manager.lock(move |inner| { - debug!("subscribed uri={} count={}", uri, response.payload.len()); - if response.payload.len() > 0 { - // Old subscription protocol, watch the provided list of URIs - for sub in response.payload { - let mut sub: protocol::pubsub::Subscription = - protobuf::parse_from_bytes(&sub).unwrap(); - let sub_uri = sub.take_uri(); + if !inner.invalid { + debug!("subscribed uri={} count={}", uri, response.payload.len()); + if response.payload.len() > 0 { + // Old subscription protocol, watch the provided list of URIs + for sub in response.payload { + let mut sub: protocol::pubsub::Subscription = + protobuf::parse_from_bytes(&sub).unwrap(); + let sub_uri = sub.take_uri(); - debug!("subscribed sub_uri={}", sub_uri); + debug!("subscribed sub_uri={}", sub_uri); - inner.subscriptions.push((sub_uri, tx.clone())); + inner.subscriptions.push((sub_uri, tx.clone())); + } + } else { + // New subscription protocol, watch the requested URI + inner.subscriptions.push((uri, tx)); } - } else { - // New subscription protocol, watch the requested URI - inner.subscriptions.push((uri, tx)); } }); @@ -223,4 +230,13 @@ impl MercuryManager { } } } + + pub(crate) fn shutdown(&self) { + self.lock(|inner| { + inner.invalid = true; + // destroy the sending halves of the channels to signal everyone who is waiting for something. + inner.pending.clear(); + inner.subscriptions.clear(); + }); + } } diff --git a/core/src/session.rs b/core/src/session.rs index 6b3f1f1..4d86a02 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -243,6 +243,8 @@ impl Session { pub fn shutdown(&self) { debug!("Invalidating session[{}]", self.0.session_id); self.0.data.write().unwrap().invalid = true; + self.mercury().shutdown(); + self.channel().shutdown(); } pub fn is_invalid(&self) -> bool { @@ -289,14 +291,18 @@ where loop { let (cmd, data) = match self.0.poll() { - Ok(Async::Ready(t)) => t, + Ok(Async::Ready(Some(t))) => t, + Ok(Async::Ready(None)) => { + warn!("Connection to server closed."); + session.shutdown(); + return Ok(Async::Ready(())); + } Ok(Async::NotReady) => return Ok(Async::NotReady), Err(e) => { session.shutdown(); return Err(From::from(e)); } - } - .expect("connection closed"); + }; session.dispatch(cmd, data); } diff --git a/playback/src/player.rs b/playback/src/player.rs index 4d1ae4f..38ee00c 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -636,9 +636,14 @@ impl PlayerInternal { spotify_id: SpotifyId, position: i64, ) -> Option<(Decoder, f32, StreamLoaderController, usize)> { - let audio = AudioItem::get_audio_item(&self.session, spotify_id) - .wait() - .unwrap(); + let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() { + Ok(audio) => audio, + Err(_) => { + error!("Unable to load audio item."); + return None; + } + }; + info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri); let audio = match self.find_available_alternative(&audio) { @@ -690,7 +695,13 @@ impl PlayerInternal { play_from_beginning, ); - let encrypted_file = encrypted_file.wait().unwrap(); + let encrypted_file = match encrypted_file.wait() { + Ok(encrypted_file) => encrypted_file, + Err(_) => { + error!("Unable to load encrypted file."); + return None; + } + }; let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(); @@ -702,7 +713,14 @@ impl PlayerInternal { stream_loader_controller.set_random_access_mode(); } - let key = key.wait().unwrap(); + let key = match key.wait() { + Ok(key) => key, + Err(_) => { + error!("Unable to load decryption key"); + return None; + } + }; + let mut decrypted_file = AudioDecrypt::new(key, encrypted_file); let normalisation_factor = match NormalisationData::parse_from_file(&mut decrypted_file) { diff --git a/src/main.rs b/src/main.rs index f94a2ef..8ee3b0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ use std::mem; use std::path::PathBuf; use std::process::exit; use std::str::FromStr; +use std::time::Instant; use tokio_core::reactor::{Core, Handle}; use tokio_io::IoStream; use url::Url; @@ -375,6 +376,8 @@ struct Main { connect: Box>, shutdown: bool, + last_credentials: Option, + auto_connect_times: Vec, player_event_channel: Option>, player_event_program: Option, @@ -398,6 +401,8 @@ impl Main { spirc: None, spirc_task: None, shutdown: false, + last_credentials: None, + auto_connect_times: Vec::new(), signal: Box::new(tokio_signal::ctrl_c().flatten_stream()), player_event_channel: None, @@ -420,6 +425,7 @@ impl Main { } fn credentials(&mut self, credentials: Credentials) { + self.last_credentials = Some(credentials.clone()); let config = self.session_config.clone(); let handle = self.handle.clone(); @@ -448,32 +454,40 @@ impl Future for Main { if let Some(ref spirc) = self.spirc { spirc.shutdown(); } + self.auto_connect_times.clear(); self.credentials(creds); progress = true; } - if let Async::Ready(session) = self.connect.poll().unwrap() { - self.connect = Box::new(futures::future::empty()); - let mixer_config = self.mixer_config.clone(); - let mixer = (self.mixer)(Some(mixer_config)); - let player_config = self.player_config.clone(); - let connect_config = self.connect_config.clone(); + match self.connect.poll() { + Ok(Async::Ready(session)) => { + self.connect = Box::new(futures::future::empty()); + let mixer_config = self.mixer_config.clone(); + let mixer = (self.mixer)(Some(mixer_config)); + let player_config = self.player_config.clone(); + let connect_config = self.connect_config.clone(); - let audio_filter = mixer.get_audio_filter(); - let backend = self.backend; - let device = self.device.clone(); - let (player, event_channel) = - Player::new(player_config, session.clone(), audio_filter, move || { - (backend)(device) - }); + let audio_filter = mixer.get_audio_filter(); + let backend = self.backend; + let device = self.device.clone(); + let (player, event_channel) = + Player::new(player_config, session.clone(), audio_filter, move || { + (backend)(device) + }); - let (spirc, spirc_task) = Spirc::new(connect_config, session, player, mixer); - self.spirc = Some(spirc); - self.spirc_task = Some(spirc_task); - self.player_event_channel = Some(event_channel); + let (spirc, spirc_task) = Spirc::new(connect_config, session, player, mixer); + self.spirc = Some(spirc); + self.spirc_task = Some(spirc_task); + self.player_event_channel = Some(event_channel); - progress = true; + progress = true; + } + Ok(Async::NotReady) => (), + Err(error) => { + error!("Could not connect to server: {}", error); + self.connect = Box::new(futures::future::empty()); + } } if let Async::Ready(Some(())) = self.signal.poll().unwrap() { @@ -492,12 +506,32 @@ impl Future for Main { progress = true; } + let mut drop_spirc_and_try_to_reconnect = false; if let Some(ref mut spirc_task) = self.spirc_task { if let Async::Ready(()) = spirc_task.poll().unwrap() { if self.shutdown { return Ok(Async::Ready(())); } else { - panic!("Spirc shut down unexpectedly"); + warn!("Spirc shut down unexpectedly"); + drop_spirc_and_try_to_reconnect = true; + } + progress = true; + } + } + if drop_spirc_and_try_to_reconnect { + self.spirc_task = None; + while (!self.auto_connect_times.is_empty()) + && ((Instant::now() - self.auto_connect_times[0]).as_secs() > 600) + { + let _ = self.auto_connect_times.remove(0); + } + + if let Some(credentials) = self.last_credentials.clone() { + if self.auto_connect_times.len() >= 5 { + warn!("Spirc shut down too often. Not reconnecting automatically."); + } else { + self.auto_connect_times.push(Instant::now()); + self.credentials(credentials); } } }