Simplify microservice with synchronous process fn

This commit is contained in:
2026-04-25 00:54:11 +01:00
parent 61fa4cc645
commit b895790ca4
3 changed files with 49 additions and 65 deletions

View File

@@ -47,25 +47,22 @@ cargo build
```rust ```rust
use slingshot_microservice::Microservice; use slingshot_microservice::Microservice;
use slingshot_microservice::{ProcessFuture, ReadFileFn, WriteFileFn}; use slingshot_microservice::{AnyError, ReadFileFn, WriteFileFn};
use std::io::Write; use std::io::{Read, Write};
use tokio::io::AsyncReadExt;
fn process<'a>( fn process(
request: u64, request: u64,
read_file: &'a ReadFileFn, read_file: &ReadFileFn,
write_file: &'a WriteFileFn, write_file: &WriteFileFn,
) -> ProcessFuture<'a, String> { ) -> Result<Vec<(u64, String)>, AnyError> {
Box::pin(async move {
let mut input = String::new(); let mut input = String::new();
let mut reader = read_file("in", request)?; let mut reader = read_file("in", request)?;
reader.read_to_string(&mut input).await?; reader.read_to_string(&mut input)?;
let mut writer = write_file("out", request)?; let mut writer = write_file("out", request)?;
writer.write_all(input.as_bytes())?; writer.write_all(input.as_bytes())?;
Ok(vec![(request, "case_a".to_string())]) Ok(vec![(request, "case_a".to_string())])
})
} }
fn main() { fn main() {
@@ -142,8 +139,8 @@ Within each `process` pass:
1. `read_file(key, id)` treats `key` as a bucket reference such as `in`, not 1. `read_file(key, id)` treats `key` as a bucket reference such as `in`, not
as the canonical bucket name. On first use, the runtime fetches as the canonical bucket name. On first use, the runtime fetches
`https://{HOSTNAME}/{MICROSERVICE_NAME}/{key}` to resolve the real bucket `https://{HOSTNAME}/{MICROSERVICE_NAME}/{key}` to resolve the real bucket
name, caches that mapping, and then returns an async stream for object name, caches that mapping, and then returns a synchronous reader for object
`id` in that bucket using the AWS SDK (`get_object(...).body.into_async_read()`). `id` in that bucket using the AWS SDK.
2. `write_file(key, id)` resolves `key` through the same cached lookup and 2. `write_file(key, id)` resolves `key` through the same cached lookup and
returns an opened local file handle for writing, staging the output for returns an opened local file handle for writing, staging the output for
`s3://{resolved_bucket}/{id}`. `s3://{resolved_bucket}/{id}`.

View File

@@ -1,22 +1,19 @@
use slingshot_microservice::{Microservice, ProcessFuture, ReadFileFn, WriteFileFn}; use slingshot_microservice::{AnyError, Microservice, ReadFileFn, WriteFileFn};
use std::io::Write; use std::io::{Read, Write};
use tokio::io::AsyncReadExt;
fn process<'a>( fn process(
request: u64, request: u64,
read_file: &'a ReadFileFn, read_file: &ReadFileFn,
write_file: &'a WriteFileFn, write_file: &WriteFileFn,
) -> ProcessFuture<'a, String> { ) -> Result<Vec<(u64, String)>, AnyError> {
Box::pin(async move {
let mut input = String::new(); let mut input = String::new();
let mut reader = read_file("in", request)?; let mut reader = read_file("in", request)?;
reader.read_to_string(&mut input).await?; reader.read_to_string(&mut input)?;
let mut writer = write_file("out", request)?; let mut writer = write_file("out", request)?;
writer.write_all(input.as_bytes())?; writer.write_all(input.as_bytes())?;
Ok(vec![(request, "case_a".to_string())]) Ok(vec![(request, "case_a".to_string())])
})
} }
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

View File

