2017-11-16 00:43:27 +01:00
|
|
|
import sys
|
2016-06-04 16:52:18 +02:00
|
|
|
from argparse import ArgumentParser
|
2017-11-16 00:43:27 +01:00
|
|
|
from typing import IO, Any
|
2013-10-30 17:36:21 +01:00
|
|
|
|
|
|
|
import ujson
|
2017-11-16 00:43:27 +01:00
|
|
|
from django.core.management.base import BaseCommand
|
2013-10-30 17:36:21 +01:00
|
|
|
|
2017-11-16 00:43:27 +01:00
|
|
|
from zerver.lib.queue import queue_json_publish
|
2013-10-30 17:36:21 +01:00
|
|
|
|
2020-01-14 21:59:46 +01:00
|
|
|
|
2017-10-26 11:35:57 +02:00
|
|
|
def error(*args: Any) -> None:
|
2013-10-30 17:36:21 +01:00
|
|
|
raise Exception('We cannot enqueue because settings.USING_RABBITMQ is False.')
|
|
|
|
|
|
|
|
class Command(BaseCommand):
|
|
|
|
help = """Read JSON lines from a file and enqueue them to a worker queue.
|
|
|
|
|
|
|
|
Each line in the file should either be a JSON payload or two tab-separated
|
2017-01-12 03:33:10 +01:00
|
|
|
fields, the second of which is a JSON payload. (The latter is to accommodate
|
2013-10-30 17:36:21 +01:00
|
|
|
the format of error files written by queue workers that catch exceptions--their
|
|
|
|
first field is a timestamp that we ignore.)
|
|
|
|
|
|
|
|
You can use "-" to represent stdin.
|
|
|
|
"""
|
|
|
|
|
2017-10-26 11:35:57 +02:00
|
|
|
def add_arguments(self, parser: ArgumentParser) -> None:
|
2015-08-21 02:10:41 +02:00
|
|
|
parser.add_argument('queue_name', metavar='<queue>', type=str,
|
|
|
|
help="name of worker queue to enqueue to")
|
|
|
|
parser.add_argument('file_name', metavar='<file>', type=str,
|
|
|
|
help="name of file containing JSON lines")
|
2013-10-30 17:36:21 +01:00
|
|
|
|
2017-10-26 11:35:57 +02:00
|
|
|
def handle(self, *args: Any, **options: str) -> None:
|
2015-08-21 02:10:41 +02:00
|
|
|
queue_name = options['queue_name']
|
|
|
|
file_name = options['file_name']
|
2013-10-30 17:36:21 +01:00
|
|
|
|
|
|
|
if file_name == '-':
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
f: IO[str] = sys.stdin
|
2013-10-30 17:36:21 +01:00
|
|
|
else:
|
|
|
|
f = open(file_name)
|
|
|
|
|
|
|
|
while True:
|
|
|
|
line = f.readline()
|
|
|
|
if not line:
|
|
|
|
break
|
|
|
|
|
|
|
|
line = line.strip()
|
|
|
|
try:
|
|
|
|
payload = line.split('\t')[1]
|
|
|
|
except IndexError:
|
|
|
|
payload = line
|
|
|
|
|
2020-06-10 06:41:04 +02:00
|
|
|
print(f'Queueing to queue {queue_name}: {payload}')
|
2013-10-30 17:36:21 +01:00
|
|
|
|
|
|
|
# Verify that payload is valid json.
|
|
|
|
data = ujson.loads(payload)
|
|
|
|
|
2017-10-19 15:02:03 +02:00
|
|
|
# This is designed to use the `error` method rather than
|
|
|
|
# the call_consume_in_tests flow.
|
2013-10-30 17:36:21 +01:00
|
|
|
queue_json_publish(queue_name, data, error)
|