diff --git a/src/spirc.rs b/src/spirc.rs index 887f277..596d4c3 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -1,5 +1,5 @@ use eventual::Async; -use protobuf::{self, Message}; +use protobuf::{self, Message, RepeatedField}; use util; use session::Session; @@ -9,10 +9,12 @@ use mercury::{MercuryRequest, MercuryMethod}; use player::{Player, PlayerState}; use std::sync::{Mutex, Arc}; +use std::collections::HashMap; use protocol; -pub use protocol::spirc::PlayStatus; +pub use protocol::spirc::{PlayStatus, MessageType}; +#[derive(Clone)] pub struct SpircManager(Arc>); struct SpircInternal { @@ -37,6 +39,8 @@ struct SpircInternal { tracks: Vec, index: u32, + + devices: HashMap, } impl SpircManager { @@ -66,6 +70,8 @@ impl SpircManager { tracks: Vec::new(), index: 0, + + devices: HashMap::new(), }))) } @@ -104,6 +110,64 @@ impl SpircManager { self.0.lock().unwrap().handle(frame); } } + + pub fn devices(&self) -> HashMap { + self.0.lock().unwrap().devices.clone() + } + + pub fn send_play(&mut self, recipient: &str) { + let mut internal = self.0.lock().unwrap(); + CommandSender::new(&mut *internal, MessageType::kMessageTypePlay) + .recipient(recipient) + .send(); + } + + pub fn send_pause(&mut self, recipient: &str) { + let mut internal = self.0.lock().unwrap(); + CommandSender::new(&mut *internal, MessageType::kMessageTypePause) + .recipient(recipient) + .send(); + } + + pub fn send_prev(&mut self, recipient: &str) { + let mut internal = self.0.lock().unwrap(); + CommandSender::new(&mut *internal, MessageType::kMessageTypePrev) + .recipient(recipient) + .send(); + } + + pub fn send_next(&mut self, recipient: &str) { + let mut internal = self.0.lock().unwrap(); + CommandSender::new(&mut *internal, MessageType::kMessageTypeNext) + .recipient(recipient) + .send(); + } + + pub fn send_replace_tracks>(&mut self, + recipient: &str, + track_ids: I) { + let state = track_ids_to_state(track_ids); + let mut internal = self.0.lock().unwrap(); + CommandSender::new(&mut *internal, MessageType::kMessageTypeReplace) + .recipient(recipient) + .state(state) + .send(); + } + + pub fn send_load_tracks>(&mut self, + recipient: &str, + track_ids: I) { + let state = track_ids_to_state(track_ids); + let mut internal = self.0.lock().unwrap(); + CommandSender::new(&mut *internal, MessageType::kMessageTypeLoad) + .recipient(recipient) + .state(state) + .send(); + } + + pub fn get_queue(&self) -> Vec { + self.0.lock().unwrap().tracks.clone() + } } impl SpircInternal { @@ -128,11 +192,17 @@ impl SpircInternal { self.last_command_ident = frame.get_ident().to_owned(); self.last_command_msgid = frame.get_seq_nr(); } + + if frame.has_ident() && !frame.has_goodbye() && frame.has_device_state() { + self.devices.insert(frame.get_ident().into(), + frame.get_device_state().get_name().into()); + } + match frame.get_typ() { - protocol::spirc::MessageType::kMessageTypeHello => { + MessageType::kMessageTypeHello => { self.notify(false, Some(frame.get_ident())); } - protocol::spirc::MessageType::kMessageTypeLoad => { + MessageType::kMessageTypeLoad => { if !self.is_active { self.is_active = true; self.became_active_at = util::now_ms(); @@ -145,37 +215,42 @@ impl SpircInternal { let position = frame.get_state().get_position_ms(); self.player.load(track, play, position); } - protocol::spirc::MessageType::kMessageTypePlay => { + MessageType::kMessageTypePlay => { self.player.play(); } - protocol::spirc::MessageType::kMessageTypePause => { + MessageType::kMessageTypePause => { self.player.pause(); } - protocol::spirc::MessageType::kMessageTypeNext => { + MessageType::kMessageTypeNext => { self.index = (self.index + 1) % self.tracks.len() as u32; let track = self.tracks[self.index as usize]; self.player.load(track, true, 0); } - protocol::spirc::MessageType::kMessageTypePrev => { + MessageType::kMessageTypePrev => { self.index = (self.index - 1) % self.tracks.len() as u32; let track = self.tracks[self.index as usize]; self.player.load(track, true, 0); } - protocol::spirc::MessageType::kMessageTypeSeek => { + MessageType::kMessageTypeSeek => { self.player.seek(frame.get_position()); } - protocol::spirc::MessageType::kMessageTypeReplace => { + MessageType::kMessageTypeReplace => { self.reload_tracks(&frame); } - protocol::spirc::MessageType::kMessageTypeNotify => { + MessageType::kMessageTypeNotify => { if self.is_active && frame.get_device_state().get_is_active() { self.is_active = false; self.player.stop(); } } - protocol::spirc::MessageType::kMessageTypeVolume => { + MessageType::kMessageTypeVolume => { self.player.volume(frame.get_volume() as u16); } + MessageType::kMessageTypeGoodbye => { + if frame.has_ident() { + self.devices.remove(frame.get_ident()); + } + } _ => (), } } @@ -190,78 +265,34 @@ impl SpircInternal { .collect(); } - // FIXME: this entire function is duplicated in notify_with_player_state, but the borrow - // checker makes it hard to refactor fn notify(&mut self, hello: bool, recipient: Option<&str>) { - let player_state = self.player.state(); - - let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), { - version: 1, - ident: self.ident.clone(), - protocol_version: "2.0.0".to_owned(), - seq_nr: { self.seq_nr += 1; self.seq_nr }, - typ: if hello { - protocol::spirc::MessageType::kMessageTypeHello - } else { - protocol::spirc::MessageType::kMessageTypeNotify - }, - - device_state: self.device_state(&player_state), - recipient: protobuf::RepeatedField::from_vec( - recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![]) - ), - state_update_id: player_state.update_time() as i64 - }); - - if self.is_active { - pkt.set_state(self.spirc_state(&player_state)); + let mut cs = CommandSender::new(self, + if hello { + MessageType::kMessageTypeHello + } else { + MessageType::kMessageTypeNotify + }); + if let Some(s) = recipient { + cs = cs.recipient(&s); } - - self.session - .mercury(MercuryRequest { - method: MercuryMethod::SEND, - uri: self.uri(), - content_type: None, - payload: vec![pkt.write_to_bytes().unwrap()], - }) - .await() - .unwrap(); + cs.send(); } fn notify_with_player_state(&mut self, hello: bool, recipient: Option<&str>, player_state: &PlayerState) { - let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), { - version: 1, - ident: self.ident.clone(), - protocol_version: "2.0.0".to_owned(), - seq_nr: { self.seq_nr += 1; self.seq_nr }, - typ: if hello { - protocol::spirc::MessageType::kMessageTypeHello - } else { - protocol::spirc::MessageType::kMessageTypeNotify - }, - - device_state: self.device_state(&player_state), - recipient: protobuf::RepeatedField::from_vec( - recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![]) - ), - state_update_id: player_state.update_time() as i64 - }); - - if self.is_active { - pkt.set_state(self.spirc_state(&player_state)); + let mut cs = CommandSender::new(self, + if hello { + MessageType::kMessageTypeHello + } else { + MessageType::kMessageTypeNotify + }) + .player_state(player_state); + if let Some(s) = recipient { + cs = cs.recipient(&s); } - - self.session - .mercury(MercuryRequest { - method: MercuryMethod::SEND, - uri: self.uri(), - content_type: None, - payload: vec![pkt.write_to_bytes().unwrap()], - }) - .fire(); + cs.send(); } fn spirc_state(&self, player_state: &PlayerState) -> protocol::spirc::State { @@ -353,3 +384,81 @@ impl SpircInternal { format!("hm://remote/user/{}", self.session.username()) } } + +struct CommandSender<'a> { + spirc_internal: &'a mut SpircInternal, + cmd: MessageType, + recipient: Option<&'a str>, + player_state: Option<&'a PlayerState>, + state: Option, +} + +impl<'a> CommandSender<'a> { + fn new(spirc_internal: &'a mut SpircInternal, cmd: MessageType) -> CommandSender { + CommandSender { + spirc_internal: spirc_internal, + cmd: cmd, + recipient: None, + player_state: None, + state: None, + } + } + + fn recipient(mut self, r: &'a str) -> CommandSender { + self.recipient = Some(r); + self + } + + fn player_state(mut self, s: &'a PlayerState) -> CommandSender { + self.player_state = Some(s); + self + } + + fn state(mut self, s: protocol::spirc::State) -> CommandSender<'a> { + self.state = Some(s); + self + } + + fn send(self) { + let internal_player_state = self.spirc_internal.player.state(); + let s = self.player_state.unwrap_or(&*internal_player_state); + + let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), { + version: 1, + ident: self.spirc_internal.ident.clone(), + protocol_version: "2.0.0".to_owned(), + seq_nr: { self.spirc_internal.seq_nr += 1; self.spirc_internal.seq_nr }, + typ: self.cmd, + recipient: RepeatedField::from_vec( + self.recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![]) + ), + device_state: self.spirc_internal.device_state(s), + state_update_id: s.update_time() as i64, + }); + + if self.spirc_internal.is_active { + pkt.set_state(self.spirc_internal.spirc_state(s)); + } + + self.spirc_internal + .session + .mercury(MercuryRequest { + method: MercuryMethod::SEND, + uri: self.spirc_internal.uri(), + content_type: None, + payload: vec![pkt.write_to_bytes().unwrap()], + }) + .fire(); + } +} + +fn track_ids_to_state>(track_ids: I) -> protocol::spirc::State { + let tracks: Vec = + track_ids.map(|i| { + protobuf_init!(protocol::spirc::TrackRef::new(), { gid: i.to_raw().to_vec()}) + }) + .collect(); + protobuf_init!(protocol::spirc::State::new(), { + track: RepeatedField::from_vec(tracks) + }) +}