Allow multi-type case vars.

This commit is contained in:
2026-04-25 02:32:29 +01:00
parent 55a375bd36
commit 5ae72f64b9
2 changed files with 47 additions and 26 deletions

View File

@@ -10,12 +10,13 @@ assumptions about a microservice:
singular queue (RabbitMQ).
2. Incoming requests are in the form of a 64-bit unsigned integer (`u64`).
2. Microservices process requests via a `process` function, which takes three
arguments: the incoming request (`u64`), a `read_file` function, and a
`write_file` function.
arguments: the incoming request (`u64`), a `read_file` function, and a
`write_file` function.
3. The `process` function returns a set of IDs (also `u64`) that are the result
of processing the incoming request. Each of these IDs is also associated
with a "case variable" that is used for routing the result to the
appropriate outbound queues.
appropriate outbound queues. Case variables for routing must be one of:
boolean, integer, or string.
4. Rather than hard-coding the inbound and outbound queues, the
microservice communicates with a self-contained configuration service shared
across all microservices.
@@ -101,7 +102,7 @@ and a mapping of case variables to outbound queue names. For example:
}
```
The case variables can be any primitive type (e.g. string, integer, boolean).
The case variables used for routing can be one of: string, integer, or boolean.
E.g. a binary classification microservice might decide on which outbound queue
to send results to based on a case variable that is either `false` or `true`:

View File

@@ -31,7 +31,7 @@ pub type AnyError = Box<dyn Error + Send + Sync + 'static>;
pub type ReadFile = Box<dyn Read + Send + 'static>;
pub type ReadFileFn = dyn Fn(&str, u64) -> Result<ReadFile, AnyError> + Send + Sync + 'static;
pub type WriteFileFn = dyn Fn(&str, u64) -> Result<File, AnyError> + Send + Sync + 'static;
type ProcessFn = dyn Fn(u64, &ReadFileFn, &WriteFileFn) -> Result<Vec<(u64, Value)>, AnyError>
type ProcessFn = dyn Fn(u64, &ReadFileFn, &WriteFileFn) -> Result<Vec<(u64, CaseKey)>, AnyError>
+ Send
+ Sync
+ 'static;
@@ -57,6 +57,13 @@ struct OutboundCase {
queues: Vec<String>,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum CaseKey {
Bool(bool),
Int(i128),
String(String),
}
/// A simple queue-driven microservice runtime.
///
/// The microservice:
@@ -88,16 +95,16 @@ impl Microservice {
request: u64,
read_file: &ReadFileFn,
write_file: &WriteFileFn,
| -> Result<Vec<(u64, Value)>, AnyError> {
| -> Result<Vec<(u64, CaseKey)>, AnyError> {
let outputs = process(request, read_file, write_file)?;
Ok(outputs
.into_iter()
.map(|(id, case)| {
let value = serde_json::to_value(case)
.expect("case variable must be serializable to JSON");
(id, value)
})
.collect())
let mut mapped = Vec::with_capacity(outputs.len());
for (id, case) in outputs {
let value = serde_json::to_value(case)
.map_err(|e| format!("case variable must be serializable to JSON: {}", e))?;
mapped.push((id, case_key_from_value(&value)?));
}
Ok(mapped)
};
Self {
@@ -110,7 +117,7 @@ impl Microservice {
/// Start the microservice. This call blocks while the consumer loop runs.
pub fn start(&self) -> Result<(), AnyError> {
let config = self.fetch_config()?;
let route_map = build_route_map(&config.out);
let route_map = build_route_map(&config.out)?;
let amqp_url = fetch_rabbitmq_url_from_sys_map()?;
let runtime = tokio::runtime::Builder::new_multi_thread()
@@ -132,7 +139,7 @@ impl Microservice {
async fn run_consumer(
&self,
inbound_queue: String,
route_map: HashMap<String, Vec<String>>,
route_map: HashMap<CaseKey, Vec<String>>,
amqp_url: String,
s3_client: Arc<Client>,
) -> Result<(), AnyError> {
@@ -515,22 +522,35 @@ fn bucket_mapping_url(host: &str, microservice_name: &str, key: &str) -> String
format!("{}/{}", config_url(host, microservice_name), key.trim_matches('/'))
}
fn build_route_map(outbound: &[OutboundCase]) -> HashMap<String, Vec<String>> {
fn build_route_map(outbound: &[OutboundCase]) -> Result<HashMap<CaseKey, Vec<String>>, AnyError> {
let mut map = HashMap::new();
for entry in outbound {
map.insert(case_key(&entry.case_value), entry.queues.clone());
map.insert(case_key_from_value(&entry.case_value)?, entry.queues.clone());
}
map
Ok(map)
}
fn case_key(case_value: &Value) -> String {
case_value.to_string()
fn case_key_from_value(case_value: &Value) -> Result<CaseKey, AnyError> {
match case_value {
Value::Bool(value) => Ok(CaseKey::Bool(*value)),
Value::String(value) => Ok(CaseKey::String(value.clone())),
Value::Number(value) => {
if let Some(v) = value.as_i64() {
Ok(CaseKey::Int(v as i128))
} else if let Some(v) = value.as_u64() {
Ok(CaseKey::Int(v as i128))
} else {
Err(format!("case variable '{}' must be an integer number", value).into())
}
}
_ => Err("case variable must be one of: bool, int, string".into()),
}
}
async fn declare_queues(
channel: &Channel,
inbound_queue: &str,
route_map: &HashMap<String, Vec<String>>,
route_map: &HashMap<CaseKey, Vec<String>>,
) -> Result<(), AnyError> {
channel
.queue_declare(
@@ -557,12 +577,12 @@ async fn declare_queues(
async fn publish_outputs(
channel: &Channel,
outputs: Vec<(u64, Value)>,
route_map: &HashMap<String, Vec<String>>,
outputs: Vec<(u64, CaseKey)>,
route_map: &HashMap<CaseKey, Vec<String>>,
) -> Result<(), AnyError> {
for (result_id, case_var) in outputs {
info!("Shuttle output result_id={}, case_var={}", result_id, case_var);
if let Some(outbound_queues) = route_map.get(&case_key(&case_var)) {
info!("Shuttle output result_id={}, case_var={:?}", result_id, case_var);
if let Some(outbound_queues) = route_map.get(&case_var) {
for queue in outbound_queues {
let payload = result_id.to_string();
let confirm = channel