Push new feature: auto config for S3.

This commit is contained in:
2026-04-25 00:37:07 +01:00
parent f75046e74d
commit 61fa4cc645
5 changed files with 1615 additions and 98 deletions

View File

@@ -1,8 +1,19 @@
use std::collections::HashMap;
use std::error::Error;
use std::future::Future;
use std::fs::{self, File};
use std::io::ErrorKind;
use std::pin::Pin;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use aws_config::BehaviorVersion;
use aws_sdk_s3::Client;
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::primitives::ByteStream;
use futures_util::StreamExt;
use lapin::options::{
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
@@ -14,11 +25,21 @@ use lapin::{BasicProperties, Channel, Connection, ConnectionProperties};
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use tokio::io::AsyncRead;
use tracing_subscriber::EnvFilter;
type AnyError = Box<dyn Error + Send + Sync + 'static>;
pub type AnyError = Box<dyn Error + Send + Sync + 'static>;
type ProcessFn = dyn Fn(u64) -> Vec<(u64, Value)> + Send + Sync + 'static;
pub type ReadStream = Pin<Box<dyn AsyncRead + Send + Unpin + 'static>>;
pub type ReadFileFn = dyn Fn(&str, u64) -> Result<ReadStream, AnyError> + Send + Sync + 'static;
pub type WriteFileFn = dyn Fn(&str, u64) -> Result<File, AnyError> + Send + Sync + 'static;
pub type ProcessFuture<'a, C> = Pin<Box<dyn Future<Output = Result<Vec<(u64, C)>, AnyError>> + Send + 'a>>;
type ProcessFn = dyn for<'a> Fn(u64, &'a ReadFileFn, &'a WriteFileFn) -> ProcessFuture<'a, Value>
+ Send
+ Sync
+ 'static;
static REQUEST_FILE_CONTEXT_COUNTER: AtomicU64 = AtomicU64::new(1);
fn init_tracing() {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
@@ -44,8 +65,9 @@ struct OutboundCase {
/// The microservice:
/// 1) Retrieves queue metadata from a configuration service,
/// 2) Consumes u64 IDs from an inbound queue,
/// 3) Runs the user-provided processing function,
/// 4) Routes each output ID to outbound queue(s) based on case variables.
/// 3) Runs the user-provided processing function with S3-backed file helpers,
/// 4) Closes/finalizes staged files and uploads writes,
/// 5) Routes each output ID to outbound queue(s) based on case variables.
pub struct Microservice {
name: String,
config_host: String,
@@ -55,24 +77,36 @@ pub struct Microservice {
impl Microservice {
/// Create a new microservice runtime.
///
/// `process` accepts an inbound request ID and returns a list of
/// `process` accepts an inbound request ID, a `read_file` function, and a
/// `write_file` function, and then returns a list of
/// `(result_id, case_variable)` tuples. Case variables can be any
/// serializable primitive, such as `String`, `bool`, or integers.
pub fn new<F, C>(name: impl Into<String>, config_host: impl Into<String>, process: F) -> Self
where
F: Fn(u64) -> Vec<(u64, C)> + Send + Sync + 'static,
C: Serialize,
F: for<'a> Fn(u64, &'a ReadFileFn, &'a WriteFileFn) -> ProcessFuture<'a, C>
+ Send
+ Sync
+ 'static,
C: Serialize + 'static,
{
init_tracing();
let process_wrapper = move |request: u64| -> Vec<(u64, Value)> {
process(request)
.into_iter()
.map(|(id, case)| {
let value = serde_json::to_value(case)
.expect("case variable must be serializable to JSON");
(id, value)
})
.collect()
let process_wrapper = move |
request: u64,
read_file: &ReadFileFn,
write_file: &WriteFileFn,
| -> ProcessFuture<'_, Value> {
let fut = process(request, read_file, write_file);
Box::pin(async move {
let outputs = fut.await?;
Ok(outputs
.into_iter()
.map(|(id, case)| {
let value = serde_json::to_value(case)
.expect("case variable must be serializable to JSON");
(id, value)
})
.collect())
})
};
Self {
@@ -91,8 +125,9 @@ impl Microservice {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let s3_client = runtime.block_on(fetch_s3_client_from_sys_map())?;
runtime.block_on(self.run_consumer(config.inbound, route_map, amqp_url))
runtime.block_on(self.run_consumer(config.inbound, route_map, amqp_url, s3_client))
}
fn fetch_config(&self) -> Result<QueueConfig, AnyError> {
@@ -108,7 +143,9 @@ impl Microservice {
inbound_queue: String,
route_map: HashMap<String, Vec<String>>,
amqp_url: String,
s3_client: Arc<Client>,
) -> Result<(), AnyError> {
let bucket_name_cache = Arc::new(Mutex::new(HashMap::<String, String>::new()));
let connection = Connection::connect(&amqp_url, ConnectionProperties::default()).await?;
let channel = connection.create_channel().await?;
@@ -142,7 +179,51 @@ impl Microservice {
}
};
let outputs = (self.process)(request_id);
let file_context = Arc::new(Mutex::new(RequestFileContext::new(request_id)?));
let read_context = Arc::clone(&file_context);
let write_context = Arc::clone(&file_context);
let s3_read_client = Arc::clone(&s3_client);
let read_bucket_cache = Arc::clone(&bucket_name_cache);
let write_bucket_cache = Arc::clone(&bucket_name_cache);
let config_host = self.config_host.clone();
let microservice_name = self.name.clone();
let read_config_host = config_host.clone();
let read_microservice_name = microservice_name.clone();
let read_file = move |key: &str, id: u64| -> Result<ReadStream, AnyError> {
let bucket = resolve_bucket_name(
&read_config_host,
&read_microservice_name,
&read_bucket_cache,
key,
)?;
let mut guard = read_context
.lock()
.map_err(|e| format!("file context lock poisoned for read_file: {}", e))?;
guard.read_file(s3_read_client.as_ref(), &bucket, id)
};
let write_file = move |key: &str, id: u64| -> Result<File, AnyError> {
let bucket = resolve_bucket_name(
&config_host,
&microservice_name,
&write_bucket_cache,
key,
)?;
let mut guard = write_context
.lock()
.map_err(|e| format!("file context lock poisoned for write_file: {}", e))?;
guard.write_file(&bucket, id)
};
let outputs = (self.process)(request_id, &read_file, &write_file).await?;
{
let mut guard = file_context
.lock()
.map_err(|e| format!("file context lock poisoned for finalize: {}", e))?;
guard.finalize(s3_client.as_ref())?
}
publish_outputs(&channel, outputs, &route_map).await?;
delivery.ack(BasicAckOptions::default()).await?;
}
@@ -151,6 +232,141 @@ impl Microservice {
}
}
#[derive(Debug)]
struct PendingUpload {
bucket: String,
object_key: String,
local_path: PathBuf,
}
#[derive(Debug)]
struct RequestFileContext {
root_dir: PathBuf,
pending_uploads: Vec<PendingUpload>,
}
impl RequestFileContext {
fn new(request_id: u64) -> Result<Self, AnyError> {
let unique = REQUEST_FILE_CONTEXT_COUNTER.fetch_add(1, Ordering::Relaxed);
let root_dir = std::env::temp_dir().join(format!(
"slingshot-microservice-{}-{}-{}",
std::process::id(),
request_id,
unique
));
fs::create_dir_all(root_dir.join("write"))?;
Ok(Self {
root_dir,
pending_uploads: Vec::new(),
})
}
fn read_file(&mut self, s3_client: &Client, bucket: &str, id: u64) -> Result<ReadStream, AnyError> {
let bucket_name = bucket.to_string();
let object_key = id.to_string();
let client = s3_client.clone();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let response = client
.get_object()
.bucket(&bucket_name)
.key(&object_key)
.send()
.await?;
Ok::<ReadStream, AnyError>(Box::pin(response.body.into_async_read()))
})
})
}
fn write_file(&mut self, key: &str, id: u64) -> Result<File, AnyError> {
let object_key = id.to_string();
let local_path = self
.root_dir
.join("write")
.join(format!("{}.bin", self.pending_uploads.len()));
self.pending_uploads.push(PendingUpload {
bucket: key.to_string(),
object_key,
local_path: local_path.clone(),
});
Ok(File::create(local_path)?)
}
fn finalize(&mut self, s3_client: &Client) -> Result<(), AnyError> {
for upload in &self.pending_uploads {
upload_to_s3(s3_client, &upload.bucket, &upload.object_key, &upload.local_path)?;
remove_if_exists(&upload.local_path)?;
}
self.pending_uploads.clear();
remove_dir_if_exists(&self.root_dir)?;
Ok(())
}
}
impl Drop for RequestFileContext {
fn drop(&mut self) {
for upload in &self.pending_uploads {
let _ = remove_if_exists(&upload.local_path);
}
let _ = remove_dir_if_exists(&self.root_dir);
}
}
fn normalize_key_component(value: &str) -> String {
value.trim_matches('/').to_string()
}
fn upload_to_s3(
s3_client: &Client,
bucket: &str,
object_key: &str,
local_path: &Path,
) -> Result<(), AnyError> {
let client = s3_client.clone();
let bucket_name = bucket.to_string();
let key = object_key.to_string();
let path = local_path.to_path_buf();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let body = ByteStream::from_path(&path).await?;
client
.put_object()
.bucket(&bucket_name)
.key(&key)
.body(body)
.send()
.await?;
Ok::<(), AnyError>(())
})
})
}
fn remove_if_exists(path: &Path) -> Result<(), AnyError> {
match fs::remove_file(path) {
Ok(()) => Ok(()),
Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
fn remove_dir_if_exists(path: &Path) -> Result<(), AnyError> {
match fs::remove_dir_all(path) {
Ok(()) => Ok(()),
Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
#[derive(Debug, Deserialize)]
struct RabbitMqConfig {
port: Vec<u16>,
@@ -159,6 +375,15 @@ struct RabbitMqConfig {
pass: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct ObjectStorageConfig {
host: Vec<String>,
#[serde(rename = "pass:access-key")]
pass_access_key: Vec<String>,
#[serde(rename = "pass:secret-key")]
pass_secret_key: Vec<String>,
}
fn fetch_rabbitmq_url_from_sys_map() -> Result<String, AnyError> {
let response = reqwest::blocking::get("https://sys-map.slingshot.cv/rabbitmq")?;
let response = response.error_for_status()?;
@@ -176,6 +401,70 @@ fn fetch_rabbitmq_url_from_sys_map() -> Result<String, AnyError> {
Ok(format!("amqp://{}:{}@{}:{}/%2f", username, pass, host, port))
}
async fn fetch_s3_client_from_sys_map() -> Result<Arc<Client>, AnyError> {
let response = reqwest::blocking::get("https://sys-map.slingshot.cv/object-storage")?;
let response = response.error_for_status()?;
let config = response.json::<ObjectStorageConfig>()?;
let host = single_value(&config.host, "host")?;
let access_key_ref = single_value(&config.pass_access_key, "pass:access-key")?;
let secret_key_ref = single_value(&config.pass_secret_key, "pass:secret-key")?;
let access_key = resolve_password_from_pass(&access_key_ref)?;
let secret_key = resolve_password_from_pass(&secret_key_ref)?;
info!("Fetched object storage config from sys-map: host={}", host);
let shared_config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new("us-east-1"))
.credentials_provider(Credentials::new(
access_key,
secret_key,
None,
None,
"sys-map",
))
.load()
.await;
let s3_config = aws_sdk_s3::config::Builder::from(&shared_config)
.endpoint_url(format!("https://{}", host))
.build();
Ok(Arc::new(Client::from_conf(s3_config)))
}
fn resolve_bucket_name(
config_host: &str,
microservice_name: &str,
cache: &Mutex<HashMap<String, String>>,
key: &str,
) -> Result<String, AnyError> {
{
let guard = cache
.lock()
.map_err(|e| format!("bucket-name cache lock poisoned: {}", e))?;
if let Some(bucket_name) = guard.get(key) {
return Ok(bucket_name.clone());
}
}
let url = bucket_mapping_url(config_host, microservice_name, key);
let response = reqwest::blocking::get(&url)?;
let response = response.error_for_status()?;
let bucket_name = response.text()?.trim().to_string();
if bucket_name.is_empty() {
return Err(format!("bucket mapping '{}' returned an empty bucket name", url).into());
}
let mut guard = cache
.lock()
.map_err(|e| format!("bucket-name cache lock poisoned: {}", e))?;
guard.insert(key.to_string(), bucket_name.clone());
Ok(bucket_name)
}
fn resolve_password_from_pass(pass_key: &str) -> Result<String, AnyError> {
let output = Command::new("pass").arg("show").arg(pass_key).output()?;
@@ -202,7 +491,7 @@ fn resolve_password_from_pass(pass_key: &str) -> Result<String, AnyError> {
fn single_value<T: Clone>(values: &[T], field_name: &str) -> Result<T, AnyError> {
if values.len() != 1 {
return Err(format!(
"sys-map.rabbitmq field '{}' must contain exactly one value, got {}",
"sys-map field '{}' must contain exactly one value, got {}",
field_name,
values.len()
)
@@ -220,6 +509,10 @@ fn config_url(host: &str, microservice_name: &str) -> String {
}
}
fn bucket_mapping_url(host: &str, microservice_name: &str, key: &str) -> String {
format!("{}/{}", config_url(host, microservice_name), key.trim_matches('/'))
}
fn build_route_map(outbound: &[OutboundCase]) -> HashMap<String, Vec<String>> {
let mut map = HashMap::new();
for entry in outbound {