diff --git a/README.md b/README.md index e0755f1..fbdb421 100644 --- a/README.md +++ b/README.md @@ -47,25 +47,22 @@ cargo build ```rust use slingshot_microservice::Microservice; -use slingshot_microservice::{ProcessFuture, ReadFileFn, WriteFileFn}; -use std::io::Write; -use tokio::io::AsyncReadExt; +use slingshot_microservice::{AnyError, ReadFileFn, WriteFileFn}; +use std::io::{Read, Write}; -fn process<'a>( +fn process( request: u64, - read_file: &'a ReadFileFn, - write_file: &'a WriteFileFn, -) -> ProcessFuture<'a, String> { - Box::pin(async move { - let mut input = String::new(); - let mut reader = read_file("in", request)?; - reader.read_to_string(&mut input).await?; + read_file: &ReadFileFn, + write_file: &WriteFileFn, +) -> Result, AnyError> { + let mut input = String::new(); + let mut reader = read_file("in", request)?; + reader.read_to_string(&mut input)?; - let mut writer = write_file("out", request)?; - writer.write_all(input.as_bytes())?; + let mut writer = write_file("out", request)?; + writer.write_all(input.as_bytes())?; - Ok(vec![(request, "case_a".to_string())]) - }) + Ok(vec![(request, "case_a".to_string())]) } fn main() { @@ -142,8 +139,8 @@ Within each `process` pass: 1. `read_file(key, id)` treats `key` as a bucket reference such as `in`, not as the canonical bucket name. On first use, the runtime fetches `https://{HOSTNAME}/{MICROSERVICE_NAME}/{key}` to resolve the real bucket - name, caches that mapping, and then returns an async stream for object - `id` in that bucket using the AWS SDK (`get_object(...).body.into_async_read()`). + name, caches that mapping, and then returns a synchronous reader for object + `id` in that bucket using the AWS SDK. 2. `write_file(key, id)` resolves `key` through the same cached lookup and returns an opened local file handle for writing, staging the output for `s3://{resolved_bucket}/{id}`. diff --git a/examples/simple.rs b/examples/simple.rs index a0a5a72..6f96d96 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -1,22 +1,19 @@ -use slingshot_microservice::{Microservice, ProcessFuture, ReadFileFn, WriteFileFn}; -use std::io::Write; -use tokio::io::AsyncReadExt; +use slingshot_microservice::{AnyError, Microservice, ReadFileFn, WriteFileFn}; +use std::io::{Read, Write}; -fn process<'a>( +fn process( request: u64, - read_file: &'a ReadFileFn, - write_file: &'a WriteFileFn, -) -> ProcessFuture<'a, String> { - Box::pin(async move { - let mut input = String::new(); - let mut reader = read_file("in", request)?; - reader.read_to_string(&mut input).await?; + read_file: &ReadFileFn, + write_file: &WriteFileFn, +) -> Result, AnyError> { + let mut input = String::new(); + let mut reader = read_file("in", request)?; + reader.read_to_string(&mut input)?; - let mut writer = write_file("out", request)?; - writer.write_all(input.as_bytes())?; + let mut writer = write_file("out", request)?; + writer.write_all(input.as_bytes())?; - Ok(vec![(request, "case_a".to_string())]) - }) + Ok(vec![(request, "case_a".to_string())]) } fn main() -> Result<(), Box> { diff --git a/src/lib.rs b/src/lib.rs index cb29d36..a91ff9d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,7 @@ use std::collections::HashMap; use std::error::Error; -use std::future::Future; use std::fs::{self, File}; -use std::io::ErrorKind; -use std::pin::Pin; +use std::io::{Cursor, ErrorKind, Read}; use std::path::{Path, PathBuf}; use std::process::Command; use std::sync::Arc; @@ -25,16 +23,15 @@ use lapin::{BasicProperties, Channel, Connection, ConnectionProperties}; use serde::Deserialize; use serde::Serialize; use serde_json::Value; -use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; use tracing_subscriber::EnvFilter; pub type AnyError = Box; -pub type ReadStream = Pin>; -pub type ReadFileFn = dyn Fn(&str, u64) -> Result + Send + Sync + 'static; +pub type ReadFile = Box; +pub type ReadFileFn = dyn Fn(&str, u64) -> Result + Send + Sync + 'static; pub type WriteFileFn = dyn Fn(&str, u64) -> Result + Send + Sync + 'static; -pub type ProcessFuture<'a, C> = Pin, AnyError>> + Send + 'a>>; -type ProcessFn = dyn for<'a> Fn(u64, &'a ReadFileFn, &'a WriteFileFn) -> ProcessFuture<'a, Value> +type ProcessFn = dyn Fn(u64, &ReadFileFn, &WriteFileFn) -> Result, AnyError> + Send + Sync + 'static; @@ -83,10 +80,7 @@ impl Microservice { /// serializable primitive, such as `String`, `bool`, or integers. pub fn new(name: impl Into, config_host: impl Into, process: F) -> Self where - F: for<'a> Fn(u64, &'a ReadFileFn, &'a WriteFileFn) -> ProcessFuture<'a, C> - + Send - + Sync - + 'static, + F: Fn(u64, &ReadFileFn, &WriteFileFn) -> Result, AnyError> + Send + Sync + 'static, C: Serialize + 'static, { init_tracing(); @@ -94,19 +88,16 @@ impl Microservice { request: u64, read_file: &ReadFileFn, write_file: &WriteFileFn, - | -> ProcessFuture<'_, Value> { - let fut = process(request, read_file, write_file); - Box::pin(async move { - let outputs = fut.await?; - Ok(outputs - .into_iter() - .map(|(id, case)| { - let value = serde_json::to_value(case) - .expect("case variable must be serializable to JSON"); - (id, value) - }) - .collect()) - }) + | -> Result, AnyError> { + let outputs = process(request, read_file, write_file)?; + Ok(outputs + .into_iter() + .map(|(id, case)| { + let value = serde_json::to_value(case) + .expect("case variable must be serializable to JSON"); + (id, value) + }) + .collect()) }; Self { @@ -190,7 +181,7 @@ impl Microservice { let read_config_host = config_host.clone(); let read_microservice_name = microservice_name.clone(); - let read_file = move |key: &str, id: u64| -> Result { + let read_file = move |key: &str, id: u64| -> Result { let bucket = resolve_bucket_name( &read_config_host, &read_microservice_name, @@ -216,7 +207,7 @@ impl Microservice { guard.write_file(&bucket, id) }; - let outputs = (self.process)(request_id, &read_file, &write_file).await?; + let outputs = (self.process)(request_id, &read_file, &write_file)?; { let mut guard = file_context .lock() @@ -263,7 +254,7 @@ impl RequestFileContext { }) } - fn read_file(&mut self, s3_client: &Client, bucket: &str, id: u64) -> Result { + fn read_file(&mut self, s3_client: &Client, bucket: &str, id: u64) -> Result { let bucket_name = bucket.to_string(); let object_key = id.to_string(); let client = s3_client.clone(); @@ -276,8 +267,11 @@ impl RequestFileContext { .key(&object_key) .send() .await?; + let mut stream = response.body.into_async_read(); + let mut bytes = Vec::new(); + stream.read_to_end(&mut bytes).await?; - Ok::(Box::pin(response.body.into_async_read())) + Ok::(Box::new(Cursor::new(bytes))) }) }) } @@ -320,10 +314,6 @@ impl Drop for RequestFileContext { } } -fn normalize_key_component(value: &str) -> String { - value.trim_matches('/').to_string() -} - fn upload_to_s3( s3_client: &Client, bucket: &str,