2017-11-01 20:51:12 +01:00
|
|
|
from django.conf import settings
|
|
|
|
from django.db.models import Sum
|
|
|
|
from django.db.models.query import F
|
|
|
|
from django.db.models.functions import Length
|
2018-01-06 11:01:22 +01:00
|
|
|
from zerver.models import BotConfigData, UserProfile
|
2017-11-01 20:51:12 +01:00
|
|
|
|
2018-05-10 19:13:36 +02:00
|
|
|
from typing import List, Dict, Optional
|
2018-04-02 19:52:54 +02:00
|
|
|
|
|
|
|
from collections import defaultdict
|
2017-11-01 20:51:12 +01:00
|
|
|
|
2018-01-07 19:21:04 +01:00
|
|
|
import os
|
|
|
|
|
|
|
|
import configparser
|
|
|
|
import importlib
|
|
|
|
|
2017-11-01 20:51:12 +01:00
|
|
|
class ConfigError(Exception):
|
|
|
|
pass
|
|
|
|
|
2018-05-10 19:13:36 +02:00
|
|
|
def get_bot_config(bot_profile: UserProfile) -> Dict[str, str]:
|
2018-01-06 11:01:22 +01:00
|
|
|
entries = BotConfigData.objects.filter(bot_profile=bot_profile)
|
2018-01-07 19:14:21 +01:00
|
|
|
if not entries:
|
|
|
|
raise ConfigError("No config data available.")
|
2017-11-01 20:51:12 +01:00
|
|
|
return {entry.key: entry.value for entry in entries}
|
|
|
|
|
2018-05-10 19:13:36 +02:00
|
|
|
def get_bot_configs(bot_profile_ids: List[int]) -> Dict[int, Dict[str, str]]:
|
2018-05-04 22:31:28 +02:00
|
|
|
if not bot_profile_ids:
|
2018-04-02 19:52:54 +02:00
|
|
|
return {}
|
2018-05-04 22:31:28 +02:00
|
|
|
entries = BotConfigData.objects.filter(bot_profile_id__in=bot_profile_ids)
|
2018-05-10 19:13:36 +02:00
|
|
|
entries_by_uid = defaultdict(dict) # type: Dict[int, Dict[str, str]]
|
2018-04-02 19:52:54 +02:00
|
|
|
for entry in entries:
|
2018-05-04 22:31:28 +02:00
|
|
|
entries_by_uid[entry.bot_profile_id].update({entry.key: entry.value})
|
2018-04-02 19:52:54 +02:00
|
|
|
return entries_by_uid
|
|
|
|
|
2018-05-10 19:13:36 +02:00
|
|
|
def get_bot_config_size(bot_profile: UserProfile, key: Optional[str]=None) -> int:
|
2017-11-01 20:51:12 +01:00
|
|
|
if key is None:
|
2018-01-06 11:01:22 +01:00
|
|
|
return BotConfigData.objects.filter(bot_profile=bot_profile) \
|
|
|
|
.annotate(key_size=Length('key'), value_size=Length('value')) \
|
|
|
|
.aggregate(sum=Sum(F('key_size')+F('value_size')))['sum'] or 0
|
2017-11-01 20:51:12 +01:00
|
|
|
else:
|
|
|
|
try:
|
2018-01-06 11:01:22 +01:00
|
|
|
return len(key) + len(BotConfigData.objects.get(bot_profile=bot_profile, key=key).value)
|
|
|
|
except BotConfigData.DoesNotExist:
|
2017-11-01 20:51:12 +01:00
|
|
|
return 0
|
|
|
|
|
2018-05-10 19:13:36 +02:00
|
|
|
def set_bot_config(bot_profile: UserProfile, key: str, value: str) -> None:
|
2017-11-01 20:51:12 +01:00
|
|
|
config_size_limit = settings.BOT_CONFIG_SIZE_LIMIT
|
|
|
|
old_entry_size = get_bot_config_size(bot_profile, key)
|
|
|
|
new_entry_size = len(key) + len(value)
|
|
|
|
old_config_size = get_bot_config_size(bot_profile)
|
|
|
|
new_config_size = old_config_size + (new_entry_size - old_entry_size)
|
|
|
|
if new_config_size > config_size_limit:
|
|
|
|
raise ConfigError("Cannot store configuration. Request would require {} characters. "
|
|
|
|
"The current configuration size limit is {} characters.".format(new_config_size,
|
|
|
|
config_size_limit))
|
2018-01-06 11:01:22 +01:00
|
|
|
obj, created = BotConfigData.objects.get_or_create(bot_profile=bot_profile, key=key,
|
|
|
|
defaults={'value': value})
|
2017-11-01 20:51:12 +01:00
|
|
|
if not created:
|
|
|
|
obj.value = value
|
|
|
|
obj.save()
|
2018-01-07 19:21:04 +01:00
|
|
|
|
|
|
|
def load_bot_config_template(bot: str) -> Dict[str, str]:
|
|
|
|
bot_module_name = 'zulip_bots.bots.{}'.format(bot)
|
|
|
|
bot_module = importlib.import_module(bot_module_name)
|
|
|
|
bot_module_path = os.path.dirname(bot_module.__file__)
|
|
|
|
config_path = os.path.join(bot_module_path, '{}.conf'.format(bot))
|
|
|
|
if os.path.isfile(config_path):
|
|
|
|
config = configparser.ConfigParser()
|
|
|
|
with open(config_path) as conf:
|
2019-08-02 22:39:46 +02:00
|
|
|
config.readfp(conf)
|
2018-01-07 19:21:04 +01:00
|
|
|
return dict(config.items(bot))
|
|
|
|
else:
|
|
|
|
return dict()
|