2017-11-16 00:43:27 +01:00
|
|
|
import sys
|
2016-06-04 16:52:18 +02:00
|
|
|
from argparse import ArgumentParser
|
2017-11-16 00:43:27 +01:00
|
|
|
from typing import IO, Any
|
2013-10-30 17:36:21 +01:00
|
|
|
|
2020-08-07 01:09:47 +02:00
|
|
|
import orjson
|
2017-11-16 00:43:27 +01:00
|
|
|
from django.core.management.base import BaseCommand
|
2023-10-12 19:43:45 +02:00
|
|
|
from typing_extensions import override
|
2013-10-30 17:36:21 +01:00
|
|
|
|
2017-11-16 00:43:27 +01:00
|
|
|
from zerver.lib.queue import queue_json_publish
|
2013-10-30 17:36:21 +01:00
|
|
|
|
2020-01-14 21:59:46 +01:00
|
|
|
|
2017-10-26 11:35:57 +02:00
|
|
|
def error(*args: Any) -> None:
|
2021-02-12 08:20:45 +01:00
|
|
|
raise Exception("We cannot enqueue because settings.USING_RABBITMQ is False.")
|
2013-10-30 17:36:21 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2023-01-18 04:06:51 +01:00
|
|
|
def enqueue_file(queue_name: str, f: IO[str]) -> None:
|
|
|
|
for line in f:
|
|
|
|
line = line.strip()
|
|
|
|
try:
|
|
|
|
payload = line.split("\t")[1]
|
|
|
|
except IndexError:
|
|
|
|
payload = line
|
|
|
|
|
|
|
|
print(f"Queueing to queue {queue_name}: {payload}")
|
|
|
|
|
|
|
|
# Verify that payload is valid json.
|
|
|
|
data = orjson.loads(payload)
|
|
|
|
|
|
|
|
# This is designed to use the `error` method rather than
|
|
|
|
# the call_consume_in_tests flow.
|
|
|
|
queue_json_publish(queue_name, data, error)
|
|
|
|
|
|
|
|
|
2013-10-30 17:36:21 +01:00
|
|
|
class Command(BaseCommand):
|
|
|
|
help = """Read JSON lines from a file and enqueue them to a worker queue.
|
|
|
|
|
|
|
|
Each line in the file should either be a JSON payload or two tab-separated
|
2017-01-12 03:33:10 +01:00
|
|
|
fields, the second of which is a JSON payload. (The latter is to accommodate
|
2013-10-30 17:36:21 +01:00
|
|
|
the format of error files written by queue workers that catch exceptions--their
|
|
|
|
first field is a timestamp that we ignore.)
|
|
|
|
|
|
|
|
You can use "-" to represent stdin.
|
|
|
|
"""
|
|
|
|
|
2023-10-12 19:43:45 +02:00
|
|
|
@override
|
2017-10-26 11:35:57 +02:00
|
|
|
def add_arguments(self, parser: ArgumentParser) -> None:
|
2021-02-12 08:19:30 +01:00
|
|
|
parser.add_argument(
|
2021-02-12 08:20:45 +01:00
|
|
|
"queue_name", metavar="<queue>", help="name of worker queue to enqueue to"
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
|
|
|
parser.add_argument(
|
2021-02-12 08:20:45 +01:00
|
|
|
"file_name", metavar="<file>", help="name of file containing JSON lines"
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2013-10-30 17:36:21 +01:00
|
|
|
|
2023-10-12 19:43:45 +02:00
|
|
|
@override
|
2017-10-26 11:35:57 +02:00
|
|
|
def handle(self, *args: Any, **options: str) -> None:
|
2021-02-12 08:20:45 +01:00
|
|
|
queue_name = options["queue_name"]
|
|
|
|
file_name = options["file_name"]
|
2013-10-30 17:36:21 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
if file_name == "-":
|
2023-01-18 04:06:51 +01:00
|
|
|
enqueue_file(queue_name, sys.stdin)
|
2013-10-30 17:36:21 +01:00
|
|
|
else:
|
2023-01-18 04:06:51 +01:00
|
|
|
with open(file_name) as f:
|
|
|
|
enqueue_file(queue_name, f)
|