From 48adf258372e4032afb434178b454c52c1d13bbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Se=C3=A1n=20Healy?= Date: Sat, 25 Apr 2026 17:10:41 +0100 Subject: [PATCH] Add Python support. --- Cargo.lock | 106 ++++++++ Cargo.toml | 8 + README.md | 42 ++++ examples/py_simple.py | 18 ++ slingshot_microservice/__init__.py | 43 ++++ .../__pycache__/__init__.cpython-312.pyc | Bin 0 -> 1943 bytes .../__pycache__/__init__.cpython-314.pyc | Bin 0 -> 2160 bytes .../__pycache__/typing.cpython-312.pyc | Bin 0 -> 1521 bytes .../__pycache__/typing.cpython-314.pyc | Bin 0 -> 2124 bytes slingshot_microservice/typing.py | 30 +++ src/lib.rs | 234 ++++++++++++++++-- 11 files changed, 467 insertions(+), 14 deletions(-) create mode 100644 examples/py_simple.py create mode 100644 slingshot_microservice/__init__.py create mode 100644 slingshot_microservice/__pycache__/__init__.cpython-312.pyc create mode 100644 slingshot_microservice/__pycache__/__init__.cpython-314.pyc create mode 100644 slingshot_microservice/__pycache__/typing.cpython-312.pyc create mode 100644 slingshot_microservice/__pycache__/typing.cpython-314.pyc create mode 100644 slingshot_microservice/typing.py diff --git a/Cargo.lock b/Cargo.lock index 9ef820f..90bf202 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1535,6 +1535,12 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -1860,6 +1866,15 @@ dependencies = [ "hashbrown 0.17.0", ] +[[package]] +name = "indoc" +version = "2.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79cf5c93f93228cf8efb3ba362535fb11199ac548a09ce117c9b1adc3030d706" +dependencies = [ + "rustversion", +] + [[package]] name = "inout" version = "0.1.4" @@ -2041,6 +2056,15 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -2336,6 +2360,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "potential_utf" version = "0.1.5" @@ -2369,6 +2399,69 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "pyo3" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7778bffd85cf38175ac1f545509665d0b9b92a198ca7941f131f85f7a4f9a872" +dependencies = [ + "cfg-if", + "indoc", + "libc", + "memoffset", + "once_cell", + "portable-atomic", + "pyo3-build-config", + "pyo3-ffi", + "pyo3-macros", + "unindent", +] + +[[package]] +name = "pyo3-build-config" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94f6cbe86ef3bf18998d9df6e0f3fc1050a8c5efa409bf712e661a4366e010fb" +dependencies = [ + "once_cell", + "target-lexicon", +] + +[[package]] +name = "pyo3-ffi" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9f1b4c431c0bb1c8fb0a338709859eed0d030ff6daa34368d3b152a63dfdd8d" +dependencies = [ + "libc", + "pyo3-build-config", +] + +[[package]] +name = "pyo3-macros" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc2201328f63c4710f68abdf653c89d8dbc2858b88c5d88b0ff38a75288a9da" +dependencies = [ + "proc-macro2", + "pyo3-macros-backend", + "quote", + "syn", +] + +[[package]] +name = "pyo3-macros-backend" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fca6726ad0f3da9c9de093d6f116a93c1a38e417ed73bf138472cf4064f72028" +dependencies = [ + "heck", + "proc-macro2", + "pyo3-build-config", + "quote", + "syn", +] + [[package]] name = "quinn" version = "0.11.9" @@ -3013,6 +3106,7 @@ dependencies = [ "aws-sdk-s3", "futures-util", "lapin", + "pyo3", "reqwest", "serde", "serde_json", @@ -3125,6 +3219,12 @@ dependencies = [ "syn", ] +[[package]] +name = "target-lexicon" +version = "0.12.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" + [[package]] name = "tcp-stream" version = "0.28.0" @@ -3406,6 +3506,12 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "unindent" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7264e107f553ccae879d21fbea1d6724ac785e8c3bfc762137959b5802826ef3" + [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index cfd0a7d..f84b2d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,13 @@ edition = "2021" description = "Opinionated Rust framework for queue-driven microservices" license = "MIT" +[lib] +crate-type = ["rlib", "cdylib"] + +[features] +default = [] +python = ["dep:pyo3"] + [dependencies] aws-config = "1" aws-sdk-s3 = "1" @@ -16,6 +23,7 @@ serde_json = "1" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } +pyo3 = { version = "0.23", optional = true, features = ["extension-module", "abi3-py38"] } [dev-dependencies] anyhow = "1" diff --git a/README.md b/README.md index 2342f06..36d281d 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,48 @@ Then fetch and build dependencies: cargo build ``` +## Python Usage + +This crate also exposes Python bindings behind the Cargo feature `python`. +The Python callback receives a request ID plus callable `read_file` and +`write_file` helpers and should yield tuples of `(result_id, case_variable)`. + +```python +from typing import Generator + +from slingshot_microservice.typing import ReadFileFn, WriteFileFn +from slingshot_microservice import Microservice + + +def process( + request: int, + read_file: ReadFileFn, + write_file: WriteFileFn, +) -> Generator[tuple[int, bool | int | str], None, None]: + reader = read_file("in", request) + input_data = reader.read().decode() + + writer = write_file("out", request) + writer.write(f"Hello {input_data}".encode()) + + yield (request, True) + + +microservice = Microservice("simple-py-microservice", "sys-map.slingshot.cv", process) +microservice.start() +``` + +### Building The Python Extension + +Build with: + +```bash +cargo build --release --features python +``` + +The generated shared library can then be imported by Python as +`slingshot_microservice`. + ## Example Usage ```rust diff --git a/examples/py_simple.py b/examples/py_simple.py new file mode 100644 index 0000000..dd59308 --- /dev/null +++ b/examples/py_simple.py @@ -0,0 +1,18 @@ +from slingshot_microservice.typing import ReadFileFn, WriteFileFn +from slingshot_microservice import Microservice +from typing import Generator + +def process( + request: int, + read_file: ReadFileFn, + write_file: WriteFileFn, +) -> Generator[tuple[int, bool | int | str], None, None]: + reader = read_file("in", request) + input_data = reader.read().decode() + writer = write_file("out", request) + writer.write(f"Hello {input_data}".encode()) + yield (request, True) + + +microservice = Microservice("simple-py-microservice", "sys-map.slingshot.cv", process) +microservice.start() diff --git a/slingshot_microservice/__init__.py b/slingshot_microservice/__init__.py new file mode 100644 index 0000000..6051781 --- /dev/null +++ b/slingshot_microservice/__init__.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import importlib.util +import pathlib +import sys +from types import ModuleType + + +def _load_native() -> ModuleType: + package_dir = pathlib.Path(__file__).resolve().parent + project_root = package_dir.parent + + # Common library locations for local cargo builds and wheel installs. + candidates = [ + project_root / "target" / "debug" / "libslingshot_microservice.so", + project_root / "target" / "release" / "libslingshot_microservice.so", + ] + candidates.extend(package_dir.glob("_native*.so")) + candidates.extend(package_dir.glob("libslingshot_microservice*.so")) + + module_name = "slingshot_microservice._native" + for candidate in candidates: + if not candidate.exists(): + continue + + spec = importlib.util.spec_from_file_location(module_name, candidate) + if spec is None or spec.loader is None: + continue + + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + return module + + raise ModuleNotFoundError( + "Native extension is not built. Build it with: cargo build --features python" + ) + + +_native = _load_native() +Microservice = _native.Microservice + +__all__ = ["Microservice"] diff --git a/slingshot_microservice/__pycache__/__init__.cpython-312.pyc b/slingshot_microservice/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..a6feb95fc66eb2be16db280c531229920c48d566 GIT binary patch literal 1943 zcmb7F&2QX96d!y2wd+mYY_gjsgv4!V!$Pu&DpDyl>Vc1f@+BnTvQlL^>&eC&uf4Ko zLlSwbE#irpyEIBoydkI}CHOMwmTd0E4kWl_@ez$g>t(Ohl9`auMZ=JfR6o zsES3A*a!yq6<3s40Hp_ApT#6hR{Cjxm=;q8UuKH(7MmA;58*t6W!<(Nq$AU@y=5Y& zmK>vQ!Oym8uoUkEfE*hn0ANmP)`kkmx)MEE|%xEL$54c2EXMEl|*o_k)zh(Dg5l~M-$ul)kuM=2?e*hu*7 z2jIrH05mwCt0@1DOS9lHyZz`a)mJi#y7ssYhPnuj^uaVL5W8!c7Eij|_}zdMX(nb){}vsGz<_h@qND{lY|R@2Vwo zUMJES>g41q)KT4qo?6>NYmS}o#bQlIYve?j`-Cu9(zI36f|?c{-gB%Ch=rQ&LK|Tr zifCZ2Y&k2TWO~TMNwZpWToiezBh$hI9+_Ef)pe>JPnJ`P<{Ar@qZ`o0XRFa{cT7TK zoba|hEOuZI%W#u8=t#J)a}g{%=(V`uoPznp1^S9@Oorl4B5#R<}^vm@?po zQ}beM^4W^V@N*ZB26pQws2Ytlh7@tdw~Q_lE)h%_fr%6mKofexHs{`=0WDo zR%SZLOdn(}w=%OqX12M|?i*?KT@3mzHW!a2kj&oQ45ZQf&-d%6yE(N(u_ZtVfnM3YMn>+KoVt@SlLGGhN?&HUq z%gu$O?C4Iit(b7q%D1Zl3-=w7_hu>5)LFzFFiA1c@_&UB<*aw;YFWM;}mJ$df4BZ z=90ezz-LaIvk>;C+e<(<%(xp~PMAmJ|Kdfvqgr5*R_2XREoz!>S(^4UaL2+u;iZ73 zxlY$yb0>%x{#U#i@;Xj14D$$FcmlFV;LH(7Jp#ig!Z0KJc@4+|-xQi+TgkPQu|OGX PrXS0L`>CN@V&wK8p@`b^ literal 0 HcmV?d00001 diff --git a/slingshot_microservice/__pycache__/__init__.cpython-314.pyc b/slingshot_microservice/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d7d5938a31a9d674dd074a856980e542058251fc GIT binary patch literal 2160 zcmb7F&2JM|5P!SgFMCZkabkyr)ZP$5Y6wn*5>yJ}P^pw85V=gZQDUQG9BBW zBYNzbV^l5p*_sPW3~`|!B|6y%QTK+3PyNh%9U2e4Y=aCeg`{K zF-xA~LvO__9dOkS!J1~;CepN=yN2VM)?OIYG*2ceqvZ2@$xs7-gCRzEO^1z}0ox-m zDA6G#a#pS7J|>H)f%y;z`Sz*p14I}G^K_K~O|~~IFNR2mhbNSmI#?*VWJ}+QGexEV z#^?&#RA3Hi>_P2czLA60!Xe)r>#{{Qe;{(0fz0GcD0g^)3)dF%4ESIF^WlFXCwEwI zBSp3wTz&w6%N4o2(#aP};jWFgfu-Qe%^m|_zB}~ECqk*i^U01JN?%M)w$IPyi##OR zhCq?er$T#&FSfGG{b!be&Of14++V3H6ovW05YN-M^9B_oCkgWQIpi~MI8#rdvwKCM z%4Wa_x!P$kKOFiN#L%P6WF;&h-CKeP^9Ed~F0Bt)=ED9{JM?~XzJo;%T2S}lx~$ox zCal0y1YLjco&N;p^gH6UL#)rv1))^9ilFV2LZh0#N{WuUP&F--Q$HfiP)($MX`=Fn zYKhF(32cUX=FB40Ng08@>aL-(V`qD?=<29UW`wys5wN6bi>5_hIi0-kSSt_MCQwr)K7A!|MpohCFK}~GA zP-h(R*L*CtU?0nHl>}&MxVKe6=Nxp|soKWJp67U35zDS#TGp4KW|$t1yPk6smXPK- z4#KgLZX2edBS=EEQ`ve2;^@8sb3t06osrfSwM8E7FQnYjskS%V%MvIkcNtQQYT)<8 z!2Piw#(rMiO1$4lOwI%b+Vp1`S7c))YLY2uE}*B zHf)Yw-b#J4&0Tq(I9r>2n#|meHkG3dB~w>24Q2R=GW=V-@1=6|-mN>g8p>2%nR>Lm zrChtsHD%@Q#UCesx$x74C&zNssb}zh`ulXFXQJLS@u+XB=iF_fIgkPLvJzoi8f_}cyGLI4gOvK`Adu5d>GQn^D+{}T`7A^ci#@4Y( z7h4B<$(tYs+By6wGVDeehIs~#zW~XnpzA4!Jp)633AATj0&@SixtiEiQVnIKu8h>; O&*g#5*x=V<5cVHpq2nt6 literal 0 HcmV?d00001 diff --git a/slingshot_microservice/__pycache__/typing.cpython-312.pyc b/slingshot_microservice/__pycache__/typing.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..c1e66152828f8e6e90a734fde81c085d676f7129 GIT binary patch literal 1521 zcmb7E&1)M+6o0c{+FeO;Vmoc>gIU@ntxRH_TL(iMNGKGwlzzaPuxxh6$tL@u%&c7G zlM6odRB{Wwl#rX~rT<4yCAXoJKntbNgQ#@ssqf9kwj6RuM(8)cc{B6oy*IzvU%Fi% zu>JVwKiNG4;13!cHM=2f_C1nE@Fk>>vK$H)GDh6UjlvAg!V0az4(-AT9R|F`-Tf8- z_XtBjHF-O=uAv75ZbE7wLh5k)C$sS#cC@vHR(HYL)m9I!{(|+)9Z1_xShzIq^tykO zgC0}uxGbwWuCuC?JxjSCWo0bJAAhR6o4n*At}CJ3&qP&MNtG+_i}8?um}jvR7#*n2 z^;q(+Vv)tSbDlIYHHgKa#%8OCkAOqSQV5OIOxd9onqvbG^FQZtdLzsEjWT(jklsXq zV>Yw&-Z4zrU;-0US(4|w%Dm6V%E(eldx9YNZ>N8^DtKS=xa=oYiY(8{JMwN-Zzg;F zlsEyOv^va(i=;Ppt9f^wQ*p`)mb3m!$vkD{VVM>+9M6g?Qlc{3wcaH;4L zoF`Z&Ah~I9k<3V9hwz*AL9>D;U;7r?UyH2fuW}8dLx<~t&LBIge-ju5yT7{N>$paG z1Ia)KIS_9UT_boQw}5N}`kZwd>BPzO^0Y4E%!#->IQ=^%HV;Lda4Au1bOnp;l-5p? z>{((7m#LQE4@O+p$`h#9Xcq;m{BNjenoDwrG7J{#uzY;jaTQ1jk{--! z#U-?f3j~)5^aa05EFgFbVZVR;7I*5=Fy}ics4KzZ+q-oo7*r*1@YYMK`wqDyAa%;C z2!}8=Yq6pdA`OU(47Nz|1R5Y(O|&jR(dvPEW6q zIjs9UgL6W2*5K)?B66X4jI?pslnYhl3wFY_u*T(}Y{FYQFh3Kpkc0;ij=+c!p{#k|iN($q9x=PMBg?*j6l>=D za`eR8*-k|P4QY)cX~M7e+|Tyhw^#4kbGsiR`SBB+wonD(Q2`LI;)L@;ivraI4=@78 zG9SX(o+a&!#p}EmC%hj?MmwjbruYc{rR)1)5XWfFP?tGcs7#?qHQ}d>KZlGuba~IA zjY+UceiVIwGp#pb;?cv~jUYbK^!fgJ#4~{*WJ&7#e4MTv@%@a-E%XB+c(m3KEJGbx ztFIu#I5nFhE;215ci@2vhh9Yb5Wn^~t=!6|mRsp#9trk;)Eh6O8Ono-!H1F8)uckq z&){5+b==jJ?H<}WMufuqthDlTv{HrYJaS#R)X^R8zDVa$T{1yiPjbjFwvSRm74-_q zh$x3%Xzw7=e~NbhsnISVgoG^1{?^-+S*%E-bp@Gc^|qJy+$v#3fs;!z&)Mw;!~Pzi zS%D*bV8z8u6^<-^LHX4J<=7yu`m)+awH49WirEUH!puY`iq&Q&meYi3TKm_uo@qPP zl~gh(dlg9&o(!|N9OMkMHM+b75p+zd&3Oif?!|NhscggCfl9yOqDo|5zPWI-iKb z5h{^8@We!;m4CXnNWF|!QC@WutI(6UG9b?(+@wi8qQ@qPkI|4`9t|Yx4FqQd5{?do zbTo7-tmgE_asoL0?$~Q-8mqkXJ`&P>5&@^s6P&djy1U zYY BinaryIO: ... + + +class WriteFileFn(Protocol): + def __call__(self, key: str, id: int) -> BinaryIO: ... + + +class ProcessFn(Protocol): + def __call__( + self, + request: int, + read_file: ReadFileFn, + write_file: WriteFileFn, + ) -> Generator[tuple[int, CaseVariable], None, None]: ... + + +__all__ = [ + "CaseVariable", + "ReadFileFn", + "WriteFileFn", + "ProcessFn", +] diff --git a/src/lib.rs b/src/lib.rs index d69c28c..cd64bf5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,8 @@ use std::collections::HashMap; use std::error::Error; use std::fs::{self, File}; use std::io::{Cursor, ErrorKind, Read}; +#[cfg(feature = "python")] +use std::io::Write; use std::path::{Path, PathBuf}; use std::process::Command; use std::sync::Arc; @@ -26,12 +28,20 @@ use serde_json::Value; use tokio::io::AsyncReadExt; use tracing_subscriber::EnvFilter; +#[cfg(feature = "python")] +use pyo3::exceptions::{PyRuntimeError, PyTypeError}; +#[cfg(feature = "python")] +use pyo3::prelude::*; +#[cfg(feature = "python")] +use pyo3::types::{PyAny, PyBool, PyBytes, PyInt, PyModule, PyString}; + pub type AnyError = Box; pub type ReadFile = Box; pub type ReadFileFn = dyn Fn(&str, u64) -> Result + Send + Sync + 'static; pub type WriteFileFn = dyn Fn(&str, u64) -> Result + Send + Sync + 'static; -type ProcessFn = dyn Fn(u64, &ReadFileFn, &WriteFileFn) -> Result, AnyError> + +type ProcessFn = dyn Fn(u64, Arc, Arc) -> Result, AnyError> + Send + Sync + 'static; @@ -79,6 +89,19 @@ pub struct Microservice { } impl Microservice { + fn new_case_key( + name: impl Into, + config_host: impl Into, + process: Arc, + ) -> Self { + init_tracing(); + Self { + name: name.into(), + config_host: config_host.into(), + process, + } + } + /// Create a new microservice runtime. /// /// `process` accepts an inbound request ID, a `read_file` function, and a @@ -93,10 +116,10 @@ impl Microservice { init_tracing(); let process_wrapper = move | request: u64, - read_file: &ReadFileFn, - write_file: &WriteFileFn, + read_file: Arc, + write_file: Arc, | -> Result, AnyError> { - let outputs = process(request, read_file, write_file)?; + let outputs = process(request, read_file.as_ref(), write_file.as_ref())?; let mut mapped = Vec::with_capacity(outputs.len()); for (id, case) in outputs { let value = serde_json::to_value(case) @@ -190,7 +213,7 @@ impl Microservice { let read_config_host = config_host.clone(); let read_microservice_name = microservice_name.clone(); - let read_file = move |key: &str, id: u64| -> Result { + let read_file: Arc = Arc::new(move |key: &str, id: u64| -> Result { let bucket = resolve_bucket_name( &read_config_host, &read_microservice_name, @@ -202,9 +225,9 @@ impl Microservice { .lock() .map_err(|e| format!("file context lock poisoned for read_file: {}", e))?; guard.read_file(s3_read_client.as_ref(), &bucket, id) - }; + }); - let write_file = move |key: &str, id: u64| -> Result { + let write_file: Arc = Arc::new(move |key: &str, id: u64| -> Result { let bucket = resolve_bucket_name( &config_host, µservice_name, @@ -215,9 +238,9 @@ impl Microservice { .lock() .map_err(|e| format!("file context lock poisoned for write_file: {}", e))?; guard.write_file(&bucket, id) - }; + }); - let outputs = (self.process)(request_id, &read_file, &write_file)?; + let outputs = (self.process)(request_id, Arc::clone(&read_file), Arc::clone(&write_file))?; { let mut guard = file_context .lock() @@ -408,9 +431,9 @@ fn fetch_rabbitmq_url_from_sys_map() -> Result { } async fn fetch_s3_client_from_sys_map() -> Result, AnyError> { - let response = reqwest::blocking::get("https://sys-map.slingshot.cv/object-storage")?; + let response = reqwest::get("https://sys-map.slingshot.cv/object-storage").await?; let response = response.error_for_status()?; - let config = response.json::()?; + let config = response.json::().await?; let host = single_value(&config.host, "host")?; let access_key_ref = single_value(&config.pass_access_key, "pass:access-key")?; @@ -456,9 +479,13 @@ fn resolve_bucket_name( } let url = bucket_mapping_url(config_host, microservice_name, key); - let response = reqwest::blocking::get(&url)?; - let response = response.error_for_status()?; - let bucket_name = response.text()?.trim().to_string(); + let bucket_name = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + let response = reqwest::get(&url).await?; + let response = response.error_for_status()?; + Ok::(response.text().await?.trim().to_string()) + }) + })?; if bucket_name.is_empty() { return Err(format!("bucket mapping '{}' returned an empty bucket name", url).into()); @@ -614,3 +641,182 @@ async fn publish_outputs( Ok(()) } + +#[cfg(feature = "python")] +fn any_error_to_py(err: AnyError) -> PyErr { + PyRuntimeError::new_err(err.to_string()) +} + +#[cfg(feature = "python")] +fn case_key_from_py_value(value: &Bound<'_, PyAny>) -> PyResult { + if value.is_instance_of::() { + return Ok(CaseKey::Bool(value.extract::()?)); + } + + if value.is_instance_of::() { + return Ok(CaseKey::Int(value.extract::()?)); + } + + if value.is_instance_of::() { + return Ok(CaseKey::String(value.extract::()?)); + } + + Err(PyTypeError::new_err( + "case variable must be one of: bool, int, string", + )) +} + +#[cfg(feature = "python")] +#[pyclass(name = "ReadFileFn")] +struct PyReadFileFn { + inner: Arc, +} + +#[cfg(feature = "python")] +#[pymethods] +impl PyReadFileFn { + fn __call__(&self, py: Python<'_>, key: &str, id: u64) -> PyResult> { + let mut reader = (self.inner)(key, id).map_err(any_error_to_py)?; + let mut data = Vec::new(); + reader.read_to_end(&mut data).map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + + let io = py.import("io")?; + let bytes_io = io + .getattr("BytesIO")? + .call1((PyBytes::new(py, &data),))?; + + Ok(bytes_io.unbind()) + } +} + +#[cfg(feature = "python")] +#[pyclass] +struct PyWriteHandle { + file: Arc>, +} + +#[cfg(feature = "python")] +#[pymethods] +impl PyWriteHandle { + fn write(&self, data: &[u8]) -> PyResult { + let mut file = self + .file + .lock() + .map_err(|e| PyRuntimeError::new_err(format!("write lock poisoned: {}", e)))?; + file.write_all(data) + .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + Ok(data.len()) + } + + fn flush(&self) -> PyResult<()> { + let mut file = self + .file + .lock() + .map_err(|e| PyRuntimeError::new_err(format!("flush lock poisoned: {}", e)))?; + file.flush() + .map_err(|e| PyRuntimeError::new_err(e.to_string())) + } +} + +#[cfg(feature = "python")] +#[pyclass(name = "WriteFileFn")] +struct PyWriteFileFn { + inner: Arc, +} + +#[cfg(feature = "python")] +#[pymethods] +impl PyWriteFileFn { + fn __call__(&self, py: Python<'_>, key: &str, id: u64) -> PyResult> { + let file = (self.inner)(key, id).map_err(any_error_to_py)?; + Py::new( + py, + PyWriteHandle { + file: Arc::new(Mutex::new(file)), + }, + ) + } +} + +#[cfg(feature = "python")] +fn run_python_process( + process: &Py, + request: u64, + read_file: Arc, + write_file: Arc, +) -> Result, AnyError> { + Python::with_gil(|py| -> Result, AnyError> { + let py_read = Py::new(py, PyReadFileFn { inner: read_file }) + .map_err(|e| format!("failed to build Python ReadFileFn wrapper: {}", e))?; + let py_write = Py::new(py, PyWriteFileFn { inner: write_file }) + .map_err(|e| format!("failed to build Python WriteFileFn wrapper: {}", e))?; + + let returned = process + .call1(py, (request, py_read, py_write)) + .map_err(|e| format!("Python process callback failed: {}", e))?; + + let iter = returned + .bind(py) + .try_iter() + .map_err(|e| format!("process return value must be iterable: {}", e))?; + + let mut outputs = Vec::new(); + for item in iter { + let item = item.map_err(|e| format!("failed to iterate process outputs: {}", e))?; + let (id, case_obj): (u64, Py) = item + .extract() + .map_err(|e| format!("each output must be a tuple (int, case): {}", e))?; + let case = case_key_from_py_value(case_obj.bind(py)) + .map_err(|e| format!("invalid case variable: {}", e))?; + outputs.push((id, case)); + } + + Ok(outputs) + }) +} + +#[cfg(feature = "python")] +#[pyclass(name = "Microservice")] +struct PyMicroservice { + name: String, + config_host: String, + process: Py, +} + +#[cfg(feature = "python")] +#[pymethods] +impl PyMicroservice { + #[new] + fn new(name: String, config_host: String, process: Py) -> PyResult { + Python::with_gil(|py| { + if !process.bind(py).is_callable() { + return Err(PyTypeError::new_err("process must be callable")); + } + + Ok(Self { + name, + config_host, + process, + }) + }) + } + + fn start(&self) -> PyResult<()> { + let process = Python::with_gil(|py| self.process.clone_ref(py)); + let microservice = Microservice::new_case_key( + self.name.clone(), + self.config_host.clone(), + Arc::new(move |request, read_file, write_file| run_python_process(&process, request, read_file, write_file)), + ); + + microservice.start().map_err(any_error_to_py) + } +} + +#[cfg(feature = "python")] +#[pymodule] +fn _native(_py: Python<'_>, module: &Bound<'_, PyModule>) -> PyResult<()> { + module.add_class::()?; + + Ok(()) +}