Do some connection pooling.

This prevents exhaustion of connections and also
has some performance benefits.
This commit is contained in:
2026-05-01 21:29:04 +01:00
parent aae997fc41
commit 13b82d66c6
4 changed files with 83 additions and 18 deletions

21
Cargo.lock generated
View File

@@ -1185,6 +1185,7 @@ dependencies = [
"downcast-rs", "downcast-rs",
"itoa", "itoa",
"pq-sys", "pq-sys",
"r2d2",
] ]
[[package]] [[package]]
@@ -2652,6 +2653,17 @@ version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot",
"scheduled-thread-pool",
]
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.9.4" version = "0.9.4"
@@ -2994,6 +3006,15 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.2.0" version = "1.2.0"

View File

@@ -17,7 +17,7 @@ aws-config = "1"
aws-sdk-s3 = "1" aws-sdk-s3 = "1"
futures-util = "0.3" futures-util = "0.3"
lapin = "2" lapin = "2"
diesel = { version = "2", features = ["postgres"] } diesel = { version = "2", features = ["postgres", "r2d2"] }
reqwest = { version = "0.12", features = ["blocking", "json", "rustls-tls"], default-features = false } reqwest = { version = "0.12", features = ["blocking", "json", "rustls-tls"], default-features = false }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"

View File

@@ -4,7 +4,7 @@ build-backend = "maturin"
[project] [project]
name = "slingshot-microservice" name = "slingshot-microservice"
version = "0.1.4" version = "0.1.5"
description = "Opinionated Rust framework for queue-driven microservices" description = "Opinionated Rust framework for queue-driven microservices"
license = { text = "MIT" } license = { text = "MIT" }
requires-python = ">=3.8" requires-python = ">=3.8"

View File

