mirror of https://github.com/zulip/zulip.git
views: Change use of typing.Text to str.
This is the first part of a general migration of our typing codebase to use the simpler `str` for strings.
This commit is contained in:
parent
2f937d81e2
commit
79e8bff8fa
|
@ -1,7 +1,7 @@
|
|||
|
||||
from django.http import HttpResponse, HttpRequest
|
||||
|
||||
from typing import List, Text
|
||||
from typing import List
|
||||
from zerver.models import UserProfile
|
||||
|
||||
from zerver.lib.request import has_request_variables, REQ
|
||||
|
@ -14,20 +14,20 @@ from zerver.lib.alert_words import user_alert_words
|
|||
def list_alert_words(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
|
||||
return json_success({'alert_words': user_alert_words(user_profile)})
|
||||
|
||||
def clean_alert_words(alert_words: List[Text]) -> List[Text]:
|
||||
def clean_alert_words(alert_words: List[str]) -> List[str]:
|
||||
alert_words = [w.strip() for w in alert_words]
|
||||
return [w for w in alert_words if w != ""]
|
||||
|
||||
@has_request_variables
|
||||
def add_alert_words(request: HttpRequest, user_profile: UserProfile,
|
||||
alert_words: List[Text]=REQ(validator=check_list(check_string), default=[])
|
||||
alert_words: List[str]=REQ(validator=check_list(check_string), default=[])
|
||||
) -> HttpResponse:
|
||||
do_add_alert_words(user_profile, clean_alert_words(alert_words))
|
||||
return json_success()
|
||||
|
||||
@has_request_variables
|
||||
def remove_alert_words(request: HttpRequest, user_profile: UserProfile,
|
||||
alert_words: List[Text]=REQ(validator=check_list(check_string), default=[])
|
||||
alert_words: List[str]=REQ(validator=check_list(check_string), default=[])
|
||||
) -> HttpResponse:
|
||||
do_remove_alert_words(user_profile, alert_words)
|
||||
return json_success()
|
||||
|
|
|
@ -19,7 +19,7 @@ from django.utils.translation import ugettext as _
|
|||
from django.utils.http import is_safe_url
|
||||
from django.core import signing
|
||||
import urllib
|
||||
from typing import Any, Dict, List, Optional, Tuple, Text
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from confirmation.models import Confirmation, create_confirmation_link
|
||||
from zerver.context_processors import zulip_default_context, get_realm_from_request
|
||||
|
@ -48,14 +48,14 @@ import requests
|
|||
import time
|
||||
import ujson
|
||||
|
||||
def get_safe_redirect_to(url: Text, redirect_host: Text) -> Text:
|
||||
def get_safe_redirect_to(url: str, redirect_host: str) -> str:
|
||||
is_url_safe = is_safe_url(url=url, host=redirect_host)
|
||||
if is_url_safe:
|
||||
return urllib.parse.urljoin(redirect_host, url)
|
||||
else:
|
||||
return redirect_host
|
||||
|
||||
def create_preregistration_user(email: Text, request: HttpRequest, realm_creation: bool=False,
|
||||
def create_preregistration_user(email: str, request: HttpRequest, realm_creation: bool=False,
|
||||
password_required: bool=True) -> HttpResponse:
|
||||
realm = None
|
||||
if not realm_creation:
|
||||
|
@ -65,7 +65,7 @@ def create_preregistration_user(email: Text, request: HttpRequest, realm_creatio
|
|||
password_required=password_required,
|
||||
realm=realm)
|
||||
|
||||
def maybe_send_to_registration(request: HttpRequest, email: Text, full_name: Text='',
|
||||
def maybe_send_to_registration(request: HttpRequest, email: str, full_name: str='',
|
||||
is_signup: bool=False, password_required: bool=True) -> HttpResponse:
|
||||
realm = get_realm(get_subdomain(request))
|
||||
from_multiuse_invite = False
|
||||
|
@ -130,11 +130,11 @@ def redirect_to_subdomain_login_url() -> HttpResponseRedirect:
|
|||
def redirect_to_config_error(error_type: str) -> HttpResponseRedirect:
|
||||
return HttpResponseRedirect("/config-error/%s" % (error_type,))
|
||||
|
||||
def login_or_register_remote_user(request: HttpRequest, remote_username: Optional[Text],
|
||||
user_profile: Optional[UserProfile], full_name: Text='',
|
||||
def login_or_register_remote_user(request: HttpRequest, remote_username: Optional[str],
|
||||
user_profile: Optional[UserProfile], full_name: str='',
|
||||
invalid_subdomain: bool=False, mobile_flow_otp: Optional[str]=None,
|
||||
is_signup: bool=False,
|
||||
redirect_to: Text='') -> HttpResponse:
|
||||
redirect_to: str='') -> HttpResponse:
|
||||
if user_profile is None or user_profile.is_mirror_dummy:
|
||||
# We have verified the user controls an email address
|
||||
# (remote_username) but there's no associated Zulip user
|
||||
|
@ -268,7 +268,7 @@ def google_oauth2_csrf(request: HttpRequest, value: str) -> str:
|
|||
def reverse_on_root(viewname: str, args: List[str]=None, kwargs: Dict[str, str]=None) -> str:
|
||||
return settings.ROOT_DOMAIN_URI + reverse(viewname, args=args, kwargs=kwargs)
|
||||
|
||||
def oauth_redirect_to_root(request: HttpRequest, url: Text, is_signup: bool=False) -> HttpResponse:
|
||||
def oauth_redirect_to_root(request: HttpRequest, url: str, is_signup: bool=False) -> HttpResponse:
|
||||
main_site_uri = settings.ROOT_DOMAIN_URI + url
|
||||
params = {
|
||||
'subdomain': get_subdomain(request),
|
||||
|
@ -298,7 +298,7 @@ def start_google_oauth2(request: HttpRequest) -> HttpResponse:
|
|||
is_signup = bool(request.GET.get('is_signup'))
|
||||
return oauth_redirect_to_root(request, url, is_signup=is_signup)
|
||||
|
||||
def start_social_login(request: HttpRequest, backend: Text) -> HttpResponse:
|
||||
def start_social_login(request: HttpRequest, backend: str) -> HttpResponse:
|
||||
backend_url = reverse('social:begin', args=[backend])
|
||||
if (backend == "github") and not (settings.SOCIAL_AUTH_GITHUB_KEY and
|
||||
settings.SOCIAL_AUTH_GITHUB_SECRET):
|
||||
|
@ -306,7 +306,7 @@ def start_social_login(request: HttpRequest, backend: Text) -> HttpResponse:
|
|||
|
||||
return oauth_redirect_to_root(request, backend_url)
|
||||
|
||||
def start_social_signup(request: HttpRequest, backend: Text) -> HttpResponse:
|
||||
def start_social_signup(request: HttpRequest, backend: str) -> HttpResponse:
|
||||
backend_url = reverse('social:begin', args=[backend])
|
||||
return oauth_redirect_to_root(request, backend_url, is_signup=True)
|
||||
|
||||
|
@ -444,7 +444,7 @@ def authenticate_remote_user(realm: Realm, email_address: str) -> Tuple[UserProf
|
|||
_subdomain_token_salt = 'zerver.views.auth.log_into_subdomain'
|
||||
|
||||
@log_view_func
|
||||
def log_into_subdomain(request: HttpRequest, token: Text) -> HttpResponse:
|
||||
def log_into_subdomain(request: HttpRequest, token: str) -> HttpResponse:
|
||||
try:
|
||||
data = signing.loads(token, salt=_subdomain_token_salt, max_age=15)
|
||||
except signing.SignatureExpired as e:
|
||||
|
@ -481,8 +481,8 @@ def log_into_subdomain(request: HttpRequest, token: Text) -> HttpResponse:
|
|||
full_name, invalid_subdomain=invalid_subdomain,
|
||||
is_signup=is_signup, redirect_to=redirect_to)
|
||||
|
||||
def redirect_and_log_into_subdomain(realm: Realm, full_name: Text, email_address: Text,
|
||||
is_signup: bool=False, redirect_to: Text='') -> HttpResponse:
|
||||
def redirect_and_log_into_subdomain(realm: Realm, full_name: str, email_address: str,
|
||||
is_signup: bool=False, redirect_to: str='') -> HttpResponse:
|
||||
data = {'name': full_name, 'email': email_address, 'subdomain': realm.subdomain,
|
||||
'is_signup': is_signup, 'next': redirect_to}
|
||||
token = signing.dumps(data, salt=_subdomain_token_salt)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
|
||||
from django.http import HttpResponse, HttpRequest
|
||||
from typing import Any, List, Dict, Optional, Text
|
||||
from typing import Any, List, Dict, Optional
|
||||
|
||||
from zerver.lib.response import json_error, json_success
|
||||
from zerver.lib.user_agent import parse_user_agent
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
from typing import Text, Union, List, Dict, Optional
|
||||
from typing import Union, List, Dict, Optional
|
||||
import logging
|
||||
|
||||
from django.core.exceptions import ValidationError
|
||||
|
@ -28,8 +28,8 @@ hint_validator = check_capped_string(CustomProfileField.HINT_MAX_LENGTH)
|
|||
@require_realm_admin
|
||||
@has_request_variables
|
||||
def create_realm_custom_profile_field(request: HttpRequest,
|
||||
user_profile: UserProfile, name: Text=REQ(),
|
||||
hint: Text=REQ(default=''),
|
||||
user_profile: UserProfile, name: str=REQ(),
|
||||
hint: str=REQ(default=''),
|
||||
field_type: int=REQ(validator=check_int)) -> HttpResponse:
|
||||
if not name.strip():
|
||||
return json_error(_("Name cannot be blank."))
|
||||
|
@ -67,8 +67,8 @@ def delete_realm_custom_profile_field(request: HttpRequest, user_profile: UserPr
|
|||
@require_realm_admin
|
||||
@has_request_variables
|
||||
def update_realm_custom_profile_field(request: HttpRequest, user_profile: UserProfile,
|
||||
field_id: int, name: Text=REQ(),
|
||||
hint: Text=REQ(default='')
|
||||
field_id: int, name: str=REQ(),
|
||||
hint: str=REQ(default='')
|
||||
) -> HttpResponse:
|
||||
if not name.strip():
|
||||
return json_error(_("Name cannot be blank."))
|
||||
|
@ -94,7 +94,7 @@ def update_realm_custom_profile_field(request: HttpRequest, user_profile: UserPr
|
|||
def update_user_custom_profile_data(
|
||||
request: HttpRequest,
|
||||
user_profile: UserProfile,
|
||||
data: List[Dict[str, Union[int, Text]]]=REQ(validator=check_list(
|
||||
data: List[Dict[str, Union[int, str]]]=REQ(validator=check_list(
|
||||
check_dict([('id', check_int)])))) -> HttpResponse:
|
||||
for item in data:
|
||||
field_id = item['id']
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from typing import Iterable, Optional, Sequence, Text
|
||||
from typing import Iterable, Optional, Sequence
|
||||
|
||||
from zerver.lib.events import do_events_register
|
||||
from zerver.lib.request import REQ, has_request_variables
|
||||
|
@ -16,13 +16,13 @@ def _default_all_public_streams(user_profile: UserProfile,
|
|||
return user_profile.default_all_public_streams
|
||||
|
||||
def _default_narrow(user_profile: UserProfile,
|
||||
narrow: Iterable[Sequence[Text]]) -> Iterable[Sequence[Text]]:
|
||||
narrow: Iterable[Sequence[str]]) -> Iterable[Sequence[str]]:
|
||||
default_stream = user_profile.default_events_register_stream # type: Optional[Stream]
|
||||
if not narrow and default_stream is not None:
|
||||
narrow = [['stream', default_stream.name]]
|
||||
return narrow
|
||||
|
||||
NarrowT = Iterable[Sequence[Text]]
|
||||
NarrowT = Iterable[Sequence[str]]
|
||||
@has_request_variables
|
||||
def events_register_backend(
|
||||
request: HttpRequest, user_profile: UserProfile,
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Any, List, Dict, Optional, Text, Iterator
|
||||
from typing import Any, List, Dict, Optional, Iterator
|
||||
|
||||
from django.conf import settings
|
||||
from django.urls import reverse
|
||||
|
@ -64,7 +64,7 @@ def sent_time_in_epoch_seconds(user_message: Optional[UserMessage]) -> Optional[
|
|||
# Return the epoch seconds in UTC.
|
||||
return calendar.timegm(user_message.message.pub_date.utctimetuple())
|
||||
|
||||
def get_bot_types(user_profile: UserProfile) -> List[Dict[Text, object]]:
|
||||
def get_bot_types(user_profile: UserProfile) -> List[Dict[str, object]]:
|
||||
bot_types = []
|
||||
for type_id, name in UserProfile.BOT_TYPES.items():
|
||||
bot_types.append({
|
||||
|
@ -105,7 +105,7 @@ def home_real(request: HttpRequest) -> HttpResponse:
|
|||
int(settings.TOS_VERSION.split('.')[0]) > user_profile.major_tos_version():
|
||||
return accounts_accept_terms(request)
|
||||
|
||||
narrow = [] # type: List[List[Text]]
|
||||
narrow = [] # type: List[List[str]]
|
||||
narrow_stream = None
|
||||
narrow_topic = request.GET.get("topic")
|
||||
if request.GET.get("stream"):
|
||||
|
@ -260,7 +260,7 @@ def home_real(request: HttpRequest) -> HttpResponse:
|
|||
def desktop_home(request: HttpRequest) -> HttpResponse:
|
||||
return HttpResponseRedirect(reverse('zerver.views.home.home'))
|
||||
|
||||
def apps_view(request: HttpRequest, _: Text) -> HttpResponse:
|
||||
def apps_view(request: HttpRequest, _: str) -> HttpResponse:
|
||||
if settings.ZILENCER_ENABLED:
|
||||
return render(request, 'zerver/apps.html')
|
||||
return HttpResponseRedirect('https://zulipchat.com/apps/', status=301)
|
||||
|
|
|
@ -3,7 +3,7 @@ from django.conf import settings
|
|||
from django.core.exceptions import ValidationError
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.utils.translation import ugettext as _
|
||||
from typing import List, Optional, Set, Text
|
||||
from typing import List, Optional, Set
|
||||
|
||||
from zerver.decorator import require_realm_admin, to_non_negative_int
|
||||
from zerver.lib.actions import do_invite_users, do_revoke_user_invite, do_resend_user_invite_email, \
|
||||
|
|
|
@ -6,7 +6,7 @@ from django.core import validators
|
|||
from django.core.exceptions import ValidationError
|
||||
from django.db import connection
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from typing import Dict, List, Set, Text, Any, Callable, Iterable, \
|
||||
from typing import Dict, List, Set, Any, Callable, Iterable, \
|
||||
Optional, Tuple, Union, Sequence
|
||||
from zerver.lib.exceptions import JsonableError, ErrorCode
|
||||
from zerver.lib.html_diff import highlight_html_differences
|
||||
|
@ -174,7 +174,7 @@ class NarrowBuilder:
|
|||
_alphanum = frozenset(
|
||||
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
|
||||
|
||||
def _pg_re_escape(self, pattern: Text) -> Text:
|
||||
def _pg_re_escape(self, pattern: str) -> str:
|
||||
"""
|
||||
Escape user input to place in a regex
|
||||
|
||||
|
@ -397,7 +397,7 @@ class NarrowBuilder:
|
|||
# The offsets we get from PGroonga are counted in characters
|
||||
# whereas the offsets from tsearch_extras are in bytes, so we
|
||||
# have to account for both cases in the logic below.
|
||||
def highlight_string(text: Text, locs: Iterable[Tuple[int, int]]) -> Text:
|
||||
def highlight_string(text: str, locs: Iterable[Tuple[int, int]]) -> str:
|
||||
highlight_start = '<span class="highlight">'
|
||||
highlight_stop = '</span>'
|
||||
pos = 0
|
||||
|
@ -446,8 +446,8 @@ def highlight_string(text: Text, locs: Iterable[Tuple[int, int]]) -> Text:
|
|||
result += final_frag
|
||||
return result
|
||||
|
||||
def get_search_fields(rendered_content: Text, subject: Text, content_matches: Iterable[Tuple[int, int]],
|
||||
subject_matches: Iterable[Tuple[int, int]]) -> Dict[str, Text]:
|
||||
def get_search_fields(rendered_content: str, subject: str, content_matches: Iterable[Tuple[int, int]],
|
||||
subject_matches: Iterable[Tuple[int, int]]) -> Dict[str, str]:
|
||||
return dict(match_content=highlight_string(rendered_content, content_matches),
|
||||
match_subject=highlight_string(escape_html(subject), subject_matches))
|
||||
|
||||
|
@ -465,7 +465,7 @@ def narrow_parameter(json: str) -> Optional[List[Dict[str, Any]]]:
|
|||
# We have to support a legacy tuple format.
|
||||
if isinstance(elem, list):
|
||||
if (len(elem) != 2 or
|
||||
any(not isinstance(x, str) and not isinstance(x, Text)
|
||||
any(not isinstance(x, str) and not isinstance(x, str)
|
||||
for x in elem)):
|
||||
raise ValueError("element is not a string pair")
|
||||
return dict(operator=elem[0], operand=elem[1])
|
||||
|
@ -517,7 +517,7 @@ def ok_to_include_history(narrow: Optional[Iterable[Dict[str, Any]]], user_profi
|
|||
|
||||
return include_history
|
||||
|
||||
def get_stream_name_from_narrow(narrow: Optional[Iterable[Dict[str, Any]]]) -> Optional[Text]:
|
||||
def get_stream_name_from_narrow(narrow: Optional[Iterable[Dict[str, Any]]]) -> Optional[str]:
|
||||
if narrow is not None:
|
||||
for term in narrow:
|
||||
if term['operator'] == 'stream':
|
||||
|
@ -804,7 +804,7 @@ def get_messages_backend(request: HttpRequest, user_profile: UserProfile,
|
|||
user_message_flags[message_id] = UserMessage.flags_list_for_flags(flags)
|
||||
message_ids.append(message_id)
|
||||
|
||||
search_fields = dict() # type: Dict[int, Dict[str, Text]]
|
||||
search_fields = dict() # type: Dict[int, Dict[str, str]]
|
||||
if is_search:
|
||||
for row in rows:
|
||||
message_id = row[0]
|
||||
|
@ -966,7 +966,7 @@ def post_process_limited_query(rows: List[Any],
|
|||
@has_request_variables
|
||||
def update_message_flags(request: HttpRequest, user_profile: UserProfile,
|
||||
messages: List[int]=REQ(validator=check_list(check_int)),
|
||||
operation: Text=REQ('op'), flag: Text=REQ()) -> HttpResponse:
|
||||
operation: str=REQ('op'), flag: str=REQ()) -> HttpResponse:
|
||||
|
||||
count = do_update_message_flags(user_profile, operation, flag, messages)
|
||||
|
||||
|
@ -1005,7 +1005,7 @@ def mark_stream_as_read(request: HttpRequest,
|
|||
def mark_topic_as_read(request: HttpRequest,
|
||||
user_profile: UserProfile,
|
||||
stream_id: int=REQ(validator=check_int),
|
||||
topic_name: Text=REQ()) -> HttpResponse:
|
||||
topic_name: str=REQ()) -> HttpResponse:
|
||||
stream, recipient, sub = access_stream_by_id(user_profile, stream_id)
|
||||
|
||||
if topic_name:
|
||||
|
@ -1024,7 +1024,7 @@ def mark_topic_as_read(request: HttpRequest,
|
|||
'msg': ''})
|
||||
|
||||
def create_mirrored_message_users(request: HttpRequest, user_profile: UserProfile,
|
||||
recipients: Iterable[Text]) -> Tuple[bool, Optional[UserProfile]]:
|
||||
recipients: Iterable[str]) -> Tuple[bool, Optional[UserProfile]]:
|
||||
if "sender" not in request.POST:
|
||||
return (False, None)
|
||||
|
||||
|
@ -1059,7 +1059,7 @@ def create_mirrored_message_users(request: HttpRequest, user_profile: UserProfil
|
|||
sender = get_user_including_cross_realm(sender_email, user_profile.realm)
|
||||
return (True, sender)
|
||||
|
||||
def same_realm_zephyr_user(user_profile: UserProfile, email: Text) -> bool:
|
||||
def same_realm_zephyr_user(user_profile: UserProfile, email: str) -> bool:
|
||||
#
|
||||
# Are the sender and recipient both addresses in the same Zephyr
|
||||
# mirroring realm? We have to handle this specially, inferring
|
||||
|
@ -1078,7 +1078,7 @@ def same_realm_zephyr_user(user_profile: UserProfile, email: Text) -> bool:
|
|||
return user_profile.realm.is_zephyr_mirror_realm and \
|
||||
RealmDomain.objects.filter(realm=user_profile.realm, domain=domain).exists()
|
||||
|
||||
def same_realm_irc_user(user_profile: UserProfile, email: Text) -> bool:
|
||||
def same_realm_irc_user(user_profile: UserProfile, email: str) -> bool:
|
||||
# Check whether the target email address is an IRC user in the
|
||||
# same realm as user_profile, i.e. if the domain were example.com,
|
||||
# the IRC user would need to be username@irc.example.com
|
||||
|
@ -1093,7 +1093,7 @@ def same_realm_irc_user(user_profile: UserProfile, email: Text) -> bool:
|
|||
# these realms.
|
||||
return RealmDomain.objects.filter(realm=user_profile.realm, domain=domain).exists()
|
||||
|
||||
def same_realm_jabber_user(user_profile: UserProfile, email: Text) -> bool:
|
||||
def same_realm_jabber_user(user_profile: UserProfile, email: str) -> bool:
|
||||
try:
|
||||
validators.validate_email(email)
|
||||
except ValidationError:
|
||||
|
@ -1108,10 +1108,10 @@ def same_realm_jabber_user(user_profile: UserProfile, email: Text) -> bool:
|
|||
return RealmDomain.objects.filter(realm=user_profile.realm, domain=domain).exists()
|
||||
|
||||
def handle_deferred_message(sender: UserProfile, client: Client,
|
||||
message_type_name: Text, message_to: Sequence[Text],
|
||||
topic_name: Optional[Text],
|
||||
message_content: Text, delivery_type: Text,
|
||||
defer_until: Text, tz_guess: Text,
|
||||
message_type_name: str, message_to: Sequence[str],
|
||||
topic_name: Optional[str],
|
||||
message_content: str, delivery_type: str,
|
||||
defer_until: str, tz_guess: str,
|
||||
forwarder_user_profile: UserProfile,
|
||||
realm: Optional[Realm]) -> HttpResponse:
|
||||
deliver_at = None
|
||||
|
@ -1147,18 +1147,18 @@ def handle_deferred_message(sender: UserProfile, client: Client,
|
|||
# the user is logged in.
|
||||
@has_request_variables
|
||||
def send_message_backend(request: HttpRequest, user_profile: UserProfile,
|
||||
message_type_name: Text=REQ('type'),
|
||||
message_to: List[Text]=REQ('to', converter=extract_recipients, default=[]),
|
||||
message_type_name: str=REQ('type'),
|
||||
message_to: List[str]=REQ('to', converter=extract_recipients, default=[]),
|
||||
forged: bool=REQ(default=False),
|
||||
topic_name: Optional[Text]= REQ('subject',
|
||||
converter=lambda x: x.strip(), default=None),
|
||||
message_content: Text=REQ('content'),
|
||||
realm_str: Optional[Text]=REQ('realm_str', default=None),
|
||||
local_id: Optional[Text]=REQ(default=None),
|
||||
queue_id: Optional[Text]=REQ(default=None),
|
||||
delivery_type: Optional[Text]=REQ('delivery_type', default='send_now'),
|
||||
defer_until: Optional[Text]=REQ('deliver_at', default=None),
|
||||
tz_guess: Optional[Text]=REQ('tz_guess', default=None)) -> HttpResponse:
|
||||
topic_name: Optional[str]=REQ('subject',
|
||||
converter=lambda x: x.strip(), default=None),
|
||||
message_content: str=REQ('content'),
|
||||
realm_str: Optional[str]=REQ('realm_str', default=None),
|
||||
local_id: Optional[str]=REQ(default=None),
|
||||
queue_id: Optional[str]=REQ(default=None),
|
||||
delivery_type: Optional[str]=REQ('delivery_type', default='send_now'),
|
||||
defer_until: Optional[str]=REQ('deliver_at', default=None),
|
||||
tz_guess: Optional[str]=REQ('tz_guess', default=None)) -> HttpResponse:
|
||||
client = request.client
|
||||
is_super_user = request.user.is_api_super_user
|
||||
if forged and not is_super_user:
|
||||
|
@ -1278,9 +1278,9 @@ def get_message_edit_history(request: HttpRequest, user_profile: UserProfile,
|
|||
@has_request_variables
|
||||
def update_message_backend(request: HttpRequest, user_profile: UserMessage,
|
||||
message_id: int=REQ(converter=to_non_negative_int),
|
||||
subject: Optional[Text]=REQ(default=None),
|
||||
subject: Optional[str]=REQ(default=None),
|
||||
propagate_mode: Optional[str]=REQ(default="change_one"),
|
||||
content: Optional[Text]=REQ(default=None)) -> HttpResponse:
|
||||
content: Optional[str]=REQ(default=None)) -> HttpResponse:
|
||||
if not user_profile.realm.allow_message_editing:
|
||||
return json_error(_("Your organization has turned off message editing"))
|
||||
|
||||
|
@ -1328,7 +1328,7 @@ def update_message_backend(request: HttpRequest, user_profile: UserMessage,
|
|||
if subject == "":
|
||||
raise JsonableError(_("Topic can't be empty"))
|
||||
rendered_content = None
|
||||
links_for_embed = set() # type: Set[Text]
|
||||
links_for_embed = set() # type: Set[str]
|
||||
prior_mention_user_ids = set() # type: Set[int]
|
||||
mention_user_ids = set() # type: Set[int]
|
||||
if content is not None:
|
||||
|
@ -1391,7 +1391,7 @@ def json_fetch_raw_message(request: HttpRequest, user_profile: UserProfile,
|
|||
|
||||
@has_request_variables
|
||||
def render_message_backend(request: HttpRequest, user_profile: UserProfile,
|
||||
content: Text=REQ()) -> HttpResponse:
|
||||
content: str=REQ()) -> HttpResponse:
|
||||
message = Message()
|
||||
message.sender = user_profile
|
||||
message.content = content
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
|
||||
from django.http import HttpResponse, HttpRequest
|
||||
from typing import List, Text
|
||||
from typing import List
|
||||
|
||||
import ujson
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.utils.translation import ugettext as _
|
||||
from typing import Text
|
||||
|
||||
from zerver.decorator import to_non_negative_int
|
||||
from zerver.lib.actions import do_update_pointer
|
||||
|
|
|
@ -3,7 +3,7 @@ import datetime
|
|||
import time
|
||||
|
||||
from django.conf import settings
|
||||
from typing import Any, Dict, Text
|
||||
from typing import Any, Dict
|
||||
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.utils.timezone import now as timezone_now
|
||||
|
@ -22,7 +22,7 @@ def get_status_list(requesting_user_profile: UserProfile) -> Dict[str, Any]:
|
|||
'server_timestamp': time.time()}
|
||||
|
||||
def get_presence_backend(request: HttpRequest, user_profile: UserProfile,
|
||||
email: Text) -> HttpResponse:
|
||||
email: str) -> HttpResponse:
|
||||
try:
|
||||
target = get_user(email, user_profile.realm)
|
||||
except UserProfile.DoesNotExist:
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
import requests
|
||||
import json
|
||||
|
||||
from typing import Optional, Text
|
||||
from typing import Optional
|
||||
|
||||
from django.conf import settings
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.utils.translation import ugettext as _
|
||||
from typing import Text
|
||||
|
||||
from zerver.decorator import \
|
||||
has_request_variables, REQ, to_non_negative_int
|
||||
|
@ -98,7 +97,7 @@ def remove_reaction(request: HttpRequest, user_profile: UserProfile, message_id:
|
|||
|
||||
@has_request_variables
|
||||
def add_reaction_legacy(request: HttpRequest, user_profile: UserProfile,
|
||||
message_id: int, emoji_name: Text) -> HttpResponse:
|
||||
message_id: int, emoji_name: str) -> HttpResponse:
|
||||
|
||||
# access_message will throw a JsonableError exception if the user
|
||||
# cannot see the message (e.g. for messages to private streams).
|
||||
|
@ -122,7 +121,7 @@ def add_reaction_legacy(request: HttpRequest, user_profile: UserProfile,
|
|||
|
||||
@has_request_variables
|
||||
def remove_reaction_legacy(request: HttpRequest, user_profile: UserProfile,
|
||||
message_id: int, emoji_name: Text) -> HttpResponse:
|
||||
message_id: int, emoji_name: str) -> HttpResponse:
|
||||
|
||||
# access_message will throw a JsonableError exception if the user
|
||||
# cannot see the message (e.g. for messages to private streams).
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
from typing import Any, Dict, Optional, List, Text
|
||||
from typing import Any, Dict, Optional, List
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.utils.translation import ugettext as _
|
||||
from django.conf import settings
|
||||
|
@ -55,8 +55,8 @@ def update_realm(
|
|||
send_welcome_emails: Optional[bool]=REQ(validator=check_bool, default=None),
|
||||
bot_creation_policy: Optional[int]=REQ(converter=to_not_negative_int_or_none, default=None),
|
||||
default_twenty_four_hour_time: Optional[bool]=REQ(validator=check_bool, default=None),
|
||||
video_chat_provider: Optional[Text]=REQ(validator=check_string, default=None),
|
||||
google_hangouts_domain: Optional[Text]=REQ(validator=check_string, default=None)
|
||||
video_chat_provider: Optional[str]=REQ(validator=check_string, default=None),
|
||||
google_hangouts_domain: Optional[str]=REQ(validator=check_string, default=None)
|
||||
) -> HttpResponse:
|
||||
realm = user_profile.realm
|
||||
|
||||
|
@ -91,7 +91,7 @@ def update_realm(
|
|||
for k, v in list(req_vars.items()):
|
||||
if v is not None and getattr(realm, k) != v:
|
||||
do_set_realm_property(realm, k, v)
|
||||
if isinstance(v, Text):
|
||||
if isinstance(v, str):
|
||||
data[k] = 'updated'
|
||||
else:
|
||||
data[k] = v
|
||||
|
@ -123,7 +123,7 @@ def update_realm(
|
|||
data['message_content_edit_limit_seconds'] = message_content_edit_limit_seconds
|
||||
data['allow_community_topic_editing'] = allow_community_topic_editing
|
||||
# Realm.notifications_stream and Realm.signup_notifications_stream are not boolean,
|
||||
# Text or integer field, and thus doesn't fit into the do_set_realm_property framework.
|
||||
# str or integer field, and thus doesn't fit into the do_set_realm_property framework.
|
||||
if notifications_stream_id is not None:
|
||||
if realm.notifications_stream is None or (realm.notifications_stream.id !=
|
||||
notifications_stream_id):
|
||||
|
@ -156,7 +156,7 @@ def deactivate_realm(request: HttpRequest, user_profile: UserProfile) -> HttpRes
|
|||
return json_success()
|
||||
|
||||
@require_GET
|
||||
def check_subdomain_available(request: HttpRequest, subdomain: Text) -> HttpResponse:
|
||||
def check_subdomain_available(request: HttpRequest, subdomain: str) -> HttpResponse:
|
||||
try:
|
||||
check_subdomain(subdomain)
|
||||
return json_success({"msg": "available"})
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.utils.translation import ugettext as _
|
||||
|
@ -12,7 +11,6 @@ from zerver.lib.response import json_error, json_success
|
|||
from zerver.lib.validator import check_bool, check_string
|
||||
from zerver.models import RealmDomain, UserProfile, get_realm_domains
|
||||
|
||||
from typing import Text
|
||||
|
||||
def list_realm_domains(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
|
||||
domains = get_realm_domains(user_profile.realm)
|
||||
|
@ -21,7 +19,7 @@ def list_realm_domains(request: HttpRequest, user_profile: UserProfile) -> HttpR
|
|||
@require_realm_admin
|
||||
@has_request_variables
|
||||
def create_realm_domain(request: HttpRequest, user_profile: UserProfile,
|
||||
domain: Text=REQ(validator=check_string),
|
||||
domain: str=REQ(validator=check_string),
|
||||
allow_subdomains: bool=REQ(validator=check_bool)) -> HttpResponse:
|
||||
domain = domain.strip().lower()
|
||||
try:
|
||||
|
@ -36,7 +34,7 @@ def create_realm_domain(request: HttpRequest, user_profile: UserProfile,
|
|||
|
||||
@require_realm_admin
|
||||
@has_request_variables
|
||||
def patch_realm_domain(request: HttpRequest, user_profile: UserProfile, domain: Text,
|
||||
def patch_realm_domain(request: HttpRequest, user_profile: UserProfile, domain: str,
|
||||
allow_subdomains: bool=REQ(validator=check_bool)) -> HttpResponse:
|
||||
try:
|
||||
realm_domain = RealmDomain.objects.get(realm=user_profile.realm, domain=domain)
|
||||
|
@ -48,7 +46,7 @@ def patch_realm_domain(request: HttpRequest, user_profile: UserProfile, domain:
|
|||
@require_realm_admin
|
||||
@has_request_variables
|
||||
def delete_realm_domain(request: HttpRequest, user_profile: UserProfile,
|
||||
domain: Text) -> HttpResponse:
|
||||
domain: str) -> HttpResponse:
|
||||
try:
|
||||
realm_domain = RealmDomain.objects.get(realm=user_profile.realm, domain=domain)
|
||||
do_remove_realm_domain(realm_domain)
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.utils.translation import ugettext as _
|
||||
from typing import Text
|
||||
|
||||
from zerver.models import RealmEmoji, UserProfile
|
||||
from zerver.lib.emoji import check_emoji_admin, check_valid_emoji_name, check_valid_emoji
|
||||
|
@ -21,7 +19,7 @@ def list_emoji(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
|
|||
|
||||
@has_request_variables
|
||||
def upload_emoji(request: HttpRequest, user_profile: UserProfile,
|
||||
emoji_name: Text=REQ()) -> HttpResponse:
|
||||
emoji_name: str=REQ()) -> HttpResponse:
|
||||
check_valid_emoji_name(emoji_name)
|
||||
check_emoji_admin(user_profile)
|
||||
if RealmEmoji.objects.filter(realm=user_profile.realm,
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
|
||||
from typing import Text
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
|
@ -22,8 +20,8 @@ def list_filters(request: HttpRequest, user_profile: UserProfile) -> HttpRespons
|
|||
|
||||
@require_realm_admin
|
||||
@has_request_variables
|
||||
def create_filter(request: HttpRequest, user_profile: UserProfile, pattern: Text=REQ(),
|
||||
url_format_string: Text=REQ()) -> HttpResponse:
|
||||
def create_filter(request: HttpRequest, user_profile: UserProfile, pattern: str=REQ(),
|
||||
url_format_string: str=REQ()) -> HttpResponse:
|
||||
try:
|
||||
filter_id = do_add_realm_filter(
|
||||
realm=user_profile.realm,
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from typing import Any, List, Dict, Mapping, Optional, Text
|
||||
from typing import Any, List, Dict, Mapping, Optional
|
||||
|
||||
from django.utils.translation import ugettext as _
|
||||
from django.conf import settings
|
||||
|
@ -335,7 +335,7 @@ def redirect_to_email_login_url(email: str) -> HttpResponseRedirect:
|
|||
redirect_url = login_url + '?already_registered=' + email
|
||||
return HttpResponseRedirect(redirect_url)
|
||||
|
||||
def create_realm(request: HttpRequest, creation_key: Optional[Text]=None) -> HttpResponse:
|
||||
def create_realm(request: HttpRequest, creation_key: Optional[str]=None) -> HttpResponse:
|
||||
try:
|
||||
key_record = validate_key(creation_key)
|
||||
except RealmCreationKey.Invalid:
|
||||
|
@ -442,7 +442,7 @@ def generate_204(request: HttpRequest) -> HttpResponse:
|
|||
def find_account(request: HttpRequest) -> HttpResponse:
|
||||
url = reverse('zerver.views.registration.find_account')
|
||||
|
||||
emails = [] # type: List[Text]
|
||||
emails = [] # type: List[str]
|
||||
if request.method == 'POST':
|
||||
form = FindMyTeamForm(request.POST)
|
||||
if form.is_valid():
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# System documented in https://zulip.readthedocs.io/en/latest/subsystems/logging.html
|
||||
|
||||
from typing import Any, Dict, Optional, Text, Union
|
||||
from typing import Any, Dict, Optional, Union
|
||||
|
||||
from django.conf import settings
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
|
@ -92,9 +92,9 @@ def report_unnarrow_times(request: HttpRequest, user_profile: UserProfile,
|
|||
|
||||
@human_users_only
|
||||
@has_request_variables
|
||||
def report_error(request: HttpRequest, user_profile: UserProfile, message: Text=REQ(),
|
||||
stacktrace: Text=REQ(), ui_message: bool=REQ(validator=check_bool),
|
||||
user_agent: Text=REQ(), href: Text=REQ(), log: Text=REQ(),
|
||||
def report_error(request: HttpRequest, user_profile: UserProfile, message: str=REQ(),
|
||||
stacktrace: str=REQ(), ui_message: bool=REQ(validator=check_bool),
|
||||
user_agent: str=REQ(), href: str=REQ(), log: str=REQ(),
|
||||
more_info: Optional[Dict[str, Any]]=REQ(validator=check_dict([]), default=None)
|
||||
) -> HttpResponse:
|
||||
"""Accepts an error report and stores in a queue for processing. The
|
||||
|
@ -110,7 +110,7 @@ def report_error(request: HttpRequest, user_profile: UserProfile, message: Text=
|
|||
|
||||
try:
|
||||
version = subprocess.check_output(["git", "log", "HEAD^..HEAD", "--oneline"],
|
||||
universal_newlines=True) # type: Optional[Text]
|
||||
universal_newlines=True) # type: Optional[str]
|
||||
except Exception:
|
||||
version = None
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Any, Optional, Tuple, List, Set, Iterable, Mapping, Callable, Dict, Text, \
|
||||
from typing import Any, Optional, Tuple, List, Set, Iterable, Mapping, Callable, Dict, \
|
||||
Union
|
||||
|
||||
from django.utils.translation import ugettext as _
|
||||
|
@ -40,14 +40,14 @@ class PrincipalError(JsonableError):
|
|||
data_fields = ['principal']
|
||||
http_status_code = 403
|
||||
|
||||
def __init__(self, principal: Text) -> None:
|
||||
self.principal = principal # type: Text
|
||||
def __init__(self, principal: str) -> None:
|
||||
self.principal = principal # type: str
|
||||
|
||||
@staticmethod
|
||||
def msg_format() -> Text:
|
||||
def msg_format() -> str:
|
||||
return _("User not authorized to execute queries on behalf of '{principal}'")
|
||||
|
||||
def principal_to_user_profile(agent: UserProfile, principal: Text) -> UserProfile:
|
||||
def principal_to_user_profile(agent: UserProfile, principal: str) -> UserProfile:
|
||||
try:
|
||||
return get_user(principal, agent.realm)
|
||||
except UserProfile.DoesNotExist:
|
||||
|
@ -69,7 +69,7 @@ def deactivate_stream_backend(request: HttpRequest,
|
|||
@has_request_variables
|
||||
def add_default_stream(request: HttpRequest,
|
||||
user_profile: UserProfile,
|
||||
stream_name: Text=REQ()) -> HttpResponse:
|
||||
stream_name: str=REQ()) -> HttpResponse:
|
||||
(stream, recipient, sub) = access_stream_by_name(user_profile, stream_name)
|
||||
do_add_default_stream(stream)
|
||||
return json_success()
|
||||
|
@ -77,8 +77,8 @@ def add_default_stream(request: HttpRequest,
|
|||
@require_realm_admin
|
||||
@has_request_variables
|
||||
def create_default_stream_group(request: HttpRequest, user_profile: UserProfile,
|
||||
group_name: Text=REQ(), description: Text=REQ(),
|
||||
stream_names: List[Text]=REQ(validator=check_list(check_string))) -> None:
|
||||
group_name: str=REQ(), description: str=REQ(),
|
||||
stream_names: List[str]=REQ(validator=check_list(check_string))) -> None:
|
||||
streams = []
|
||||
for stream_name in stream_names:
|
||||
(stream, recipient, sub) = access_stream_by_name(user_profile, stream_name)
|
||||
|
@ -89,9 +89,9 @@ def create_default_stream_group(request: HttpRequest, user_profile: UserProfile,
|
|||
@require_realm_admin
|
||||
@has_request_variables
|
||||
def update_default_stream_group_info(request: HttpRequest, user_profile: UserProfile, group_id: int,
|
||||
new_group_name: Text=REQ(validator=check_string, default=None),
|
||||
new_description: Text=REQ(validator=check_string,
|
||||
default=None)) -> None:
|
||||
new_group_name: str=REQ(validator=check_string, default=None),
|
||||
new_description: str=REQ(validator=check_string,
|
||||
default=None)) -> None:
|
||||
if not new_group_name and not new_description:
|
||||
return json_error(_('You must pass "new_description" or "new_group_name".'))
|
||||
|
||||
|
@ -105,8 +105,8 @@ def update_default_stream_group_info(request: HttpRequest, user_profile: UserPro
|
|||
@require_realm_admin
|
||||
@has_request_variables
|
||||
def update_default_stream_group_streams(request: HttpRequest, user_profile: UserProfile,
|
||||
group_id: int, op: Text=REQ(),
|
||||
stream_names: List[Text]=REQ(
|
||||
group_id: int, op: str=REQ(),
|
||||
stream_names: List[str]=REQ(
|
||||
validator=check_list(check_string))) -> None:
|
||||
group = access_default_stream_group_by_id(user_profile.realm, group_id,)
|
||||
streams = []
|
||||
|
@ -134,7 +134,7 @@ def remove_default_stream_group(request: HttpRequest, user_profile: UserProfile,
|
|||
@has_request_variables
|
||||
def remove_default_stream(request: HttpRequest,
|
||||
user_profile: UserProfile,
|
||||
stream_name: Text=REQ()) -> HttpResponse:
|
||||
stream_name: str=REQ()) -> HttpResponse:
|
||||
(stream, recipient, sub) = access_stream_by_name(user_profile, stream_name)
|
||||
do_remove_default_stream(stream)
|
||||
return json_success()
|
||||
|
@ -144,9 +144,9 @@ def remove_default_stream(request: HttpRequest,
|
|||
def update_stream_backend(
|
||||
request: HttpRequest, user_profile: UserProfile,
|
||||
stream_id: int,
|
||||
description: Optional[Text]=REQ(validator=check_string, default=None),
|
||||
description: Optional[str]=REQ(validator=check_string, default=None),
|
||||
is_private: Optional[bool]=REQ(validator=check_bool, default=None),
|
||||
new_name: Optional[Text]=REQ(validator=check_string, default=None),
|
||||
new_name: Optional[str]=REQ(validator=check_string, default=None),
|
||||
) -> HttpResponse:
|
||||
# We allow realm administrators to to update the stream name and
|
||||
# description even for private streams.
|
||||
|
@ -178,7 +178,7 @@ FuncKwargPair = Tuple[Callable[..., HttpResponse], Dict[str, Union[int, Iterable
|
|||
@has_request_variables
|
||||
def update_subscriptions_backend(
|
||||
request: HttpRequest, user_profile: UserProfile,
|
||||
delete: Iterable[Text]=REQ(validator=check_list(check_string), default=[]),
|
||||
delete: Iterable[str]=REQ(validator=check_list(check_string), default=[]),
|
||||
add: Iterable[Mapping[str, Any]]=REQ(
|
||||
validator=check_list(check_dict([('name', check_string)])), default=[]),
|
||||
) -> HttpResponse:
|
||||
|
@ -215,8 +215,8 @@ def compose_views(request, user_profile, method_kwarg_pairs):
|
|||
@has_request_variables
|
||||
def remove_subscriptions_backend(
|
||||
request: HttpRequest, user_profile: UserProfile,
|
||||
streams_raw: Iterable[Text]=REQ("subscriptions", validator=check_list(check_string)),
|
||||
principals: Optional[Iterable[Text]]=REQ(validator=check_list(check_string), default=None),
|
||||
streams_raw: Iterable[str]=REQ("subscriptions", validator=check_list(check_string)),
|
||||
principals: Optional[Iterable[str]]=REQ(validator=check_list(check_string), default=None),
|
||||
) -> HttpResponse:
|
||||
|
||||
removing_someone_else = principals and \
|
||||
|
@ -239,7 +239,7 @@ def remove_subscriptions_backend(
|
|||
else:
|
||||
people_to_unsub = set([user_profile])
|
||||
|
||||
result = dict(removed=[], not_subscribed=[]) # type: Dict[str, List[Text]]
|
||||
result = dict(removed=[], not_subscribed=[]) # type: Dict[str, List[str]]
|
||||
(removed, not_subscribed) = bulk_remove_subscriptions(people_to_unsub, streams,
|
||||
acting_user=user_profile)
|
||||
|
||||
|
@ -251,8 +251,8 @@ def remove_subscriptions_backend(
|
|||
return json_success(result)
|
||||
|
||||
def you_were_just_subscribed_message(acting_user: UserProfile,
|
||||
stream_names: Set[Text],
|
||||
private_stream_names: Set[Text]) -> Text:
|
||||
stream_names: Set[str],
|
||||
private_stream_names: Set[str]) -> str:
|
||||
|
||||
# stream_names is the list of streams for which we should send notifications.
|
||||
#
|
||||
|
@ -285,11 +285,11 @@ def you_were_just_subscribed_message(acting_user: UserProfile,
|
|||
@has_request_variables
|
||||
def add_subscriptions_backend(
|
||||
request: HttpRequest, user_profile: UserProfile,
|
||||
streams_raw: Iterable[Mapping[str, Text]]=REQ(
|
||||
streams_raw: Iterable[Mapping[str, str]]=REQ(
|
||||
"subscriptions", validator=check_list(check_dict([('name', check_string)]))),
|
||||
invite_only: bool=REQ(validator=check_bool, default=False),
|
||||
announce: bool=REQ(validator=check_bool, default=False),
|
||||
principals: List[Text]=REQ(validator=check_list(check_string), default=[]),
|
||||
principals: List[str]=REQ(validator=check_list(check_string), default=[]),
|
||||
authorization_errors_fatal: bool=REQ(validator=check_bool, default=True),
|
||||
) -> HttpResponse:
|
||||
stream_dicts = []
|
||||
|
@ -326,7 +326,7 @@ def add_subscriptions_backend(
|
|||
|
||||
# We can assume unique emails here for now, but we should eventually
|
||||
# convert this function to be more id-centric.
|
||||
email_to_user_profile = dict() # type: Dict[Text, UserProfile]
|
||||
email_to_user_profile = dict() # type: Dict[str, UserProfile]
|
||||
|
||||
result = dict(subscribed=defaultdict(list), already_subscribed=defaultdict(list)) # type: Dict[str, Any]
|
||||
for (subscriber, stream) in subscribed:
|
||||
|
@ -447,7 +447,7 @@ def get_topics_backend(request: HttpRequest, user_profile: UserProfile,
|
|||
|
||||
@authenticated_json_post_view
|
||||
@has_request_variables
|
||||
def json_stream_exists(request: HttpRequest, user_profile: UserProfile, stream_name: Text=REQ("stream"),
|
||||
def json_stream_exists(request: HttpRequest, user_profile: UserProfile, stream_name: str=REQ("stream"),
|
||||
autosubscribe: bool=REQ(validator=check_bool, default=False)) -> HttpResponse:
|
||||
check_stream_name(stream_name)
|
||||
|
||||
|
@ -472,7 +472,7 @@ def json_stream_exists(request: HttpRequest, user_profile: UserProfile, stream_n
|
|||
@has_request_variables
|
||||
def json_get_stream_id(request: HttpRequest,
|
||||
user_profile: UserProfile,
|
||||
stream_name: Text=REQ('stream')) -> HttpResponse:
|
||||
stream_name: str=REQ('stream')) -> HttpResponse:
|
||||
(stream, recipient, sub) = access_stream_by_name(user_profile, stream_name)
|
||||
return json_success({'stream_id': stream.id})
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from typing import List, Text
|
||||
from typing import List
|
||||
|
||||
from zerver.decorator import has_request_variables, REQ, JsonableError
|
||||
from zerver.lib.actions import check_send_typing_notification, \
|
||||
|
@ -11,8 +11,8 @@ from zerver.models import UserProfile
|
|||
@has_request_variables
|
||||
def send_notification_backend(
|
||||
request: HttpRequest, user_profile: UserProfile,
|
||||
operator: Text=REQ('op'),
|
||||
notification_to: List[Text]=REQ('to', converter=extract_recipients, default=[]),
|
||||
operator: str=REQ('op'),
|
||||
notification_to: List[str]=REQ('to', converter=extract_recipients, default=[]),
|
||||
) -> HttpResponse:
|
||||
check_send_typing_notification(user_profile, notification_to, operator)
|
||||
return json_success()
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
from django.http import HttpResponse, HttpRequest
|
||||
from django.utils.translation import ugettext as _
|
||||
|
||||
from typing import List, Text
|
||||
from typing import List
|
||||
|
||||
from zerver.context_processors import get_realm_from_request
|
||||
from zerver.lib.actions import check_add_user_group, do_update_user_group_name, \
|
||||
|
@ -19,9 +19,9 @@ from zerver.views.streams import compose_views, FuncKwargPair
|
|||
|
||||
@has_request_variables
|
||||
def add_user_group(request: HttpRequest, user_profile: UserProfile,
|
||||
name: Text=REQ(),
|
||||
name: str=REQ(),
|
||||
members: List[int]=REQ(validator=check_list(check_int), default=[]),
|
||||
description: Text=REQ()) -> HttpResponse:
|
||||
description: str=REQ()) -> HttpResponse:
|
||||
user_profiles = user_ids_to_users(members, user_profile.realm)
|
||||
check_add_user_group(user_profile.realm, name, user_profiles, description)
|
||||
return json_success()
|
||||
|
@ -29,7 +29,7 @@ def add_user_group(request: HttpRequest, user_profile: UserProfile,
|
|||
@has_request_variables
|
||||
def edit_user_group(request: HttpRequest, user_profile: UserProfile,
|
||||
user_group_id: int=REQ(validator=check_int),
|
||||
name: Text=REQ(default=""), description: Text=REQ(default="")
|
||||
name: str=REQ(default=""), description: str=REQ(default="")
|
||||
) -> HttpResponse:
|
||||
if not (name or description):
|
||||
return json_error(_("No new data supplied"))
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Optional, Any, Dict, Text
|
||||
from typing import Optional, Any, Dict
|
||||
|
||||
from django.utils.translation import ugettext as _
|
||||
from django.conf import settings
|
||||
|
@ -70,10 +70,10 @@ def json_change_ui_settings(
|
|||
@human_users_only
|
||||
@has_request_variables
|
||||
def json_change_settings(request: HttpRequest, user_profile: UserProfile,
|
||||
full_name: Text=REQ(default=""),
|
||||
email: Text=REQ(default=""),
|
||||
old_password: Text=REQ(default=""),
|
||||
new_password: Text=REQ(default="")) -> HttpResponse:
|
||||
full_name: str=REQ(default=""),
|
||||
email: str=REQ(default=""),
|
||||
old_password: str=REQ(default=""),
|
||||
new_password: str=REQ(default="")) -> HttpResponse:
|
||||
if not (full_name or new_password or email):
|
||||
return json_error(_("Please fill out all fields."))
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Callable, Text, Union, Optional, Dict, Any, List, Tuple
|
||||
from typing import Callable, Union, Optional, Dict, Any, List, Tuple
|
||||
|
||||
import os
|
||||
import ujson
|
||||
|
@ -37,7 +37,7 @@ from zerver.lib.create_user import random_api_key
|
|||
|
||||
|
||||
def deactivate_user_backend(request: HttpRequest, user_profile: UserProfile,
|
||||
email: Text) -> HttpResponse:
|
||||
email: str) -> HttpResponse:
|
||||
try:
|
||||
target = get_user(email, user_profile.realm)
|
||||
except UserProfile.DoesNotExist:
|
||||
|
@ -60,7 +60,7 @@ def check_last_admin(user_profile: UserProfile) -> bool:
|
|||
return user_profile.is_realm_admin and len(admins) == 1
|
||||
|
||||
def deactivate_bot_backend(request: HttpRequest, user_profile: UserProfile,
|
||||
email: Text) -> HttpResponse:
|
||||
email: str) -> HttpResponse:
|
||||
try:
|
||||
target = get_user(email, user_profile.realm)
|
||||
except UserProfile.DoesNotExist:
|
||||
|
@ -78,7 +78,7 @@ def _deactivate_user_profile_backend(request: HttpRequest, user_profile: UserPro
|
|||
return json_success()
|
||||
|
||||
def reactivate_user_backend(request: HttpRequest, user_profile: UserProfile,
|
||||
email: Text) -> HttpResponse:
|
||||
email: str) -> HttpResponse:
|
||||
try:
|
||||
target = get_user(email, user_profile.realm)
|
||||
except UserProfile.DoesNotExist:
|
||||
|
@ -91,8 +91,8 @@ def reactivate_user_backend(request: HttpRequest, user_profile: UserProfile,
|
|||
return json_success()
|
||||
|
||||
@has_request_variables
|
||||
def update_user_backend(request: HttpRequest, user_profile: UserProfile, email: Text,
|
||||
full_name: Optional[Text]=REQ(default="", validator=check_string),
|
||||
def update_user_backend(request: HttpRequest, user_profile: UserProfile, email: str,
|
||||
full_name: Optional[str]=REQ(default="", validator=check_string),
|
||||
is_admin: Optional[bool]=REQ(default=None, validator=check_bool)) -> HttpResponse:
|
||||
try:
|
||||
target = get_user(email, user_profile.realm)
|
||||
|
@ -149,22 +149,22 @@ def avatar(request: HttpRequest, email_or_id: str, medium: bool=False) -> HttpRe
|
|||
url += '&' + request.META['QUERY_STRING']
|
||||
return redirect(url)
|
||||
|
||||
def get_stream_name(stream: Optional[Stream]) -> Optional[Text]:
|
||||
def get_stream_name(stream: Optional[Stream]) -> Optional[str]:
|
||||
if stream:
|
||||
return stream.name
|
||||
return None
|
||||
|
||||
@has_request_variables
|
||||
def patch_bot_backend(
|
||||
request: HttpRequest, user_profile: UserProfile, email: Text,
|
||||
full_name: Optional[Text]=REQ(default=None),
|
||||
bot_owner: Optional[Text]=REQ(default=None),
|
||||
config_data: Optional[Dict[Text, Text]]=REQ(default=None,
|
||||
validator=check_dict(value_validator=check_string)),
|
||||
service_payload_url: Optional[Text]=REQ(validator=check_url, default=None),
|
||||
request: HttpRequest, user_profile: UserProfile, email: str,
|
||||
full_name: Optional[str]=REQ(default=None),
|
||||
bot_owner: Optional[str]=REQ(default=None),
|
||||
config_data: Optional[Dict[str, str]]=REQ(default=None,
|
||||
validator=check_dict(value_validator=check_string)),
|
||||
service_payload_url: Optional[str]=REQ(validator=check_url, default=None),
|
||||
service_interface: Optional[int]=REQ(validator=check_int, default=1),
|
||||
default_sending_stream: Optional[Text]=REQ(default=None),
|
||||
default_events_register_stream: Optional[Text]=REQ(default=None),
|
||||
default_sending_stream: Optional[str]=REQ(default=None),
|
||||
default_events_register_stream: Optional[str]=REQ(default=None),
|
||||
default_all_public_streams: Optional[bool]=REQ(default=None, validator=check_bool)
|
||||
) -> HttpResponse:
|
||||
try:
|
||||
|
@ -243,7 +243,7 @@ def patch_bot_backend(
|
|||
return json_success(json_result)
|
||||
|
||||
@has_request_variables
|
||||
def regenerate_bot_api_key(request: HttpRequest, user_profile: UserProfile, email: Text) -> HttpResponse:
|
||||
def regenerate_bot_api_key(request: HttpRequest, user_profile: UserProfile, email: str) -> HttpResponse:
|
||||
try:
|
||||
bot = get_user(email, user_profile.realm)
|
||||
except UserProfile.DoesNotExist:
|
||||
|
@ -259,8 +259,8 @@ def regenerate_bot_api_key(request: HttpRequest, user_profile: UserProfile, emai
|
|||
return json_success(json_result)
|
||||
|
||||
# Adds an outgoing webhook or embedded bot service.
|
||||
def add_service(name: Text, user_profile: UserProfile, base_url: Text=None,
|
||||
interface: int=None, token: Text=None) -> None:
|
||||
def add_service(name: str, user_profile: UserProfile, base_url: str=None,
|
||||
interface: int=None, token: str=None) -> None:
|
||||
Service.objects.create(name=name,
|
||||
user_profile=user_profile,
|
||||
base_url=base_url,
|
||||
|
@ -270,16 +270,16 @@ def add_service(name: Text, user_profile: UserProfile, base_url: Text=None,
|
|||
@has_request_variables
|
||||
def add_bot_backend(
|
||||
request: HttpRequest, user_profile: UserProfile,
|
||||
full_name_raw: Text=REQ("full_name"), short_name_raw: Text=REQ("short_name"),
|
||||
full_name_raw: str=REQ("full_name"), short_name_raw: str=REQ("short_name"),
|
||||
bot_type: int=REQ(validator=check_int, default=UserProfile.DEFAULT_BOT),
|
||||
payload_url: Optional[Text]=REQ(validator=check_url, default=""),
|
||||
service_name: Optional[Text]=REQ(default=None),
|
||||
config_data: Dict[Text, Text]=REQ(default={},
|
||||
validator=check_dict(value_validator=check_string)),
|
||||
payload_url: Optional[str]=REQ(validator=check_url, default=""),
|
||||
service_name: Optional[str]=REQ(default=None),
|
||||
config_data: Dict[str, str]=REQ(default={},
|
||||
validator=check_dict(value_validator=check_string)),
|
||||
interface_type: int=REQ(validator=check_int, default=Service.GENERIC),
|
||||
default_sending_stream_name: Optional[Text]=REQ('default_sending_stream', default=None),
|
||||
default_events_register_stream_name: Optional[Text]=REQ('default_events_register_stream',
|
||||
default=None),
|
||||
default_sending_stream_name: Optional[str]=REQ('default_sending_stream', default=None),
|
||||
default_events_register_stream_name: Optional[str]=REQ('default_events_register_stream',
|
||||
default=None),
|
||||
default_all_public_streams: Optional[bool]=REQ(validator=check_bool, default=None)
|
||||
) -> HttpResponse:
|
||||
short_name = check_short_name(short_name_raw)
|
||||
|
@ -449,8 +449,8 @@ def get_members_backend(request: HttpRequest, user_profile: UserProfile,
|
|||
@require_realm_admin
|
||||
@has_request_variables
|
||||
def create_user_backend(request: HttpRequest, user_profile: UserProfile,
|
||||
email: Text=REQ(), password: Text=REQ(), full_name_raw: Text=REQ("full_name"),
|
||||
short_name: Text=REQ()) -> HttpResponse:
|
||||
email: str=REQ(), password: str=REQ(), full_name_raw: str=REQ("full_name"),
|
||||
short_name: str=REQ()) -> HttpResponse:
|
||||
full_name = check_full_name(full_name_raw)
|
||||
form = CreateUserForm({'full_name': full_name, 'email': email})
|
||||
if not form.is_valid():
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Any, List, Dict, Optional, Callable, Tuple, Iterable, Sequence, Text
|
||||
from typing import Any, List, Dict, Optional, Callable, Tuple, Iterable, Sequence
|
||||
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse, HttpRequest
|
||||
|
@ -25,7 +25,7 @@ kerberos_alter_egos = {
|
|||
@authenticated_json_view
|
||||
@has_request_variables
|
||||
def webathena_kerberos_login(request: HttpRequest, user_profile: UserProfile,
|
||||
cred: Text=REQ(default=None)) -> HttpResponse:
|
||||
cred: str=REQ(default=None)) -> HttpResponse:
|
||||
global kerberos_alter_egos
|
||||
if cred is None:
|
||||
return json_error(_("Could not find Kerberos credential"))
|
||||
|
|
Loading…
Reference in New Issue