PostgreSQL (Async)
Async PostgreSQL helpers using asyncpg.
Installation
Dependencies
Quick Start
import asyncpg
from tracktolib.pg import insert_many, insert_one, insert_returning
async def main():
conn = await asyncpg.connect('postgresql://user:pass@localhost/db')
# Insert single row
await insert_one(conn, 'users', {'name': 'John', 'email': 'john@example.com'})
# Insert multiple rows
users = [
{'name': 'Alice', 'email': 'alice@example.com'},
{'name': 'Bob', 'email': 'bob@example.com'},
]
await insert_many(conn, 'users', users)
# Insert and return the inserted ID
user_id = await insert_returning(conn, 'users', {'name': 'Charlie'}, 'id')
Insert Functions
insert_one
Insert a single row into a table.
await insert_one(
conn,
'users',
{'name': 'John', 'email': 'john@example.com'},
on_conflict='ON CONFLICT DO NOTHING'
)
insert_many
Insert multiple rows into a table.
users = [
{'name': 'Alice', 'email': 'alice@example.com'},
{'name': 'Bob', 'email': 'bob@example.com'},
]
await insert_many(conn, 'users', users)
# With returning
records = await insert_many(conn, 'users', users, returning='id')
insert_returning
Insert and return values from the inserted row.
# Return single value
user_id = await insert_returning(conn, 'users', {'name': 'John'}, 'id')
# Return multiple values
record = await insert_returning(conn, 'users', {'name': 'John'}, ['id', 'created_at'])
Update Functions
update_one
Update a single row.
await update_one(
conn,
'users',
{'id': 1, 'name': 'John Updated'},
keys=['id'] # WHERE clause keys
)
update_many
Update multiple rows.
updates = [
{'id': 1, 'status': 'active'},
{'id': 2, 'status': 'inactive'},
]
await update_many(conn, 'users', updates, keys=['id'])
update_returning
Update and return values.
record = await update_returning(
conn,
'users',
{'id': 1, 'name': 'Updated'},
keys=['id'],
returning=['name', 'updated_at']
)
Query Builders
PGInsertQuery
Build complex INSERT queries with conflict handling and returning clauses.
from tracktolib.pg import PGInsertQuery, PGConflictQuery, PGReturningQuery
query = PGInsertQuery(
table='users',
items=[{'name': 'John', 'email': 'john@example.com'}],
on_conflict=PGConflictQuery(keys=['email']),
returning=PGReturningQuery.load(keys=['id'])
)
# Execute
await query.run(conn)
# Or fetch results
result = await query.fetchrow(conn)
PGUpdateQuery
Build complex UPDATE queries.
from tracktolib.pg import PGUpdateQuery
query = PGUpdateQuery(
table='users',
items=[{'id': 1, 'name': 'Updated', 'status': 'active'}],
where_keys=['id'],
returning=['name', 'updated_at']
)
result = await conn.fetchrow(query.query, *query.values)
Conflict Handling
Using Conflict helper
from tracktolib.pg import insert_many, Conflict
await insert_many(
conn,
'users',
users,
on_conflict=Conflict(keys=['email'], ignore_keys=['created_at'])
)
Using PGConflictQuery
from tracktolib.pg import PGConflictQuery
conflict = PGConflictQuery(
keys=['email'], # Conflict detection keys
ignore_keys=['id'], # Keys to ignore in update
where='t.status != $1', # Additional WHERE clause
merge_keys=['metadata'] # JSONB merge (a || b)
)
Utility Functions
fetch_count
Count rows from a query.
from tracktolib.pg import fetch_count
count = await fetch_count(conn, 'SELECT * FROM users WHERE status = $1', 'active')
insert_pg
Factory function to create PGInsertQuery instances.
from tracktolib.pg import insert_pg
query = insert_pg(
'users',
[{'name': 'John'}],
on_conflict='ON CONFLICT DO NOTHING',
returning=['id'],
fill=True # Fill missing keys with None
)
Utilities Module
Additional utilities from tracktolib.pg.utils:
iterate_pg
Iterate over large result sets efficiently.
from tracktolib.pg import iterate_pg
async for batch in iterate_pg(conn, 'SELECT * FROM large_table', batch_size=1000):
for record in batch:
process(record)
safe_pg / safe_pg_context
Handle PostgreSQL errors gracefully.