From 04feadd9179303dcfd225cce7c2a7eaa94fad68f Mon Sep 17 00:00:00 2001 From: Anders Kaseorg Date: Mon, 2 Aug 2021 16:02:27 -0700 Subject: [PATCH] mypy: Add pika-stubs. Signed-off-by: Anders Kaseorg --- pyproject.toml | 1 - requirements/dev.txt | 3 +++ requirements/mypy.in | 1 + requirements/mypy.txt | 3 +++ version.py | 2 +- zerver/lib/queue.py | 11 +++++++++-- zerver/tests/test_queue.py | 1 + 7 files changed, 18 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a84ebc0071..6dff8e2afd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,7 +83,6 @@ module = [ "onelogin.*", "openapi_core.*", "openapi_schema_validator.*", - "pika.*", "PIL.*", "premailer.*", "psycopg2.*", diff --git a/requirements/dev.txt b/requirements/dev.txt index 749bbcbebe..6b64fda0ce 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -880,6 +880,9 @@ pika==1.2.0 \ --hash=sha256:59da6701da1aeaf7e5e93bb521cc03129867f6e54b7dd352c4b3ecb2bd7ec624 \ --hash=sha256:f023d6ac581086b124190cb3dc81dd581a149d216fa4540ac34f9be1e3970b89 # via -r requirements/common.in +https://github.com/andersk/pika-stubs/archive/87c5795741449e37bdbd2ceceee853fd56462440.zip#egg=pika-stubs==0.1.3+git \ + --hash=sha256:b56b4210ba66c65e3c54236dc3ccfd79d02c1385b9d72146d3ea8af05ec7c5ff + # via -r requirements/mypy.in pillow==8.3.0 \ --hash=sha256:063d17a02a0170c2f880fbd373b2738b089c6adcbd1f7418667bc9e97524c11b \ --hash=sha256:1037288a22cc8ec9d2918a24ded733a1cc4342fd7f21d15d37e6bbe5fb4a7306 \ diff --git a/requirements/mypy.in b/requirements/mypy.in index d5251488c1..0ea329645e 100644 --- a/requirements/mypy.in +++ b/requirements/mypy.in @@ -3,6 +3,7 @@ # and requirements/mypy.txt. # See requirements/README.md for more detail. mypy +https://github.com/andersk/pika-stubs/archive/87c5795741449e37bdbd2ceceee853fd56462440.zip#egg=pika-stubs==0.1.3+git # https://github.com/hahow/pika-stubs/issues/1, https://github.com/hahow/pika-stubs/pull/4 sqlalchemy-stubs types-boto types-certifi diff --git a/requirements/mypy.txt b/requirements/mypy.txt index 8cadb6625a..bc9b0d092b 100644 --- a/requirements/mypy.txt +++ b/requirements/mypy.txt @@ -38,6 +38,9 @@ mypy-extensions==0.4.3 \ --hash=sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d \ --hash=sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8 # via mypy +https://github.com/andersk/pika-stubs/archive/87c5795741449e37bdbd2ceceee853fd56462440.zip#egg=pika-stubs==0.1.3+git \ + --hash=sha256:b56b4210ba66c65e3c54236dc3ccfd79d02c1385b9d72146d3ea8af05ec7c5ff + # via -r requirements/mypy.in sqlalchemy-stubs==0.4 \ --hash=sha256:5eec7aa110adf9b957b631799a72fef396b23ff99fe296df726645d01e312aa5 \ --hash=sha256:c665d6dd4482ef642f01027fa06c3d5e91befabb219dc71fc2a09e7d7695f7ae diff --git a/version.py b/version.py index f14d012c28..e7694377bd 100644 --- a/version.py +++ b/version.py @@ -48,4 +48,4 @@ API_FEATURE_LEVEL = 89 # historical commits sharing the same major version, in which case a # minor version bump suffices. -PROVISION_VERSION = "153.4" +PROVISION_VERSION = "153.5" diff --git a/zerver/lib/queue.py b/zerver/lib/queue.py index d9a7f47dd3..ac98ac4fe2 100644 --- a/zerver/lib/queue.py +++ b/zerver/lib/queue.py @@ -131,6 +131,8 @@ class QueueClient(Generic[ChannelT], metaclass=ABCMeta): class SimpleQueueClient(QueueClient[BlockingChannel]): + connection: Optional[pika.BlockingConnection] + def _connect(self) -> None: start = time.time() self.connection = pika.BlockingConnection(self._get_parameters()) @@ -183,11 +185,13 @@ class SimpleQueueClient(QueueClient[BlockingChannel]): # batch of events it has. for method, properties, body in channel.consume(queue_name, inactivity_timeout=timeout): if body is not None: + assert method is not None events.append(orjson.loads(body)) max_processed = method.delivery_tag now = time.time() if len(events) >= batch_size or (timeout and now >= last_process + timeout): if events: + assert max_processed is not None try: callback(events) channel.basic_ack(max_processed, multiple=True) @@ -203,7 +207,9 @@ class SimpleQueueClient(QueueClient[BlockingChannel]): def local_queue_size(self) -> int: assert self.channel is not None - return self.channel.get_waiting_message_count() + len(self.channel._pending_events) + return self.channel.get_waiting_message_count() + len( + self.channel._pending_events # type: ignore[attr-defined] # private member missing from stubs + ) def stop_consuming(self) -> None: assert self.channel is not None @@ -218,7 +224,7 @@ class SimpleQueueClient(QueueClient[BlockingChannel]): class ExceptionFreeTornadoConnection(pika.adapters.tornado_connection.TornadoConnection): def _adapter_disconnect(self) -> None: try: - super()._adapter_disconnect() + super()._adapter_disconnect() # type: ignore[misc] # private method missing from stubs except ( pika.exceptions.ProbableAuthenticationError, pika.exceptions.ProbableAccessDeniedError, @@ -352,6 +358,7 @@ class TornadoQueueClient(QueueClient[Channel]): properties: pika.BasicProperties, body: bytes, ) -> None: + assert method.delivery_tag is not None callback([orjson.loads(body)]) ch.basic_ack(delivery_tag=method.delivery_tag) diff --git a/zerver/tests/test_queue.py b/zerver/tests/test_queue.py index 89e328eb54..fad39d43d2 100644 --- a/zerver/tests/test_queue.py +++ b/zerver/tests/test_queue.py @@ -81,6 +81,7 @@ class TestQueueImplementation(ZulipTestCase): @override_settings(USING_RABBITMQ=True) def test_queue_error_json(self) -> None: queue_client = get_queue_client() + assert isinstance(queue_client, SimpleQueueClient) actual_publish = queue_client.publish self.counter = 0