mirror of https://github.com/zulip/zulip.git
95 lines
2.4 KiB
Python
95 lines
2.4 KiB
Python
import time
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
Iterable,
|
|
List,
|
|
Mapping,
|
|
Optional,
|
|
Sequence,
|
|
TypeVar,
|
|
Union,
|
|
overload,
|
|
)
|
|
|
|
from psycopg2.extensions import connection, cursor
|
|
from psycopg2.sql import Composable
|
|
|
|
CursorObj = TypeVar("CursorObj", bound=cursor)
|
|
Query = Union[str, Composable]
|
|
Params = Union[Sequence[object], Mapping[str, object]]
|
|
ParamsT = TypeVar("ParamsT")
|
|
|
|
# Similar to the tracking done in Django's CursorDebugWrapper, but done at the
|
|
# psycopg2 cursor level so it works with SQLAlchemy.
|
|
def wrapper_execute(
|
|
self: CursorObj, action: Callable[[Query, ParamsT], CursorObj], sql: Query, params: ParamsT
|
|
) -> CursorObj:
|
|
start = time.time()
|
|
try:
|
|
return action(sql, params)
|
|
finally:
|
|
stop = time.time()
|
|
duration = stop - start
|
|
self.connection.queries.append(
|
|
{
|
|
"time": f"{duration:.3f}",
|
|
}
|
|
)
|
|
|
|
|
|
class TimeTrackingCursor(cursor):
|
|
"""A psycopg2 cursor class that tracks the time spent executing queries."""
|
|
|
|
def execute(self, query: Query, vars: Optional[Params] = None) -> "TimeTrackingCursor":
|
|
return wrapper_execute(self, super().execute, query, vars)
|
|
|
|
def executemany(
|
|
self, query: Query, vars: Iterable[Params]
|
|
) -> "TimeTrackingCursor": # nocoverage
|
|
return wrapper_execute(self, super().executemany, query, vars)
|
|
|
|
|
|
CursorT = TypeVar("CursorT", bound=cursor)
|
|
|
|
|
|
class TimeTrackingConnection(connection):
|
|
"""A psycopg2 connection class that uses TimeTrackingCursors."""
|
|
|
|
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
self.queries: List[Dict[str, str]] = []
|
|
super().__init__(*args, **kwargs)
|
|
|
|
@overload
|
|
def cursor(
|
|
self,
|
|
name: str = ...,
|
|
*,
|
|
scrollable: Optional[bool] = ...,
|
|
withhold: bool = ...,
|
|
) -> TimeTrackingCursor:
|
|
...
|
|
|
|
@overload
|
|
def cursor(
|
|
self,
|
|
name: str = ...,
|
|
cursor_factory: Callable[..., CursorT] = ...,
|
|
scrollable: Optional[bool] = ...,
|
|
withhold: bool = ...,
|
|
) -> CursorT:
|
|
...
|
|
|
|
def cursor(self, *args: object, **kwargs: object) -> cursor:
|
|
kwargs.setdefault("cursor_factory", TimeTrackingCursor)
|
|
return super().cursor(*args, **kwargs)
|
|
|
|
|
|
def reset_queries() -> None:
|
|
from django.db import connections
|
|
|
|
for conn in connections.all():
|
|
if conn.connection is not None:
|
|
conn.connection.queries = []
|