import time from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, TypeVar, Union from psycopg2.extensions import connection, cursor from psycopg2.sql import Composable CursorObj = TypeVar("CursorObj", bound=cursor) Query = Union[str, Composable] Params = Union[Sequence[object], Mapping[str, object]] ParamsT = TypeVar("ParamsT") # Similar to the tracking done in Django's CursorDebugWrapper, but done at the # psycopg2 cursor level so it works with SQLAlchemy. def wrapper_execute( self: CursorObj, action: Callable[[Query, ParamsT], CursorObj], sql: Query, params: ParamsT ) -> CursorObj: start = time.time() try: return action(sql, params) finally: stop = time.time() duration = stop - start self.connection.queries.append( { "time": f"{duration:.3f}", } ) class TimeTrackingCursor(cursor): """A psycopg2 cursor class that tracks the time spent executing queries.""" def execute(self, query: Query, vars: Optional[Params] = None) -> "TimeTrackingCursor": return wrapper_execute(self, super().execute, query, vars) def executemany( self, query: Query, vars: Iterable[Params] ) -> "TimeTrackingCursor": # nocoverage return wrapper_execute(self, super().executemany, query, vars) class TimeTrackingConnection(connection): """A psycopg2 connection class that uses TimeTrackingCursors.""" def __init__(self, *args: Any, **kwargs: Any) -> None: self.queries: List[Dict[str, str]] = [] super().__init__(*args, **kwargs) def cursor(self, *args: Any, **kwargs: Any) -> TimeTrackingCursor: kwargs.setdefault("cursor_factory", TimeTrackingCursor) return connection.cursor(self, *args, **kwargs) def reset_queries() -> None: from django.db import connections for conn in connections.all(): if conn.connection is not None: conn.connection.queries = []