@@ -8,13 +8,14 @@ use std::path::{Path, PathBuf};
use std::process::Command; use std::process::Command;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex; use std::sync::Mutex;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use aws_config::BehaviorVersion; use aws_config::BehaviorVersion;
use aws_sdk_s3::Client; use aws_sdk_s3::Client;
use aws_sdk_s3::config::{Credentials, Region}; use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::primitives::ByteStream;
use diesel::Connection as DieselConnection; use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
use diesel::PgConnection; use diesel::PgConnection;
use futures_util::StreamExt; use futures_util::StreamExt;
use lapin::options::{ use lapin::options::{
@@ -49,6 +50,8 @@ type ProcessFn = dyn Fn(u64, Arc<ReadFileFn>, Arc<WriteFileFn>) -> Result<Vec<(u
+ 'static; + 'static;
static REQUEST_FILE_CONTEXT_COUNTER: AtomicU64 = AtomicU64::new(1); static REQUEST_FILE_CONTEXT_COUNTER: AtomicU64 = AtomicU64::new(1);
static DATABASE_URL_CACHE: OnceLock<String> = OnceLock::new();
static DATABASE_POOL_CACHE: OnceLock<Pool<ConnectionManager<PgConnection>>> = OnceLock::new();
fn init_tracing() { fn init_tracing() {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
@@ -128,7 +131,7 @@ impl Microservice {
| -> Result<Vec<(u64, CaseKey)>, AnyError> { | -> Result<Vec<(u64, CaseKey)>, AnyError> {
let mut connection = establish_pg_connection().map_err(|e| { let mut connection = establish_pg_connection().map_err(|e| {
format!( format!(
"failed to establish PostgreSQL connection for request {}: {}", "failed to checkout PostgreSQL connection from pool for request {}: {}",
request, e request, e
) )
})?; })?;
@@ -601,10 +604,38 @@ fn resolve_password_from_pass(pass_key: &str) -> Result<String, AnyError> {
Ok(password) Ok(password)
} }
fn establish_pg_connection() -> Result<PgConnection, AnyError> { fn establish_pg_connection() -> Result<PooledConnection<ConnectionManager<PgConnection>>, AnyError> {
let database_url = fetch_database_url_from_sys_map()?; let pool = cached_pg_pool()?;
PgConnection::establish(&database_url) pool
.map_err(|e| format!("failed to connect to PostgreSQL using sys-map DB config: {}", e).into()) .get()
.map_err(|e| format!("failed to get PostgreSQL connection from pool: {}", e).into())
}
fn cached_database_url() -> Result<String, AnyError> {
if let Some(url) = DATABASE_URL_CACHE.get() {
return Ok(url.clone());
}
let url = fetch_database_url_from_sys_map()?;
let _ = DATABASE_URL_CACHE.set(url.clone());
Ok(url)
}
fn cached_pg_pool() -> Result<&'static Pool<ConnectionManager<PgConnection>>, AnyError> {
if let Some(pool) = DATABASE_POOL_CACHE.get() {
return Ok(pool);
}
let database_url = cached_database_url()?;
let manager = ConnectionManager::<PgConnection>::new(database_url);
let pool = Pool::builder()
.build(manager)
.map_err(|e| format!("failed to build PostgreSQL connection pool: {}", e))?;
let _ = DATABASE_POOL_CACHE.set(pool);
DATABASE_POOL_CACHE
.get()
.ok_or_else(|| "failed to initialize PostgreSQL connection pool".into())
} }
fn single_value<T: Clone>(values: &[T], field_name: &str) -> Result<T, AnyError> { fn single_value<T: Clone>(values: &[T], field_name: &str) -> Result<T, AnyError> {
@@ -854,25 +885,18 @@ impl PyWriteFileFn {
#[cfg(feature = "python")] #[cfg(feature = "python")]
fn run_python_process( fn run_python_process(
process: &Py<PyAny>, process: &Py<PyAny>,
engine: &Py<PyAny>,
request: u64, request: u64,
read_file: Arc<ReadFileFn>, read_file: Arc<ReadFileFn>,
write_file: Arc<WriteFileFn>, write_file: Arc<WriteFileFn>,
) -> Result<Vec<(u64, CaseKey)>, AnyError> { ) -> Result<Vec<(u64, CaseKey)>, AnyError> {
let database_url = fetch_database_url_from_sys_map()?;
Python::with_gil(|py| -> Result<Vec<(u64, CaseKey)>, AnyError> { Python::with_gil(|py| -> Result<Vec<(u64, CaseKey)>, AnyError> {
let py_read = Py::new(py, PyReadFileFn { inner: read_file }) let py_read = Py::new(py, PyReadFileFn { inner: read_file })
.map_err(|e| format!("failed to build Python ReadFileFn wrapper: {}", e))?; .map_err(|e| format!("failed to build Python ReadFileFn wrapper: {}", e))?;
let py_write = Py::new(py, PyWriteFileFn { inner: write_file }) let py_write = Py::new(py, PyWriteFileFn { inner: write_file })
.map_err(|e| format!("failed to build Python WriteFileFn wrapper: {}", e))?; .map_err(|e| format!("failed to build Python WriteFileFn wrapper: {}", e))?;
let sqlalchemy = py
.import("sqlalchemy")
.map_err(|e| format!("failed to import sqlalchemy: {}", e))?;
let engine = sqlalchemy
.getattr("create_engine")
.and_then(|f| f.call1((database_url.as_str(),)))
.map_err(|e| format!("failed to create SQLAlchemy engine: {}", e))?;
let connection = engine let connection = engine
.bind(py)
.call_method0("connect") .call_method0("connect")
.map_err(|e| format!("failed to open SQLAlchemy connection: {}", e))?; .map_err(|e| format!("failed to open SQLAlchemy connection: {}", e))?;
@@ -909,6 +933,23 @@ fn run_python_process(
}) })
} }
#[cfg(feature = "python")]
fn create_python_sqlalchemy_engine() -> Result<Py<PyAny>, AnyError> {
let database_url = cached_database_url()?;
Python::with_gil(|py| {
let sqlalchemy = py
.import("sqlalchemy")
.map_err(|e| format!("failed to import sqlalchemy: {}", e))?;
let engine = sqlalchemy
.getattr("create_engine")
.and_then(|f| f.call1((database_url.as_str(),)))
.map_err(|e| format!("failed to create SQLAlchemy engine: {}", e))?;
Ok(engine.unbind())
})
}
#[cfg(feature = "python")] #[cfg(feature = "python")]
#[pyclass(name = "Microservice")] #[pyclass(name = "Microservice")]
struct PyMicroservice { struct PyMicroservice {
@@ -937,10 +978,13 @@ impl PyMicroservice {
fn start(&self) -> PyResult<()> { fn start(&self) -> PyResult<()> {
let process = Python::with_gil(|py| self.process.clone_ref(py)); let process = Python::with_gil(|py| self.process.clone_ref(py));
let engine = create_python_sqlalchemy_engine().map_err(any_error_to_py)?;
let microservice = Microservice::new_case_key( let microservice = Microservice::new_case_key(
self.name.clone(), self.name.clone(),
self.config_host.clone(), self.config_host.clone(),
Arc::new(move |request, read_file, write_file| run_python_process(&process, request, read_file, write_file)), Arc::new(move |request, read_file, write_file| {
run_python_process(&process, &engine, request, read_file, write_file)
}),
); );
microservice.start().map_err(any_error_to_py) microservice.start().map_err(any_error_to_py)