Notes
Notes - notes.io |
DB CONNECTOR IN PYTHON:
While you are working on the setup on your laptop and onboarding is in progress, work on below task for next 1 week as you already have python installed on your laptop. Use Microsoft copilot with detailed prompts.
Create a pip installable package for connecting a postgres database. Database server needs to get the information from the callable
databaseServer:
protocol: database
url: "${ENV:DB_URL}" # e.g. postgresql://user:pass@$host:port/db_name
driver: "postgres"
description: Primary Database Server
securitySchemes:
dbAuth:
- type: basic
description: Database Authentication
username: "${ENV:DB_USERNAME}"
password: "${ENV:DB_PASSWORD}"
Connection info, DML(Query/Insert/update/delete on tables) has to be passed dynamically to this function. Inputs (? In below below dsl usage) to the Query/Insert/update/delete commands should be dynamically passed.
- name: query-loyalty-score
type: database
operation: executeSql
server: databaseServer
description: "Calculate loyalty score based on order history"
query: "SELECT COUNT(*) AS loyaltyScore FROM loyaltyPoints WHERE customer_id = ? AND ORDERDATE BETWEEN ? AND ? GROUP BY customer_id"
parameters: ["$.receive-customer-kafka.customerId", "$.calculate-min-start-date.minStartDate", "currentdatetime()"] # MANDATORY; currentdatetime() function to get current date as per driver
response:
loyaltyScore: "$.rows[0].loyaltyScore"
Build engine-plugin-sdk interfaces and engine-builtins for this database connector with operations:
operations:
dbConnecton:
type: database
description: Operations for database interactions
server: databaseServer
executeSql:
type: database
description: Execute SQL queries
server: databaseServer
commitSql:
type: database
description: Commit transactions
server: databaseServer
rollbackSql:
type: database
description: Rollback transactions
server: databaseServer
Python Package structure:
packages/
├── engine-plugin-sdk/ # Plugin interfaces and base classes
├── engine-dsl/ # YAML parsing and validation
├── engine-core/ # Pipeline execution engine
├── engine-builtins/ # Standard components (Kafka, Redis, HTTP, DB, etc)
├── engine-runtime/ # CLI application (deployed as containers)
└── engine-operator/ # Kubernetes operator (future, optional)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
These stories assume:
The engine-core will call the connector directly (passing connection info, SQL, params).
Only PostgreSQL connector is in scope.
Includes:
✔ Connection Pooling
✔ Async psycopg
✔ Pydantic validation
✔ Retry strategy
✔ Transaction support
✔ Parameter substitution
✔ Query execution
✔ Error handling
📌 GOAL: Build a High‑Performance PostgreSQL Database Connector
This connector can be invoked directly by engine-core with:
connection info
SQL string with placeholders
parameter list
operation type (query/update/transaction)
🟦 US‑001: Define Database Connector Interface
Description
Create a Python interface (protocol/abstract class) defining the required methods for any database connector implementation.
Acceptance Criteria
Interface includes synchronous and asynchronous operations:
connect()
execute(sql, params)
begin()
commit()
rollback()
close()
Method signatures are stable and documented.
Engine-core can import the interface and treat the connector polymorphically.
Unit Test Scenarios
Any class implementing the interface must provide all required methods.
Method signature mismatch should fail static type checking.
Mock connector implementing interface behaves correctly when called by engine-core.
🟦 US‑002: Pydantic-Based Connection Configuration Validation
Description
Implement a Pydantic model to validate user-provided connection configuration before opening a PostgreSQL connection.
Acceptance Criteria
Validates:
host, port, user, password, dbname
optional: min_pool_size, max_pool_size, timeout_ms
Invalid configuration returns clear validation error messages.
All fields have proper types and default values where appropriate.
Unit Test Scenarios
Valid configuration passes.
Missing or invalid fields raise Pydantic validation errors.
Configuration with invalid port (string, negative) is rejected.
Boundary tests on pool sizes (min ≤ max).
🟦 US‑003: Implement Sync PostgreSQL Connector (psycopg)
Description
Create a synchronous connector using psycopg (v3) with connection pooling.
Acceptance Criteria
Uses psycopg ConnectionPool.
execute() accepts SQL string with positional placeholders (?) and converts to %s.
Returns:
rowcount
list of dict rows
Connector supports:
queries (SELECT)
DML (INSERT/UPDATE/DELETE)
Pool is created once and reused per engine instance.
Unit Test Scenarios
Pool initializes correctly.
Connection checkout & return works.
Executes queries and returns correct rows.
Handles empty result sets.
Validates placeholder conversion from ? → %s.
🟦 US‑004: Implement Async PostgreSQL Connector (async psycopg)
Description
Add an async implementation using psycopg.AsyncConnectionPool for high-throughput workloads.
Acceptance Criteria
Async versions of all operations:
async_connect()
async_execute()
async_begin()
async_commit()
async_rollback()
async_close()
Can be used independently from the sync connector.
Row output identical to synchronous version (list of dicts).
Unit Test Scenarios
Async pool initializes properly.
Concurrent async queries run in parallel and return expected results.
Ensure proper await/async usage.
Async error propagation works correctly.
🟦 US‑005: SQL Parameter Binding & Placeholder Mapping
Description
Implement mapping of engine-core parameter placeholders ? to psycopg %s.
Acceptance Criteria
Count of ? must equal length of parameters list.
Parameters passed in correct order.
Supports mixing literal expressions (e.g., CURRENT_TIMESTAMP) with bound parameters.
Unit Test Scenarios
Valid placeholder count → query executes successfully.
Too many/too few parameters → raises clear error.
SQL returned to engine-core includes correct %s substitutions.
Literal-only parameters do not consume placeholder slots.
🟦 US‑006: Retry Strategy for Transient Failures
Description
Implement retry logic for common transient DB errors (e.g., network blips, serialization failures).
Acceptance Criteria
Retries triggered for:
network connection dropped
40001 serialization failure
connection timeout
Exponential backoff with configurable retry count & delay.
Retries logged or tracked but without exposing sensitive data.
Unit Test Scenarios
Simulate transient DB exception → connector retries.
After max retries → appropriate exception raised.
Non-retryable errors (syntax errors) do not retry.
Backoff delays applied correctly.
🟦 US‑007: Synchronous Transaction Support
Description
Add transaction support for sync mode.
Acceptance Criteria
Support:
begin()
commit()
rollback()
Multiple SQL statements can run inside a transaction.
Autocommit disabled when a transaction is explicitly started.
Unit Test Scenarios
Begin → execute → commit persists changes.
Begin → execute → rollback discards changes.
Starting a new transaction while one is active returns an error.
🟦 US‑008: Asynchronous Transaction Support
Description
Implement async equivalents for transaction management.
Acceptance Criteria
Async versions of:
async_begin()
async_commit()
async_rollback()
Works with async connection pool.
Unit Test Scenarios
Parallel transactional operations do not interfere.
Rollback discards intermediate async writes.
Commit persists results as expected.
🟦 US‑009: Error Handling & Normalization
Description
Implement consistent exceptions and error normalization for engine-core consumers.
Acceptance Criteria
Normalize database exceptions to structured Python exceptions:
DatabaseConnectionError
DatabaseExecutionError
DatabaseTimeoutError
DatabaseTransactionError
Return actionable messages without exposing sensitive DB details.
Unit Test Scenarios
Invalid SQL → raises DatabaseExecutionError.
Lost connection → raises DatabaseConnectionError.
Timeout → raises DatabaseTimeoutError.
Running commit without begin → DatabaseTransactionError.
🟦 US‑010: Connection Lifecycle Management
Description
Ensure correct behavior for connector startup and shutdown.
Acceptance Criteria
Pool created on first connect().
close() shuts down pool and all connections.
Safe to call close() multiple times.
Unit Test Scenarios
Pool initializes once.
Calling close() releases connections.
Reconnecting after close reinitializes pool.
No resource leaks after operations.
🟦 US‑012: Packaging & Distribution
Description
Package connector as a pip-installable module.
Acceptance Criteria
Valid pyproject.toml.
Installable via pip install engine-db-postgres.
Exposes:
PostgresSyncConnector
PostgresAsyncConnector
Pydantic config models
Retry configuration
Unit Test Scenarios
Package installs into fresh environment.
Import path works as documented.
Engine-core can import connector and run basic tests.
✅ Summary of Deliverables (in scope)
Feature
Included
Sync PostgreSQL connector
✔
Async PostgreSQL connector
✔
Connection pooling
✔
Pydantic validation
✔
Retry strategy
✔
Transaction support
✔
Parameter binding
✔
Error normalization
✔
High-performance patterns
✔
this is the actual task now guide me according to this and enhance these codes according to you
![]() |
Notes is a web-based application for online taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000+ notes created and continuing...
With notes.io;
- * You can take a note from anywhere and any device with internet connection.
- * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
- * You can quickly share your contents without website, blog and e-mail.
- * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
- * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.
Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.
Easy: Notes.io doesn’t require installation. Just write and share note!
Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )
Free: Notes.io works for 14 years and has been free since the day it was started.
You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;
Email: [email protected]
Twitter: http://twitter.com/notesio
Instagram: http://instagram.com/notes.io
Facebook: http://facebook.com/notesio
Regards;
Notes.io Team
