diff --git a/Cargo.lock b/Cargo.lock index 5813a70..7606ed8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -64,6 +64,15 @@ dependencies = [ "opaque-debug 0.3.0", ] +[[package]] +name = "aho-corasick" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7404febffaa47dac81aa44dba71523c9d069b1bdc50a77db41195149e17f68e5" +dependencies = [ + "memchr", +] + [[package]] name = "alsa" version = "0.2.2" @@ -152,6 +161,17 @@ dependencies = [ "syn", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.0.1" @@ -567,6 +587,19 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "env_logger" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26ecb66b4bdca6c1409b40fb255eefc2bd4f6d135dab3c3124f80ffa2a9661e" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "error-chain" version = "0.12.4" @@ -1023,6 +1056,12 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.2" @@ -1319,6 +1358,7 @@ dependencies = [ "base64 0.13.0", "byteorder", "bytes", + "env_logger", "futures", "hmac", "httparse", @@ -1340,6 +1380,7 @@ dependencies = [ "shannon", "tokio", "tokio-util", + "tower-service", "url 1.7.2", "uuid", "vergen", @@ -2126,7 +2167,10 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9251239e129e16308e70d853559389de218ac275b515068abc96829d05b948a" dependencies = [ + "aho-corasick", + "memchr", "regex-syntax", + "thread_local", ] [[package]] @@ -2552,6 +2596,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "1.0.23" @@ -2572,6 +2625,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8208a331e1cb318dd5bd76951d2b8fc48ca38a69f5f4e4af1b6a9f8c6236915" +dependencies = [ + "once_cell", +] + [[package]] name = "time" version = "0.1.43" diff --git a/audio/src/range_set.rs b/audio/src/range_set.rs index 8712dfd..d01d888 100644 --- a/audio/src/range_set.rs +++ b/audio/src/range_set.rs @@ -54,7 +54,7 @@ impl RangeSet { } pub fn len(&self) -> usize { - self.ranges.iter().map(|r| r.length).fold(0, std::ops::Add::add) + self.ranges.iter().map(|r| r.length).sum() } pub fn get_range(&self, index: usize) -> Range { diff --git a/core/Cargo.toml b/core/Cargo.toml index c092c04..e0d7952 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -37,6 +37,7 @@ sha-1 = "~0.8" shannon = "0.2.0" tokio = { version = "1.0", features = ["io-util", "rt-multi-thread"] } tokio-util = { version = "0.6", features = ["codec"] } +tower-service = "0.3" url = "1.7" uuid = { version = "0.8", features = ["v4"] } @@ -45,4 +46,5 @@ rand = "0.7" vergen = "3.0.4" [dev-dependencies] +env_logger = "*" tokio = {version = "1.0", features = ["macros"] } \ No newline at end of file diff --git a/core/src/apresolve.rs b/core/src/apresolve.rs index 07c2958..81340c9 100644 --- a/core/src/apresolve.rs +++ b/core/src/apresolve.rs @@ -1,10 +1,12 @@ const AP_FALLBACK: &'static str = "ap.spotify.com:443"; -const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com/"; +const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com:80"; use hyper::{Body, Client, Method, Request, Uri}; use std::error::Error; use url::Url; +use crate::proxytunnel::ProxyTunnel; + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct APResolveData { ap_list: Vec, @@ -22,26 +24,15 @@ async fn apresolve(proxy: &Option, ap_port: &Option) -> Result, ap_port: &Option) -> Result; pub async fn connect(addr: String, proxy: &Option) -> io::Result { let socket = if let Some(proxy) = proxy { info!("Using proxy \"{}\"", proxy); + + let mut split = addr.rsplit(':'); + + let port = split + .next() + .unwrap() // will never panic, split iterator contains at least one element + .parse() + .map_err(|e| { + io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid port: {}", e)) + })?; + + let host = split + .next() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Missing port"))?; + let socket_addr = proxy.to_socket_addrs().and_then(|mut iter| { iter.next().ok_or_else(|| { io::Error::new( @@ -31,7 +46,8 @@ pub async fn connect(addr: String, proxy: &Option) -> io::Result }) })?; let socket = TcpStream::connect(&socket_addr).await?; - proxytunnel::connect(socket, &addr).await? + + proxytunnel::connect(socket, host, port).await? } else { let socket_addr = addr.to_socket_addrs().and_then(|mut iter| { iter.next().ok_or_else(|| { diff --git a/core/src/lib.rs b/core/src/lib.rs index a15aa7a..3e332c2 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -27,6 +27,7 @@ extern crate sha1; extern crate shannon; pub extern crate tokio; extern crate tokio_util; +extern crate tower_service; extern crate url; extern crate uuid; diff --git a/core/src/proxytunnel.rs b/core/src/proxytunnel.rs index 508de7f..c2033c8 100644 --- a/core/src/proxytunnel.rs +++ b/core/src/proxytunnel.rs @@ -1,45 +1,106 @@ -use std::io; - +use futures::Future; use hyper::Uri; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use std::{ + io, + net::{SocketAddr, ToSocketAddrs}, + pin::Pin, + task::Poll, +}; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + net::TcpStream, +}; +use tower_service::Service; pub async fn connect( - mut connection: T, - connect_url: &str, + mut proxy_connection: T, + connect_host: &str, + connect_port: u16, ) -> io::Result { - let uri = connect_url.parse::().unwrap(); - let mut buffer = format!( - "CONNECT {0}:{1} HTTP/1.1\r\n\ - \r\n", - uri.host().unwrap_or_else(|| panic!("No host in {}", uri)), - uri.port().unwrap_or_else(|| panic!("No port in {}", uri)) - ) - .into_bytes(); - connection.write_all(buffer.as_ref()).await?; + let mut buffer = Vec::new(); + buffer.extend_from_slice(b"CONNECT "); + buffer.extend_from_slice(connect_host.as_bytes()); + buffer.push(b':'); + buffer.extend_from_slice(connect_port.to_string().as_bytes()); + buffer.extend_from_slice(b" HTTP/1.1\r\n\r\n"); - buffer.clear(); - connection.read_to_end(&mut buffer).await?; - if buffer.is_empty() { - return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy")); - } + proxy_connection.write_all(buffer.as_ref()).await?; - let mut headers = [httparse::EMPTY_HEADER; 16]; - let mut response = httparse::Response::new(&mut headers); + buffer.resize(buffer.capacity(), 0); - response - .parse(&buffer[..]) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?; - - match response.code { - Some(200) => Ok(connection), // Proxy says all is well - Some(code) => { - let reason = response.reason.unwrap_or("no reason"); - let msg = format!("Proxy responded with {}: {}", code, reason); - Err(io::Error::new(io::ErrorKind::Other, msg)) + let mut offset = 0; + loop { + let bytes_read = proxy_connection.read(&mut buffer[offset..]).await?; + if bytes_read == 0 { + return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy")); + } + offset += bytes_read; + + let mut headers = [httparse::EMPTY_HEADER; 16]; + let mut response = httparse::Response::new(&mut headers); + + let status = response + .parse(&buffer[..offset]) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + if status.is_complete() { + return match response.code { + Some(200) => Ok(proxy_connection), // Proxy says all is well + Some(code) => { + let reason = response.reason.unwrap_or("no reason"); + let msg = format!("Proxy responded with {}: {}", code, reason); + Err(io::Error::new(io::ErrorKind::Other, msg)) + } + None => Err(io::Error::new( + io::ErrorKind::Other, + "Malformed response from proxy", + )), + }; + } + + if offset >= buffer.len() { + buffer.resize(buffer.len() * 2, 0); } - None => Err(io::Error::new( - io::ErrorKind::Other, - "Malformed response from proxy", - )), + } +} + +#[derive(Clone)] +pub struct ProxyTunnel { + proxy_addr: SocketAddr, +} + +impl ProxyTunnel { + pub fn new(addr: T) -> io::Result { + let addr = addr.to_socket_addrs()?.next().ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "No socket address given") + })?; + Ok(Self { proxy_addr: addr }) + } +} + +impl Service for ProxyTunnel { + type Response = TcpStream; + type Error = io::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, url: Uri) -> Self::Future { + let proxy_addr = self.proxy_addr; + let fut = async move { + let host = url + .host() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Host is missing"))?; + let port = url + .port() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Port is missing"))?; + + let conn = TcpStream::connect(proxy_addr).await?; + connect(conn, host, port.as_u16()).await + }; + + Box::pin(fut) } } diff --git a/core/tests/connect.rs b/core/tests/connect.rs index 44d418a..4ea2a1f 100644 --- a/core/tests/connect.rs +++ b/core/tests/connect.rs @@ -7,6 +7,7 @@ mod tests { use apresolve::apresolve_or_fallback; #[tokio::test] async fn test_ap_resolve() { + env_logger::init(); let ap = apresolve_or_fallback(&None, &None).await; println!("AP: {:?}", ap); }