From 362106df622126f8ee2af3038a7eeb45b424229f Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Wed, 22 Jan 2020 16:58:21 +1100 Subject: [PATCH 01/14] Fix error handling for closed channel. fixes #417 --- core/src/channel.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/channel.rs b/core/src/channel.rs index a4785eb..d86b252 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -92,7 +92,8 @@ impl ChannelManager { impl Channel { fn recv_packet(&mut self) -> Poll { let (cmd, packet) = match self.receiver.poll() { - Ok(Async::Ready(t)) => t.expect("channel closed"), + Ok(Async::Ready(Some(t))) => t, + Ok(Async::Ready(None)) => return Err(ChannelError), // The channel has been closed. Ok(Async::NotReady) => return Ok(Async::NotReady), Err(()) => unreachable!(), }; From b6c676ad600b461d96ac999a87fc805f10985177 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Wed, 22 Jan 2020 21:55:45 +1100 Subject: [PATCH 02/14] Prevent librespot from panicking if server connection is lost. --- core/src/session.rs | 9 +++++++-- src/main.rs | 3 ++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/session.rs b/core/src/session.rs index e15c6b8..7695eb5 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -283,13 +283,18 @@ where loop { let (cmd, data) = match self.0.poll() { - Ok(Async::Ready(t)) => t, + Ok(Async::Ready(Some(t))) => t, + Ok(Async::Ready(None)) => { + warn!("Connection to server closed."); + session.shutdown(); + return Ok(Async::Ready(())); + }, Ok(Async::NotReady) => return Ok(Async::NotReady), Err(e) => { session.shutdown(); return Err(From::from(e)); } - }.expect("connection closed"); + }; session.dispatch(cmd, data); } diff --git a/src/main.rs b/src/main.rs index e193257..fcced12 100644 --- a/src/main.rs +++ b/src/main.rs @@ -500,7 +500,8 @@ impl Future for Main { if self.shutdown { return Ok(Async::Ready(())); } else { - panic!("Spirc shut down unexpectedly"); + warn!("Spirc shut down unexpectedly"); + self.spirc_task = None; } } } From 3fe384958833e983ac58da25094ffdc4e0b8d27c Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Thu, 23 Jan 2020 01:14:43 +1100 Subject: [PATCH 03/14] Enable Mercury to be shut down and all pending requests being cancelled. --- core/src/mercury/mod.rs | 42 ++++++++++++++++++++++++++++------------- core/src/session.rs | 1 + 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 0b69d8e..2fb6b87 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -20,6 +20,7 @@ component! { sequence: SeqGenerator = SeqGenerator::new(0), pending: HashMap, MercuryPending> = HashMap::new(), subscriptions: Vec<(String, mpsc::UnboundedSender)> = Vec::new(), + is_shutdown: bool = false, } } @@ -61,7 +62,11 @@ impl MercuryManager { }; let seq = self.next_seq(); - self.lock(|inner| inner.pending.insert(seq.clone(), pending)); + self.lock(|inner| { + if !inner.is_shutdown { + inner.pending.insert(seq.clone(), pending); + } + }); let cmd = req.method.command(); let data = req.encode(&seq); @@ -109,21 +114,23 @@ impl MercuryManager { let (tx, rx) = mpsc::unbounded(); manager.lock(move |inner| { - debug!("subscribed uri={} count={}", uri, response.payload.len()); - if response.payload.len() > 0 { - // Old subscription protocol, watch the provided list of URIs - for sub in response.payload { - let mut sub: protocol::pubsub::Subscription = - protobuf::parse_from_bytes(&sub).unwrap(); - let sub_uri = sub.take_uri(); + if !inner.is_shutdown { + debug!("subscribed uri={} count={}", uri, response.payload.len()); + if response.payload.len() > 0 { + // Old subscription protocol, watch the provided list of URIs + for sub in response.payload { + let mut sub: protocol::pubsub::Subscription = + protobuf::parse_from_bytes(&sub).unwrap(); + let sub_uri = sub.take_uri(); - debug!("subscribed sub_uri={}", sub_uri); + debug!("subscribed sub_uri={}", sub_uri); - inner.subscriptions.push((sub_uri, tx.clone())); + inner.subscriptions.push((sub_uri, tx.clone())); + } + } else { + // New subscription protocol, watch the requested URI + inner.subscriptions.push((uri, tx)); } - } else { - // New subscription protocol, watch the requested URI - inner.subscriptions.push((uri, tx)); } }); @@ -222,4 +229,13 @@ impl MercuryManager { } } } + + pub(crate) fn shutdown(&self) { + self.lock(|inner| { + inner.is_shutdown = true; + // destroy the sending halves of the channels to signal everyone who is waiting for something. + inner.pending.clear(); + inner.subscriptions.clear(); + }); + } } diff --git a/core/src/session.rs b/core/src/session.rs index 7695eb5..e661e4f 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -237,6 +237,7 @@ impl Session { pub fn shutdown(&self) { debug!("Invalidating session[{}]", self.0.session_id); self.0.data.write().unwrap().invalid = true; + self.mercury().shutdown(); } pub fn is_invalid(&self) -> bool { From 04b52d7878d3ad52233007aafd19ce5dc86ff056 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Thu, 23 Jan 2020 01:15:30 +1100 Subject: [PATCH 04/14] Have player handle Mercury errors while loading tracks. --- playback/src/player.rs | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/playback/src/player.rs b/playback/src/player.rs index a54a577..3bcdef4 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -623,9 +623,15 @@ impl PlayerInternal { spotify_id: SpotifyId, position: i64, ) -> Option<(Decoder, f32, StreamLoaderController, usize)> { - let audio = AudioItem::get_audio_item(&self.session, spotify_id) - .wait() - .unwrap(); + + let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() { + Ok(audio) => audio, + Err(_) => { + error!("Unable to load audio item."); + return None + }, + }; + info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri); let audio = match self.find_available_alternative(&audio) { @@ -673,7 +679,13 @@ impl PlayerInternal { let encrypted_file = AudioFile::open(&self.session, file_id, bytes_per_second, play_from_beginning); - let encrypted_file = encrypted_file.wait().unwrap(); + let encrypted_file = match encrypted_file.wait() { + Ok(encrypted_file) => encrypted_file, + Err(_) => { + error!("Unable to load encrypted file."); + return None; + } + }; let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(); @@ -685,7 +697,14 @@ impl PlayerInternal { stream_loader_controller.set_random_access_mode(); } - let key = key.wait().unwrap(); + let key = match key.wait() { + Ok(key) => key, + Err(_) => { + error!("Unable to load decryption key"); + return None; + } + }; + let mut decrypted_file = AudioDecrypt::new(key, encrypted_file); let normalisation_factor = match NormalisationData::parse_from_file(&mut decrypted_file) { From ea1e0925dcf0717641794f3f4b5d999389723329 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Thu, 23 Jan 2020 01:23:34 +1100 Subject: [PATCH 05/14] Enable proper shutdown of the channels. --- core/src/channel.rs | 13 ++++++++++++- core/src/session.rs | 1 + 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/core/src/channel.rs b/core/src/channel.rs index d86b252..98ab62e 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -14,6 +14,7 @@ component! { download_rate_estimate: usize = 0, download_measurement_start: Option = None, download_measurement_bytes: usize = 0, + is_shutdown: bool = false, } } @@ -46,7 +47,9 @@ impl ChannelManager { let seq = self.lock(|inner| { let seq = inner.sequence.get(); - inner.channels.insert(seq, tx); + if !inner.is_shutdown { + inner.channels.insert(seq, tx); + } seq }); @@ -87,6 +90,14 @@ impl ChannelManager { pub fn get_download_rate_estimate(&self) -> usize { return self.lock(|inner| inner.download_rate_estimate); } + + pub(crate) fn shutdown(&self) { + self.lock(|inner| { + inner.is_shutdown = true; + // destroy the sending halves of the channels to signal everyone who is waiting for something. + inner.channels.clear(); + }); + } } impl Channel { diff --git a/core/src/session.rs b/core/src/session.rs index e661e4f..3f30e28 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -238,6 +238,7 @@ impl Session { debug!("Invalidating session[{}]", self.0.session_id); self.0.data.write().unwrap().invalid = true; self.mercury().shutdown(); + self.channel().shutdown(); } pub fn is_invalid(&self) -> bool { From 719943aec9f5e7f66563e288c970207cb154d919 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Thu, 23 Jan 2020 01:24:59 +1100 Subject: [PATCH 06/14] Don't panic if spirc terminates prematurely. Instead attempt to reconnect and wait for new client connections. --- src/main.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main.rs b/src/main.rs index fcced12..b965904 100644 --- a/src/main.rs +++ b/src/main.rs @@ -381,6 +381,7 @@ struct Main { connect: Box>, shutdown: bool, + last_credentials: Option, player_event_channel: Option>, player_event_program: Option, @@ -404,6 +405,7 @@ impl Main { spirc: None, spirc_task: None, shutdown: false, + last_credentials: None, signal: Box::new(tokio_signal::ctrl_c().flatten_stream()), player_event_channel: None, @@ -425,6 +427,7 @@ impl Main { } fn credentials(&mut self, credentials: Credentials) { + self.last_credentials = Some(credentials.clone()); let config = self.session_config.clone(); let handle = self.handle.clone(); @@ -502,6 +505,10 @@ impl Future for Main { } else { warn!("Spirc shut down unexpectedly"); self.spirc_task = None; + if let Some(credentials) = self.last_credentials.clone() { + self.credentials(credentials); + progress = true; + } } } } From 17821b26aaa1860fb0e714f7e792c7088e0f1a20 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Thu, 23 Jan 2020 19:05:54 +1100 Subject: [PATCH 07/14] Rename variable to be in line with existing code. --- core/src/channel.rs | 6 +++--- core/src/mercury/mod.rs | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/channel.rs b/core/src/channel.rs index 98ab62e..daf02c7 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -14,7 +14,7 @@ component! { download_rate_estimate: usize = 0, download_measurement_start: Option = None, download_measurement_bytes: usize = 0, - is_shutdown: bool = false, + invalid: bool = false, } } @@ -47,7 +47,7 @@ impl ChannelManager { let seq = self.lock(|inner| { let seq = inner.sequence.get(); - if !inner.is_shutdown { + if !inner.invalid { inner.channels.insert(seq, tx); } seq @@ -93,7 +93,7 @@ impl ChannelManager { pub(crate) fn shutdown(&self) { self.lock(|inner| { - inner.is_shutdown = true; + inner.invalid = true; // destroy the sending halves of the channels to signal everyone who is waiting for something. inner.channels.clear(); }); diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 2fb6b87..1eb0e7e 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -20,7 +20,7 @@ component! { sequence: SeqGenerator = SeqGenerator::new(0), pending: HashMap, MercuryPending> = HashMap::new(), subscriptions: Vec<(String, mpsc::UnboundedSender)> = Vec::new(), - is_shutdown: bool = false, + invalid: bool = false, } } @@ -63,7 +63,7 @@ impl MercuryManager { let seq = self.next_seq(); self.lock(|inner| { - if !inner.is_shutdown { + if !inner.invalid { inner.pending.insert(seq.clone(), pending); } }); @@ -114,7 +114,7 @@ impl MercuryManager { let (tx, rx) = mpsc::unbounded(); manager.lock(move |inner| { - if !inner.is_shutdown { + if !inner.invalid { debug!("subscribed uri={} count={}", uri, response.payload.len()); if response.payload.len() > 0 { // Old subscription protocol, watch the provided list of URIs @@ -232,7 +232,7 @@ impl MercuryManager { pub(crate) fn shutdown(&self) { self.lock(|inner| { - inner.is_shutdown = true; + inner.invalid = true; // destroy the sending halves of the channels to signal everyone who is waiting for something. inner.pending.clear(); inner.subscriptions.clear(); From f26db0111063619ca2e3652273a0ab6da785c289 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Thu, 23 Jan 2020 19:09:26 +1100 Subject: [PATCH 08/14] Rate-limit automatic re-connection attempts when spirc shuts down. --- src/main.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/main.rs b/src/main.rs index b965904..988f62b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,7 @@ use std::str::FromStr; use tokio_core::reactor::{Core, Handle}; use tokio_io::IoStream; use url::Url; +use std::time::Instant; use librespot::core::authentication::{get_credentials, Credentials}; use librespot::core::cache::Cache; @@ -382,6 +383,7 @@ struct Main { shutdown: bool, last_credentials: Option, + auto_connect_times: Vec, player_event_channel: Option>, player_event_program: Option, @@ -406,6 +408,7 @@ impl Main { spirc_task: None, shutdown: false, last_credentials: None, + auto_connect_times: Vec::new(), signal: Box::new(tokio_signal::ctrl_c().flatten_stream()), player_event_channel: None, @@ -454,6 +457,7 @@ impl Future for Main { if let Some(ref spirc) = self.spirc { spirc.shutdown(); } + self.auto_connect_times.clear(); self.credentials(creds); progress = true; @@ -505,7 +509,16 @@ impl Future for Main { } else { warn!("Spirc shut down unexpectedly"); self.spirc_task = None; + + while (!self.auto_connect_times.is_empty()) && ((Instant::now() - self.auto_connect_times[0]).as_secs() > 600) { + let _ = self.auto_connect_times.remove(0); + } + if self.auto_connect_times.len() >= 5 { + error!("Spirc shut down too often. Exiting to avoid too many login attempts."); + return Ok(Async::Ready(())); + } if let Some(credentials) = self.last_credentials.clone() { + self.auto_connect_times.push(Instant::now()); self.credentials(credentials); progress = true; } From dadab486d2ec8a86b30f618d17599c1a563f2163 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Thu, 23 Jan 2020 19:51:09 +1100 Subject: [PATCH 09/14] Don't exit if too many spirc failures. --- src/main.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index 988f62b..a461857 100644 --- a/src/main.rs +++ b/src/main.rs @@ -513,16 +513,17 @@ impl Future for Main { while (!self.auto_connect_times.is_empty()) && ((Instant::now() - self.auto_connect_times[0]).as_secs() > 600) { let _ = self.auto_connect_times.remove(0); } - if self.auto_connect_times.len() >= 5 { - error!("Spirc shut down too often. Exiting to avoid too many login attempts."); - return Ok(Async::Ready(())); - } + if let Some(credentials) = self.last_credentials.clone() { - self.auto_connect_times.push(Instant::now()); - self.credentials(credentials); - progress = true; + if self.auto_connect_times.len() >= 5 { + warn!("Spirc shut down too often. Not reconnecting automatically."); + } else { + self.auto_connect_times.push(Instant::now()); + self.credentials(credentials); + } } } + progress = true; } } From 0e22678a281fca2b746c4f2210e905398c9e9415 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Thu, 23 Jan 2020 21:10:55 +1100 Subject: [PATCH 10/14] Workaround for Rust 1.33 borrow checker. --- src/main.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/main.rs b/src/main.rs index a461857..e2c5f51 100644 --- a/src/main.rs +++ b/src/main.rs @@ -502,6 +502,7 @@ impl Future for Main { progress = true; } + let mut try_to_reconnect = false; if let Some(ref mut spirc_task) = self.spirc_task { if let Async::Ready(()) = spirc_task.poll().unwrap() { if self.shutdown { @@ -510,22 +511,26 @@ impl Future for Main { warn!("Spirc shut down unexpectedly"); self.spirc_task = None; - while (!self.auto_connect_times.is_empty()) && ((Instant::now() - self.auto_connect_times[0]).as_secs() > 600) { - let _ = self.auto_connect_times.remove(0); - } - - if let Some(credentials) = self.last_credentials.clone() { - if self.auto_connect_times.len() >= 5 { - warn!("Spirc shut down too often. Not reconnecting automatically."); - } else { - self.auto_connect_times.push(Instant::now()); - self.credentials(credentials); - } - } + try_to_reconnect = true; } progress = true; } } + if try_to_reconnect { + while (!self.auto_connect_times.is_empty()) && ((Instant::now() - self.auto_connect_times[0]).as_secs() > 600) { + let _ = self.auto_connect_times.remove(0); + } + + if let Some(credentials) = self.last_credentials.clone() { + if self.auto_connect_times.len() >= 5 { + warn!("Spirc shut down too often. Not reconnecting automatically."); + } else { + self.auto_connect_times.push(Instant::now()); + self.credentials(credentials); + } + } + } + if let Some(ref mut player_event_channel) = self.player_event_channel { if let Async::Ready(Some(event)) = player_event_channel.poll().unwrap() { From a52092e094e6e721c043d44260e2498d4848ffc9 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Fri, 24 Jan 2020 10:12:16 +1100 Subject: [PATCH 11/14] Convert another panic to error handling. Compatibility fix for Rust 1.33. --- connect/src/spirc.rs | 6 +++++- src/main.rs | 9 ++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 9594082..132c9fc 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -333,7 +333,11 @@ impl Future for SpircTask { progress = true; self.handle_frame(frame); } - Async::Ready(None) => panic!("subscription terminated"), + Async::Ready(None) => { + error!("subscription terminated"); + self.shutdown = true; + self.commands.close(); + }, Async::NotReady => (), } diff --git a/src/main.rs b/src/main.rs index e2c5f51..84b2fdd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -502,21 +502,20 @@ impl Future for Main { progress = true; } - let mut try_to_reconnect = false; + let mut drop_spirc_and_try_to_reconnect = false; if let Some(ref mut spirc_task) = self.spirc_task { if let Async::Ready(()) = spirc_task.poll().unwrap() { if self.shutdown { return Ok(Async::Ready(())); } else { warn!("Spirc shut down unexpectedly"); - self.spirc_task = None; - - try_to_reconnect = true; + drop_spirc_and_try_to_reconnect = true; } progress = true; } } - if try_to_reconnect { + if drop_spirc_and_try_to_reconnect { + self.spirc_task = None; while (!self.auto_connect_times.is_empty()) && ((Instant::now() - self.auto_connect_times[0]).as_secs() > 600) { let _ = self.auto_connect_times.remove(0); } From e9c3357e4134d6fc873d54389bcd32de8ec469f1 Mon Sep 17 00:00:00 2001 From: Sasha Hilton Date: Fri, 24 Jan 2020 02:26:16 +0100 Subject: [PATCH 12/14] Run cargo fmt --- connect/src/spirc.rs | 2 +- core/src/session.rs | 2 +- playback/src/player.rs | 5 ++--- src/main.rs | 7 ++++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 1cbcc2b..f1b4180 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -337,7 +337,7 @@ impl Future for SpircTask { error!("subscription terminated"); self.shutdown = true; self.commands.close(); - }, + } Async::NotReady => (), } diff --git a/core/src/session.rs b/core/src/session.rs index 53e7a2d..4d86a02 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -296,7 +296,7 @@ where warn!("Connection to server closed."); session.shutdown(); return Ok(Async::Ready(())); - }, + } Ok(Async::NotReady) => return Ok(Async::NotReady), Err(e) => { session.shutdown(); diff --git a/playback/src/player.rs b/playback/src/player.rs index 3f7152b..38ee00c 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -636,13 +636,12 @@ impl PlayerInternal { spotify_id: SpotifyId, position: i64, ) -> Option<(Decoder, f32, StreamLoaderController, usize)> { - let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() { Ok(audio) => audio, Err(_) => { error!("Unable to load audio item."); - return None - }, + return None; + } }; info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri); diff --git a/src/main.rs b/src/main.rs index 36a3ff4..9f41f2c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,10 +8,10 @@ use std::mem; use std::path::PathBuf; use std::process::exit; use std::str::FromStr; +use std::time::Instant; use tokio_core::reactor::{Core, Handle}; use tokio_io::IoStream; use url::Url; -use std::time::Instant; use librespot::core::authentication::{get_credentials, Credentials}; use librespot::core::cache::Cache; @@ -513,7 +513,9 @@ impl Future for Main { } if drop_spirc_and_try_to_reconnect { self.spirc_task = None; - while (!self.auto_connect_times.is_empty()) && ((Instant::now() - self.auto_connect_times[0]).as_secs() > 600) { + while (!self.auto_connect_times.is_empty()) + && ((Instant::now() - self.auto_connect_times[0]).as_secs() > 600) + { let _ = self.auto_connect_times.remove(0); } @@ -527,7 +529,6 @@ impl Future for Main { } } - if let Some(ref mut player_event_channel) = self.player_event_channel { if let Async::Ready(Some(event)) = player_event_channel.poll().unwrap() { if let Some(ref program) = self.player_event_program { From 65d1c1bf8ed9073b7d409d222c0648912ee80f97 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Wed, 29 Jan 2020 09:45:06 +1100 Subject: [PATCH 13/14] Proper error handling when connecting to the server. --- core/src/connection/mod.rs | 22 ++++++++++++++++++-- src/main.rs | 42 ++++++++++++++++++++++---------------- 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/core/src/connection/mod.rs b/core/src/connection/mod.rs index 5f9b3dc..7249779 100644 --- a/core/src/connection/mod.rs +++ b/core/src/connection/mod.rs @@ -28,9 +28,27 @@ pub fn connect( let (addr, connect_url) = match *proxy { Some(ref url) => { info!("Using proxy \"{}\"", url); - (url.to_socket_addrs().unwrap().next().unwrap(), Some(addr)) + match url.to_socket_addrs().and_then(|mut iter| { + iter.next().ok_or(io::Error::new( + io::ErrorKind::NotFound, + "Can't resolve proxy server address", + )) + }) { + Ok(socket_addr) => (socket_addr, Some(addr)), + Err(error) => return Box::new(futures::future::err(error)), + } + } + None => { + match addr.to_socket_addrs().and_then(|mut iter| { + iter.next().ok_or(io::Error::new( + io::ErrorKind::NotFound, + "Can't resolve server address", + )) + }) { + Ok(socket_addr) => (socket_addr, None), + Err(error) => return Box::new(futures::future::err(error)), + } } - None => (addr.to_socket_addrs().unwrap().next().unwrap(), None), }; let socket = TcpStream::connect(&addr, handle); diff --git a/src/main.rs b/src/main.rs index 9f41f2c..4a74d22 100644 --- a/src/main.rs +++ b/src/main.rs @@ -460,27 +460,33 @@ impl Future for Main { progress = true; } - if let Async::Ready(session) = self.connect.poll().unwrap() { - self.connect = Box::new(futures::future::empty()); - let mixer_config = self.mixer_config.clone(); - let mixer = (self.mixer)(Some(mixer_config)); - let player_config = self.player_config.clone(); - let connect_config = self.connect_config.clone(); + match self.connect.poll() { + Ok(Async::Ready(session)) => { + self.connect = Box::new(futures::future::empty()); + let mixer_config = self.mixer_config.clone(); + let mixer = (self.mixer)(Some(mixer_config)); + let player_config = self.player_config.clone(); + let connect_config = self.connect_config.clone(); - let audio_filter = mixer.get_audio_filter(); - let backend = self.backend; - let device = self.device.clone(); - let (player, event_channel) = - Player::new(player_config, session.clone(), audio_filter, move || { - (backend)(device) - }); + let audio_filter = mixer.get_audio_filter(); + let backend = self.backend; + let device = self.device.clone(); + let (player, event_channel) = + Player::new(player_config, session.clone(), audio_filter, move || { + (backend)(device) + }); - let (spirc, spirc_task) = Spirc::new(connect_config, session, player, mixer); - self.spirc = Some(spirc); - self.spirc_task = Some(spirc_task); - self.player_event_channel = Some(event_channel); + let (spirc, spirc_task) = Spirc::new(connect_config, session, player, mixer); + self.spirc = Some(spirc); + self.spirc_task = Some(spirc_task); + self.player_event_channel = Some(event_channel); - progress = true; + progress = true; + } + Ok(Async::NotReady) => (), + Err(_) => { + self.connect = Box::new(futures::future::empty()); + } } if let Async::Ready(Some(())) = self.signal.poll().unwrap() { From 37f6e3eb9ca52cc695db2d8285bc8e87a115d66a Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Wed, 29 Jan 2020 09:51:26 +1100 Subject: [PATCH 14/14] Print error message on connection failure. --- src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 4a74d22..8ee3b0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -484,7 +484,8 @@ impl Future for Main { progress = true; } Ok(Async::NotReady) => (), - Err(_) => { + Err(error) => { + error!("Could not connect to server: {}", error); self.connect = Box::new(futures::future::empty()); } }