diff --git a/src/mercury/mod.rs b/src/mercury/mod.rs index 58df0cd..3651740 100644 --- a/src/mercury/mod.rs +++ b/src/mercury/mod.rs @@ -101,9 +101,10 @@ impl MercuryManager { pub fn subscribe>(&self, uri: T) -> BoxFuture, MercuryError> { + let uri = uri.into(); let request = self.request(MercuryRequest { method: MercuryMethod::SUB, - uri: uri.into(), + uri: uri.clone(), content_type: None, payload: Vec::new(), }); @@ -116,8 +117,11 @@ impl MercuryManager { for sub in response.payload { let mut sub : protocol::pubsub::Subscription = protobuf::parse_from_bytes(&sub).unwrap(); - let uri = sub.take_uri(); - inner.subscriptions.insert(uri, tx.clone()); + let sub_uri = sub.take_uri(); + + debug!("subscribed {} ({})", uri, sub_uri); + + inner.subscriptions.insert(sub_uri, tx.clone()); } }); @@ -181,21 +185,29 @@ impl MercuryManager { let response = MercuryResponse { uri: header.get_uri().to_owned(), + status_code: header.get_status_code(), payload: pending.parts, }; - if cmd == 0xb5 { - self.lock(|inner| { - use std::collections::hash_map::Entry; - if let Entry::Occupied(entry) = inner.subscriptions.entry(response.uri.clone()) { - // TODO: send unsub message - if entry.get().send(response).is_err() { - entry.remove(); + if response.status_code >= 400 { + warn!("error {} for uri {}", response.status_code, &response.uri); + if let Some(cb) = pending.callback { + cb.complete(Err(MercuryError)); + } + } else { + if cmd == 0xb5 { + self.lock(|inner| { + use std::collections::hash_map::Entry; + if let Entry::Occupied(entry) = inner.subscriptions.entry(response.uri.clone()) { + // TODO: send unsub message + if entry.get().send(response).is_err() { + entry.remove(); + } } - } - }) - } else if let Some(cb) = pending.callback { - cb.complete(Ok(response)); + }) + } else if let Some(cb) = pending.callback { + cb.complete(Ok(response)); + } } } } diff --git a/src/mercury/types.rs b/src/mercury/types.rs index aa33af2..2268330 100644 --- a/src/mercury/types.rs +++ b/src/mercury/types.rs @@ -23,6 +23,7 @@ pub struct MercuryRequest { #[derive(Debug)] pub struct MercuryResponse { pub uri: String, + pub status_code: i32, pub payload: Vec>, }