From 76c63fc3ef63487928d79c373825f3a77900e17d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Se=C3=A1n=20Healy?= Date: Sat, 25 Apr 2026 21:42:07 +0100 Subject: [PATCH] Add standardised SQL connection setup for microservices. --- .gitignore | 2 + Cargo.lock | 133 +++++++++++++++++++++++ Cargo.toml | 1 + README.md | 30 +++-- examples/py_simple.py | 2 + examples/simple.rs | 2 + pyproject.toml | 5 +- slingshot_microservice/typing.py | 8 +- src/lib.rs | 181 ++++++++++++++++++++++++++----- 9 files changed, 325 insertions(+), 39 deletions(-) diff --git a/.gitignore b/.gitignore index b83d222..4ab2c5c 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ /target/ +__pycache__/ +*.so diff --git a/Cargo.lock b/Cargo.lock index 90bf202..36f2cbc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -811,6 +811,12 @@ version = "3.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.1" @@ -1060,6 +1066,41 @@ dependencies = [ "cmov", ] +[[package]] +name = "darling" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "data-encoding" version = "2.11.0" @@ -1132,6 +1173,42 @@ dependencies = [ "cipher", ] +[[package]] +name = "diesel" +version = "2.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78df0e4e8c596662edb07fbfbb7f23769cca35049827df5f909084d956b6aeaf" +dependencies = [ + "bitflags 2.11.1", + "byteorder", + "diesel_derives", + "downcast-rs", + "itoa", + "pq-sys", +] + +[[package]] +name = "diesel_derives" +version = "2.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b79402bd1cfb25b65650f0f4901d0e79c095729e2139c8ab779d025968c7099" +dependencies = [ + "diesel_table_macro_syntax", + "dsl_auto_type", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "diesel_table_macro_syntax" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe2444076b48641147115697648dc743c2c00b61adade0f01ce67133c7babe8c" +dependencies = [ + "syn", +] + [[package]] name = "digest" version = "0.10.7" @@ -1172,6 +1249,26 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "780955b8b195a21ab8e4ac6b60dd1dbdcec1dc6c51c0617964b08c81785e12c9" +[[package]] +name = "downcast-rs" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "117240f60069e65410b3ae1bb213295bd828f707b5bec6596a1afc8793ce0cbc" + +[[package]] +name = "dsl_auto_type" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd122633e4bef06db27737f21d3738fb89c8f6d5360d6d9d7635dda142a7757e" +dependencies = [ + "darling", + "either", + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dunce" version = "1.0.5" @@ -1835,6 +1932,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.1.0" @@ -2330,6 +2433,12 @@ dependencies = [ "spki 0.6.0", ] +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "polling" version = "2.8.0" @@ -2390,6 +2499,17 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "pq-sys" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "574ddd6a267294433f140b02a726b0640c43cf7c6f717084684aaa3b285aba61" +dependencies = [ + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -3104,6 +3224,7 @@ dependencies = [ "anyhow", "aws-config", "aws-sdk-s3", + "diesel", "futures-util", "lapin", "pyo3", @@ -3182,6 +3303,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.6.1" @@ -3558,6 +3685,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index f84b2d1..7021875 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ aws-config = "1" aws-sdk-s3 = "1" futures-util = "0.3" lapin = "2" +diesel = { version = "2", features = ["postgres"] } reqwest = { version = "0.12", features = ["blocking", "json", "rustls-tls"], default-features = false } serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/README.md b/README.md index 2687ed6..13de5a2 100644 --- a/README.md +++ b/README.md @@ -9,9 +9,13 @@ assumptions about a microservice: 1. A microservice listens to incoming requests on its own dedicated and singular queue (RabbitMQ). 2. Incoming requests are in the form of a 64-bit unsigned integer (`u64`). -2. Microservices process requests via a `process` function, which takes three - arguments: the incoming request (`u64`), a `read_file` function, and a - `write_file` function. +2. Microservices process requests via a `process` function, which takes four + arguments: the incoming request (`u64`), a `read_file` function, a + `write_file` function, and a database ORM `connection`. +3. All microservices must communicate with the shared PostgreSQL database via + an ORM connection passed into `process`. + - Rust microservices use `diesel::pg::PgConnection`. + - Python microservices use `sqlalchemy.engine.base.Connection`. 3. The `process` function returns a set of IDs (also `u64`) that are the result of processing the incoming request. Each of these IDs is also associated with a "case variable" that is used for routing the result to the @@ -75,6 +79,7 @@ pip install -e . ```python from typing import Generator +from sqlalchemy.engine.base import Connection from slingshot_microservice.typing import ReadFileFn, WriteFileFn from slingshot_microservice import Microservice @@ -84,6 +89,7 @@ def process( request: int, read_file: ReadFileFn, write_file: WriteFileFn, + connection: Connection, ) -> Generator[tuple[int, bool | int | str], None, None]: reader = read_file("in", request) input_data = reader.read().decode() @@ -107,7 +113,7 @@ editors and type-checkers: |---|---| | `ReadFileFn` | Callable returned by `read_file(key, id)` – behaves like `BinaryIO` | | `WriteFileFn` | Callable returned by `write_file(key, id)` – behaves like `BinaryIO` | -| `ProcessFn` | The generator function signature expected by `Microservice` | +| `ProcessFn` | The generator signature expected by `Microservice` with `(request, read_file, write_file, connection)` | | `CaseVariable` | `bool \| int \| str` – valid case variable types | ### Publishing Wheels @@ -128,6 +134,7 @@ Linux wheel covers all CPython versions ≥ 3.8. ```rust use slingshot_microservice::Microservice; +use diesel::pg::PgConnection; use slingshot_microservice::{AnyError, ReadFileFn, WriteFileFn}; use std::io::{Read, Write}; @@ -135,6 +142,7 @@ fn process( request: u64, read_file: &ReadFileFn, write_file: &WriteFileFn, + connection: &mut PgConnection, ) -> Result, AnyError> { let mut input = String::new(); let mut reader = read_file("in", request)?; @@ -213,7 +221,8 @@ actual secrets with `pass show ` before constructing the S3 client. When the microservice first starts up, it makes a request to the configuration service to get the queue metadata. Then it starts to listen to the inbound queue. Inbound requests are processed by the user-programmed `process` -function, which returns a set of tuples of the form `(result_id, case_variable)`. +function, which is called with `(request, read_file, write_file, connection)` +and returns a set of tuples of the form `(result_id, case_variable)`. Within each `process` pass: @@ -225,17 +234,20 @@ Within each `process` pass: 2. `write_file(key, id)` resolves `key` through the same cached lookup and returns an opened local file handle for writing, staging the output for `s3://{resolved_bucket}/{id}`. -3. After `process` returns, opened files are closed. -4. Then staged write files are uploaded to S3 with the AWS SDK, local staged +3. `connection` is an ORM-backed PostgreSQL connection passed into `process` + (`diesel::pg::PgConnection` in Rust, `sqlalchemy.engine.base.Connection` + in Python). +4. After `process` returns, opened files are closed. +5. Then staged write files are uploaded to S3 with the AWS SDK, local staged files are deleted, and local temporary directories are removed. -5. Only after file finalization is complete are output IDs published to +6. Only after file finalization is complete are output IDs published to outbound queues. The output queue routing step looks like this: Peudocode: ``` -for each (result_id, case_variable) in process(request): +for each (result_id, case_variable) in process(request, read_file, write_file, connection): for each outbound_queue in config.out[case_variable]: send result_id to outbound_queue ``` diff --git a/examples/py_simple.py b/examples/py_simple.py index dd59308..eef1bc6 100644 --- a/examples/py_simple.py +++ b/examples/py_simple.py @@ -1,11 +1,13 @@ from slingshot_microservice.typing import ReadFileFn, WriteFileFn from slingshot_microservice import Microservice from typing import Generator +from sqlalchemy.engine.base import Connection def process( request: int, read_file: ReadFileFn, write_file: WriteFileFn, + _connection: Connection, ) -> Generator[tuple[int, bool | int | str], None, None]: reader = read_file("in", request) input_data = reader.read().decode() diff --git a/examples/simple.rs b/examples/simple.rs index 6f96d96..ac1ccbb 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -1,10 +1,12 @@ use slingshot_microservice::{AnyError, Microservice, ReadFileFn, WriteFileFn}; +use diesel::pg::PgConnection; use std::io::{Read, Write}; fn process( request: u64, read_file: &ReadFileFn, write_file: &WriteFileFn, + _connection: &mut PgConnection, ) -> Result, AnyError> { let mut input = String::new(); let mut reader = read_file("in", request)?; diff --git a/pyproject.toml b/pyproject.toml index 82c2f50..0b2601f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,11 +4,14 @@ build-backend = "maturin" [project] name = "slingshot-microservice" -version = "0.1.0" +version = "0.1.2" description = "Opinionated Rust framework for queue-driven microservices" license = { text = "MIT" } requires-python = ">=3.8" readme = "README.md" +dependencies = [ + "sqlalchemy>=2", +] authors = [ { name = "Seán Healy", email = "sean@seanhealy.ie" } ] diff --git a/slingshot_microservice/typing.py b/slingshot_microservice/typing.py index 650feaa..26cd2c9 100644 --- a/slingshot_microservice/typing.py +++ b/slingshot_microservice/typing.py @@ -1,6 +1,11 @@ from __future__ import annotations -from typing import BinaryIO, Generator, Protocol, TypeAlias +from typing import TYPE_CHECKING, BinaryIO, Generator, Protocol, TypeAlias + +if TYPE_CHECKING: + from sqlalchemy.engine.base import Connection as SqlAlchemyConnection +else: + SqlAlchemyConnection = object CaseVariable: TypeAlias = bool | int | str @@ -19,6 +24,7 @@ class ProcessFn(Protocol): request: int, read_file: ReadFileFn, write_file: WriteFileFn, + connection: SqlAlchemyConnection, ) -> Generator[tuple[int, CaseVariable], None, None]: ... diff --git a/src/lib.rs b/src/lib.rs index cd64bf5..637cb4a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,8 @@ use aws_config::BehaviorVersion; use aws_sdk_s3::Client; use aws_sdk_s3::config::{Credentials, Region}; use aws_sdk_s3::primitives::ByteStream; +use diesel::Connection as DieselConnection; +use diesel::pg::PgConnection; use futures_util::StreamExt; use lapin::options::{ BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, @@ -89,6 +91,7 @@ pub struct Microservice { } impl Microservice { + #[cfg(feature = "python")] fn new_case_key( name: impl Into, config_host: impl Into, @@ -105,12 +108,16 @@ impl Microservice { /// Create a new microservice runtime. /// /// `process` accepts an inbound request ID, a `read_file` function, and a - /// `write_file` function, and then returns a list of + /// `write_file` function, and a mutable PostgreSQL connection, and then + /// returns a list of /// `(result_id, case_variable)` tuples. Case variables can be any /// serializable primitive, such as `String`, `bool`, or integers. pub fn new(name: impl Into, config_host: impl Into, process: F) -> Self where - F: Fn(u64, &ReadFileFn, &WriteFileFn) -> Result, AnyError> + Send + Sync + 'static, + F: Fn(u64, &ReadFileFn, &WriteFileFn, &mut PgConnection) -> Result, AnyError> + + Send + + Sync + + 'static, C: Serialize + 'static, { init_tracing(); @@ -119,7 +126,13 @@ impl Microservice { read_file: Arc, write_file: Arc, | -> Result, AnyError> { - let outputs = process(request, read_file.as_ref(), write_file.as_ref())?; + let mut connection = establish_pg_connection().map_err(|e| { + format!( + "failed to establish PostgreSQL connection for request {}: {}", + request, e + ) + })?; + let outputs = process(request, read_file.as_ref(), write_file.as_ref(), &mut connection)?; let mut mapped = Vec::with_capacity(outputs.len()); for (id, case) in outputs { let value = serde_json::to_value(case) @@ -148,7 +161,9 @@ impl Microservice { .build()?; let s3_client = runtime.block_on(fetch_s3_client_from_sys_map())?; - runtime.block_on(self.run_consumer(config.inbound, route_map, amqp_url, s3_client)) + runtime + .block_on(self.run_consumer(config.inbound, route_map, amqp_url, s3_client)) + .map_err(|e| format!("microservice '{}' failed: {}", self.name, e).into()) } fn fetch_config(&self) -> Result { @@ -240,15 +255,23 @@ impl Microservice { guard.write_file(&bucket, id) }); - let outputs = (self.process)(request_id, Arc::clone(&read_file), Arc::clone(&write_file))?; + let outputs = (self.process)(request_id, Arc::clone(&read_file), Arc::clone(&write_file)) + .map_err(|e| format!("request {}: process callback failed: {}", request_id, e))?; { let mut guard = file_context .lock() .map_err(|e| format!("file context lock poisoned for finalize: {}", e))?; - guard.finalize(s3_client.as_ref())? + guard + .finalize(s3_client.as_ref()) + .map_err(|e| format!("request {}: finalize/upload failed: {}", request_id, e))? } - publish_outputs(&channel, outputs, &route_map).await?; - delivery.ack(BasicAckOptions::default()).await?; + publish_outputs(&channel, outputs, &route_map) + .await + .map_err(|e| format!("request {}: publish failed: {}", request_id, e))?; + delivery + .ack(BasicAckOptions::default()) + .await + .map_err(|e| format!("request {}: ack failed: {}", request_id, e))?; } Ok(()) @@ -373,7 +396,16 @@ fn upload_to_s3( .key(&key) .body(body) .send() - .await?; + .await + .map_err(|e| { + format!( + "failed to upload S3 object bucket='{}' key='{}' path='{}': {:?}", + bucket_name, + key, + path.display(), + e + ) + })?; Ok::<(), AnyError>(()) }) @@ -404,6 +436,15 @@ struct RabbitMqConfig { pass: Vec, } +#[derive(Debug, Deserialize)] +struct DatabaseConfig { + port: Vec, + host: Vec, + username: Vec, + database: Vec, + pass: Vec, +} + #[derive(Debug, Deserialize)] struct ObjectStorageConfig { host: Vec, @@ -430,6 +471,42 @@ fn fetch_rabbitmq_url_from_sys_map() -> Result { Ok(format!("amqp://{}:{}@{}:{}/%2f", username, pass, host, port)) } +fn fetch_database_url_from_sys_map() -> Result { + let config = if tokio::runtime::Handle::try_current().is_ok() { + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + let response = reqwest::get("https://sys-map.slingshot.cv/db").await?; + let response = response.error_for_status()?; + response + .json::() + .await + .map_err(|e| -> AnyError { Box::new(e) }) + }) + })? + } else { + let response = reqwest::blocking::get("https://sys-map.slingshot.cv/db")?; + let response = response.error_for_status()?; + response.json::()? + }; + + let port = single_value(&config.port, "port")?; + let host = single_value(&config.host, "host")?; + let username = single_value(&config.username, "username")?; + let database = single_value(&config.database, "database")?; + let pass_key = single_value(&config.pass, "pass")?; + let password = resolve_password_from_pass(&pass_key)?; + + info!( + "Fetched DB config from sys-map: host={}, port={}, username={}, database={}", + host, port, username, database + ); + + Ok(format!( + "postgres://{}:{}@{}:{}/{}", + username, password, host, port, database + )) +} + async fn fetch_s3_client_from_sys_map() -> Result, AnyError> { let response = reqwest::get("https://sys-map.slingshot.cv/object-storage").await?; let response = response.error_for_status()?; @@ -524,6 +601,12 @@ fn resolve_password_from_pass(pass_key: &str) -> Result { Ok(password) } +fn establish_pg_connection() -> Result { + let database_url = fetch_database_url_from_sys_map()?; + PgConnection::establish(&database_url) + .map_err(|e| format!("failed to connect to PostgreSQL using sys-map DB config: {}", e).into()) +} + fn single_value(values: &[T], field_name: &str) -> Result { if values.len() != 1 { return Err(format!( @@ -633,9 +716,30 @@ async fn publish_outputs( payload.as_bytes(), BasicProperties::default(), ) - .await?; - confirm.await?; + .await + .map_err(|e| { + format!( + "failed to publish result_id={} to queue='{}': {}", + result_id, + queue, + e + ) + })?; + confirm.await.map_err(|e| { + format!( + "broker rejected publish result_id={} queue='{}': {}", + result_id, + queue, + e + ) + })?; } + } else { + info!( + "No outbound queues configured for case variable {:?}; skipping publish for result_id={}", + case_var, + result_id + ); } } @@ -745,33 +849,54 @@ fn run_python_process( read_file: Arc, write_file: Arc, ) -> Result, AnyError> { + let database_url = fetch_database_url_from_sys_map()?; + Python::with_gil(|py| -> Result, AnyError> { let py_read = Py::new(py, PyReadFileFn { inner: read_file }) .map_err(|e| format!("failed to build Python ReadFileFn wrapper: {}", e))?; let py_write = Py::new(py, PyWriteFileFn { inner: write_file }) .map_err(|e| format!("failed to build Python WriteFileFn wrapper: {}", e))?; + let sqlalchemy = py + .import("sqlalchemy") + .map_err(|e| format!("failed to import sqlalchemy: {}", e))?; + let engine = sqlalchemy + .getattr("create_engine") + .and_then(|f| f.call1((database_url.as_str(),))) + .map_err(|e| format!("failed to create SQLAlchemy engine: {}", e))?; + let connection = engine + .call_method0("connect") + .map_err(|e| format!("failed to open SQLAlchemy connection: {}", e))?; - let returned = process - .call1(py, (request, py_read, py_write)) - .map_err(|e| format!("Python process callback failed: {}", e))?; + let outputs_result = (|| -> Result, AnyError> { + let returned = process + .call1(py, (request, py_read, py_write, connection.clone())) + .map_err(|e| format!("Python process callback failed: {}", e))?; - let iter = returned - .bind(py) - .try_iter() - .map_err(|e| format!("process return value must be iterable: {}", e))?; + let iter = returned + .bind(py) + .try_iter() + .map_err(|e| format!("process return value must be iterable: {}", e))?; - let mut outputs = Vec::new(); - for item in iter { - let item = item.map_err(|e| format!("failed to iterate process outputs: {}", e))?; - let (id, case_obj): (u64, Py) = item - .extract() - .map_err(|e| format!("each output must be a tuple (int, case): {}", e))?; - let case = case_key_from_py_value(case_obj.bind(py)) - .map_err(|e| format!("invalid case variable: {}", e))?; - outputs.push((id, case)); + let mut outputs = Vec::new(); + for item in iter { + let item = item.map_err(|e| format!("failed to iterate process outputs: {}", e))?; + let (id, case_obj): (u64, Py) = item + .extract() + .map_err(|e| format!("each output must be a tuple (int, case): {}", e))?; + let case = case_key_from_py_value(case_obj.bind(py)) + .map_err(|e| format!("invalid case variable: {}", e))?; + outputs.push((id, case)); + } + + Ok(outputs) + })(); + + let close_result = connection.call_method0("close"); + if let Err(err) = close_result { + return Err(format!("failed to close SQLAlchemy connection: {}", err).into()); } - Ok(outputs) + outputs_result }) }