zulip/zerver/lib/generate_test_data.py

182 lines
5.5 KiB
Python
Raw Normal View History

import itertools
import ujson
import random
from typing import List, Dict, Any
import os
from scripts.lib.zulip_tools import get_or_create_dev_uuid_var_path
def load_config() -> Dict[str, Any]:
with open("zerver/tests/fixtures/config.generate_data.json") as infile:
config = ujson.load(infile)
return config
def get_stream_title(gens: Dict[str, Any]) -> str:
return next(gens["adjectives"]) + " " + next(gens["nouns"]) + " " + \
next(gens["connectors"]) + " " + next(gens["verbs"]) + " " + \
next(gens["adverbs"])
def load_generators(config: Dict[str, Any]) -> Dict[str, Any]:
results = {}
cfg = config["gen_fodder"]
results["nouns"] = itertools.cycle(cfg["nouns"])
results["adjectives"] = itertools.cycle(cfg["adjectives"])
results["connectors"] = itertools.cycle(cfg["connectors"])
results["verbs"] = itertools.cycle(cfg["verbs"])
results["adverbs"] = itertools.cycle(cfg["adverbs"])
results["emojis"] = itertools.cycle(cfg["emoji"])
results["links"] = itertools.cycle(cfg["links"])
results["maths"] = itertools.cycle(cfg["maths"])
results["inline-code"] = itertools.cycle(cfg["inline-code"])
results["code-blocks"] = itertools.cycle(cfg["code-blocks"])
results["quote-blocks"] = itertools.cycle(cfg["quote-blocks"])
results["lists"] = itertools.cycle(cfg["lists"])
return results
def parse_file(config: Dict[str, Any], gens: Dict[str, Any], corpus_file: str) -> List[str]:
# First, load the entire file into a dictionary,
# then apply our custom filters to it as needed.
python: Convert assignment type annotations to Python 3.6 style. This commit was split by tabbott; this piece covers the vast majority of files in Zulip, but excludes scripts/, tools/, and puppet/ to help ensure we at least show the right error messages for Xenial systems. We can likely further refine the remaining pieces with some testing. Generated by com2ann, with whitespace fixes and various manual fixes for runtime issues: - invoiced_through: Optional[LicenseLedger] = models.ForeignKey( + invoiced_through: Optional["LicenseLedger"] = models.ForeignKey( -_apns_client: Optional[APNsClient] = None +_apns_client: Optional["APNsClient"] = None - notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) - signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) + notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) + signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) - author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE) + author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE) - bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL) + bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL) - default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) - default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) + default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) + default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) -descriptors_by_handler_id: Dict[int, ClientDescriptor] = {} +descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {} -worker_classes: Dict[str, Type[QueueProcessingWorker]] = {} -queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {} +worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {} +queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {} -AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None +AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
paragraphs: List[str] = []
with open(corpus_file) as infile:
# OUR DATA: we need to separate the person talking and what they say
paragraphs = remove_line_breaks(infile)
paragraphs = add_flair(paragraphs, gens)
return paragraphs
def get_flair_gen(length: int) -> List[str]:
# Grab the percentages from the config file
# create a list that we can consume that will guarantee the distribution
result = []
for k, v in config["dist_percentages"].items():
result.extend([k] * int(v * length / 100))
result.extend(["None"] * (length - len(result)))
random.shuffle(result)
return result
def add_flair(paragraphs: List[str], gens: Dict[str, Any]) -> List[str]:
# roll the dice and see what kind of flair we should add, if any
results = []
flair = get_flair_gen(len(paragraphs))
for i in range(len(paragraphs)):
key = flair[i]
if key == "None":
txt = paragraphs[i]
elif key == "italic":
txt = add_md("*", paragraphs[i])
elif key == "bold":
txt = add_md("**", paragraphs[i])
elif key == "strike-thru":
txt = add_md("~~", paragraphs[i])
elif key == "quoted":
txt = ">" + paragraphs[i]
elif key == "quote-block":
txt = paragraphs[i] + "\n" + next(gens["quote-blocks"])
elif key == "inline-code":
txt = paragraphs[i] + "\n" + next(gens["inline-code"])
elif key == "code-block":
txt = paragraphs[i] + "\n" + next(gens["code-blocks"])
elif key == "math":
txt = paragraphs[i] + "\n" + next(gens["maths"])
elif key == "list":
txt = paragraphs[i] + "\n" + next(gens["lists"])
elif key == "emoji":
txt = add_emoji(paragraphs[i], next(gens["emojis"]))
elif key == "link":
txt = add_link(paragraphs[i], next(gens["links"]))
elif key == "picture":
txt = txt # TODO: implement pictures
results.append(txt)
return results
def add_md(mode: str, text: str) -> str:
# mode means: bold, italic, etc.
# to add a list at the end of a paragraph, * iterm one\n * item two
# find out how long the line is, then insert the mode before the end
vals = text.split()
start = random.randrange(len(vals))
end = random.randrange(len(vals) - start) + start
vals[start] = mode + vals[start]
vals[end] = vals[end] + mode
return " ".join(vals).strip()
def add_emoji(text: str, emoji: str) -> str:
vals = text.split()
start = random.randrange(len(vals))
vals[start] = vals[start] + " " + emoji + " "
return " ".join(vals)
def add_link(text: str, link: str) -> str:
vals = text.split()
start = random.randrange(len(vals))
vals[start] = vals[start] + " " + link + " "
return " ".join(vals)
def remove_line_breaks(fh: Any) -> List[str]:
# We're going to remove line breaks from paragraphs
results = [] # save the dialogs as tuples with (author, dialog)
para = [] # we'll store the lines here to form a paragraph
for line in fh:
text = line.strip()
if text != "":
para.append(text)
else:
if para:
results.append(" ".join(para))
# reset the paragraph
para = []
if para:
results.append(" ".join(para))
return results
def write_file(paragraphs: List[str], filename: str) -> None:
with open(filename, "w") as outfile:
outfile.write(ujson.dumps(paragraphs))
def create_test_data() -> None:
gens = load_generators(config) # returns a dictionary of generators
paragraphs = parse_file(config, gens, config["corpus"]["filename"])
write_file(paragraphs, os.path.join(get_or_create_dev_uuid_var_path('test-backend'),
"test_messages.json"))
config = load_config()
if __name__ == "__main__":
create_test_data()