What pgmonkey Does Behind the Scenes
Before diving into recipes, here is what pgmonkey handles automatically so you do not have to:
- Connection caching — Connections and pools are cached by config content (SHA-256 hash). Repeated calls with the same config return the existing instance instead of creating a new one. This prevents "pool storms" where each call opens a brand-new pool.
- Async pool lifecycle — When you use
async with pool_conn:, pgmonkey borrows a connection from the pool and returns it when the block exits. The pool itself stays open for reuse. Commits on clean exit, rolls back on exception. - atexit cleanup — All cached connections and pools are automatically closed when the process exits, via an
atexithandler. - Thread-safe caching — The connection cache is protected by a threading lock, safe for multi-threaded sync applications.
Connection Recipes
Normal (Synchronous) Connection
Best for: scripts, CLI tools, simple sync apps. One connection, auto-managed.
from pgmonkey import PGConnectionManager
manager = PGConnectionManager()
connection = manager.get_database_connection('config.yaml', 'normal')
# Context manager commits on success, rolls back on exception
with connection as conn:
with conn.cursor() as cur:
cur.execute('SELECT version();')
print(cur.fetchone())
# Connection is cached — calling again returns the same instance
same_conn = manager.get_database_connection('config.yaml', 'normal')
assert same_conn is connection
pgmonkey handles: Caching (same config = same connection), atexit cleanup.
Pooled (Synchronous) Connection
Best for: Flask, Django, multi-threaded web apps. Borrows/returns connections from a shared pool.
from pgmonkey import PGConnectionManager
manager = PGConnectionManager()
pool = manager.get_database_connection('config.yaml', 'pool')
# Each 'with' block borrows a connection, returns it when done
with pool as conn:
with conn.cursor() as cur:
cur.execute('SELECT count(*) FROM users;')
print(cur.fetchone())
# Pool stays open — next 'with' block borrows another connection
with pool as conn:
with conn.cursor() as cur:
cur.execute('SELECT now();')
print(cur.fetchone())
# Calling get_database_connection again returns the SAME pool (cached)
same_pool = manager.get_database_connection('config.yaml', 'pool')
assert same_pool is pool
pgmonkey handles: Pool creation, caching (prevents pool storms), atexit cleanup.
Async Connection
Best for: single async tasks, lightweight async scripts. One async connection.
import asyncio
from pgmonkey import PGConnectionManager
async def main():
manager = PGConnectionManager()
connection = await manager.get_database_connection('config.yaml', 'async')
async with connection as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT version();')
print(await cur.fetchone())
# Connection is cached — awaiting again returns the same instance
same_conn = await manager.get_database_connection('config.yaml', 'async')
assert same_conn is connection
if __name__ == "__main__":
asyncio.run(main())
pgmonkey handles: Caching, atexit cleanup (async connections closed via temporary event loop).
Async Pooled Connection
Best for: FastAPI, aiohttp, high-concurrency async web apps. Borrows/returns connections from an async pool.
import asyncio
from pgmonkey import PGConnectionManager
async def main():
manager = PGConnectionManager()
pool = await manager.get_database_connection('config.yaml', 'async_pool')
# Each 'async with' borrows a connection, returns it on exit
async with pool as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT count(*) FROM orders;')
print(await cur.fetchone())
# Pool stays open — reuse it
async with pool as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT now();')
print(await cur.fetchone())
# Pool is cached — awaiting again returns the SAME pool
same_pool = await manager.get_database_connection('config.yaml', 'async_pool')
assert same_pool is pool
# Clean up when done (or let atexit handle it)
await manager.clear_cache_async()
if __name__ == "__main__":
asyncio.run(main())
pgmonkey handles: Pool creation, connection borrow/return, auto commit/rollback, caching (prevents pool storms), atexit cleanup.
App-Level Design Patterns
These minimal examples show how to integrate pgmonkey into real applications.
Database Class for Flask
A reusable database class for synchronous web frameworks. The pool is created once and shared across all requests.
from pgmonkey import PGConnectionManager
class Database:
def __init__(self, config_path):
self.manager = PGConnectionManager()
self.config_path = config_path
# Pool is created on first call, cached thereafter
self.pool = self.manager.get_database_connection(config_path, 'pool')
def fetch_one(self, query, params=None):
with self.pool as conn:
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchone()
def fetch_all(self, query, params=None):
with self.pool as conn:
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchall()
def execute(self, query, params=None):
with self.pool as conn:
with conn.cursor() as cur:
cur.execute(query, params)
# Usage in Flask
from flask import Flask
app = Flask(__name__)
db = Database('/path/to/config.yaml')
@app.route('/users')
def list_users():
rows = db.fetch_all('SELECT id, name FROM users ORDER BY id;')
return {'users': [{'id': r[0], 'name': r[1]} for r in rows]}
AsyncDatabase Class for FastAPI
A reusable async database class for async web frameworks. The pool is created once at startup and shared across all requests.
import asyncio
from pgmonkey import PGConnectionManager
class AsyncDatabase:
def __init__(self, config_path):
self.manager = PGConnectionManager()
self.config_path = config_path
self.pool = None
async def connect(self):
# Pool is created and cached on first call
self.pool = await self.manager.get_database_connection(
self.config_path, 'async_pool'
)
async def disconnect(self):
await self.manager.clear_cache_async()
async def fetch_one(self, query, params=None):
async with self.pool as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
return await cur.fetchone()
async def fetch_all(self, query, params=None):
async with self.pool as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
return await cur.fetchall()
async def execute(self, query, params=None):
async with self.pool as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
# Usage in FastAPI
from fastapi import FastAPI
app = FastAPI()
db = AsyncDatabase('/path/to/config.yaml')
@app.on_event("startup")
async def startup():
await db.connect()
@app.on_event("shutdown")
async def shutdown():
await db.disconnect()
@app.get("/orders")
async def list_orders():
rows = await db.fetch_all('SELECT id, total FROM orders ORDER BY id;')
return {"orders": [{"id": r[0], "total": r[1]} for r in rows]}
Quick Reference
| Connection Type | Best For | Cached? | Context Manager | Pool Reusable? |
|---|---|---|---|---|
normal |
Scripts, CLI tools | Yes | with conn: |
N/A |
pool |
Flask, Django, threaded apps | Yes | with pool: borrows/returns |
Yes |
async |
Async scripts, lightweight tasks | Yes | async with conn: |
N/A |
async_pool |
FastAPI, aiohttp, high concurrency | Yes | async with pool: borrows/returns |
Yes |
Cache Management API
pgmonkey exposes cache management methods on PGConnectionManager:
| Method | Description |
|---|---|
manager.cache_info |
Returns dict with size and connection_types of cached connections. |
manager.clear_cache() |
Disconnects all cached connections and clears the cache. Use from sync code. |
await manager.clear_cache_async() |
Same as above, but safe to call from async code (inside an event loop). |
force_reload=True |
Pass to get_database_connection() to replace a cached connection with a fresh one. |