CLI Commands Overview
| Command | Description | Example |
|---|---|---|
| pgmonkey settings | Manage application settings. | pgmonkey settings --help |
| pgmonkey pgconfig create | Create a new PostgreSQL configuration template. | pgmonkey pgconfig create --connconfig config.yaml |
| pgmonkey pgconfig test | Test a database connection. Add --resolve-env to resolve ${VAR} and from_env/from_file references. |
pgmonkey pgconfig test --connconfig config.yaml --connection-type pool --resolve-env |
| pgmonkey pgconfig generate-code | Generate example Python code. Use --library psycopg for native psycopg code. Accepts --resolve-env. |
pgmonkey pgconfig generate-code --connconfig config.yaml --connection-type async --library psycopg |
| pgmonkey pgserverconfig | Generate server configuration recommendations. | pgmonkey pgserverconfig --filepath config.yaml |
| pgmonkey pgserverconfig --audit | Audit live server settings against recommendations (read-only). | pgmonkey pgserverconfig --filepath config.yaml --audit |
| pgmonkey pgimport | Import CSV data into a PostgreSQL table. | pgmonkey pgimport --table public.my_table --connconfig config.yaml --import_file data.csv |
| pgmonkey pgexport | Export a PostgreSQL table to CSV. | pgmonkey pgexport --table public.my_table --connconfig config.yaml --export_file output.csv |
One Config File, All Connection Types
In pgmonkey, a single YAML configuration file serves all connection types. Specify the connection type when calling the API, or let it default to the value in the config file.
from pgmonkey import PGConnectionManager
manager = PGConnectionManager()
# Same config file, different connection types
conn = manager.get_database_connection('config.yaml', 'normal')
conn = manager.get_database_connection('config.yaml', 'pool')
conn = await manager.get_database_connection('config.yaml', 'async')
conn = await manager.get_database_connection('config.yaml', 'async_pool')
YAML Configuration Options
Full YAML Configuration
# Default connection type when none is specified in the API call.
# Options: 'normal', 'pool', 'async', 'async_pool'
# You can override this per-call:
# manager.get_database_connection('config.yaml', 'pool')
connection_type: 'normal'
connection_settings:
user: 'postgres'
password: 'password'
host: 'localhost'
port: '5432'
dbname: 'mydatabase'
sslmode: 'prefer' # Options: disable, allow, prefer, require, verify-ca, verify-full
sslcert: '' # Path to the client SSL certificate
sslkey: '' # Path to the client SSL key
sslrootcert: '' # Path to the root SSL certificate
connect_timeout: '10' # Maximum wait for connection, in seconds
application_name: 'myapp'
keepalives: '1' # Enable TCP keepalives (1=on, 0=off)
keepalives_idle: '60' # Seconds before sending a keepalive probe
keepalives_interval: '15'
keepalives_count: '5'
# Settings for 'pool' connection type
pool_settings:
min_size: 5
max_size: 20
timeout: 30 # Seconds to wait for a connection from the pool
max_idle: 300 # Seconds idle before closed
max_lifetime: 3600 # Seconds before reuse expires
check_on_checkout: false # Validate connections with SELECT 1 before use
# Settings for 'async' and 'async_pool' connection types (applied via SET commands)
# For async_pool, these are applied to each connection via a configure callback.
async_settings:
idle_in_transaction_session_timeout: '5000' # ms
statement_timeout: '30000' # ms
lock_timeout: '10000' # ms
# work_mem: '256MB'
# Settings for 'async_pool' connection type
async_pool_settings:
min_size: 5
max_size: 20
timeout: 30 # Seconds to wait for a connection from the pool
max_idle: 300
max_lifetime: 3600
check_on_checkout: false # Validate connections with SELECT 1 before use
Configuration Parameter Reference
| Parameter | Description | Example |
|---|---|---|
| connection_type | Default connection type. Can be overridden per API call. | 'normal' |
| connection_settings.user | Username for the PostgreSQL database. | 'postgres' |
| connection_settings.password | Password for the database user. | 'your password' |
| connection_settings.host | Database server host address. | 'localhost' |
| connection_settings.port | Database server port. | '5432' |
| connection_settings.dbname | Name of the database to connect to. | 'mydatabase' |
| connection_settings.sslmode | SSL mode (disable, allow, prefer, require, verify-ca, verify-full). | 'prefer' |
| connection_settings.sslcert | Path to the client SSL certificate. | '/path/to/client.crt' |
| connection_settings.sslkey | Path to the client SSL key. | '/path/to/client.key' |
| connection_settings.sslrootcert | Path to the root SSL certificate. | '/path/to/ca.crt' |
| connection_settings.connect_timeout | Max seconds to wait for a connection. | '10' |
| connection_settings.application_name | App name reported to PostgreSQL. | 'myapp' |
| connection_settings.keepalives | Enable TCP keepalives (1=on, 0=off). | '1' |
| connection_settings.keepalives_idle | Seconds before sending a keepalive probe. | '60' |
| connection_settings.keepalives_interval | Seconds between keepalive probes. | '15' |
| connection_settings.keepalives_count | Max probes before closing. | '5' |
| pool_settings.min_size | Min connections in the pool. | 5 |
| pool_settings.max_size | Max connections in the pool. | 20 |
| pool_settings.timeout | Seconds to wait for a connection from the pool. | 30 |
| pool_settings.max_idle | Seconds idle before connection is closed. | 300 |
| pool_settings.max_lifetime | Seconds before a connection is recycled. | 3600 |
| pool_settings.check_on_checkout | Validate connections with SELECT 1 before use. | false |
| async_settings.idle_in_transaction_session_timeout | Idle in transaction timeout (ms). | '5000' |
| async_settings.statement_timeout | Statement execution timeout (ms). | '30000' |
| async_settings.lock_timeout | Lock acquisition timeout (ms). | '10000' |
| async_settings.work_mem | Memory for sort operations. | '256MB' |
| async_pool_settings.min_size | Min connections in the async pool. | 5 |
| async_pool_settings.max_size | Max connections in the async pool. | 20 |
| async_pool_settings.timeout | Seconds to wait for a connection from the async pool. | 30 |
| async_pool_settings.max_idle | Seconds idle before async connection is closed. | 300 |
| async_pool_settings.max_lifetime | Seconds before an async connection is recycled. | 3600 |
| async_pool_settings.check_on_checkout | Validate connections with SELECT 1 before use. | false |
Environment Variable Interpolation
pgmonkey v3.4.0 adds opt-in support for resolving environment variables and file-based secrets inside YAML configs. Interpolation is disabled by default - existing configs with literal values work exactly as before.
Enabling Interpolation
| Method | How to enable |
|---|---|
Python (load_config) | load_config('config.yaml', resolve_env=True) |
Python (PGConnectionManager) | manager.get_database_connection('config.yaml', resolve_env=True) |
| CLI | pgmonkey pgconfig test --connconfig config.yaml --resolve-env |
Inline Syntax
Reference environment variables with ${VAR}. Provide a fallback with ${VAR:-default}:
connection_settings:
user: '${PGUSER:-postgres}'
password: '${PGPASSWORD}' # required - error if not set
host: '${PGHOST:-localhost}'
port: '${PGPORT:-5432}'
dbname: '${PGDATABASE:-mydb}'
Structured Syntax (from_env / from_file)
For secrets, use a structured form that makes the intent unambiguous:
# Read from an environment variable
connection_settings:
password:
from_env: PGMONKEY_DB_PASSWORD
# Read from a file (Kubernetes Secret-style, trailing newline trimmed)
connection_settings:
password:
from_file: /var/run/secrets/db/password
Sensitive Key Rules
Defaults (${VAR:-fallback}) are disallowed for sensitive keys by default. This prevents shipping a config with a hardcoded fallback password.
| Sensitive Keys | Description |
|---|---|
password | Database password |
sslkey | Client SSL key path |
sslcert | Client SSL certificate path |
sslrootcert | Root SSL certificate path |
Keys containing token, secret, credential | Detected by substring match |
Override with allow_sensitive_defaults=True:
cfg = load_config('config.yaml', resolve_env=True, allow_sensitive_defaults=True)
load_config() Function
from pgmonkey import load_config
# Load without interpolation (default)
cfg = load_config('config.yaml')
# Load with env interpolation enabled
cfg = load_config('config.yaml', resolve_env=True)
# Strict mode + allow sensitive defaults
cfg = load_config('config.yaml', resolve_env=True, strict=True, allow_sensitive_defaults=True)
| Parameter | Type | Default | Description |
|---|---|---|---|
file_path | str | (required) | Path to the YAML configuration file. |
resolve_env | bool | False | Resolve ${VAR} and from_env/from_file references. |
strict | bool | False | Fail on missing env vars with no default. |
allow_sensitive_defaults | bool | False | Allow ${VAR:-default} for sensitive keys. |
Redacting Secrets
Use redact_config() to mask sensitive values before printing or logging:
from pgmonkey.common.utils.redaction import redact_config
safe = redact_config(cfg)
# {'connection_settings': {'password': '***REDACTED***', 'host': 'db.prod.com', ...}}
Error Handling
All interpolation errors raise EnvInterpolationError (importable from pgmonkey). Error messages name the missing variable or file and the config key - never the resolved secret value.
from pgmonkey import load_config, EnvInterpolationError
try:
cfg = load_config('config.yaml', resolve_env=True)
except EnvInterpolationError as e:
print(f"Config error: {e}")
# "Environment variable 'PGPASSWORD' is not set and no default was provided
# (referenced by config key 'connection_settings.password')."
Working with PGConnectionManager
The PGConnectionManager class is the main entry point. Use a single YAML config file and specify the connection type as a parameter.
Basic Usage
from pgmonkey import PGConnectionManager
# Normal (synchronous) connection
def main():
manager = PGConnectionManager()
connection = manager.get_database_connection('config.yaml', 'normal')
with connection as conn:
with conn.cursor() as cur:
cur.execute('SELECT version();')
print(cur.fetchone())
if __name__ == "__main__":
main()
import asyncio
from pgmonkey import PGConnectionManager
# Async connection
async def main():
manager = PGConnectionManager()
connection = await manager.get_database_connection('config.yaml', 'async')
async with connection as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT version();')
print(await cur.fetchone())
if __name__ == "__main__":
asyncio.run(main())
Transactions, Commit, Rollback, and Cursors
pgmonkey simplifies connection management for both synchronous and asynchronous code with context managers.
Connection
Use with or async with to manage connection lifecycle automatically.
# Asynchronous
async with connection as conn:
async with conn.cursor() as cur:
await cur.execute('INSERT INTO my_table (name) VALUES (%s)', ('Alice',))
await cur.execute('SELECT * FROM my_table WHERE name = %s', ('Alice',))
print(await cur.fetchall())
# Synchronous
with connection as conn:
with conn.cursor() as cur:
cur.execute('INSERT INTO my_table (name) VALUES (%s)', ('Bob',))
cur.execute('SELECT * FROM my_table WHERE name = %s', ('Bob',))
print(cur.fetchall())
Transactions
Transactions execute a series of SQL statements as a single atomic unit of work.
# Asynchronous Transaction
async with connection as conn:
async with conn.transaction():
async with conn.cursor() as cur:
await cur.execute('INSERT INTO my_table (name) VALUES (%s)', ('John',))
await cur.execute('SELECT * FROM my_table WHERE name = %s', ('John',))
print(await cur.fetchall())
# Synchronous Transaction
with connection as conn:
with conn.transaction():
with conn.cursor() as cur:
cur.execute('INSERT INTO my_table (name) VALUES (%s)', ('Jane',))
cur.execute('SELECT * FROM my_table WHERE name = %s', ('Jane',))
print(cur.fetchall())
Commit
Within a transaction() context, commits are automatic. Use commit() manually when needed outside of a transaction block.
# Manual commit (outside transaction context)
async with connection as conn:
async with conn.cursor() as cur:
await cur.execute('UPDATE my_table SET name = %s WHERE id = %s', ('Doe', 1))
await conn.commit()
Rollback
The rollback() method discards all changes made during a transaction.
# Manual rollback on error
try:
async with connection as conn:
async with conn.cursor() as cur:
await cur.execute('DELETE FROM my_table WHERE id = %s', (1,))
raise Exception('Simulated error')
await conn.commit()
except Exception as e:
print(f"Error: {e}. Rolling back...")
await conn.rollback()
Cursors
Cursors execute SQL queries and retrieve results. The cursor context closes automatically.
# Asynchronous Cursor
async with connection as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT * FROM my_table WHERE name = %s', ('John',))
rows = await cur.fetchall()
for row in rows:
print(row)
# Synchronous Cursor
with connection as conn:
with conn.cursor() as cur:
cur.execute('SELECT * FROM my_table WHERE name = %s', ('Jane',))
rows = cur.fetchall()
for row in rows:
print(row)
Key Points
- Transactions ensure atomicity - a series of operations either all succeed or all fail together.
- Automatic commit/rollback within a
transaction()context eliminates manual commit management. cursor()works seamlessly with both synchronous and asynchronous operations.
Testing Pooling Capability
import asyncio
from pgmonkey import PGConnectionManager
async def test_async_pool(config_file, num_connections):
manager = PGConnectionManager()
connections = []
for _ in range(num_connections):
connection = await manager.get_database_connection(config_file, 'async_pool')
connections.append(connection)
for idx, connection in enumerate(connections):
async with connection as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT version();')
version = await cur.fetchone()
print(f"Async Connection {idx + 1}: {version}")
for connection in connections:
await connection.disconnect()
def test_sync_pool(config_file, num_connections):
manager = PGConnectionManager()
connections = []
for _ in range(num_connections):
connection = manager.get_database_connection(config_file, 'pool')
connections.append(connection)
for idx, connection in enumerate(connections):
with connection as conn:
with conn.cursor() as cur:
cur.execute('SELECT version();')
version = cur.fetchone()
print(f"Sync Connection {idx + 1}: {version}")
for connection in connections:
connection.disconnect()
async def main():
config_file = '/path/to/config.yaml'
print("Testing async pool connections:")
await test_async_pool(config_file, 5)
print("\nTesting sync pool connections:")
test_sync_pool(config_file, 5)
if __name__ == "__main__":
asyncio.run(main())
Testing Connections with the CLI
Test your PostgreSQL connection configurations directly from the command line.
# Test using the default connection_type from the config file
pgmonkey pgconfig test --connconfig /path/to/config.yaml
# Test a specific connection type (overrides config file)
pgmonkey pgconfig test --connconfig /path/to/config.yaml --connection-type pool
pgmonkey pgconfig test --connconfig /path/to/config.yaml --connection-type async
The --connection-type flag accepts: normal, pool, async, async_pool.
Example Test Results
# Normal connection
$ pgmonkey pgconfig test --connconfig ~/config.yaml --connection-type normal
Connection successful: (1,)
Connection closed.
Connection test completed successfully.
# Async connection
$ pgmonkey pgconfig test --connconfig ~/config.yaml --connection-type async
Async connection successful: (1,)
Connection closed.
Connection test completed successfully.
# Pooled connection
$ pgmonkey pgconfig test --connconfig ~/config.yaml --connection-type pool
Pool connection successful: (1,)
Pooling test successful: Acquired 6 connections out of a possible 20
Pooling tested successfully with 6 concurrent connections.
Connection test completed successfully.
# Async pooled connection
$ pgmonkey pgconfig test --connconfig ~/config.yaml --connection-type async_pool
Async pool connection successful: (1,)
Pooling test successful: Acquired 6 connections out of a possible 20
Async pooling tested successfully with 6 concurrent connections.
Connection test completed successfully.
Server Configuration Recommendations & Audit
Generate recommended PostgreSQL server settings based on your config file:
pgmonkey pgserverconfig --filepath /path/to/config.yaml
This outputs recommended entries for postgresql.conf and pg_hba.conf based on your pool sizes and SSL settings.
Auditing Live Server Settings
Add --audit to connect to the live server and compare current settings against recommendations:
pgmonkey pgserverconfig --filepath /path/to/config.yaml --audit
The audit queries the server's pg_settings view (read-only) and displays a comparison table:
1) Database type detected: PostgreSQL
2) Server settings audit:
postgresql.conf:
Setting Recommended Current Source Status
────────────────────────────────────────────────────────────────────
max_connections 22 100 configuration file OK
ssl on on configuration file OK
ssl_cert_file server.crt server.crt configuration file OK
ssl_key_file server.key server.key configuration file OK
ssl_ca_file ca.crt ca.crt configuration file OK
Status Values
| Status | Meaning |
|---|---|
| OK | Current setting meets or exceeds the recommendation |
| MISMATCH | Current setting does not meet the recommendation (e.g., max_connections too low, ssl is off) |
| REVIEW | Values differ but may be intentional (e.g., different SSL file paths) |
| UNKNOWN | Setting not found on server or could not be compared |
pg_hba.conf Inspection
On PostgreSQL 15+, the audit also queries pg_hba_file_rules to display current HBA rules alongside recommendations.
Permission Handling
If the connected role lacks permission to query pg_settings, the audit fails gracefully with a clear message and falls back to showing recommendations only. No server settings are ever modified - the audit is entirely read-only.
Importing and Exporting Data
Importing Data
Import CSV or text files into a PostgreSQL table.
pgmonkey pgimport --table <TABLE> --connconfig <CONFIG_FILE> --import_file <IMPORT_FILE>
- --table (required): Target table, e.g.
schema.tableortable. - --connconfig (required): Path to the connection configuration file.
- --import_file: Path to the CSV or text file to import.
If an import configuration file doesn't exist, pgmonkey generates a template you can edit for column mapping, delimiter, and encoding.
Exporting Data
Export a PostgreSQL table to a CSV file.
pgmonkey pgexport --table <TABLE> --connconfig <CONFIG_FILE> [--export_file <CSV_FILE>]
- --table (required): Source table to export.
- --connconfig (required): Path to the connection configuration file.
- --export_file (optional): Output CSV path. Defaults to a file based on the table name.
Code Generation
Generate example Python code for any connection type directly from your config file. Use the --connection-type flag to choose.
1. Normal Synchronous Connection
$ pgmonkey pgconfig generate-code --filepath config.yaml --connection-type normal
Generated Code:
# Example: Normal synchronous connection using pgmonkey
# One config file serves all connection types - just pass the type you need.
from pgmonkey import PGConnectionManager
def main():
connection_manager = PGConnectionManager()
config_file_path = 'config.yaml'
# Get a normal (synchronous) PostgreSQL connection
connection = connection_manager.get_database_connection(config_file_path, 'normal')
# Use the connection
with connection as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1;')
print(cur.fetchone())
if __name__ == "__main__":
main()
2. Pooled Synchronous Connection
$ pgmonkey pgconfig generate-code --filepath config.yaml --connection-type pool
Generated Code:
# Example: Pooled synchronous connection using pgmonkey
# One config file serves all connection types - just pass the type you need.
from pgmonkey import PGConnectionManager
def main():
connection_manager = PGConnectionManager()
config_file_path = 'config.yaml'
# Get a pooled PostgreSQL connection
pool_connection = connection_manager.get_database_connection(config_file_path, 'pool')
# Use the pool - each 'with' block acquires and releases a connection
with pool_connection as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1;')
print(cur.fetchone())
if __name__ == "__main__":
main()
3. Asynchronous Connection
$ pgmonkey pgconfig generate-code --filepath config.yaml --connection-type async
Generated Code:
# Example: Asynchronous connection using pgmonkey
# One config file serves all connection types - just pass the type you need.
import asyncio
from pgmonkey import PGConnectionManager
async def main():
connection_manager = PGConnectionManager()
config_file_path = 'config.yaml'
# Get an async PostgreSQL connection
connection = await connection_manager.get_database_connection(config_file_path, 'async')
# Use the connection asynchronously
async with connection as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT 1;')
result = await cur.fetchone()
print(result)
if __name__ == "__main__":
asyncio.run(main())
4. Asynchronous Pooled Connection
$ pgmonkey pgconfig generate-code --filepath config.yaml --connection-type async_pool
Generated Code:
# Example: Asynchronous pooled connection using pgmonkey
# One config file serves all connection types - just pass the type you need.
import asyncio
from pgmonkey import PGConnectionManager
async def main():
connection_manager = PGConnectionManager()
config_file_path = 'config.yaml'
# Get an async pooled PostgreSQL connection
pool_connection = await connection_manager.get_database_connection(config_file_path, 'async_pool')
# Use the pool - each 'async with' cursor block acquires and releases a connection
async with pool_connection as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT 1;')
result = await cur.fetchone()
print(result)
if __name__ == "__main__":
asyncio.run(main())
Native psycopg Code Generation
Use --library psycopg to generate code that uses psycopg and psycopg_pool directly, reading connection settings from the same pgmonkey YAML config file. This is useful if you want to use pgmonkey only for configuration management while writing your own connection code.
1. Normal Connection (psycopg)
$ pgmonkey pgconfig generate-code --filepath config.yaml --connection-type normal --library psycopg
Generated Code:
# Example: Normal synchronous connection using psycopg
# Reads connection settings from your pgmonkey YAML config file.
import yaml
import psycopg
def main():
config_file_path = 'config.yaml'
with open(config_file_path, 'r') as f:
config = yaml.safe_load(f)
conn_settings = config['connection_settings']
# Filter out empty values (e.g. blank SSL cert paths)
conn_params = {k: v for k, v in conn_settings.items() if v}
with psycopg.connect(**conn_params) as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1;')
print(cur.fetchone())
if __name__ == "__main__":
main()
2. Pooled Connection (psycopg_pool)
$ pgmonkey pgconfig generate-code --filepath config.yaml --connection-type pool --library psycopg
Generated Code:
# Example: Pooled synchronous connection using psycopg_pool
# Reads connection and pool settings from your pgmonkey YAML config file.
import yaml
from psycopg import conninfo
from psycopg_pool import ConnectionPool
def main():
config_file_path = 'config.yaml'
with open(config_file_path, 'r') as f:
config = yaml.safe_load(f)
conn_settings = config['connection_settings']
conn_params = {k: v for k, v in conn_settings.items() if v}
pool_settings = config.get('pool_settings', {})
# Remove pgmonkey-specific keys that psycopg_pool does not accept
pool_settings.pop('check_on_checkout', None)
conninfo_str = conninfo.make_conninfo(**conn_params)
pool = ConnectionPool(conninfo=conninfo_str, **pool_settings)
try:
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1;')
print(cur.fetchone())
finally:
pool.close()
if __name__ == "__main__":
main()
3. Async Connection (psycopg AsyncConnection)
$ pgmonkey pgconfig generate-code --filepath config.yaml --connection-type async --library psycopg
Generated Code:
# Example: Asynchronous connection using psycopg (AsyncConnection)
# Reads connection and async settings from your pgmonkey YAML config file.
import asyncio
import yaml
from psycopg import AsyncConnection
async def main():
config_file_path = 'config.yaml'
with open(config_file_path, 'r') as f:
config = yaml.safe_load(f)
conn_settings = config['connection_settings']
conn_params = {k: v for k, v in conn_settings.items() if v}
async_settings = config.get('async_settings', {})
async with await AsyncConnection.connect(**conn_params) as conn:
# Apply GUC settings (statement_timeout, lock_timeout, etc.)
for setting, value in async_settings.items():
await conn.execute(f"SET {setting} = %s", (str(value),))
async with conn.cursor() as cur:
await cur.execute('SELECT 1;')
result = await cur.fetchone()
print(result)
if __name__ == "__main__":
asyncio.run(main())
4. Async Pooled Connection (psycopg_pool AsyncConnectionPool)
$ pgmonkey pgconfig generate-code --filepath config.yaml --connection-type async_pool --library psycopg
Generated Code:
# Example: Asynchronous pooled connection using psycopg_pool (AsyncConnectionPool)
# Reads connection, pool, and async settings from your pgmonkey YAML config file.
import asyncio
import yaml
from psycopg import conninfo
from psycopg_pool import AsyncConnectionPool
async def main():
config_file_path = 'config.yaml'
with open(config_file_path, 'r') as f:
config = yaml.safe_load(f)
conn_settings = config['connection_settings']
conn_params = {k: v for k, v in conn_settings.items() if v}
pool_settings = config.get('async_pool_settings', {})
async_settings = config.get('async_settings', {})
# Remove pgmonkey-specific keys that psycopg_pool does not accept
pool_settings.pop('check_on_checkout', None)
conninfo_str = conninfo.make_conninfo(**conn_params)
# Optional: configure callback to apply GUC settings to each connection
async def configure(conn):
for setting, value in async_settings.items():
await conn.execute(f"SET {setting} = %s", (str(value),))
pool = AsyncConnectionPool(
conninfo=conninfo_str,
configure=configure if async_settings else None,
**pool_settings,
)
await pool.open()
try:
async with pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT 1;')
result = await cur.fetchone()
print(result)
finally:
await pool.close()
if __name__ == "__main__":
asyncio.run(main())
Compatibility
pgmonkey is tested and supported on the following:
| Dependency | Supported Versions | Notes |
|---|---|---|
| Python | 3.10, 3.11, 3.12, 3.13 | Python 3.9 reached end-of-life in October 2025 |
| psycopg[binary] | ≥ 3.1.20, < 4.0.0 | psycopg3 with binary driver for best performance |
| psycopg_pool | ≥ 3.1.9, < 4.0.0 | Provides ConnectionPool and AsyncConnectionPool |
| PyYAML | ≥ 6.0.2, < 7.0.0 | 6.0.2+ required for Python 3.12/3.13 compatibility |
| chardet | ≥ 5.2.0, < 6.0.0 | Character encoding detection for CSV import |
| tqdm | ≥ 4.64.0, < 5.0.0 | Progress bars for CSV import/export |
Install with test dependencies for development:
pip install pgmonkey[test]
This adds pytest, pytest-asyncio, and pytest-mock for running the unit test suite (180 tests, no database required).
Raspberry Pi Users: Installation via PiWheels
pgmonkey is available on PiWheels, providing pre-built packages optimized for Raspberry Pi.
pip3 install pgmonkey
Supported Raspberry Pi OS Versions
pgmonkey v3.0.0 requires Python 3.10+. Recommended Raspberry Pi OS:
- Bookworm (Python 3.11)
Note: Bullseye ships with Python 3.9 which is no longer supported. Upgrade to Bookworm or install Python 3.10+ manually.
Why Use pgmonkey on Raspberry Pi?
- Manage PostgreSQL connections using a single YAML configuration.
- Use async and connection pooling for performance-critical tasks.
- Run projects efficiently on lower-powered hardware.