2013-10-30 17:36:21 +01:00
|
|
|
from __future__ import absolute_import
|
2015-11-01 17:11:06 +01:00
|
|
|
from __future__ import print_function
|
2013-10-30 17:36:21 +01:00
|
|
|
|
2016-06-04 16:52:18 +02:00
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
from argparse import ArgumentParser
|
2013-10-30 17:36:21 +01:00
|
|
|
from django.core.management.base import BaseCommand
|
|
|
|
from zerver.lib.queue import queue_json_publish
|
|
|
|
|
|
|
|
import sys
|
|
|
|
import ujson
|
|
|
|
|
|
|
|
|
|
|
|
def error(*args):
|
2016-06-04 16:52:18 +02:00
|
|
|
# type: (*Any) -> None
|
2013-10-30 17:36:21 +01:00
|
|
|
raise Exception('We cannot enqueue because settings.USING_RABBITMQ is False.')
|
|
|
|
|
|
|
|
class Command(BaseCommand):
|
|
|
|
help = """Read JSON lines from a file and enqueue them to a worker queue.
|
|
|
|
|
|
|
|
Each line in the file should either be a JSON payload or two tab-separated
|
|
|
|
fields, the second of which is a JSON payload. (The latter is to accomodate
|
|
|
|
the format of error files written by queue workers that catch exceptions--their
|
|
|
|
first field is a timestamp that we ignore.)
|
|
|
|
|
|
|
|
You can use "-" to represent stdin.
|
|
|
|
"""
|
|
|
|
|
2015-08-21 02:10:41 +02:00
|
|
|
def add_arguments(self, parser):
|
2016-06-04 16:52:18 +02:00
|
|
|
# type: (ArgumentParser) -> None
|
2015-08-21 02:10:41 +02:00
|
|
|
parser.add_argument('queue_name', metavar='<queue>', type=str,
|
|
|
|
help="name of worker queue to enqueue to")
|
|
|
|
parser.add_argument('file_name', metavar='<file>', type=str,
|
|
|
|
help="name of file containing JSON lines")
|
2013-10-30 17:36:21 +01:00
|
|
|
|
2015-08-21 02:10:41 +02:00
|
|
|
def handle(self, *args, **options):
|
2016-06-04 16:52:18 +02:00
|
|
|
# type: (*Any, **str) -> None
|
2015-08-21 02:10:41 +02:00
|
|
|
queue_name = options['queue_name']
|
|
|
|
file_name = options['file_name']
|
2013-10-30 17:36:21 +01:00
|
|
|
|
|
|
|
if file_name == '-':
|
|
|
|
f = sys.stdin
|
|
|
|
else:
|
|
|
|
f = open(file_name)
|
|
|
|
|
|
|
|
while True:
|
|
|
|
line = f.readline()
|
|
|
|
if not line:
|
|
|
|
break
|
|
|
|
|
|
|
|
line = line.strip()
|
|
|
|
try:
|
|
|
|
payload = line.split('\t')[1]
|
|
|
|
except IndexError:
|
|
|
|
payload = line
|
|
|
|
|
2015-11-01 17:11:06 +01:00
|
|
|
print('Queueing to queue %s: %s' % (queue_name, payload))
|
2013-10-30 17:36:21 +01:00
|
|
|
|
|
|
|
# Verify that payload is valid json.
|
|
|
|
data = ujson.loads(payload)
|
|
|
|
|
|
|
|
queue_json_publish(queue_name, data, error)
|