mypy: Add pika-stubs.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2021-08-02 16:02:27 -07:00 committed by Tim Abbott
parent f87a0e912b
commit 04feadd917
7 changed files with 18 additions and 4 deletions

View File

@ -83,7 +83,6 @@ module = [
"onelogin.*", "onelogin.*",
"openapi_core.*", "openapi_core.*",
"openapi_schema_validator.*", "openapi_schema_validator.*",
"pika.*",
"PIL.*", "PIL.*",
"premailer.*", "premailer.*",
"psycopg2.*", "psycopg2.*",

View File

@ -880,6 +880,9 @@ pika==1.2.0 \
--hash=sha256:59da6701da1aeaf7e5e93bb521cc03129867f6e54b7dd352c4b3ecb2bd7ec624 \ --hash=sha256:59da6701da1aeaf7e5e93bb521cc03129867f6e54b7dd352c4b3ecb2bd7ec624 \
--hash=sha256:f023d6ac581086b124190cb3dc81dd581a149d216fa4540ac34f9be1e3970b89 --hash=sha256:f023d6ac581086b124190cb3dc81dd581a149d216fa4540ac34f9be1e3970b89
# via -r requirements/common.in # via -r requirements/common.in
https://github.com/andersk/pika-stubs/archive/87c5795741449e37bdbd2ceceee853fd56462440.zip#egg=pika-stubs==0.1.3+git \
--hash=sha256:b56b4210ba66c65e3c54236dc3ccfd79d02c1385b9d72146d3ea8af05ec7c5ff
# via -r requirements/mypy.in
pillow==8.3.0 \ pillow==8.3.0 \
--hash=sha256:063d17a02a0170c2f880fbd373b2738b089c6adcbd1f7418667bc9e97524c11b \ --hash=sha256:063d17a02a0170c2f880fbd373b2738b089c6adcbd1f7418667bc9e97524c11b \
--hash=sha256:1037288a22cc8ec9d2918a24ded733a1cc4342fd7f21d15d37e6bbe5fb4a7306 \ --hash=sha256:1037288a22cc8ec9d2918a24ded733a1cc4342fd7f21d15d37e6bbe5fb4a7306 \

View File

@ -3,6 +3,7 @@
# and requirements/mypy.txt. # and requirements/mypy.txt.
# See requirements/README.md for more detail. # See requirements/README.md for more detail.
mypy mypy
https://github.com/andersk/pika-stubs/archive/87c5795741449e37bdbd2ceceee853fd56462440.zip#egg=pika-stubs==0.1.3+git # https://github.com/hahow/pika-stubs/issues/1, https://github.com/hahow/pika-stubs/pull/4
sqlalchemy-stubs sqlalchemy-stubs
types-boto types-boto
types-certifi types-certifi

View File

@ -38,6 +38,9 @@ mypy-extensions==0.4.3 \
--hash=sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d \ --hash=sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d \
--hash=sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8 --hash=sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8
# via mypy # via mypy
https://github.com/andersk/pika-stubs/archive/87c5795741449e37bdbd2ceceee853fd56462440.zip#egg=pika-stubs==0.1.3+git \
--hash=sha256:b56b4210ba66c65e3c54236dc3ccfd79d02c1385b9d72146d3ea8af05ec7c5ff
# via -r requirements/mypy.in
sqlalchemy-stubs==0.4 \ sqlalchemy-stubs==0.4 \
--hash=sha256:5eec7aa110adf9b957b631799a72fef396b23ff99fe296df726645d01e312aa5 \ --hash=sha256:5eec7aa110adf9b957b631799a72fef396b23ff99fe296df726645d01e312aa5 \
--hash=sha256:c665d6dd4482ef642f01027fa06c3d5e91befabb219dc71fc2a09e7d7695f7ae --hash=sha256:c665d6dd4482ef642f01027fa06c3d5e91befabb219dc71fc2a09e7d7695f7ae

View File

@ -48,4 +48,4 @@ API_FEATURE_LEVEL = 89
# historical commits sharing the same major version, in which case a # historical commits sharing the same major version, in which case a
# minor version bump suffices. # minor version bump suffices.
PROVISION_VERSION = "153.4" PROVISION_VERSION = "153.5"

View File

@ -131,6 +131,8 @@ class QueueClient(Generic[ChannelT], metaclass=ABCMeta):
class SimpleQueueClient(QueueClient[BlockingChannel]): class SimpleQueueClient(QueueClient[BlockingChannel]):
connection: Optional[pika.BlockingConnection]
def _connect(self) -> None: def _connect(self) -> None:
start = time.time() start = time.time()
self.connection = pika.BlockingConnection(self._get_parameters()) self.connection = pika.BlockingConnection(self._get_parameters())
@ -183,11 +185,13 @@ class SimpleQueueClient(QueueClient[BlockingChannel]):
# batch of events it has. # batch of events it has.
for method, properties, body in channel.consume(queue_name, inactivity_timeout=timeout): for method, properties, body in channel.consume(queue_name, inactivity_timeout=timeout):
if body is not None: if body is not None:
assert method is not None
events.append(orjson.loads(body)) events.append(orjson.loads(body))
max_processed = method.delivery_tag max_processed = method.delivery_tag
now = time.time() now = time.time()
if len(events) >= batch_size or (timeout and now >= last_process + timeout): if len(events) >= batch_size or (timeout and now >= last_process + timeout):
if events: if events:
assert max_processed is not None
try: try:
callback(events) callback(events)
channel.basic_ack(max_processed, multiple=True) channel.basic_ack(max_processed, multiple=True)
@ -203,7 +207,9 @@ class SimpleQueueClient(QueueClient[BlockingChannel]):
def local_queue_size(self) -> int: def local_queue_size(self) -> int:
assert self.channel is not None assert self.channel is not None
return self.channel.get_waiting_message_count() + len(self.channel._pending_events) return self.channel.get_waiting_message_count() + len(
self.channel._pending_events # type: ignore[attr-defined] # private member missing from stubs
)
def stop_consuming(self) -> None: def stop_consuming(self) -> None:
assert self.channel is not None assert self.channel is not None
@ -218,7 +224,7 @@ class SimpleQueueClient(QueueClient[BlockingChannel]):
class ExceptionFreeTornadoConnection(pika.adapters.tornado_connection.TornadoConnection): class ExceptionFreeTornadoConnection(pika.adapters.tornado_connection.TornadoConnection):
def _adapter_disconnect(self) -> None: def _adapter_disconnect(self) -> None:
try: try:
super()._adapter_disconnect() super()._adapter_disconnect() # type: ignore[misc] # private method missing from stubs
except ( except (
pika.exceptions.ProbableAuthenticationError, pika.exceptions.ProbableAuthenticationError,
pika.exceptions.ProbableAccessDeniedError, pika.exceptions.ProbableAccessDeniedError,
@ -352,6 +358,7 @@ class TornadoQueueClient(QueueClient[Channel]):
properties: pika.BasicProperties, properties: pika.BasicProperties,
body: bytes, body: bytes,
) -> None: ) -> None:
assert method.delivery_tag is not None
callback([orjson.loads(body)]) callback([orjson.loads(body)])
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)

View File

@ -81,6 +81,7 @@ class TestQueueImplementation(ZulipTestCase):
@override_settings(USING_RABBITMQ=True) @override_settings(USING_RABBITMQ=True)
def test_queue_error_json(self) -> None: def test_queue_error_json(self) -> None:
queue_client = get_queue_client() queue_client = get_queue_client()
assert isinstance(queue_client, SimpleQueueClient)
actual_publish = queue_client.publish actual_publish = queue_client.publish
self.counter = 0 self.counter = 0