diff --git a/README.md b/README.md index fbdb421..cead0c7 100644 --- a/README.md +++ b/README.md @@ -10,12 +10,13 @@ assumptions about a microservice: singular queue (RabbitMQ). 2. Incoming requests are in the form of a 64-bit unsigned integer (`u64`). 2. Microservices process requests via a `process` function, which takes three - arguments: the incoming request (`u64`), a `read_file` function, and a - `write_file` function. + arguments: the incoming request (`u64`), a `read_file` function, and a + `write_file` function. 3. The `process` function returns a set of IDs (also `u64`) that are the result of processing the incoming request. Each of these IDs is also associated with a "case variable" that is used for routing the result to the - appropriate outbound queues. + appropriate outbound queues. Case variables for routing must be one of: + boolean, integer, or string. 4. Rather than hard-coding the inbound and outbound queues, the microservice communicates with a self-contained configuration service shared across all microservices. @@ -101,7 +102,7 @@ and a mapping of case variables to outbound queue names. For example: } ``` -The case variables can be any primitive type (e.g. string, integer, boolean). +The case variables used for routing can be one of: string, integer, or boolean. E.g. a binary classification microservice might decide on which outbound queue to send results to based on a case variable that is either `false` or `true`: diff --git a/src/lib.rs b/src/lib.rs index af3f789..8d7129b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,7 +31,7 @@ pub type AnyError = Box; pub type ReadFile = Box; pub type ReadFileFn = dyn Fn(&str, u64) -> Result + Send + Sync + 'static; pub type WriteFileFn = dyn Fn(&str, u64) -> Result + Send + Sync + 'static; -type ProcessFn = dyn Fn(u64, &ReadFileFn, &WriteFileFn) -> Result, AnyError> +type ProcessFn = dyn Fn(u64, &ReadFileFn, &WriteFileFn) -> Result, AnyError> + Send + Sync + 'static; @@ -57,6 +57,13 @@ struct OutboundCase { queues: Vec, } +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +enum CaseKey { + Bool(bool), + Int(i128), + String(String), +} + /// A simple queue-driven microservice runtime. /// /// The microservice: @@ -88,16 +95,16 @@ impl Microservice { request: u64, read_file: &ReadFileFn, write_file: &WriteFileFn, - | -> Result, AnyError> { + | -> Result, AnyError> { let outputs = process(request, read_file, write_file)?; - Ok(outputs - .into_iter() - .map(|(id, case)| { - let value = serde_json::to_value(case) - .expect("case variable must be serializable to JSON"); - (id, value) - }) - .collect()) + let mut mapped = Vec::with_capacity(outputs.len()); + for (id, case) in outputs { + let value = serde_json::to_value(case) + .map_err(|e| format!("case variable must be serializable to JSON: {}", e))?; + mapped.push((id, case_key_from_value(&value)?)); + } + + Ok(mapped) }; Self { @@ -110,7 +117,7 @@ impl Microservice { /// Start the microservice. This call blocks while the consumer loop runs. pub fn start(&self) -> Result<(), AnyError> { let config = self.fetch_config()?; - let route_map = build_route_map(&config.out); + let route_map = build_route_map(&config.out)?; let amqp_url = fetch_rabbitmq_url_from_sys_map()?; let runtime = tokio::runtime::Builder::new_multi_thread() @@ -132,7 +139,7 @@ impl Microservice { async fn run_consumer( &self, inbound_queue: String, - route_map: HashMap>, + route_map: HashMap>, amqp_url: String, s3_client: Arc, ) -> Result<(), AnyError> { @@ -515,22 +522,35 @@ fn bucket_mapping_url(host: &str, microservice_name: &str, key: &str) -> String format!("{}/{}", config_url(host, microservice_name), key.trim_matches('/')) } -fn build_route_map(outbound: &[OutboundCase]) -> HashMap> { +fn build_route_map(outbound: &[OutboundCase]) -> Result>, AnyError> { let mut map = HashMap::new(); for entry in outbound { - map.insert(case_key(&entry.case_value), entry.queues.clone()); + map.insert(case_key_from_value(&entry.case_value)?, entry.queues.clone()); } - map + Ok(map) } -fn case_key(case_value: &Value) -> String { - case_value.to_string() +fn case_key_from_value(case_value: &Value) -> Result { + match case_value { + Value::Bool(value) => Ok(CaseKey::Bool(*value)), + Value::String(value) => Ok(CaseKey::String(value.clone())), + Value::Number(value) => { + if let Some(v) = value.as_i64() { + Ok(CaseKey::Int(v as i128)) + } else if let Some(v) = value.as_u64() { + Ok(CaseKey::Int(v as i128)) + } else { + Err(format!("case variable '{}' must be an integer number", value).into()) + } + } + _ => Err("case variable must be one of: bool, int, string".into()), + } } async fn declare_queues( channel: &Channel, inbound_queue: &str, - route_map: &HashMap>, + route_map: &HashMap>, ) -> Result<(), AnyError> { channel .queue_declare( @@ -557,12 +577,12 @@ async fn declare_queues( async fn publish_outputs( channel: &Channel, - outputs: Vec<(u64, Value)>, - route_map: &HashMap>, + outputs: Vec<(u64, CaseKey)>, + route_map: &HashMap>, ) -> Result<(), AnyError> { for (result_id, case_var) in outputs { - info!("Shuttle output result_id={}, case_var={}", result_id, case_var); - if let Some(outbound_queues) = route_map.get(&case_key(&case_var)) { + info!("Shuttle output result_id={}, case_var={:?}", result_id, case_var); + if let Some(outbound_queues) = route_map.get(&case_var) { for queue in outbound_queues { let payload = result_id.to_string(); let confirm = channel