@@ -1,9 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::future::Future;
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::ErrorKind; use std::io::{Cursor, ErrorKind, Read};
use std::pin::Pin;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::Command; use std::process::Command;
use std::sync::Arc; use std::sync::Arc;
@@ -25,16 +23,15 @@ use lapin::{BasicProperties, Channel, Connection, ConnectionProperties};
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use serde_json::Value; use serde_json::Value;
use tokio::io::AsyncRead; use tokio::io::AsyncReadExt;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
pub type AnyError = Box<dyn Error + Send + Sync + 'static>; pub type AnyError = Box<dyn Error + Send + Sync + 'static>;
pub type ReadStream = Pin<Box<dyn AsyncRead + Send + Unpin + 'static>>; pub type ReadFile = Box<dyn Read + Send + 'static>;
pub type ReadFileFn = dyn Fn(&str, u64) -> Result<ReadStream, AnyError> + Send + Sync + 'static; pub type ReadFileFn = dyn Fn(&str, u64) -> Result<ReadFile, AnyError> + Send + Sync + 'static;
pub type WriteFileFn = dyn Fn(&str, u64) -> Result<File, AnyError> + Send + Sync + 'static; pub type WriteFileFn = dyn Fn(&str, u64) -> Result<File, AnyError> + Send + Sync + 'static;
pub type ProcessFuture<'a, C> = Pin<Box<dyn Future<Output = Result<Vec<(u64, C)>, AnyError>> + Send + 'a>>; type ProcessFn = dyn Fn(u64, &ReadFileFn, &WriteFileFn) -> Result<Vec<(u64, Value)>, AnyError>
type ProcessFn = dyn for<'a> Fn(u64, &'a ReadFileFn, &'a WriteFileFn) -> ProcessFuture<'a, Value>
+ Send + Send
+ Sync + Sync
+ 'static; + 'static;
@@ -83,10 +80,7 @@ impl Microservice {
/// serializable primitive, such as `String`, `bool`, or integers. /// serializable primitive, such as `String`, `bool`, or integers.
pub fn new<F, C>(name: impl Into<String>, config_host: impl Into<String>, process: F) -> Self pub fn new<F, C>(name: impl Into<String>, config_host: impl Into<String>, process: F) -> Self
where where
F: for<'a> Fn(u64, &'a ReadFileFn, &'a WriteFileFn) -> ProcessFuture<'a, C> F: Fn(u64, &ReadFileFn, &WriteFileFn) -> Result<Vec<(u64, C)>, AnyError> + Send + Sync + 'static,
+ Send
+ Sync
+ 'static,
C: Serialize + 'static, C: Serialize + 'static,
{ {
init_tracing(); init_tracing();
@@ -94,10 +88,8 @@ impl Microservice {
request: u64, request: u64,
read_file: &ReadFileFn, read_file: &ReadFileFn,
write_file: &WriteFileFn, write_file: &WriteFileFn,
| -> ProcessFuture<'_, Value> { | -> Result<Vec<(u64, Value)>, AnyError> {
let fut = process(request, read_file, write_file); let outputs = process(request, read_file, write_file)?;
Box::pin(async move {
let outputs = fut.await?;
Ok(outputs Ok(outputs
.into_iter() .into_iter()
.map(|(id, case)| { .map(|(id, case)| {
@@ -106,7 +98,6 @@ impl Microservice {
(id, value) (id, value)
}) })
.collect()) .collect())
})
}; };
Self { Self {
@@ -190,7 +181,7 @@ impl Microservice {
let read_config_host = config_host.clone(); let read_config_host = config_host.clone();
let read_microservice_name = microservice_name.clone(); let read_microservice_name = microservice_name.clone();
let read_file = move |key: &str, id: u64| -> Result<ReadStream, AnyError> { let read_file = move |key: &str, id: u64| -> Result<ReadFile, AnyError> {
let bucket = resolve_bucket_name( let bucket = resolve_bucket_name(
&read_config_host, &read_config_host,
&read_microservice_name, &read_microservice_name,
@@ -216,7 +207,7 @@ impl Microservice {
guard.write_file(&bucket, id) guard.write_file(&bucket, id)
}; };
let outputs = (self.process)(request_id, &read_file, &write_file).await?; let outputs = (self.process)(request_id, &read_file, &write_file)?;
{ {
let mut guard = file_context let mut guard = file_context
.lock() .lock()
@@ -263,7 +254,7 @@ impl RequestFileContext {
}) })
} }
fn read_file(&mut self, s3_client: &Client, bucket: &str, id: u64) -> Result<ReadStream, AnyError> { fn read_file(&mut self, s3_client: &Client, bucket: &str, id: u64) -> Result<ReadFile, AnyError> {
let bucket_name = bucket.to_string(); let bucket_name = bucket.to_string();
let object_key = id.to_string(); let object_key = id.to_string();
let client = s3_client.clone(); let client = s3_client.clone();
@@ -276,8 +267,11 @@ impl RequestFileContext {
.key(&object_key) .key(&object_key)
.send() .send()
.await?; .await?;
let mut stream = response.body.into_async_read();
let mut bytes = Vec::new();
stream.read_to_end(&mut bytes).await?;
Ok::<ReadStream, AnyError>(Box::pin(response.body.into_async_read())) Ok::<ReadFile, AnyError>(Box::new(Cursor::new(bytes)))
}) })
}) })
} }
@@ -320,10 +314,6 @@ impl Drop for RequestFileContext {
} }
} }
fn normalize_key_component(value: &str) -> String {
value.trim_matches('/').to_string()
}
fn upload_to_s3( fn upload_to_s3(
s3_client: &Client, s3_client: &Client,
bucket: &str, bucket: &str,