mirror of https://github.com/zulip/zulip.git
python: Replace None defaults with empty collections where appropriate.
Use read-only types (List ↦ Sequence, Dict ↦ Mapping, Set ↦ AbstractSet) to guard against accidental mutation of the default value. Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
parent
cfcbf58cd1
commit
91a86c24f5
|
@ -4,7 +4,7 @@ __revision__ = '$Id: models.py 28 2009-10-22 15:03:02Z jarek.zgoda $'
|
||||||
import datetime
|
import datetime
|
||||||
import string
|
import string
|
||||||
from random import SystemRandom
|
from random import SystemRandom
|
||||||
from typing import Dict, Optional, Union
|
from typing import Mapping, Optional, Union
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.contrib.contenttypes.fields import GenericForeignKey
|
from django.contrib.contenttypes.fields import GenericForeignKey
|
||||||
|
@ -65,7 +65,7 @@ def get_object_from_key(confirmation_key: str,
|
||||||
|
|
||||||
def create_confirmation_link(obj: ContentType, host: str,
|
def create_confirmation_link(obj: ContentType, host: str,
|
||||||
confirmation_type: int,
|
confirmation_type: int,
|
||||||
url_args: Optional[Dict[str, str]]=None) -> str:
|
url_args: Mapping[str, str] = {}) -> str:
|
||||||
key = generate_key()
|
key = generate_key()
|
||||||
realm = None
|
realm = None
|
||||||
if hasattr(obj, 'realm'):
|
if hasattr(obj, 'realm'):
|
||||||
|
@ -79,9 +79,8 @@ def create_confirmation_link(obj: ContentType, host: str,
|
||||||
|
|
||||||
def confirmation_url(confirmation_key: str, host: str,
|
def confirmation_url(confirmation_key: str, host: str,
|
||||||
confirmation_type: int,
|
confirmation_type: int,
|
||||||
url_args: Optional[Dict[str, str]]=None) -> str:
|
url_args: Mapping[str, str] = {}) -> str:
|
||||||
if url_args is None:
|
url_args = dict(url_args)
|
||||||
url_args = {}
|
|
||||||
url_args['confirmation_key'] = confirmation_key
|
url_args['confirmation_key'] = confirmation_key
|
||||||
return '%s%s%s' % (settings.EXTERNAL_URI_SCHEME, host,
|
return '%s%s%s' % (settings.EXTERNAL_URI_SCHEME, host,
|
||||||
reverse(_properties[confirmation_type].url_name, kwargs=url_args))
|
reverse(_properties[confirmation_type].url_name, kwargs=url_args))
|
||||||
|
|
|
@ -24,7 +24,7 @@ sys.path.append('/home/zulip/deployments/current/zerver')
|
||||||
|
|
||||||
django.setup()
|
django.setup()
|
||||||
|
|
||||||
from typing import Any, Dict, Optional, Set
|
from typing import AbstractSet, Any, Dict
|
||||||
|
|
||||||
from zerver.models import UserActivity
|
from zerver.models import UserActivity
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ states: Dict[str, int] = {
|
||||||
"UNKNOWN": 3,
|
"UNKNOWN": 3,
|
||||||
}
|
}
|
||||||
|
|
||||||
def report(state: str, short_msg: str, too_old: Optional[Set[Any]] = None) -> None:
|
def report(state: str, short_msg: str, too_old: AbstractSet[Any] = set()) -> None:
|
||||||
too_old_data = ""
|
too_old_data = ""
|
||||||
if too_old:
|
if too_old:
|
||||||
too_old_data = "\nLast call to get_message for recently out of date mirrors:\n" + "\n".join(
|
too_old_data = "\nLast call to get_message for recently out of date mirrors:\n" + "\n".join(
|
||||||
|
|
|
@ -129,7 +129,6 @@ def send_bot_payload_message(bot: UserProfile, integration: WebhookIntegration,
|
||||||
data = ''
|
data = ''
|
||||||
|
|
||||||
headers = get_requests_headers(integration.name, fixture_name)
|
headers = get_requests_headers(integration.name, fixture_name)
|
||||||
if config.custom_headers:
|
|
||||||
headers.update(config.custom_headers)
|
headers.update(config.custom_headers)
|
||||||
if config.use_basic_auth:
|
if config.use_basic_auth:
|
||||||
credentials = base64.b64encode(f'{bot.email}:{bot.api_key}'.encode('utf8')).decode('utf8')
|
credentials = base64.b64encode(f'{bot.email}:{bot.api_key}'.encode('utf8')).decode('utf8')
|
||||||
|
@ -140,7 +139,6 @@ def send_bot_payload_message(bot: UserProfile, integration: WebhookIntegration,
|
||||||
stream = integration.stream_name or 'devel'
|
stream = integration.stream_name or 'devel'
|
||||||
url = f"{bot.bot_owner.realm.uri}/{integration.url}"
|
url = f"{bot.bot_owner.realm.uri}/{integration.url}"
|
||||||
params = {'api_key': bot.api_key, 'stream': stream}
|
params = {'api_key': bot.api_key, 'stream': stream}
|
||||||
if config.extra_params:
|
|
||||||
params.update(config.extra_params)
|
params.update(config.extra_params)
|
||||||
|
|
||||||
extra_args = {}
|
extra_args = {}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import re
|
import re
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from typing import Dict, List, Optional, Set
|
from typing import Dict, List, Optional, Sequence, Set
|
||||||
|
|
||||||
from .template_parser import FormattedException, Token, tokenize
|
from .template_parser import FormattedException, Token, tokenize
|
||||||
|
|
||||||
|
@ -132,12 +132,9 @@ def html_branches(text: str, fn: Optional[str] = None) -> List[HtmlTreeBranch]:
|
||||||
tree = html_tag_tree(text, fn)
|
tree = html_tag_tree(text, fn)
|
||||||
branches: List[HtmlTreeBranch] = []
|
branches: List[HtmlTreeBranch] = []
|
||||||
|
|
||||||
def walk(node: Node, tag_info_list: Optional[List[TagInfo]] = None) -> None:
|
def walk(node: Node, tag_info_list: Sequence[TagInfo] = []) -> None:
|
||||||
info = get_tag_info(node.token)
|
info = get_tag_info(node.token)
|
||||||
if tag_info_list is None:
|
tag_info_list = [*tag_info_list, info]
|
||||||
tag_info_list = [info]
|
|
||||||
else:
|
|
||||||
tag_info_list = tag_info_list[:] + [info]
|
|
||||||
|
|
||||||
if node.children:
|
if node.children:
|
||||||
for child in node.children:
|
for child in node.children:
|
||||||
|
@ -147,7 +144,7 @@ def html_branches(text: str, fn: Optional[str] = None) -> List[HtmlTreeBranch]:
|
||||||
branches.append(tree_branch)
|
branches.append(tree_branch)
|
||||||
|
|
||||||
for node in tree.children:
|
for node in tree.children:
|
||||||
walk(node, None)
|
walk(node, [])
|
||||||
|
|
||||||
return branches
|
return branches
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import signal
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
from typing import Any, Callable, Generator, List, Sequence
|
||||||
from urllib.parse import urlunparse
|
from urllib.parse import urlunparse
|
||||||
|
|
||||||
# check for the venv
|
# check for the venv
|
||||||
|
@ -18,7 +19,6 @@ from tornado.ioloop import IOLoop
|
||||||
|
|
||||||
TOOLS_DIR = os.path.dirname(os.path.abspath(__file__))
|
TOOLS_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||||
sys.path.insert(0, os.path.dirname(TOOLS_DIR))
|
sys.path.insert(0, os.path.dirname(TOOLS_DIR))
|
||||||
from typing import Any, Callable, Generator, List, Optional
|
|
||||||
|
|
||||||
from tools.lib.test_script import assert_provisioning_status_ok
|
from tools.lib.test_script import assert_provisioning_status_ok
|
||||||
|
|
||||||
|
@ -212,9 +212,8 @@ class BaseHandler(web.RequestHandler):
|
||||||
target_port: int
|
target_port: int
|
||||||
|
|
||||||
def _add_request_headers(
|
def _add_request_headers(
|
||||||
self, exclude_lower_headers_list: Optional[List[str]] = None,
|
self, exclude_lower_headers_list: Sequence[str] = [],
|
||||||
) -> httputil.HTTPHeaders:
|
) -> httputil.HTTPHeaders:
|
||||||
exclude_lower_headers_list = exclude_lower_headers_list or []
|
|
||||||
headers = httputil.HTTPHeaders()
|
headers = httputil.HTTPHeaders()
|
||||||
for header, v in self.request.headers.get_all():
|
for header, v in self.request.headers.get_all():
|
||||||
if header.lower() not in exclude_lower_headers_list:
|
if header.lower() not in exclude_lower_headers_list:
|
||||||
|
|
|
@ -3,7 +3,7 @@ import os
|
||||||
import random
|
import random
|
||||||
import shutil
|
import shutil
|
||||||
import traceback
|
import traceback
|
||||||
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, TypeVar
|
from typing import AbstractSet, Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, TypeVar
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
import ujson
|
import ujson
|
||||||
|
@ -363,12 +363,9 @@ def build_usermessages(zerver_usermessage: List[ZerverFieldsT],
|
||||||
mentioned_user_ids: List[int],
|
mentioned_user_ids: List[int],
|
||||||
message_id: int,
|
message_id: int,
|
||||||
is_private: bool,
|
is_private: bool,
|
||||||
long_term_idle: Optional[Set[int]]=None) -> Tuple[int, int]:
|
long_term_idle: AbstractSet[int] = set()) -> Tuple[int, int]:
|
||||||
user_ids = subscriber_map.get(recipient_id, set())
|
user_ids = subscriber_map.get(recipient_id, set())
|
||||||
|
|
||||||
if long_term_idle is None:
|
|
||||||
long_term_idle = set()
|
|
||||||
|
|
||||||
user_messages_created = 0
|
user_messages_created = 0
|
||||||
user_messages_skipped = 0
|
user_messages_skipped = 0
|
||||||
if user_ids:
|
if user_ids:
|
||||||
|
|
|
@ -1059,7 +1059,7 @@ class RecipientInfoResult(TypedDict):
|
||||||
def get_recipient_info(recipient: Recipient,
|
def get_recipient_info(recipient: Recipient,
|
||||||
sender_id: int,
|
sender_id: int,
|
||||||
stream_topic: Optional[StreamTopicTarget],
|
stream_topic: Optional[StreamTopicTarget],
|
||||||
possibly_mentioned_user_ids: Optional[Set[int]]=None,
|
possibly_mentioned_user_ids: AbstractSet[int]=set(),
|
||||||
possible_wildcard_mention: bool=True) -> RecipientInfoResult:
|
possible_wildcard_mention: bool=True) -> RecipientInfoResult:
|
||||||
stream_push_user_ids: Set[int] = set()
|
stream_push_user_ids: Set[int] = set()
|
||||||
stream_email_user_ids: Set[int] = set()
|
stream_email_user_ids: Set[int] = set()
|
||||||
|
@ -1151,7 +1151,6 @@ def get_recipient_info(recipient: Recipient,
|
||||||
message_to_user_id_set = set(message_to_user_ids)
|
message_to_user_id_set = set(message_to_user_ids)
|
||||||
|
|
||||||
user_ids = set(message_to_user_id_set)
|
user_ids = set(message_to_user_id_set)
|
||||||
if possibly_mentioned_user_ids:
|
|
||||||
# Important note: Because we haven't rendered bugdown yet, we
|
# Important note: Because we haven't rendered bugdown yet, we
|
||||||
# don't yet know which of these possibly-mentioned users was
|
# don't yet know which of these possibly-mentioned users was
|
||||||
# actually mentioned in the message (in other words, the
|
# actually mentioned in the message (in other words, the
|
||||||
|
@ -2759,7 +2758,7 @@ def get_last_message_id() -> int:
|
||||||
SubT = Tuple[List[Tuple[UserProfile, Stream]], List[Tuple[UserProfile, Stream]]]
|
SubT = Tuple[List[Tuple[UserProfile, Stream]], List[Tuple[UserProfile, Stream]]]
|
||||||
def bulk_add_subscriptions(streams: Iterable[Stream],
|
def bulk_add_subscriptions(streams: Iterable[Stream],
|
||||||
users: Iterable[UserProfile],
|
users: Iterable[UserProfile],
|
||||||
color_map: Optional[Dict[str, str]]=None,
|
color_map: Mapping[str, str]={},
|
||||||
from_stream_creation: bool=False,
|
from_stream_creation: bool=False,
|
||||||
acting_user: Optional[UserProfile]=None) -> SubT:
|
acting_user: Optional[UserProfile]=None) -> SubT:
|
||||||
users = list(users)
|
users = list(users)
|
||||||
|
@ -2799,7 +2798,7 @@ def bulk_add_subscriptions(streams: Iterable[Stream],
|
||||||
|
|
||||||
subs_to_add: List[Tuple[Subscription, Stream]] = []
|
subs_to_add: List[Tuple[Subscription, Stream]] = []
|
||||||
for (user_profile, recipient_id, stream) in new_subs:
|
for (user_profile, recipient_id, stream) in new_subs:
|
||||||
if color_map is not None and stream.name in color_map:
|
if stream.name in color_map:
|
||||||
color = color_map[stream.name]
|
color = color_map[stream.name]
|
||||||
else:
|
else:
|
||||||
color = pick_color(user_profile, subs_by_user[user_profile.id])
|
color = pick_color(user_profile, subs_by_user[user_profile.id])
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Mapping
|
||||||
|
|
||||||
import markdown
|
import markdown
|
||||||
from django.utils.html import escape as escape_html
|
from django.utils.html import escape as escape_html
|
||||||
|
@ -14,9 +14,7 @@ REGEXP = re.compile(r'\{generate_api_arguments_table\|\s*(.+?)\s*\|\s*(.+)\s*\}'
|
||||||
|
|
||||||
|
|
||||||
class MarkdownArgumentsTableGenerator(Extension):
|
class MarkdownArgumentsTableGenerator(Extension):
|
||||||
def __init__(self, configs: Optional[Dict[str, Any]]=None) -> None:
|
def __init__(self, configs: Mapping[str, Any] = {}) -> None:
|
||||||
if configs is None:
|
|
||||||
configs = {}
|
|
||||||
self.config = {
|
self.config = {
|
||||||
'base_path': ['.', 'Default location from which to evaluate relative paths for the JSON files.'],
|
'base_path': ['.', 'Default location from which to evaluate relative paths for the JSON files.'],
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import re
|
import re
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Mapping, Optional
|
||||||
|
|
||||||
import markdown
|
import markdown
|
||||||
from markdown.extensions import Extension
|
from markdown.extensions import Extension
|
||||||
|
@ -11,7 +11,7 @@ REGEXP = re.compile(r'\{generate_return_values_table\|\s*(.+?)\s*\|\s*(.+)\s*\}'
|
||||||
|
|
||||||
|
|
||||||
class MarkdownReturnValuesTableGenerator(Extension):
|
class MarkdownReturnValuesTableGenerator(Extension):
|
||||||
def __init__(self, configs: Optional[Dict[str, Any]]=None) -> None:
|
def __init__(self, configs: Mapping[str, Any] = {}) -> None:
|
||||||
self.config: Dict[str, Any] = {}
|
self.config: Dict[str, Any] = {}
|
||||||
|
|
||||||
def extendMarkdown(self, md: markdown.Markdown, md_globals: Dict[str, Any]) -> None:
|
def extendMarkdown(self, md: markdown.Markdown, md_globals: Dict[str, Any]) -> None:
|
||||||
|
|
|
@ -76,7 +76,7 @@ Dependencies:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import re
|
import re
|
||||||
from typing import Any, Dict, Iterable, List, MutableSequence, Optional
|
from typing import Any, Dict, Iterable, List, Mapping, MutableSequence, Optional
|
||||||
|
|
||||||
import markdown
|
import markdown
|
||||||
from django.utils.html import escape
|
from django.utils.html import escape
|
||||||
|
@ -128,9 +128,7 @@ CODE_VALIDATORS = {
|
||||||
}
|
}
|
||||||
|
|
||||||
class FencedCodeExtension(markdown.Extension):
|
class FencedCodeExtension(markdown.Extension):
|
||||||
def __init__(self, config: Optional[Dict[str, Any]]=None) -> None:
|
def __init__(self, config: Mapping[str, Any] = {}) -> None:
|
||||||
if config is None:
|
|
||||||
config = {}
|
|
||||||
self.config = {
|
self.config = {
|
||||||
'run_content_validators': [
|
'run_content_validators': [
|
||||||
config.get('run_content_validators', False),
|
config.get('run_content_validators', False),
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from typing import Dict, Optional
|
from typing import Mapping
|
||||||
|
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
from django.http import HttpRequest
|
from django.http import HttpRequest
|
||||||
|
@ -7,7 +7,7 @@ from django.utils.html import escape
|
||||||
from zerver.lib.cache import cache_with_key, open_graph_description_cache_key
|
from zerver.lib.cache import cache_with_key, open_graph_description_cache_key
|
||||||
|
|
||||||
|
|
||||||
def html_to_text(content: str, tags: Optional[Dict[str, str]]=None) -> str:
|
def html_to_text(content: str, tags: Mapping[str, str] = {'p': ' | '}) -> str:
|
||||||
bs = BeautifulSoup(content, features='lxml')
|
bs = BeautifulSoup(content, features='lxml')
|
||||||
# Skip any admonition (warning) blocks, since they're
|
# Skip any admonition (warning) blocks, since they're
|
||||||
# usually something about users needing to be an
|
# usually something about users needing to be an
|
||||||
|
@ -21,8 +21,6 @@ def html_to_text(content: str, tags: Optional[Dict[str, str]]=None) -> str:
|
||||||
tag.clear()
|
tag.clear()
|
||||||
|
|
||||||
text = ''
|
text = ''
|
||||||
if tags is None:
|
|
||||||
tags = {'p': ' | '}
|
|
||||||
for element in bs.find_all(tags.keys()):
|
for element in bs.find_all(tags.keys()):
|
||||||
# Ignore empty elements
|
# Ignore empty elements
|
||||||
if not element.text:
|
if not element.text:
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import os
|
import os
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass, field
|
||||||
from typing import Any, Dict, List, Optional, Tuple
|
from typing import Any, Dict, List, Optional, Sequence, Tuple
|
||||||
|
|
||||||
from django.conf.urls import url
|
from django.conf.urls import url
|
||||||
from django.contrib.staticfiles.storage import staticfiles_storage
|
from django.contrib.staticfiles.storage import staticfiles_storage
|
||||||
|
@ -56,7 +56,7 @@ class Integration:
|
||||||
logo: Optional[str]=None, secondary_line_text: Optional[str]=None,
|
logo: Optional[str]=None, secondary_line_text: Optional[str]=None,
|
||||||
display_name: Optional[str]=None, doc: Optional[str]=None,
|
display_name: Optional[str]=None, doc: Optional[str]=None,
|
||||||
stream_name: Optional[str]=None, legacy: bool=False,
|
stream_name: Optional[str]=None, legacy: bool=False,
|
||||||
config_options: Optional[List[Tuple[str, str, Validator]]]=None) -> None:
|
config_options: Sequence[Tuple[str, str, Validator]]=[]) -> None:
|
||||||
self.name = name
|
self.name = name
|
||||||
self.client_name = client_name
|
self.client_name = client_name
|
||||||
self.secondary_line_text = secondary_line_text
|
self.secondary_line_text = secondary_line_text
|
||||||
|
@ -66,8 +66,6 @@ class Integration:
|
||||||
# Note: Currently only incoming webhook type bots use this list for
|
# Note: Currently only incoming webhook type bots use this list for
|
||||||
# defining how the bot's BotConfigData should be. Embedded bots follow
|
# defining how the bot's BotConfigData should be. Embedded bots follow
|
||||||
# a different approach.
|
# a different approach.
|
||||||
if config_options is None:
|
|
||||||
config_options = []
|
|
||||||
self.config_options = config_options
|
self.config_options = config_options
|
||||||
|
|
||||||
for category in categories:
|
for category in categories:
|
||||||
|
@ -160,7 +158,7 @@ class WebhookIntegration(Integration):
|
||||||
function: Optional[str]=None, url: Optional[str]=None,
|
function: Optional[str]=None, url: Optional[str]=None,
|
||||||
display_name: Optional[str]=None, doc: Optional[str]=None,
|
display_name: Optional[str]=None, doc: Optional[str]=None,
|
||||||
stream_name: Optional[str]=None, legacy: bool=False,
|
stream_name: Optional[str]=None, legacy: bool=False,
|
||||||
config_options: Optional[List[Tuple[str, str, Validator]]]=None) -> None:
|
config_options: Sequence[Tuple[str, str, Validator]]=[]) -> None:
|
||||||
if client_name is None:
|
if client_name is None:
|
||||||
client_name = self.DEFAULT_CLIENT_NAME.format(name=name.title())
|
client_name = self.DEFAULT_CLIENT_NAME.format(name=name.title())
|
||||||
super().__init__(
|
super().__init__(
|
||||||
|
@ -210,9 +208,9 @@ class ScreenshotConfig:
|
||||||
bot_name: Optional[str] = None
|
bot_name: Optional[str] = None
|
||||||
payload_as_query_param: bool = False
|
payload_as_query_param: bool = False
|
||||||
payload_param_name: str = 'payload'
|
payload_param_name: str = 'payload'
|
||||||
extra_params: Optional[Dict[str, str]] = None
|
extra_params: Dict[str, str] = field(default_factory=dict)
|
||||||
use_basic_auth: bool = False
|
use_basic_auth: bool = False
|
||||||
custom_headers: Optional[Dict[str, str]] = None
|
custom_headers: Dict[str, str] = field(default_factory=dict)
|
||||||
|
|
||||||
def get_fixture_and_image_paths(integration: WebhookIntegration,
|
def get_fixture_and_image_paths(integration: WebhookIntegration,
|
||||||
screenshot_config: ScreenshotConfig) -> Tuple[str, str]:
|
screenshot_config: ScreenshotConfig) -> Tuple[str, str]:
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import logging
|
import logging
|
||||||
import urllib
|
import urllib
|
||||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
from typing import Any, Dict, List, Mapping, Tuple, Union
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
import ujson
|
import ujson
|
||||||
|
@ -24,7 +24,7 @@ class PushNotificationBouncerRetryLaterError(JsonableError):
|
||||||
def send_to_push_bouncer(method: str,
|
def send_to_push_bouncer(method: str,
|
||||||
endpoint: str,
|
endpoint: str,
|
||||||
post_data: Union[str, Dict[str, Any]],
|
post_data: Union[str, Dict[str, Any]],
|
||||||
extra_headers: Optional[Dict[str, Any]]=None) -> Dict[str, Any]:
|
extra_headers: Mapping[str, Any] = {}) -> Dict[str, Any]:
|
||||||
"""While it does actually send the notice, this function has a lot of
|
"""While it does actually send the notice, this function has a lot of
|
||||||
code and comments around error handling for the push notifications
|
code and comments around error handling for the push notifications
|
||||||
bouncer. There are several classes of failures, each with its own
|
bouncer. There are several classes of failures, each with its own
|
||||||
|
@ -47,7 +47,6 @@ def send_to_push_bouncer(method: str,
|
||||||
settings.ZULIP_ORG_KEY)
|
settings.ZULIP_ORG_KEY)
|
||||||
|
|
||||||
headers = {"User-agent": f"ZulipServer/{ZULIP_VERSION}"}
|
headers = {"User-agent": f"ZulipServer/{ZULIP_VERSION}"}
|
||||||
if extra_headers is not None:
|
|
||||||
headers.update(extra_headers)
|
headers.update(extra_headers)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -1,7 +1,19 @@
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from types import FunctionType
|
from types import FunctionType
|
||||||
from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union, cast, overload
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Callable,
|
||||||
|
Dict,
|
||||||
|
Generic,
|
||||||
|
List,
|
||||||
|
Optional,
|
||||||
|
Sequence,
|
||||||
|
TypeVar,
|
||||||
|
Union,
|
||||||
|
cast,
|
||||||
|
overload,
|
||||||
|
)
|
||||||
|
|
||||||
import ujson
|
import ujson
|
||||||
from django.http import HttpRequest, HttpResponse
|
from django.http import HttpRequest, HttpResponse
|
||||||
|
@ -69,7 +81,7 @@ class _REQ(Generic[ResultT]):
|
||||||
argument_type: Optional[str] = None,
|
argument_type: Optional[str] = None,
|
||||||
intentionally_undocumented: bool=False,
|
intentionally_undocumented: bool=False,
|
||||||
documentation_pending: bool=False,
|
documentation_pending: bool=False,
|
||||||
aliases: Optional[List[str]] = None,
|
aliases: Sequence[str] = [],
|
||||||
path_only: bool=False
|
path_only: bool=False
|
||||||
) -> None:
|
) -> None:
|
||||||
"""whence: the name of the request variable that should be used
|
"""whence: the name of the request variable that should be used
|
||||||
|
@ -134,7 +146,7 @@ def REQ(
|
||||||
default: ResultT = ...,
|
default: ResultT = ...,
|
||||||
intentionally_undocumented: bool = ...,
|
intentionally_undocumented: bool = ...,
|
||||||
documentation_pending: bool = ...,
|
documentation_pending: bool = ...,
|
||||||
aliases: Optional[List[str]] = ...,
|
aliases: Sequence[str] = ...,
|
||||||
path_only: bool = ...
|
path_only: bool = ...
|
||||||
) -> ResultT:
|
) -> ResultT:
|
||||||
...
|
...
|
||||||
|
@ -148,7 +160,7 @@ def REQ(
|
||||||
validator: Validator,
|
validator: Validator,
|
||||||
intentionally_undocumented: bool = ...,
|
intentionally_undocumented: bool = ...,
|
||||||
documentation_pending: bool = ...,
|
documentation_pending: bool = ...,
|
||||||
aliases: Optional[List[str]] = ...,
|
aliases: Sequence[str] = ...,
|
||||||
path_only: bool = ...
|
path_only: bool = ...
|
||||||
) -> ResultT:
|
) -> ResultT:
|
||||||
...
|
...
|
||||||
|
@ -162,7 +174,7 @@ def REQ(
|
||||||
str_validator: Optional[Validator] = ...,
|
str_validator: Optional[Validator] = ...,
|
||||||
intentionally_undocumented: bool = ...,
|
intentionally_undocumented: bool = ...,
|
||||||
documentation_pending: bool = ...,
|
documentation_pending: bool = ...,
|
||||||
aliases: Optional[List[str]] = ...,
|
aliases: Sequence[str] = ...,
|
||||||
path_only: bool = ...
|
path_only: bool = ...
|
||||||
) -> str:
|
) -> str:
|
||||||
...
|
...
|
||||||
|
@ -176,7 +188,7 @@ def REQ(
|
||||||
str_validator: Optional[Validator] = ...,
|
str_validator: Optional[Validator] = ...,
|
||||||
intentionally_undocumented: bool = ...,
|
intentionally_undocumented: bool = ...,
|
||||||
documentation_pending: bool = ...,
|
documentation_pending: bool = ...,
|
||||||
aliases: Optional[List[str]] = ...,
|
aliases: Sequence[str] = ...,
|
||||||
path_only: bool = ...
|
path_only: bool = ...
|
||||||
) -> Optional[str]:
|
) -> Optional[str]:
|
||||||
...
|
...
|
||||||
|
@ -191,7 +203,7 @@ def REQ(
|
||||||
argument_type: Literal["body"],
|
argument_type: Literal["body"],
|
||||||
intentionally_undocumented: bool = ...,
|
intentionally_undocumented: bool = ...,
|
||||||
documentation_pending: bool = ...,
|
documentation_pending: bool = ...,
|
||||||
aliases: Optional[List[str]] = ...,
|
aliases: Sequence[str] = ...,
|
||||||
path_only: bool = ...
|
path_only: bool = ...
|
||||||
) -> ResultT:
|
) -> ResultT:
|
||||||
...
|
...
|
||||||
|
@ -207,7 +219,7 @@ def REQ(
|
||||||
argument_type: Optional[str] = None,
|
argument_type: Optional[str] = None,
|
||||||
intentionally_undocumented: bool=False,
|
intentionally_undocumented: bool=False,
|
||||||
documentation_pending: bool=False,
|
documentation_pending: bool=False,
|
||||||
aliases: Optional[List[str]] = None,
|
aliases: Sequence[str] = [],
|
||||||
path_only: bool = False
|
path_only: bool = False
|
||||||
) -> ResultT:
|
) -> ResultT:
|
||||||
return cast(ResultT, _REQ(
|
return cast(ResultT, _REQ(
|
||||||
|
@ -295,7 +307,6 @@ def has_request_variables(view_func: ViewFuncT) -> ViewFuncT:
|
||||||
raise Exception(_("Invalid argument type"))
|
raise Exception(_("Invalid argument type"))
|
||||||
|
|
||||||
post_var_names = [param.post_var_name]
|
post_var_names = [param.post_var_name]
|
||||||
if param.aliases:
|
|
||||||
post_var_names += param.aliases
|
post_var_names += param.aliases
|
||||||
|
|
||||||
default_assigned = False
|
default_assigned = False
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, List, Mapping, Optional
|
||||||
|
|
||||||
import ujson
|
import ujson
|
||||||
from django.http import HttpResponse, HttpResponseNotAllowed
|
from django.http import HttpResponse, HttpResponseNotAllowed
|
||||||
|
@ -37,15 +37,14 @@ def json_method_not_allowed(methods: List[str]) -> HttpResponseNotAllowed:
|
||||||
|
|
||||||
def json_response(res_type: str="success",
|
def json_response(res_type: str="success",
|
||||||
msg: str="",
|
msg: str="",
|
||||||
data: Optional[Dict[str, Any]]=None,
|
data: Mapping[str, Any]={},
|
||||||
status: int=200) -> HttpResponse:
|
status: int=200) -> HttpResponse:
|
||||||
content = {"result": res_type, "msg": msg}
|
content = {"result": res_type, "msg": msg}
|
||||||
if data is not None:
|
|
||||||
content.update(data)
|
content.update(data)
|
||||||
return HttpResponse(content=ujson.dumps(content) + "\n",
|
return HttpResponse(content=ujson.dumps(content) + "\n",
|
||||||
content_type='application/json', status=status)
|
content_type='application/json', status=status)
|
||||||
|
|
||||||
def json_success(data: Optional[Dict[str, Any]]=None) -> HttpResponse:
|
def json_success(data: Mapping[str, Any]={}) -> HttpResponse:
|
||||||
return json_response(data=data)
|
return json_response(data=data)
|
||||||
|
|
||||||
def json_response_from_error(exception: JsonableError) -> HttpResponse:
|
def json_response_from_error(exception: JsonableError) -> HttpResponse:
|
||||||
|
@ -61,5 +60,5 @@ def json_response_from_error(exception: JsonableError) -> HttpResponse:
|
||||||
data=exception.data,
|
data=exception.data,
|
||||||
status=exception.http_status_code)
|
status=exception.http_status_code)
|
||||||
|
|
||||||
def json_error(msg: str, data: Optional[Dict[str, Any]]=None, status: int=400) -> HttpResponse:
|
def json_error(msg: str, data: Mapping[str, Any]={}, status: int=400) -> HttpResponse:
|
||||||
return json_response(res_type="error", msg=msg, data=data, status=status)
|
return json_response(res_type="error", msg=msg, data=data, status=status)
|
||||||
|
|
|
@ -55,7 +55,7 @@ class FromAddress:
|
||||||
def build_email(template_prefix: str, to_user_ids: Optional[List[int]]=None,
|
def build_email(template_prefix: str, to_user_ids: Optional[List[int]]=None,
|
||||||
to_emails: Optional[List[str]]=None, from_name: Optional[str]=None,
|
to_emails: Optional[List[str]]=None, from_name: Optional[str]=None,
|
||||||
from_address: Optional[str]=None, reply_to_email: Optional[str]=None,
|
from_address: Optional[str]=None, reply_to_email: Optional[str]=None,
|
||||||
language: Optional[str]=None, context: Optional[Dict[str, Any]]=None,
|
language: Optional[str]=None, context: Mapping[str, Any]={},
|
||||||
) -> EmailMultiAlternatives:
|
) -> EmailMultiAlternatives:
|
||||||
# Callers should pass exactly one of to_user_id and to_email.
|
# Callers should pass exactly one of to_user_id and to_email.
|
||||||
assert (to_user_ids is None) ^ (to_emails is None)
|
assert (to_user_ids is None) ^ (to_emails is None)
|
||||||
|
@ -63,14 +63,12 @@ def build_email(template_prefix: str, to_user_ids: Optional[List[int]]=None,
|
||||||
to_users = [get_user_profile_by_id(to_user_id) for to_user_id in to_user_ids]
|
to_users = [get_user_profile_by_id(to_user_id) for to_user_id in to_user_ids]
|
||||||
to_emails = [formataddr((to_user.full_name, to_user.delivery_email)) for to_user in to_users]
|
to_emails = [formataddr((to_user.full_name, to_user.delivery_email)) for to_user in to_users]
|
||||||
|
|
||||||
if context is None:
|
context = {
|
||||||
context = {}
|
**context,
|
||||||
|
|
||||||
context.update({
|
|
||||||
'support_email': FromAddress.SUPPORT,
|
'support_email': FromAddress.SUPPORT,
|
||||||
'email_images_base_uri': settings.ROOT_DOMAIN_URI + '/static/images/emails',
|
'email_images_base_uri': settings.ROOT_DOMAIN_URI + '/static/images/emails',
|
||||||
'physical_address': settings.PHYSICAL_ADDRESS,
|
'physical_address': settings.PHYSICAL_ADDRESS,
|
||||||
})
|
}
|
||||||
|
|
||||||
def render_templates() -> Tuple[str, str, str]:
|
def render_templates() -> Tuple[str, str, str]:
|
||||||
email_subject = loader.render_to_string(template_prefix + '.subject.txt',
|
email_subject = loader.render_to_string(template_prefix + '.subject.txt',
|
||||||
|
|
|
@ -1058,18 +1058,18 @@ def test_invalid_stream_error(client: Client) -> None:
|
||||||
|
|
||||||
|
|
||||||
# SETUP METHODS FOLLOW
|
# SETUP METHODS FOLLOW
|
||||||
def test_against_fixture(result: Dict[str, Any], fixture: Dict[str, Any], check_if_equal: Optional[Iterable[str]] = [], check_if_exists: Optional[Iterable[str]] = []) -> None:
|
def test_against_fixture(result: Dict[str, Any], fixture: Dict[str, Any], check_if_equal: Optional[Iterable[str]] = None, check_if_exists: Optional[Iterable[str]] = None) -> None:
|
||||||
assertLength(result, fixture)
|
assertLength(result, fixture)
|
||||||
|
|
||||||
if not check_if_equal and not check_if_exists:
|
if check_if_equal is None and check_if_exists is None:
|
||||||
for key, value in fixture.items():
|
for key, value in fixture.items():
|
||||||
assertEqual(key, result, fixture)
|
assertEqual(key, result, fixture)
|
||||||
|
|
||||||
if check_if_equal:
|
if check_if_equal is not None:
|
||||||
for key in check_if_equal:
|
for key in check_if_equal:
|
||||||
assertEqual(key, result, fixture)
|
assertEqual(key, result, fixture)
|
||||||
|
|
||||||
if check_if_exists:
|
if check_if_exists is not None:
|
||||||
for key in check_if_exists:
|
for key in check_if_exists:
|
||||||
assertIn(key, result)
|
assertIn(key, result)
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, List, Mapping, Optional
|
||||||
|
|
||||||
import markdown
|
import markdown
|
||||||
import markdown.extensions.admonition
|
import markdown.extensions.admonition
|
||||||
|
@ -75,7 +75,7 @@ docs_without_macros = [
|
||||||
@items_tuple_to_dict
|
@items_tuple_to_dict
|
||||||
@register.filter(name='render_markdown_path', is_safe=True)
|
@register.filter(name='render_markdown_path', is_safe=True)
|
||||||
def render_markdown_path(markdown_file_path: str,
|
def render_markdown_path(markdown_file_path: str,
|
||||||
context: Optional[Dict[Any, Any]]=None,
|
context: Mapping[str, Any]={},
|
||||||
pure_markdown: bool=False) -> str:
|
pure_markdown: bool=False) -> str:
|
||||||
"""Given a path to a markdown file, return the rendered html.
|
"""Given a path to a markdown file, return the rendered html.
|
||||||
|
|
||||||
|
@ -83,9 +83,6 @@ def render_markdown_path(markdown_file_path: str,
|
||||||
trusted; it is intended to be used for documentation, not user
|
trusted; it is intended to be used for documentation, not user
|
||||||
data."""
|
data."""
|
||||||
|
|
||||||
if context is None:
|
|
||||||
context = {}
|
|
||||||
|
|
||||||
# We set this global hackishly
|
# We set this global hackishly
|
||||||
from zerver.lib.bugdown.help_settings_links import set_relative_settings_links
|
from zerver.lib.bugdown.help_settings_links import set_relative_settings_links
|
||||||
set_relative_settings_links(bool(context.get('html_settings_links')))
|
set_relative_settings_links(bool(context.get('html_settings_links')))
|
||||||
|
|
|
@ -140,12 +140,10 @@ class AuthBackendTest(ZulipTestCase):
|
||||||
|
|
||||||
return username
|
return username
|
||||||
|
|
||||||
def verify_backend(self, backend: Any, good_kwargs: Optional[Dict[str, Any]]=None, bad_kwargs: Optional[Dict[str, Any]]=None) -> None:
|
def verify_backend(self, backend: Any, *, good_kwargs: Dict[str, Any], bad_kwargs: Optional[Dict[str, Any]]=None) -> None:
|
||||||
clear_supported_auth_backends_cache()
|
clear_supported_auth_backends_cache()
|
||||||
user_profile = self.example_user('hamlet')
|
user_profile = self.example_user('hamlet')
|
||||||
|
|
||||||
assert good_kwargs is not None
|
|
||||||
|
|
||||||
# If bad_kwargs was specified, verify auth fails in that case
|
# If bad_kwargs was specified, verify auth fails in that case
|
||||||
if bad_kwargs is not None:
|
if bad_kwargs is not None:
|
||||||
self.assertIsNone(backend.authenticate(**bad_kwargs))
|
self.assertIsNone(backend.authenticate(**bad_kwargs))
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import random
|
import random
|
||||||
import re
|
import re
|
||||||
from email.utils import formataddr
|
from email.utils import formataddr
|
||||||
from typing import List, Optional
|
from typing import List, Sequence
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
import ldap
|
import ldap
|
||||||
|
@ -221,7 +221,7 @@ class TestMissedMessages(ZulipTestCase):
|
||||||
def _test_cases(self, msg_id: int, verify_body_include: List[str], email_subject: str,
|
def _test_cases(self, msg_id: int, verify_body_include: List[str], email_subject: str,
|
||||||
send_as_user: bool, verify_html_body: bool=False,
|
send_as_user: bool, verify_html_body: bool=False,
|
||||||
show_message_content: bool=True,
|
show_message_content: bool=True,
|
||||||
verify_body_does_not_include: Optional[List[str]]=None,
|
verify_body_does_not_include: Sequence[str]=[],
|
||||||
trigger: str='') -> None:
|
trigger: str='') -> None:
|
||||||
othello = self.example_user('othello')
|
othello = self.example_user('othello')
|
||||||
hamlet = self.example_user('hamlet')
|
hamlet = self.example_user('hamlet')
|
||||||
|
@ -248,7 +248,6 @@ class TestMissedMessages(ZulipTestCase):
|
||||||
else:
|
else:
|
||||||
for text in verify_body_include:
|
for text in verify_body_include:
|
||||||
self.assertIn(text, self.normalize_string(msg.body))
|
self.assertIn(text, self.normalize_string(msg.body))
|
||||||
if verify_body_does_not_include is not None:
|
|
||||||
for text in verify_body_does_not_include:
|
for text in verify_body_does_not_include:
|
||||||
self.assertNotIn(text, self.normalize_string(msg.body))
|
self.assertNotIn(text, self.normalize_string(msg.body))
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import random
|
import random
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from typing import Any, Dict, List, Mapping, Optional, Set, Union
|
from typing import Any, Dict, List, Mapping, Optional, Sequence, Set, Union
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
import ujson
|
import ujson
|
||||||
|
@ -976,7 +976,7 @@ class StreamAdminTest(ZulipTestCase):
|
||||||
def attempt_unsubscribe_of_principal(self, query_count: int, target_users: List[UserProfile],
|
def attempt_unsubscribe_of_principal(self, query_count: int, target_users: List[UserProfile],
|
||||||
is_admin: bool=False, is_subbed: bool=True, invite_only: bool=False,
|
is_admin: bool=False, is_subbed: bool=True, invite_only: bool=False,
|
||||||
target_users_subbed: bool=True, using_legacy_emails: bool=False,
|
target_users_subbed: bool=True, using_legacy_emails: bool=False,
|
||||||
other_sub_users: Optional[List[UserProfile]]=None) -> HttpResponse:
|
other_sub_users: Sequence[UserProfile]=[]) -> HttpResponse:
|
||||||
|
|
||||||
# Set up the main user, who is in most cases an admin.
|
# Set up the main user, who is in most cases an admin.
|
||||||
if is_admin:
|
if is_admin:
|
||||||
|
@ -1004,7 +1004,6 @@ class StreamAdminTest(ZulipTestCase):
|
||||||
if target_users_subbed:
|
if target_users_subbed:
|
||||||
for user in target_users:
|
for user in target_users:
|
||||||
self.subscribe(user, stream_name)
|
self.subscribe(user, stream_name)
|
||||||
if other_sub_users:
|
|
||||||
for user in other_sub_users:
|
for user in other_sub_users:
|
||||||
self.subscribe(user, stream_name)
|
self.subscribe(user, stream_name)
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
# System documented in https://zulip.readthedocs.io/en/latest/subsystems/logging.html
|
# System documented in https://zulip.readthedocs.io/en/latest/subsystems/logging.html
|
||||||
import logging
|
import logging
|
||||||
import subprocess
|
import subprocess
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Mapping, Optional
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.http import HttpRequest, HttpResponse
|
from django.http import HttpRequest, HttpResponse
|
||||||
|
@ -89,14 +89,13 @@ def report_unnarrow_times(request: HttpRequest, user_profile: UserProfile,
|
||||||
def report_error(request: HttpRequest, user_profile: UserProfile, message: str=REQ(),
|
def report_error(request: HttpRequest, user_profile: UserProfile, message: str=REQ(),
|
||||||
stacktrace: str=REQ(), ui_message: bool=REQ(validator=check_bool),
|
stacktrace: str=REQ(), ui_message: bool=REQ(validator=check_bool),
|
||||||
user_agent: str=REQ(), href: str=REQ(), log: str=REQ(),
|
user_agent: str=REQ(), href: str=REQ(), log: str=REQ(),
|
||||||
more_info: Optional[Dict[str, Any]]=REQ(validator=check_dict([]), default=None),
|
more_info: Mapping[str, Any]=REQ(validator=check_dict([]), default={}),
|
||||||
) -> HttpResponse:
|
) -> HttpResponse:
|
||||||
"""Accepts an error report and stores in a queue for processing. The
|
"""Accepts an error report and stores in a queue for processing. The
|
||||||
actual error reports are later handled by do_report_error"""
|
actual error reports are later handled by do_report_error"""
|
||||||
if not settings.BROWSER_ERROR_REPORTING:
|
if not settings.BROWSER_ERROR_REPORTING:
|
||||||
return json_success()
|
return json_success()
|
||||||
if more_info is None:
|
more_info = dict(more_info)
|
||||||
more_info = {}
|
|
||||||
|
|
||||||
js_source_map = get_js_source_map()
|
js_source_map = get_js_source_map()
|
||||||
if js_source_map:
|
if js_source_map:
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from typing import Any, Dict, Mapping, Optional, Tuple
|
from typing import Any, Mapping, Optional, Tuple
|
||||||
|
|
||||||
from zerver.lib.exceptions import UnexpectedWebhookEventType
|
from zerver.lib.exceptions import UnexpectedWebhookEventType
|
||||||
|
|
||||||
|
@ -71,9 +71,10 @@ def get_change_name_body(payload: Mapping[str, Any], action_type: str) -> str:
|
||||||
|
|
||||||
def fill_appropriate_message_content(payload: Mapping[str, Any],
|
def fill_appropriate_message_content(payload: Mapping[str, Any],
|
||||||
action_type: str,
|
action_type: str,
|
||||||
data: Optional[Dict[str, Any]]=None) -> str:
|
data: Mapping[str, Any] = {}) -> str:
|
||||||
data = {} if data is None else data
|
data = dict(data)
|
||||||
data['board_url_template'] = data.get('board_url_template', get_filled_board_url_template(payload))
|
if 'board_url_template' not in data:
|
||||||
|
data['board_url_template'] = get_filled_board_url_template(payload)
|
||||||
message_body = get_message_body(action_type)
|
message_body = get_message_body(action_type)
|
||||||
return message_body.format(**data)
|
return message_body.format(**data)
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from typing import Any, Dict, Mapping, Optional, Tuple
|
from typing import Any, Mapping, Optional, Tuple
|
||||||
|
|
||||||
from zerver.lib.exceptions import UnexpectedWebhookEventType
|
from zerver.lib.exceptions import UnexpectedWebhookEventType
|
||||||
|
|
||||||
|
@ -205,9 +205,10 @@ def get_body_by_action_type_without_data(payload: Mapping[str, Any], action_type
|
||||||
|
|
||||||
def fill_appropriate_message_content(payload: Mapping[str, Any],
|
def fill_appropriate_message_content(payload: Mapping[str, Any],
|
||||||
action_type: str,
|
action_type: str,
|
||||||
data: Optional[Dict[str, Any]]=None) -> str:
|
data: Mapping[str, Any] = {}) -> str:
|
||||||
data = {} if data is None else data
|
data = dict(data)
|
||||||
data['card_url_template'] = data.get('card_url_template', get_filled_card_url_template(payload))
|
if 'card_url_template' not in data:
|
||||||
|
data['card_url_template'] = get_filled_card_url_template(payload)
|
||||||
message_body = get_message_body(action_type)
|
message_body = get_message_body(action_type)
|
||||||
return message_body.format(**data)
|
return message_body.format(**data)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue