diff --git a/audio/Cargo.toml b/audio/Cargo.toml index cde907c..5f6a942 100644 --- a/audio/Cargo.toml +++ b/audio/Cargo.toml @@ -14,7 +14,8 @@ version = "0.1.3" bit-set = "0.5" byteorder = "1.3" bytes = "0.4" -futures = "0.1" +futures = "0.3" +tokio = { version = "0.2", features = ["full"] } # Temp "rt-core", "sync" lewton = "0.9" log = "0.4" num-bigint = "0.3" diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 2214cde..5ed4ccd 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -1,9 +1,6 @@ use crate::range_set::{Range, RangeSet}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use bytes::Bytes; -use futures::sync::{mpsc, oneshot}; -use futures::Stream; -use futures::{Async, Future, Poll}; use std::cmp::{max, min}; use std::fs; use std::io::{self, Read, Seek, SeekFrom, Write}; @@ -11,13 +8,23 @@ use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, Instant}; use tempfile::NamedTempFile; -use futures::sync::mpsc::unbounded; use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; use librespot_core::session::Session; use librespot_core::spotify_id::FileId; use std::sync::atomic; use std::sync::atomic::AtomicUsize; +use futures::{ + channel::{mpsc, mpsc::unbounded, oneshot}, + ready, Future, Stream, +}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use tokio::task; + const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; // The minimum size of a block that is requested from the Spotify servers in one request. // This is the block size that is typically requested while doing a seek() on a file. @@ -329,6 +336,7 @@ impl AudioFileOpenStreaming { complete_tx, ); self.session.spawn(fetcher); + // tokio::spawn(move |_| fetcher); AudioFileStreaming { read_file: read_file, @@ -343,36 +351,37 @@ impl AudioFileOpenStreaming { } impl Future for AudioFileOpen { - type Item = AudioFile; - type Error = ChannelError; + type Output = Result; - fn poll(&mut self) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { match *self { AudioFileOpen::Streaming(ref mut open) => { - let file = try_ready!(open.poll()); - Ok(Async::Ready(AudioFile::Streaming(file))) + let file = ready!(open.poll()); + Poll::Ready(Ok(AudioFile::Streaming(file))) } AudioFileOpen::Cached(ref mut file) => { let file = file.take().unwrap(); - Ok(Async::Ready(AudioFile::Cached(file))) + Poll::Ready(Ok(AudioFile::Cached(file))) } } } } impl Future for AudioFileOpenStreaming { - type Item = AudioFileStreaming; - type Error = ChannelError; + type Output = Result; - fn poll(&mut self) -> Poll { + fn poll( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { loop { - let (id, data) = try_ready!(self.headers.poll()).unwrap(); + let (id, data) = ready!(self.headers.poll()).unwrap(); if id == 0x3 { let size = BigEndian::read_u32(&data) as usize * 4; let file = self.finish(size); - return Ok(Async::Ready(file)); + return Poll::Ready(Ok(file)); } } } @@ -563,13 +572,12 @@ impl AudioFileFetchDataReceiver { } impl Future for AudioFileFetchDataReceiver { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll<(), ()> { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { loop { match self.data_rx.poll() { - Ok(Async::Ready(Some(data))) => { + Poll::Ready(Some(data)) => { if self.measure_ping_time { if let Some(request_sent_time) = self.request_sent_time { let duration = Instant::now() - request_sent_time; @@ -603,26 +611,24 @@ impl Future for AudioFileFetchDataReceiver { } if self.request_length == 0 { self.finish(); - return Ok(Async::Ready(())); + return Poll::Ready(()); } } - Ok(Async::Ready(None)) => { + Poll::Ready(None) => { if self.request_length > 0 { warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length); } self.finish(); - return Ok(Async::Ready(())); - } - Ok(Async::NotReady) => { - return Ok(Async::NotReady); + return Poll::Ready(()); } + Poll::Pending => return Poll::Pending, Err(ChannelError) => { warn!( "Error from channel for data receiver for range {} (+{}).", self.initial_data_offset, self.initial_request_length ); self.finish(); - return Ok(Async::Ready(())); + return Poll::Ready(()); } } } @@ -672,6 +678,7 @@ impl AudioFileFetch { ); session.spawn(initial_data_receiver); + // tokio::spawn(move |_| initial_data_receiver); AudioFileFetch { session: session, @@ -747,6 +754,7 @@ impl AudioFileFetch { ); self.session.spawn(receiver); + // tokio::spawn(move |_| receiver); } } @@ -794,13 +802,11 @@ impl AudioFileFetch { } } - fn poll_file_data_rx(&mut self) -> Poll<(), ()> { + fn poll_file_data_rx(&mut self) -> Poll<()> { loop { match self.file_data_rx.poll() { - Ok(Async::Ready(None)) => { - return Ok(Async::Ready(())); - } - Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => { + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms))) => { trace!("Ping time estimated as: {} ms.", response_time_ms); // record the response time @@ -832,7 +838,7 @@ impl AudioFileFetch { .ping_time_ms .store(ping_time_ms, atomic::Ordering::Relaxed); } - Ok(Async::Ready(Some(ReceivedData::Data(data)))) => { + Poll::Ready(Some(ReceivedData::Data(data))) => { self.output .as_mut() .unwrap() @@ -864,39 +870,34 @@ impl AudioFileFetch { if full { self.finish(); - return Ok(Async::Ready(())); + return Poll::Ready(()); } } - Ok(Async::NotReady) => { - return Ok(Async::NotReady); - } - Err(()) => unreachable!(), + Poll::Pending => return Poll::Pending, + // Err(()) => unreachable!(), } } } - fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> { + fn poll_stream_loader_command_rx(&mut self) -> Poll<()> { loop { match self.stream_loader_command_rx.poll() { - Ok(Async::Ready(None)) => { - return Ok(Async::Ready(())); - } - Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => { + Poll::Ready(None) => return Poll::Ready(()), + + Poll::Ready(Some(StreamLoaderCommand::Fetch(request))) => { self.download_range(request.start, request.length); } - Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => { + Poll::Ready(Some(StreamLoaderCommand::RandomAccessMode())) => { *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); } - Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => { + Poll::Ready(Some(StreamLoaderCommand::StreamMode())) => { *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); } - Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => { - return Ok(Async::Ready(())); - } - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(()) => unreachable!(), + Poll::Ready(Some(StreamLoaderCommand::Close())) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + // Err(()) => unreachable!(), } } } @@ -911,24 +912,19 @@ impl AudioFileFetch { } impl Future for AudioFileFetch { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll<(), ()> { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { match self.poll_stream_loader_command_rx() { - Ok(Async::NotReady) => (), - Ok(Async::Ready(_)) => { - return Ok(Async::Ready(())); - } - Err(()) => unreachable!(), + Poll::Pending => (), + Poll::Ready(_) => return Poll::Ready(()), + // Err(()) => unreachable!(), } match self.poll_file_data_rx() { - Ok(Async::NotReady) => (), - Ok(Async::Ready(_)) => { - return Ok(Async::Ready(())); - } - Err(()) => unreachable!(), + Poll::Pending => (), + Poll::Ready(_) => return Poll::Ready(()), + // Err(()) => unreachable!(), } if let DownloadStrategy::Streaming() = self.get_download_strategy() { @@ -969,7 +965,7 @@ impl Future for AudioFileFetch { } } - return Ok(Async::NotReady); + return Poll::Pending; } } diff --git a/core/Cargo.toml b/core/Cargo.toml index 65ba047..8c9475a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -17,10 +17,10 @@ base64 = "0.13" byteorder = "1.3" bytes = "0.4" error-chain = { version = "0.12", default_features = false } -futures = "0.1" +futures = {version = "0.3",features =["unstable","bilock"]} httparse = "1.3" -hyper = "0.12" -hyper-proxy = { version = "0.5", default_features = false } +hyper = "0.13" +hyper-proxy = { version = "0.6", default_features = false } lazy_static = "1.3" log = "0.4" num-bigint = "0.3" @@ -32,9 +32,10 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" shannon = "0.2.0" -tokio-codec = "0.1" -tokio = "0.1" -tokio-io = "0.1" +tokio = {version = "0.2", features = ["full","io-util","tcp"]} # io-util +tokio-util = {version = "0.3", features = ["compat","codec"]} +# tokio-codec = "0.1" +# tokio-io = "0.1" url = "1.7" uuid = { version = "0.8", features = ["v4"] } sha-1 = "0.8" diff --git a/core/src/apresolve.rs b/core/src/apresolve.rs index bea2331..cf30178 100644 --- a/core/src/apresolve.rs +++ b/core/src/apresolve.rs @@ -1,41 +1,33 @@ const AP_FALLBACK: &'static str = "ap.spotify.com:443"; const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com/"; -use futures::{Future, Stream}; use hyper::client::HttpConnector; -use hyper::{self, Client, Request, Uri}; +use hyper::{self, Body, Client, Request, Uri}; use hyper_proxy::{Intercept, Proxy, ProxyConnector}; use serde_json; +use std::error; use std::str::FromStr; use url::Url; -error_chain! {} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct APResolveData { ap_list: Vec, } +type Result = std::result::Result>; -fn apresolve( - proxy: &Option, - ap_port: &Option, -) -> Box> { - let url = Uri::from_str(APRESOLVE_ENDPOINT).expect("invalid AP resolve URL"); +async fn apresolve(proxy: &Option, ap_port: &Option) -> Result { + let url = Uri::from_str(APRESOLVE_ENDPOINT)?; //.expect("invalid AP resolve URL"); let use_proxy = proxy.is_some(); - // let mut req = Request::new(url.clone()); - let mut req = Request::get(url.clone()) - .body(hyper::Body::from(vec![])) - .unwrap(); + let mut req = Request::get(&url).body(Body::empty())?; let response = match *proxy { Some(ref val) => { let proxy_url = Uri::from_str(val.as_str()).expect("invalid http proxy"); let proxy = Proxy::new(Intercept::All, proxy_url); - let connector = HttpConnector::new(4); + let connector = HttpConnector::new(); let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy); if let Some(headers) = proxy_connector.http_headers(&url) { req.headers_mut().extend(headers.clone().into_iter()); - // req.set_proxy(true); } let client = Client::builder().build(proxy_connector); client.request(req) @@ -44,29 +36,19 @@ fn apresolve( let client = Client::new(); client.request(req) } - }; + } + .await?; - let body = response.and_then(|response| { - response.into_body().fold(Vec::new(), |mut acc, chunk| { - acc.extend_from_slice(chunk.as_ref()); - Ok::<_, hyper::Error>(acc) - }) - }); - let body = body.then(|result| result.chain_err(|| "HTTP error")); - let body = - body.and_then(|body| String::from_utf8(body).chain_err(|| "invalid UTF8 in response")); + let body = hyper::body::to_bytes(response.into_body()).await?; + let body = String::from_utf8(body.to_vec())?; + let data = serde_json::from_str::(&body)?; - let data = body - .and_then(|body| serde_json::from_str::(&body).chain_err(|| "invalid JSON")); - - let p = ap_port.clone(); - - let ap = data.and_then(move |data| { + let ap = { let mut aps = data.ap_list.iter().filter(|ap| { - if p.is_some() { - Uri::from_str(ap).ok().map_or(false, |uri| { - uri.port_u16().map_or(false, |port| port == p.unwrap()) - }) + if let Some(p) = ap_port { + Uri::from_str(ap) + .ok() + .map_or(false, |uri| uri.port_u16().map_or(false, |port| &port == p)) } else if use_proxy { // It is unlikely that the proxy will accept CONNECT on anything other than 443. Uri::from_str(ap).ok().map_or(false, |uri| { @@ -79,23 +61,23 @@ fn apresolve( let ap = aps.next().ok_or("empty AP List")?; Ok(ap.clone()) - }); + }; - Box::new(ap) + ap } -pub(crate) fn apresolve_or_fallback( +pub(crate) async fn apresolve_or_fallback( proxy: &Option, ap_port: &Option, -) -> Box> -where - E: 'static, -{ - let ap = apresolve(proxy, ap_port).or_else(|e| { - warn!("Failed to resolve Access Point: {}", e.description()); +) -> Result { + // match apresolve.await { + // Ok(ap) + // } + let ap = apresolve(proxy, ap_port).await.or_else(|e| { + warn!("Failed to resolve Access Point: {:?}", e); warn!("Using fallback \"{}\"", AP_FALLBACK); Ok(AP_FALLBACK.into()) }); - Box::new(ap) + ap } diff --git a/core/src/audio_key.rs b/core/src/audio_key.rs index 1e5310c..39eef72 100644 --- a/core/src/audio_key.rs +++ b/core/src/audio_key.rs @@ -1,10 +1,14 @@ use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use bytes::Bytes; -use futures::sync::oneshot; -use futures::{Async, Future, Poll}; use std::collections::HashMap; use std::io::Write; +use futures::{channel::oneshot, Future}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + use crate::spotify_id::{FileId, SpotifyId}; use crate::util::SeqGenerator; @@ -73,14 +77,13 @@ impl AudioKeyManager { pub struct AudioKeyFuture(oneshot::Receiver>); impl Future for AudioKeyFuture { - type Item = T; - type Error = AudioKeyError; + type Output = Result; - fn poll(&mut self) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.0.poll() { - Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)), - Ok(Async::Ready(Err(err))) => Err(err), - Ok(Async::NotReady) => Ok(Async::NotReady), + Poll::Ready(Ok(Ok(value))) => Poll::Ready(Ok(value)), + Poll::Ready(Ok(Err(err))) => Err(err), + Poll::Pending => Poll::Pending, Err(oneshot::Canceled) => Err(AudioKeyError), } } diff --git a/core/src/channel.rs b/core/src/channel.rs index b614fac..f789bfe 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -1,12 +1,16 @@ use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; -use futures::sync::{mpsc, BiLock}; -use futures::{Async, Poll, Stream}; use std::collections::HashMap; use std::time::Instant; use crate::util::SeqGenerator; +use futures::{channel::mpsc, lock::BiLock, Stream}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + component! { ChannelManager : ChannelManagerInner { sequence: SeqGenerator = SeqGenerator::new(0), @@ -101,11 +105,11 @@ impl ChannelManager { } impl Channel { - fn recv_packet(&mut self) -> Poll { + fn recv_packet(&mut self) -> Poll> { let (cmd, packet) = match self.receiver.poll() { - Ok(Async::Ready(Some(t))) => t, - Ok(Async::Ready(None)) => return Err(ChannelError), // The channel has been closed. - Ok(Async::NotReady) => return Ok(Async::NotReady), + Poll::Ready(Ok(Some(t))) => t, + Poll::Ready(Ok(t)) => return Err(ChannelError), // The channel has been closed. + Poll::Pending => return Poll::Pending, Err(()) => unreachable!(), }; @@ -117,7 +121,7 @@ impl Channel { Err(ChannelError) } else { - Ok(Async::Ready(packet)) + Poll::Ready(Ok(packet)) } } @@ -129,16 +133,15 @@ impl Channel { } impl Stream for Channel { - type Item = ChannelEvent; - type Error = ChannelError; + type Item = Result, ChannelError>; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { match self.state.clone() { ChannelState::Closed => panic!("Polling already terminated channel"), ChannelState::Header(mut data) => { if data.len() == 0 { - data = try_ready!(self.recv_packet()); + data = ready!(self.recv_packet()); } let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize; @@ -152,19 +155,19 @@ impl Stream for Channel { self.state = ChannelState::Header(data); let event = ChannelEvent::Header(header_id, header_data); - return Ok(Async::Ready(Some(event))); + return Poll::Ready(Ok(Some(event))); } } ChannelState::Data => { - let data = try_ready!(self.recv_packet()); + let data = ready!(self.recv_packet()); if data.len() == 0 { self.receiver.close(); self.state = ChannelState::Closed; - return Ok(Async::Ready(None)); + return Poll::Ready(Ok(None)); } else { let event = ChannelEvent::Data(data); - return Ok(Async::Ready(Some(event))); + return Poll::Ready(Ok(Some(event))); } } } @@ -173,38 +176,36 @@ impl Stream for Channel { } impl Stream for ChannelData { - type Item = Bytes; - type Error = ChannelError; + type Item = Result, ChannelError>; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut channel = match self.0.poll_lock() { - Async::Ready(c) => c, - Async::NotReady => return Ok(Async::NotReady), + Poll::Ready(c) => c, + Poll::Pending => return Poll::Pending, }; loop { - match try_ready!(channel.poll()) { + match ready!(channel.poll()) { Some(ChannelEvent::Header(..)) => (), - Some(ChannelEvent::Data(data)) => return Ok(Async::Ready(Some(data))), - None => return Ok(Async::Ready(None)), + Some(ChannelEvent::Data(data)) => return Poll::Ready(Ok(Some(data))), + None => return Poll::Ready(Ok(None)), } } } } impl Stream for ChannelHeaders { - type Item = (u8, Vec); - type Error = ChannelError; + type Item = Result)>, ChannelError>; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut channel = match self.0.poll_lock() { - Async::Ready(c) => c, - Async::NotReady => return Ok(Async::NotReady), + Poll::Ready(c) => c, + Poll::Pending => return Poll::Pending, }; - match try_ready!(channel.poll()) { - Some(ChannelEvent::Header(id, data)) => Ok(Async::Ready(Some((id, data)))), - Some(ChannelEvent::Data(..)) | None => Ok(Async::Ready(None)), + match ready!(channel.poll()) { + Some(ChannelEvent::Header(id, data)) => Poll::Ready(Ok(Some((id, data)))), + Some(ChannelEvent::Data(..)) | None => Poll::Ready(Ok(None)), } } } diff --git a/core/src/connection/codec.rs b/core/src/connection/codec.rs index fa4cd9d..47e1163 100644 --- a/core/src/connection/codec.rs +++ b/core/src/connection/codec.rs @@ -2,7 +2,7 @@ use byteorder::{BigEndian, ByteOrder}; use bytes::{BufMut, Bytes, BytesMut}; use shannon::Shannon; use std::io; -use tokio_io::codec::{Decoder, Encoder}; +use tokio_util::codec::{Decoder, Encoder}; const HEADER_SIZE: usize = 3; const MAC_SIZE: usize = 4; @@ -35,11 +35,11 @@ impl APCodec { } } -impl Encoder for APCodec { - type Item = (u8, Vec); +type APCodecItem = (u8, Vec); +impl Encoder for APCodec { type Error = io::Error; - fn encode(&mut self, item: (u8, Vec), buf: &mut BytesMut) -> io::Result<()> { + fn encode(&mut self, item: APCodecItem, buf: &mut BytesMut) -> io::Result<()> { let (cmd, payload) = item; let offset = buf.len(); diff --git a/core/src/connection/handshake.rs b/core/src/connection/handshake.rs index 220ab6e..512f61c 100644 --- a/core/src/connection/handshake.rs +++ b/core/src/connection/handshake.rs @@ -1,14 +1,13 @@ use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; -use futures::{Async, Future, Poll}; use hmac::{Hmac, Mac}; use protobuf::{self, Message}; use rand::thread_rng; use sha1::Sha1; use std::io::{self, Read}; use std::marker::PhantomData; -use tokio_codec::{Decoder, Framed}; -use tokio_io::io::{read_exact, write_all, ReadExact, Window, WriteAll}; -use tokio_io::{AsyncRead, AsyncWrite}; +// use tokio_codec::{Decoder, Framed}; +// use tokio_io::io::{read_exact, write_all, ReadExact, Window, WriteAll}; +// use tokio_io::{AsyncRead, AsyncWrite}; use super::codec::APCodec; use crate::diffie_hellman::DHLocalKeys; @@ -16,18 +15,30 @@ use crate::protocol; use crate::protocol::keyexchange::{APResponseMessage, ClientHello, ClientResponsePlaintext}; use crate::util; -pub struct Handshake { +use futures::{ + io::{ReadExact, Window, WriteAll}, + Future, +}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; + +use tokio_util::codec::{Decoder, Framed}; + +pub struct Handshake<'a, T> { keys: DHLocalKeys, - state: HandshakeState, + state: HandshakeState<'a, T>, } -enum HandshakeState { - ClientHello(WriteAll>), - APResponse(RecvPacket), - ClientResponse(Option, WriteAll>), +enum HandshakeState<'a, T> { + ClientHello(WriteAll<'a, T>), + APResponse(RecvPacket<'a, T, APResponseMessage>), + ClientResponse(Option, WriteAll<'a, T>), } -pub fn handshake(connection: T) -> Handshake { +pub fn handshake<'a, T: AsyncRead + AsyncWrite>(connection: T) -> Handshake<'a, T> { let local_keys = DHLocalKeys::random(&mut thread_rng()); let client_hello = client_hello(connection, local_keys.public_key()); @@ -37,23 +48,22 @@ pub fn handshake(connection: T) -> Handshake { } } -impl Future for Handshake { - type Item = Framed; - type Error = io::Error; +impl<'a, T: AsyncRead + AsyncWrite> Future for Handshake<'a, T> { + type Output = Result, io::Error>; - fn poll(&mut self) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { use self::HandshakeState::*; loop { self.state = match self.state { ClientHello(ref mut write) => { - let (connection, accumulator) = try_ready!(write.poll()); + let (connection, accumulator) = ready!(write.poll()); let read = recv_packet(connection, accumulator); APResponse(read) } APResponse(ref mut read) => { - let (connection, message, accumulator) = try_ready!(read.poll()); + let (connection, message, accumulator) = ready!(read.poll()); let remote_key = message .get_challenge() .get_login_crypto_challenge() @@ -71,17 +81,17 @@ impl Future for Handshake { } ClientResponse(ref mut codec, ref mut write) => { - let (connection, _) = try_ready!(write.poll()); + let (connection, _) = ready!(write.poll()); let codec = codec.take().unwrap(); let framed = codec.framed(connection); - return Ok(Async::Ready(framed)); + return Poll::Ready(Ok(framed)); } } } } } -fn client_hello(connection: T, gc: Vec) -> WriteAll> { +fn client_hello<'a, T: AsyncWrite>(connection: T, gc: Vec) -> WriteAll<'a, T> { let mut packet = ClientHello::new(); packet .mut_build_info() @@ -109,10 +119,11 @@ fn client_hello(connection: T, gc: Vec) -> WriteAll(size).unwrap(); packet.write_to_vec(&mut buffer).unwrap(); - write_all(connection, buffer) + // write_all(connection, buffer) + connection.write_all(&buffer) } -fn client_response(connection: T, challenge: Vec) -> WriteAll> { +fn client_response<'a, T: AsyncWrite>(connection: T, challenge: Vec) -> WriteAll<'a, T> { let mut packet = ClientResponsePlaintext::new(); packet .mut_login_crypto_response() @@ -126,15 +137,16 @@ fn client_response(connection: T, challenge: Vec) -> WriteAll buffer.write_u32::(size).unwrap(); packet.write_to_vec(&mut buffer).unwrap(); - write_all(connection, buffer) + // write_all(connection, buffer) + connection.write_all(&buffer) } -enum RecvPacket { - Header(ReadExact>>, PhantomData), - Body(ReadExact>>, PhantomData), +enum RecvPacket<'a, T, M: Message> { + Header(ReadExact<'a, T>, PhantomData), + Body(ReadExact<'a, T>, PhantomData), } -fn recv_packet(connection: T, acc: Vec) -> RecvPacket +fn recv_packet<'a, T: AsyncRead, M>(connection: T, acc: Vec) -> RecvPacket<'a, T, M> where T: Read, M: Message, @@ -142,20 +154,19 @@ where RecvPacket::Header(read_into_accumulator(connection, 4, acc), PhantomData) } -impl Future for RecvPacket +impl<'a, T: AsyncRead, M> Future for RecvPacket<'a, T, M> where T: Read, M: Message, { - type Item = (T, M, Vec); - type Error = io::Error; + type Output = Result<(T, M, Vec), io::Error>; - fn poll(&mut self) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { use self::RecvPacket::*; loop { *self = match *self { Header(ref mut read, _) => { - let (connection, header) = try_ready!(read.poll()); + let (connection, header) = ready!(read.poll()); let size = BigEndian::read_u32(header.as_ref()) as usize; let acc = header.into_inner(); @@ -164,29 +175,30 @@ where } Body(ref mut read, _) => { - let (connection, data) = try_ready!(read.poll()); + let (connection, data) = ready!(read.poll()); let message = protobuf::parse_from_bytes(data.as_ref()).unwrap(); let acc = data.into_inner(); - return Ok(Async::Ready((connection, message, acc))); + return Poll::Ready(Ok((connection, message, acc))); } } } } } -fn read_into_accumulator( +fn read_into_accumulator<'a, T: AsyncRead>( connection: T, size: usize, mut acc: Vec, -) -> ReadExact>> { +) -> ReadExact<'a, T> { let offset = acc.len(); acc.resize(offset + size, 0); let mut window = Window::new(acc); window.set_start(offset); - read_exact(connection, window) + // read_exact(connection, window) + connection.read_exact(window) } fn compute_keys(shared_secret: &[u8], packets: &[u8]) -> (Vec, Vec, Vec) { diff --git a/core/src/connection/mod.rs b/core/src/connection/mod.rs index c0e95f5..21550de 100644 --- a/core/src/connection/mod.rs +++ b/core/src/connection/mod.rs @@ -3,13 +3,18 @@ mod handshake; pub use self::codec::APCodec; pub use self::handshake::handshake; +use tokio::net::TcpStream; -use futures::{Future, Sink, Stream}; +use futures::{AsyncRead, AsyncWrite, Future, Sink, SinkExt, Stream, StreamExt}; use protobuf::{self, Message}; use std::io; use std::net::ToSocketAddrs; -use tokio::net::TcpStream; -use tokio_codec::Framed; +use tokio_util::codec::Framed; +// use futures::compat::{AsyncWrite01CompatExt, AsyncRead01CompatExt}; +// use tokio_util::compat::{self, Tokio02AsyncReadCompatExt, Tokio02AsyncWriteCompatExt}; +// use tokio_codec::Framed; +// use tokio_core::net::TcpStream; +// use tokio_core::reactor::Handle; use url::Url; use crate::authentication::Credentials; @@ -19,52 +24,46 @@ use crate::proxytunnel; pub type Transport = Framed; -pub fn connect( - addr: String, - proxy: &Option, -) -> Box> { - let (addr, connect_url) = match *proxy { +pub async fn connect(addr: String, proxy: &Option) -> Result { + let (addr, connect_url): (_, Option) = match *proxy { Some(ref url) => { - info!("Using proxy \"{}\"", url); - match url.to_socket_addrs().and_then(|mut iter| { - iter.next().ok_or(io::Error::new( - io::ErrorKind::NotFound, - "Can't resolve proxy server address", - )) - }) { - Ok(socket_addr) => (socket_addr, Some(addr)), - Err(error) => return Box::new(futures::future::err(error)), - } + unimplemented!() + // info!("Using proxy \"{}\"", url); + // + // let mut iter = url.to_socket_addrs()?; + // let socket_addr = iter.next().ok_or(io::Error::new( + // io::ErrorKind::NotFound, + // "Can't resolve proxy server address", + // ))?; + // (socket_addr, Some(addr)) } None => { - match addr.to_socket_addrs().and_then(|mut iter| { - iter.next().ok_or(io::Error::new( - io::ErrorKind::NotFound, - "Can't resolve server address", - )) - }) { - Ok(socket_addr) => (socket_addr, None), - Err(error) => return Box::new(futures::future::err(error)), - } + let mut iter = addr.to_socket_addrs()?; + let socket_addr = iter.next().ok_or(io::Error::new( + io::ErrorKind::NotFound, + "Can't resolve server address", + ))?; + (socket_addr, None) } }; - let socket = TcpStream::connect(&addr); + let connection = TcpStream::connect(&addr).await?; if let Some(connect_url) = connect_url { - let connection = socket - .and_then(move |socket| proxytunnel::connect(socket, &connect_url).and_then(handshake)); - Box::new(connection) + unimplemented!() + // let connection = proxytunnel::connect(connection, &connect_url).await?; + // let connection = handshake(connection).await?; + // Ok(connection) } else { - let connection = socket.and_then(handshake); - Box::new(connection) + let connection = handshake(connection).await?; + Ok(connection) } } -pub fn authenticate( - transport: Transport, +pub async fn authenticate( + mut transport: Transport, credentials: Credentials, device_id: String, -) -> Box> { +) -> Result<(Transport, Credentials), io::Error> { use crate::protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os}; use crate::protocol::keyexchange::APLoginFailed; @@ -92,38 +91,39 @@ pub fn authenticate( packet.mut_system_info().set_device_id(device_id); packet.set_version_string(version::version_string()); - let cmd = 0xab; + let cmd: u8 = 0xab; let data = packet.write_to_bytes().unwrap(); - Box::new( - transport - .send((cmd, data)) - .and_then(|transport| transport.into_future().map_err(|(err, _stream)| err)) - .and_then(|(packet, transport)| match packet { - Some((0xac, data)) => { - let welcome_data: APWelcome = - protobuf::parse_from_bytes(data.as_ref()).unwrap(); + transport.send((cmd, data)).await; - let reusable_credentials = Credentials { - username: welcome_data.get_canonical_username().to_owned(), - auth_type: welcome_data.get_reusable_auth_credentials_type(), - auth_data: welcome_data.get_reusable_auth_credentials().to_owned(), - }; + let packet = transport.next().await; + // let (packet, transport) = transport + // .into_future() + // .map_err(|(err, _stream)| err) + // .await?; + match packet { + Some(Ok((0xac, data))) => { + let welcome_data: APWelcome = protobuf::parse_from_bytes(data.as_ref()).unwrap(); - Ok((transport, reusable_credentials)) - } + let reusable_credentials = Credentials { + username: welcome_data.get_canonical_username().to_owned(), + auth_type: welcome_data.get_reusable_auth_credentials_type(), + auth_data: welcome_data.get_reusable_auth_credentials().to_owned(), + }; - Some((0xad, data)) => { - let error_data: APLoginFailed = - protobuf::parse_from_bytes(data.as_ref()).unwrap(); - panic!( - "Authentication failed with reason: {:?}", - error_data.get_error_code() - ) - } + Ok((transport, reusable_credentials)) + } - Some((cmd, _)) => panic!("Unexpected packet {:?}", cmd), - None => panic!("EOF"), - }), - ) + Some(Ok((0xad, data))) => { + let error_data: APLoginFailed = protobuf::parse_from_bytes(data.as_ref()).unwrap(); + panic!( + "Authentication failed with reason: {:?}", + error_data.get_error_code() + ) + } + + Some(Ok((cmd, _))) => panic!("Unexpected packet {:?}", cmd), + Some(err @ Err(_)) => panic!("Packet error: {:?}", err), + None => panic!("EOF"), + } } diff --git a/core/src/keymaster.rs b/core/src/keymaster.rs index f2d7b77..c7be11b 100644 --- a/core/src/keymaster.rs +++ b/core/src/keymaster.rs @@ -1,4 +1,4 @@ -use futures::Future; +// use futures::Future; use serde_json; use crate::mercury::MercuryError; @@ -13,20 +13,22 @@ pub struct Token { pub scope: Vec, } -pub fn get_token( +pub async fn get_token( session: &Session, client_id: &str, scopes: &str, -) -> Box> { +) -> Result { let url = format!( "hm://keymaster/token/authenticated?client_id={}&scope={}", client_id, scopes ); - Box::new(session.mercury().get(url).map(move |response| { + + // Box::new(session.mercury().get(url).map(move |response| { + session.mercury().get(url).await.map(move |response| { let data = response.payload.first().expect("Empty payload"); let data = String::from_utf8(data.clone()).unwrap(); let token: Token = serde_json::from_str(&data).unwrap(); token - })) + }) } diff --git a/core/src/lib.rs b/core/src/lib.rs index 278478c..2d50ec7 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,7 +1,7 @@ #![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))] -#[macro_use] -extern crate error_chain; +// #[macro_use] +// extern crate error_chain; #[macro_use] extern crate futures; #[macro_use] @@ -30,8 +30,8 @@ extern crate serde_json; extern crate sha1; extern crate shannon; extern crate tokio; -extern crate tokio_codec; -extern crate tokio_io; +// extern crate tokio_codec; +// extern crate tokio_io; extern crate url; extern crate uuid; @@ -45,11 +45,11 @@ pub mod authentication; pub mod cache; pub mod channel; pub mod config; -mod connection; +pub mod connection; pub mod diffie_hellman; pub mod keymaster; pub mod mercury; -mod proxytunnel; +pub mod proxytunnel; pub mod session; pub mod spotify_id; pub mod util; diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 20e3f0d..7a80f44 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -2,12 +2,16 @@ use crate::protocol; use crate::util::url_encode; use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; -use futures::sync::{mpsc, oneshot}; -use futures::{Async, Future, Poll}; use protobuf; use std::collections::HashMap; use std::mem; +use futures::{ + channel::{mpsc, oneshot}, + Future, FutureExt, +}; +use std::task::Poll; + use crate::util::SeqGenerator; mod types; @@ -33,14 +37,13 @@ pub struct MercuryPending { pub struct MercuryFuture(oneshot::Receiver>); impl Future for MercuryFuture { - type Item = T; - type Error = MercuryError; + type Output = Result; - fn poll(&mut self) -> Poll { + fn poll(&mut self) -> Poll { match self.0.poll() { - Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)), - Ok(Async::Ready(Err(err))) => Err(err), - Ok(Async::NotReady) => Ok(Async::NotReady), + Poll::Ready(Ok(Ok(value))) => Poll::Ready(Ok(value)), + Poll::Ready(Ok(Err(err))) => Err(err), + Poll::Pending => Poll::Pending, Err(oneshot::Canceled) => Err(MercuryError), } } @@ -98,11 +101,10 @@ impl MercuryManager { MercurySender::new(self.clone(), uri.into()) } - pub fn subscribe>( + pub async fn subscribe>( &self, uri: T, - ) -> Box, Error = MercuryError>> - { + ) -> Result, MercuryError> { let uri = uri.into(); let request = self.request(MercuryRequest { method: MercuryMethod::SUB, @@ -112,7 +114,7 @@ impl MercuryManager { }); let manager = self.clone(); - Box::new(request.map(move |response| { + request.await.map(move |response| { let (tx, rx) = mpsc::unbounded(); manager.lock(move |inner| { @@ -137,7 +139,7 @@ impl MercuryManager { }); rx - })) + }) } pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) { diff --git a/core/src/mercury/sender.rs b/core/src/mercury/sender.rs index f00235e..f406a52 100644 --- a/core/src/mercury/sender.rs +++ b/core/src/mercury/sender.rs @@ -1,7 +1,11 @@ -use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend}; +use futures::{Future, Sink}; use std::collections::VecDeque; use super::*; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; pub struct MercurySender { mercury: MercuryManager, @@ -30,25 +34,23 @@ impl Clone for MercurySender { } } -impl Sink for MercurySender { - type SinkItem = Vec; - type SinkError = MercuryError; +type SinkItem = Vec; +impl Sink for MercurySender { + type Error = MercuryError; - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> { let task = self.mercury.send(self.uri.clone(), item); self.pending.push_back(task); - Ok(AsyncSink::Ready) + Poll::Ready(Ok(())) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { match self.pending.front_mut() { Some(task) => { - try_ready!(task.poll()); - } - None => { - return Ok(Async::Ready(())); + ready!(task.poll()); } + None => return Poll::Ready(Ok(())), } self.pending.pop_front(); } diff --git a/core/src/proxytunnel.rs b/core/src/proxytunnel.rs index e8fb137..fbc17f6 100644 --- a/core/src/proxytunnel.rs +++ b/core/src/proxytunnel.rs @@ -1,49 +1,61 @@ use std::io; use std::str::FromStr; -use futures::{Async, Future, Poll}; use httparse; use hyper::Uri; -use tokio_io::io::{read, write_all, Read, Window, WriteAll}; -use tokio_io::{AsyncRead, AsyncWrite}; +// use tokio_io::io::{read, write_all, Read, Window, WriteAll}; +// use tokio_io::{AsyncRead, AsyncWrite}; -pub struct ProxyTunnel { - state: ProxyState, +use futures::{ + io::{Read, Window, WriteAll}, + AsyncRead, AsyncWrite, Future, +}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +// use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +pub struct ProxyTunnel<'a, T> { + state: ProxyState<'a, T>, } -enum ProxyState { - ProxyConnect(WriteAll>), - ProxyResponse(Read>>), +enum ProxyState<'a, T> { + ProxyConnect(WriteAll<'a, T>), + ProxyResponse(Read<'a, T>), } -pub fn connect(connection: T, connect_url: &str) -> ProxyTunnel { +pub fn connect<'a, T: AsyncRead + AsyncWrite>( + connection: T, + connect_url: &str, +) -> ProxyTunnel<'a, T> { let proxy = proxy_connect(connection, connect_url); ProxyTunnel { state: ProxyState::ProxyConnect(proxy), } } -impl Future for ProxyTunnel { - type Item = T; - type Error = io::Error; +impl<'a, T: AsyncRead + AsyncWrite> Future for ProxyTunnel<'a, T> { + type Output = Result; - fn poll(&mut self) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { use self::ProxyState::*; loop { self.state = match self.state { ProxyConnect(ref mut write) => { - let (connection, mut accumulator) = try_ready!(write.poll()); + let (connection, mut accumulator) = ready!(write.poll()); let capacity = accumulator.capacity(); accumulator.resize(capacity, 0); let window = Window::new(accumulator); - let read = read(connection, window); - ProxyResponse(read) + // let read = read(connection, window); + // ProxyResponse(read) + ProxyResponse(connection.read(window)) } ProxyResponse(ref mut read_f) => { - let (connection, mut window, bytes_read) = try_ready!(read_f.poll()); + let (connection, mut window, bytes_read) = ready!(read_f.poll()); if bytes_read == 0 { return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy")); @@ -65,7 +77,7 @@ impl Future for ProxyTunnel { if let Some(code) = response.code { if code == 200 { // Proxy says all is well - return Ok(Async::Ready(connection)); + return Poll::Ready(connection); } else { let reason = response.reason.unwrap_or("no reason"); let msg = format!("Proxy responded with {}: {}", code, reason); @@ -87,8 +99,9 @@ impl Future for ProxyTunnel { } // We did not get a full header window.set_start(data_end); - let read = read(connection, window); - ProxyResponse(read) + // let read = read(connection, window); + // ProxyResponse(read) + ProxyResponse(connection.read(window)) } } } @@ -96,7 +109,7 @@ impl Future for ProxyTunnel { } } -fn proxy_connect(connection: T, connect_url: &str) -> WriteAll> { +fn proxy_connect(connection: T, connect_url: &str) -> WriteAll { let uri = Uri::from_str(connect_url).unwrap(); let buffer = format!( "CONNECT {0}:{1} HTTP/1.1\r\n\ @@ -106,5 +119,6 @@ fn proxy_connect(connection: T, connect_url: &str) -> WriteAll); +// TODO: Define better errors! +type Result = std::result::Result>; + impl Session { - pub fn connect( + pub async fn connect( config: SessionConfig, credentials: Credentials, cache: Option, handle: Handle, - ) -> Box> { - let access_point = apresolve_or_fallback::(&config.proxy, &config.ap_port); - - let proxy = config.proxy.clone(); - let connection = access_point.and_then(move |addr| { - info!("Connecting to AP \"{}\"", addr); - connection::connect(addr, &proxy) - }); - - let device_id = config.device_id.clone(); - let authentication = connection.and_then(move |connection| { - connection::authenticate(connection, credentials, device_id) - }); - - let result = authentication.map(move |(transport, reusable_credentials)| { - info!("Authenticated as \"{}\" !", reusable_credentials.username); - if let Some(ref cache) = cache { - cache.save_credentials(&reusable_credentials); - } - - let (session, task) = Session::create( - &handle, - transport, - config, - cache, - reusable_credentials.username.clone(), - ); - - current_thread::spawn(task.map_err(|e| { - error!("SessionError: {}", e.to_string()); - std::process::exit(0); - })); - - session - }); - - Box::new(result) + ) -> Result { + unimplemented!() + // let access_point_addr = + // apresolve_or_fallback::(&config.proxy, &config.ap_port).await?; + // + // let proxy = config.proxy.clone(); + // info!("Connecting to AP \"{}\"", access_point_addr); + // let connection = connection::connect(access_point_addr, &proxy); + // + // let device_id = config.device_id.clone(); + // let authentication = connection.and_then(move |connection| { + // connection::authenticate(connection, credentials, device_id) + // }); + // + // let result = authentication.map(move |(transport, reusable_credentials)| { + // info!("Authenticated as \"{}\" !", reusable_credentials.username); + // if let Some(ref cache) = cache { + // cache.save_credentials(&reusable_credentials); + // } + // + // let (session, task) = Session::create( + // &handle, + // transport, + // config, + // cache, + // reusable_credentials.username.clone(), + // ); + // + // tokio::spawn(task.map_err(|e| { + // error!("SessionError: {}", e.to_string()); + // std::process::exit(0); + // })); + // + // session + // }); + // + // result } fn create( @@ -97,7 +115,7 @@ impl Session { config: SessionConfig, cache: Option, username: String, - ) -> (Session, Box>) { + ) -> (Session, Box>>) { let (sink, stream) = transport.split(); let (sender_tx, sender_rx) = mpsc::unbounded(); @@ -160,7 +178,7 @@ impl Session { // Spawn a future directly pub fn spawn(&self, f: F) where - F: Future + Send + 'static, + F: Future + Send + 'static, { let handle = self.0.handle.lock().unwrap(); let spawn_res = handle.spawn(f); @@ -293,34 +311,35 @@ impl Drop for SessionInternal { } } +// type SErr = ::std::fmt::Debug; + struct DispatchTask(S, SessionWeak) where - S: Stream; + S: Stream>; impl Future for DispatchTask where - S: Stream, - ::Error: ::std::fmt::Debug, + // SErr: ::std::fmt::Debug, + S: Stream>, { - type Item = (); - type Error = S::Error; + type Output = Result<((), ())>; - fn poll(&mut self) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let session = match self.1.try_upgrade() { Some(session) => session, - None => return Ok(Async::Ready(())), + None => return Poll::Ready(()), }; loop { - let (cmd, data) = match self.0.poll() { - Ok(Async::Ready(Some(t))) => t, - Ok(Async::Ready(None)) => { + let (cmd, data) = match self.unwrap().0.poll() { + Poll::Ready(Ok(Some(t))) => t, + Poll::Ready(Ok(None)) => { warn!("Connection to server closed."); session.shutdown(); - return Ok(Async::Ready(())); + return Ok(Poll::Ready(())); } - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(e) => { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => { session.shutdown(); return Err(From::from(e)); } @@ -333,7 +352,7 @@ where impl Drop for DispatchTask where - S: Stream, + S: Stream>, { fn drop(&mut self) { debug!("drop Dispatch"); diff --git a/core/tests/connect.rs b/core/tests/connect.rs new file mode 100644 index 0000000..388db25 --- /dev/null +++ b/core/tests/connect.rs @@ -0,0 +1,23 @@ +use env_logger; +use std::env; +use tokio::runtime::Runtime; + +use librespot_core::{apresolve::apresolve_or_fallback, connection}; + +// TODO: Rewrite this into an actual test instead of this wonder +fn main() { + env_logger::init(); + let mut rt = Runtime::new().unwrap(); + + let args: Vec<_> = env::args().collect(); + if args.len() != 4 { + println!("Usage: {} USERNAME PASSWORD PLAYLIST", args[0]); + } + // let username = args[1].to_owned(); + // let password = args[2].to_owned(); + + let ap = rt.block_on(apresolve_or_fallback(&None, &Some(80))); + + println!("AP: {:?}", ap); + let connection = rt.block_on(connection::connect(&None)); +}