Add Python support.

This commit is contained in:
2026-04-25 17:10:41 +01:00
parent 5ff4b3ee5c
commit 48adf25837
11 changed files with 467 additions and 14 deletions

View File

@@ -2,6 +2,8 @@ use std::collections::HashMap;
use std::error::Error;
use std::fs::{self, File};
use std::io::{Cursor, ErrorKind, Read};
#[cfg(feature = "python")]
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::Arc;
@@ -26,12 +28,20 @@ use serde_json::Value;
use tokio::io::AsyncReadExt;
use tracing_subscriber::EnvFilter;
#[cfg(feature = "python")]
use pyo3::exceptions::{PyRuntimeError, PyTypeError};
#[cfg(feature = "python")]
use pyo3::prelude::*;
#[cfg(feature = "python")]
use pyo3::types::{PyAny, PyBool, PyBytes, PyInt, PyModule, PyString};
pub type AnyError = Box<dyn Error + Send + Sync + 'static>;
pub type ReadFile = Box<dyn Read + Send + 'static>;
pub type ReadFileFn = dyn Fn(&str, u64) -> Result<ReadFile, AnyError> + Send + Sync + 'static;
pub type WriteFileFn = dyn Fn(&str, u64) -> Result<File, AnyError> + Send + Sync + 'static;
type ProcessFn = dyn Fn(u64, &ReadFileFn, &WriteFileFn) -> Result<Vec<(u64, CaseKey)>, AnyError>
type ProcessFn = dyn Fn(u64, Arc<ReadFileFn>, Arc<WriteFileFn>) -> Result<Vec<(u64, CaseKey)>, AnyError>
+ Send
+ Sync
+ 'static;
@@ -79,6 +89,19 @@ pub struct Microservice {
}
impl Microservice {
fn new_case_key(
name: impl Into<String>,
config_host: impl Into<String>,
process: Arc<ProcessFn>,
) -> Self {
init_tracing();
Self {
name: name.into(),
config_host: config_host.into(),
process,
}
}
/// Create a new microservice runtime.
///
/// `process` accepts an inbound request ID, a `read_file` function, and a
@@ -93,10 +116,10 @@ impl Microservice {
init_tracing();
let process_wrapper = move |
request: u64,
read_file: &ReadFileFn,
write_file: &WriteFileFn,
read_file: Arc<ReadFileFn>,
write_file: Arc<WriteFileFn>,
| -> Result<Vec<(u64, CaseKey)>, AnyError> {
let outputs = process(request, read_file, write_file)?;
let outputs = process(request, read_file.as_ref(), write_file.as_ref())?;
let mut mapped = Vec::with_capacity(outputs.len());
for (id, case) in outputs {
let value = serde_json::to_value(case)
@@ -190,7 +213,7 @@ impl Microservice {
let read_config_host = config_host.clone();
let read_microservice_name = microservice_name.clone();
let read_file = move |key: &str, id: u64| -> Result<ReadFile, AnyError> {
let read_file: Arc<ReadFileFn> = Arc::new(move |key: &str, id: u64| -> Result<ReadFile, AnyError> {
let bucket = resolve_bucket_name(
&read_config_host,
&read_microservice_name,
@@ -202,9 +225,9 @@ impl Microservice {
.lock()
.map_err(|e| format!("file context lock poisoned for read_file: {}", e))?;
guard.read_file(s3_read_client.as_ref(), &bucket, id)
};
});
let write_file = move |key: &str, id: u64| -> Result<File, AnyError> {
let write_file: Arc<WriteFileFn> = Arc::new(move |key: &str, id: u64| -> Result<File, AnyError> {
let bucket = resolve_bucket_name(
&config_host,
&microservice_name,
@@ -215,9 +238,9 @@ impl Microservice {
.lock()
.map_err(|e| format!("file context lock poisoned for write_file: {}", e))?;
guard.write_file(&bucket, id)
};
});
let outputs = (self.process)(request_id, &read_file, &write_file)?;
let outputs = (self.process)(request_id, Arc::clone(&read_file), Arc::clone(&write_file))?;
{
let mut guard = file_context
.lock()
@@ -408,9 +431,9 @@ fn fetch_rabbitmq_url_from_sys_map() -> Result<String, AnyError> {
}
async fn fetch_s3_client_from_sys_map() -> Result<Arc<Client>, AnyError> {
let response = reqwest::blocking::get("https://sys-map.slingshot.cv/object-storage")?;
let response = reqwest::get("https://sys-map.slingshot.cv/object-storage").await?;
let response = response.error_for_status()?;
let config = response.json::<ObjectStorageConfig>()?;
let config = response.json::<ObjectStorageConfig>().await?;
let host = single_value(&config.host, "host")?;
let access_key_ref = single_value(&config.pass_access_key, "pass:access-key")?;
@@ -456,9 +479,13 @@ fn resolve_bucket_name(
}
let url = bucket_mapping_url(config_host, microservice_name, key);
let response = reqwest::blocking::get(&url)?;
let response = response.error_for_status()?;
let bucket_name = response.text()?.trim().to_string();
let bucket_name = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
let response = reqwest::get(&url).await?;
let response = response.error_for_status()?;
Ok::<String, AnyError>(response.text().await?.trim().to_string())
})
})?;
if bucket_name.is_empty() {
return Err(format!("bucket mapping '{}' returned an empty bucket name", url).into());
@@ -614,3 +641,182 @@ async fn publish_outputs(
Ok(())
}
#[cfg(feature = "python")]
fn any_error_to_py(err: AnyError) -> PyErr {
PyRuntimeError::new_err(err.to_string())
}
#[cfg(feature = "python")]
fn case_key_from_py_value(value: &Bound<'_, PyAny>) -> PyResult<CaseKey> {
if value.is_instance_of::<PyBool>() {
return Ok(CaseKey::Bool(value.extract::<bool>()?));
}
if value.is_instance_of::<PyInt>() {
return Ok(CaseKey::Int(value.extract::<i128>()?));
}
if value.is_instance_of::<PyString>() {
return Ok(CaseKey::String(value.extract::<String>()?));
}
Err(PyTypeError::new_err(
"case variable must be one of: bool, int, string",
))
}
#[cfg(feature = "python")]
#[pyclass(name = "ReadFileFn")]
struct PyReadFileFn {
inner: Arc<ReadFileFn>,
}
#[cfg(feature = "python")]
#[pymethods]
impl PyReadFileFn {
fn __call__(&self, py: Python<'_>, key: &str, id: u64) -> PyResult<Py<PyAny>> {
let mut reader = (self.inner)(key, id).map_err(any_error_to_py)?;
let mut data = Vec::new();
reader.read_to_end(&mut data).map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
let io = py.import("io")?;
let bytes_io = io
.getattr("BytesIO")?
.call1((PyBytes::new(py, &data),))?;
Ok(bytes_io.unbind())
}
}
#[cfg(feature = "python")]
#[pyclass]
struct PyWriteHandle {
file: Arc<Mutex<File>>,
}
#[cfg(feature = "python")]
#[pymethods]
impl PyWriteHandle {
fn write(&self, data: &[u8]) -> PyResult<usize> {
let mut file = self
.file
.lock()
.map_err(|e| PyRuntimeError::new_err(format!("write lock poisoned: {}", e)))?;
file.write_all(data)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
Ok(data.len())
}
fn flush(&self) -> PyResult<()> {
let mut file = self
.file
.lock()
.map_err(|e| PyRuntimeError::new_err(format!("flush lock poisoned: {}", e)))?;
file.flush()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
}
#[cfg(feature = "python")]
#[pyclass(name = "WriteFileFn")]
struct PyWriteFileFn {
inner: Arc<WriteFileFn>,
}
#[cfg(feature = "python")]
#[pymethods]
impl PyWriteFileFn {
fn __call__(&self, py: Python<'_>, key: &str, id: u64) -> PyResult<Py<PyWriteHandle>> {
let file = (self.inner)(key, id).map_err(any_error_to_py)?;
Py::new(
py,
PyWriteHandle {
file: Arc::new(Mutex::new(file)),
},
)
}
}
#[cfg(feature = "python")]
fn run_python_process(
process: &Py<PyAny>,
request: u64,
read_file: Arc<ReadFileFn>,
write_file: Arc<WriteFileFn>,
) -> Result<Vec<(u64, CaseKey)>, AnyError> {
Python::with_gil(|py| -> Result<Vec<(u64, CaseKey)>, AnyError> {
let py_read = Py::new(py, PyReadFileFn { inner: read_file })
.map_err(|e| format!("failed to build Python ReadFileFn wrapper: {}", e))?;
let py_write = Py::new(py, PyWriteFileFn { inner: write_file })
.map_err(|e| format!("failed to build Python WriteFileFn wrapper: {}", e))?;
let returned = process
.call1(py, (request, py_read, py_write))
.map_err(|e| format!("Python process callback failed: {}", e))?;
let iter = returned
.bind(py)
.try_iter()
.map_err(|e| format!("process return value must be iterable: {}", e))?;
let mut outputs = Vec::new();
for item in iter {
let item = item.map_err(|e| format!("failed to iterate process outputs: {}", e))?;
let (id, case_obj): (u64, Py<PyAny>) = item
.extract()
.map_err(|e| format!("each output must be a tuple (int, case): {}", e))?;
let case = case_key_from_py_value(case_obj.bind(py))
.map_err(|e| format!("invalid case variable: {}", e))?;
outputs.push((id, case));
}
Ok(outputs)
})
}
#[cfg(feature = "python")]
#[pyclass(name = "Microservice")]
struct PyMicroservice {
name: String,
config_host: String,
process: Py<PyAny>,
}
#[cfg(feature = "python")]
#[pymethods]
impl PyMicroservice {
#[new]
fn new(name: String, config_host: String, process: Py<PyAny>) -> PyResult<Self> {
Python::with_gil(|py| {
if !process.bind(py).is_callable() {
return Err(PyTypeError::new_err("process must be callable"));
}
Ok(Self {
name,
config_host,
process,
})
})
}
fn start(&self) -> PyResult<()> {
let process = Python::with_gil(|py| self.process.clone_ref(py));
let microservice = Microservice::new_case_key(
self.name.clone(),
self.config_host.clone(),
Arc::new(move |request, read_file, write_file| run_python_process(&process, request, read_file, write_file)),
);
microservice.start().map_err(any_error_to_py)
}
}
#[cfg(feature = "python")]
#[pymodule]
fn _native(_py: Python<'_>, module: &Bound<'_, PyModule>) -> PyResult<()> {
module.add_class::<PyMicroservice>()?;
Ok(())
}