zerver/views: Use Python 3 syntax for typing.

Edited by tabbott to remove state.py and streams.py, because of
problems with the original PR's changes, and wrap some long lines.
This commit is contained in:
rht 2017-11-27 08:28:57 +00:00 committed by Tim Abbott
parent 92888a0cde
commit e538f4dd44
13 changed files with 154 additions and 258 deletions

View File

@ -7,13 +7,11 @@ from zerver.lib.attachments import user_attachments, remove_attachment, \
access_attachment_by_id
def list_by_user(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
def list_by_user(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
return json_success({"attachments": user_attachments(user_profile)})
def remove(request, user_profile, attachment_id):
# type: (HttpRequest, UserProfile, int) -> HttpResponse
def remove(request: HttpRequest, user_profile: UserProfile, attachment_id: int) -> HttpResponse:
attachment = access_attachment_by_id(user_profile, attachment_id,
needs_owner=True)
remove_attachment(user_profile, attachment)

View File

@ -56,8 +56,8 @@ def create_preregistration_user(email, request, realm_creation=False,
password_required=password_required,
realm=realm)
def maybe_send_to_registration(request, email, full_name='', password_required=True):
# type: (HttpRequest, Text, Text, bool) -> HttpResponse
def maybe_send_to_registration(request: HttpRequest, email: Text, full_name: Text='',
password_required: bool=True) -> HttpResponse:
realm = get_realm_from_request(request)
from_multiuse_invite = False
@ -108,14 +108,12 @@ def maybe_send_to_registration(request, email, full_name='', password_required=T
'from_multiuse_invite': from_multiuse_invite},
)
def redirect_to_subdomain_login_url():
# type: () -> HttpResponseRedirect
def redirect_to_subdomain_login_url() -> HttpResponseRedirect:
login_url = reverse('django.contrib.auth.views.login')
redirect_url = login_url + '?subdomain=1'
return HttpResponseRedirect(redirect_url)
def redirect_to_config_error(error_type):
# type: (str) -> HttpResponseRedirect
def redirect_to_config_error(error_type: str) -> HttpResponseRedirect:
return HttpResponseRedirect("/config-error/%s" % (error_type,))
def login_or_register_remote_user(request, remote_username, user_profile, full_name='',
@ -179,8 +177,7 @@ def login_or_register_remote_user(request, remote_username, user_profile, full_n
return HttpResponseRedirect(user_profile.realm.uri)
@log_view_func
def remote_user_sso(request):
# type: (HttpRequest) -> HttpResponse
def remote_user_sso(request: HttpRequest) -> HttpResponse:
try:
remote_user = request.META["REMOTE_USER"]
except KeyError:
@ -201,8 +198,7 @@ def remote_user_sso(request):
@csrf_exempt
@log_view_func
def remote_user_jwt(request):
# type: (HttpRequest) -> HttpResponse
def remote_user_jwt(request: HttpRequest) -> HttpResponse:
subdomain = get_subdomain(request)
try:
auth_key = settings.JWT_AUTH_KEYS[subdomain]
@ -245,20 +241,17 @@ def remote_user_jwt(request):
return login_or_register_remote_user(request, email, user_profile, remote_user)
def google_oauth2_csrf(request, value):
# type: (HttpRequest, str) -> str
def google_oauth2_csrf(request: HttpRequest, value: str) -> str:
# In Django 1.10, get_token returns a salted token which changes
# every time get_token is called.
from django.middleware.csrf import _unsalt_cipher_token
token = _unsalt_cipher_token(get_token(request))
return hmac.new(token.encode('utf-8'), value.encode("utf-8"), hashlib.sha256).hexdigest()
def reverse_on_root(viewname, args=None, kwargs=None):
# type: (str, List[str], Dict[str, str]) -> str
def reverse_on_root(viewname: str, args: List[str]=None, kwargs: Dict[str, str]=None) -> str:
return settings.ROOT_DOMAIN_URI + reverse(viewname, args=args, kwargs=kwargs)
def oauth_redirect_to_root(request, url, is_signup=False):
# type: (HttpRequest, Text, bool) -> HttpResponse
def oauth_redirect_to_root(request: HttpRequest, url: Text, is_signup: bool=False) -> HttpResponse:
main_site_uri = settings.ROOT_DOMAIN_URI + url
params = {
'subdomain': get_subdomain(request),
@ -275,8 +268,7 @@ def oauth_redirect_to_root(request, url, is_signup=False):
return redirect(main_site_uri + '?' + urllib.parse.urlencode(params))
def start_google_oauth2(request):
# type: (HttpRequest) -> HttpResponse
def start_google_oauth2(request: HttpRequest) -> HttpResponse:
url = reverse('zerver.views.auth.send_oauth_request_to_google')
if not (settings.GOOGLE_OAUTH2_CLIENT_ID and settings.GOOGLE_OAUTH2_CLIENT_SECRET):
@ -285,8 +277,7 @@ def start_google_oauth2(request):
is_signup = bool(request.GET.get('is_signup'))
return oauth_redirect_to_root(request, url, is_signup=is_signup)
def start_social_login(request, backend):
# type: (HttpRequest, Text) -> HttpResponse
def start_social_login(request: HttpRequest, backend: Text) -> HttpResponse:
backend_url = reverse('social:begin', args=[backend])
if (backend == "github") and not (settings.SOCIAL_AUTH_GITHUB_KEY and
settings.SOCIAL_AUTH_GITHUB_SECRET):
@ -294,13 +285,11 @@ def start_social_login(request, backend):
return oauth_redirect_to_root(request, backend_url)
def start_social_signup(request, backend):
# type: (HttpRequest, Text) -> HttpResponse
def start_social_signup(request: HttpRequest, backend: Text) -> HttpResponse:
backend_url = reverse('social:begin', args=[backend])
return oauth_redirect_to_root(request, backend_url, is_signup=True)
def send_oauth_request_to_google(request):
# type: (HttpRequest) -> HttpResponse
def send_oauth_request_to_google(request: HttpRequest) -> HttpResponse:
subdomain = request.GET.get('subdomain', '')
is_signup = request.GET.get('is_signup', '')
mobile_flow_otp = request.GET.get('mobile_flow_otp', '0')
@ -326,8 +315,7 @@ def send_oauth_request_to_google(request):
return redirect(google_uri + urllib.parse.urlencode(params))
@log_view_func
def finish_google_oauth2(request):
# type: (HttpRequest) -> HttpResponse
def finish_google_oauth2(request: HttpRequest) -> HttpResponse:
error = request.GET.get('error')
if error == 'access_denied':
return redirect('/')
@ -413,8 +401,7 @@ def finish_google_oauth2(request):
return redirect_and_log_into_subdomain(
realm, full_name, email_address, is_signup=is_signup)
def authenticate_remote_user(realm, email_address):
# type: (Realm, str) -> Tuple[UserProfile, Dict[str, Any]]
def authenticate_remote_user(realm: Realm, email_address: str) -> Tuple[UserProfile, Dict[str, Any]]:
return_data = {} # type: Dict[str, bool]
if email_address is None:
# No need to authenticate if email address is None. We already
@ -434,8 +421,7 @@ def authenticate_remote_user(realm, email_address):
_subdomain_token_salt = 'zerver.views.auth.log_into_subdomain'
@log_view_func
def log_into_subdomain(request, token):
# type: (HttpRequest, Text) -> HttpResponse
def log_into_subdomain(request: HttpRequest, token: Text) -> HttpResponse:
try:
data = signing.loads(token, salt=_subdomain_token_salt, max_age=15)
except signing.SignatureExpired as e:
@ -481,8 +467,7 @@ def redirect_and_log_into_subdomain(realm, full_name, email_address,
+ reverse('zerver.views.auth.log_into_subdomain', args=[token]))
return redirect(subdomain_login_uri)
def get_dev_users(realm=None, extra_users_count=10):
# type: (Optional[Realm], int) -> List[UserProfile]
def get_dev_users(realm: Optional[Realm]=None, extra_users_count: int=10) -> List[UserProfile]:
# Development environments usually have only a few users, but
# it still makes sense to limit how many extra users we render to
# support performance testing with DevAuthBackend.
@ -498,8 +483,7 @@ def get_dev_users(realm=None, extra_users_count=10):
users = list(shakespearian_users) + list(extra_users)
return users
def redirect_to_misconfigured_ldap_notice(error_type):
# type: (int) -> HttpResponse
def redirect_to_misconfigured_ldap_notice(error_type: int) -> HttpResponse:
if error_type == ZulipLDAPAuthBackend.REALM_IS_NONE_ERROR:
url = reverse('ldap_error_realm_is_none')
else:
@ -507,8 +491,7 @@ def redirect_to_misconfigured_ldap_notice(error_type):
return HttpResponseRedirect(url)
def show_deactivation_notice(request):
# type: (HttpRequest) -> HttpResponse
def show_deactivation_notice(request: HttpRequest) -> HttpResponse:
realm = get_realm_from_request(request)
if realm and realm.deactivated:
return render(request, "zerver/deactivated.html",
@ -516,12 +499,10 @@ def show_deactivation_notice(request):
return HttpResponseRedirect(reverse('zerver.views.auth.login_page'))
def redirect_to_deactivation_notice():
# type: () -> HttpResponse
def redirect_to_deactivation_notice() -> HttpResponse:
return HttpResponseRedirect(reverse('zerver.views.auth.show_deactivation_notice'))
def login_page(request, **kwargs):
# type: (HttpRequest, **Any) -> HttpResponse
def login_page(request: HttpRequest, **kwargs: Any) -> HttpResponse:
if request.user.is_authenticated:
return HttpResponseRedirect(request.user.realm.uri)
if is_subdomain_root_or_alias(request) and settings.ROOT_DOMAIN_LANDING_PAGE:
@ -579,8 +560,7 @@ def login_page(request, **kwargs):
return template_response
@csrf_exempt
def dev_direct_login(request, **kwargs):
# type: (HttpRequest, **Any) -> HttpResponse
def dev_direct_login(request: HttpRequest, **kwargs: Any) -> HttpResponse:
# This function allows logging in without a password and should only be called
# in development environments. It may be called if the DevAuthBackend is included
# in settings.AUTHENTICATION_BACKENDS
@ -600,8 +580,7 @@ def dev_direct_login(request, **kwargs):
@csrf_exempt
@require_post
@has_request_variables
def api_dev_fetch_api_key(request, username=REQ()):
# type: (HttpRequest, str) -> HttpResponse
def api_dev_fetch_api_key(request: HttpRequest, username: str=REQ()) -> HttpResponse:
"""This function allows logging in without a password on the Zulip
mobile apps when connecting to a Zulip development environment. It
requires DevAuthBackend to be included in settings.AUTHENTICATION_BACKENDS.
@ -635,8 +614,7 @@ def api_dev_fetch_api_key(request, username=REQ()):
return json_success({"api_key": user_profile.api_key, "email": user_profile.email})
@csrf_exempt
def api_dev_get_emails(request):
# type: (HttpRequest) -> HttpResponse
def api_dev_get_emails(request: HttpRequest) -> HttpResponse:
if not dev_auth_enabled() or settings.PRODUCTION:
return json_error(_("Dev environment not enabled."))
users = get_dev_users()
@ -646,8 +624,7 @@ def api_dev_get_emails(request):
@csrf_exempt
@require_post
@has_request_variables
def api_fetch_api_key(request, username=REQ(), password=REQ()):
# type: (HttpRequest, str, str) -> HttpResponse
def api_fetch_api_key(request: HttpRequest, username: str=REQ(), password: str=REQ()) -> HttpResponse:
return_data = {} # type: Dict[str, bool]
subdomain = get_subdomain(request)
realm = get_realm(subdomain)
@ -697,8 +674,7 @@ def api_fetch_api_key(request, username=REQ(), password=REQ()):
return json_success({"api_key": user_profile.api_key, "email": user_profile.email})
def get_auth_backends_data(request):
# type: (HttpRequest) -> Dict[str, Any]
def get_auth_backends_data(request: HttpRequest) -> Dict[str, Any]:
"""Returns which authentication methods are enabled on the server"""
subdomain = get_subdomain(request)
try:
@ -725,8 +701,7 @@ def get_auth_backends_data(request):
}
@csrf_exempt
def api_get_auth_backends(request):
# type: (HttpRequest) -> HttpResponse
def api_get_auth_backends(request: HttpRequest) -> HttpResponse:
"""Deprecated route; this is to be replaced by api_get_server_settings"""
auth_backends = get_auth_backends_data(request)
auth_backends['zulip_version'] = ZULIP_VERSION
@ -734,8 +709,7 @@ def api_get_auth_backends(request):
@require_GET
@csrf_exempt
def api_get_server_settings(request):
# type: (HttpRequest) -> HttpResponse
def api_get_server_settings(request: HttpRequest) -> HttpResponse:
result = dict(
authentication_methods=get_auth_backends_data(request),
zulip_version=ZULIP_VERSION,
@ -759,8 +733,8 @@ def api_get_server_settings(request):
@authenticated_json_post_view
@has_request_variables
def json_fetch_api_key(request, user_profile, password=REQ(default='')):
# type: (HttpRequest, UserProfile, str) -> HttpResponse
def json_fetch_api_key(request: HttpRequest, user_profile: UserProfile,
password: str=REQ(default='')) -> HttpResponse:
subdomain = get_subdomain(request)
realm = get_realm(subdomain)
if password_auth_enabled(user_profile.realm):
@ -770,15 +744,13 @@ def json_fetch_api_key(request, user_profile, password=REQ(default='')):
return json_success({"api_key": user_profile.api_key})
@csrf_exempt
def api_fetch_google_client_id(request):
# type: (HttpRequest) -> HttpResponse
def api_fetch_google_client_id(request: HttpRequest) -> HttpResponse:
if not settings.GOOGLE_CLIENT_ID:
return json_error(_("GOOGLE_CLIENT_ID is not configured"), status=400)
return json_success({"google_client_id": settings.GOOGLE_CLIENT_ID})
@require_post
def logout_then_login(request, **kwargs):
# type: (HttpRequest, **Any) -> HttpResponse
def logout_then_login(request: HttpRequest, **kwargs: Any) -> HttpResponse:
return django_logout_then_login(request, kwargs)
def password_reset(request: HttpRequest, **kwargs: Any) -> HttpResponse:

View File

@ -21,8 +21,7 @@ import datetime
ZULIP_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../../')
client = Client()
def email_page(request):
# type: (HttpRequest) -> HttpResponse
def email_page(request: HttpRequest) -> HttpResponse:
if request.method == 'POST':
set_forward_address(request.POST["forward_address"])
return json_success()
@ -35,8 +34,7 @@ def email_page(request):
{'log': content,
'forward_address': get_forward_address()})
def clear_emails(request):
# type: (HttpRequest) -> HttpResponse
def clear_emails(request: HttpRequest) -> HttpResponse:
try:
os.remove(settings.EMAIL_CONTENT_LOG_PATH)
except FileNotFoundError: # nocoverage
@ -44,8 +42,7 @@ def clear_emails(request):
return redirect(email_page)
@require_GET
def generate_all_emails(request):
# type: (HttpRequest) -> HttpResponse
def generate_all_emails(request: HttpRequest) -> HttpResponse:
# write fake data for all variables
registered_email = "hamlet@zulip.com"

View File

@ -36,8 +36,7 @@ import re
import time
@zulip_login_required
def accounts_accept_terms(request):
# type: (HttpRequest) -> HttpResponse
def accounts_accept_terms(request: HttpRequest) -> HttpResponse:
if request.method == "POST":
form = ToSForm(request.POST)
if form.is_valid():
@ -58,16 +57,14 @@ def accounts_accept_terms(request):
'special_message_template': special_message_template},
)
def sent_time_in_epoch_seconds(user_message):
# type: (Optional[UserMessage]) -> Optional[float]
def sent_time_in_epoch_seconds(user_message: Optional[UserMessage]) -> Optional[float]:
if user_message is None:
return None
# We have USE_TZ = True, so our datetime objects are timezone-aware.
# Return the epoch seconds in UTC.
return calendar.timegm(user_message.message.pub_date.utctimetuple())
def home(request):
# type: (HttpRequest) -> HttpResponse
def home(request: HttpRequest) -> HttpResponse:
if settings.DEVELOPMENT and os.path.exists('var/handlebars-templates/compile.error'):
response = render(request, 'zerver/handlebars_compilation_failed.html')
response.status_code = 500
@ -85,8 +82,7 @@ def home(request):
return render(request, 'zerver/hello.html')
@zulip_login_required
def home_real(request):
# type: (HttpRequest) -> HttpResponse
def home_real(request: HttpRequest) -> HttpResponse:
# We need to modify the session object every two weeks or it will expire.
# This line makes reloading the page a sufficient action to keep the
# session alive.
@ -243,18 +239,15 @@ def home_real(request):
return response
@zulip_login_required
def desktop_home(request):
# type: (HttpRequest) -> HttpResponse
def desktop_home(request: HttpRequest) -> HttpResponse:
return HttpResponseRedirect(reverse('zerver.views.home.home'))
def apps_view(request, _):
# type: (HttpRequest, Text) -> HttpResponse
def apps_view(request: HttpRequest, _: Text) -> HttpResponse:
if settings.ZILENCER_ENABLED:
return render(request, 'zerver/apps.html')
return HttpResponseRedirect('https://zulipchat.com/apps/', status=301)
def is_buggy_ua(agent):
# type: (str) -> bool
def is_buggy_ua(agent: str) -> bool:
"""Discrimiate CSS served to clients based on User Agent
Due to QTBUG-3467, @font-face is not supported in QtWebKit.

View File

@ -17,8 +17,7 @@ from zerver.lib.subdomains import get_subdomain
from zerver.models import Realm
from zerver.templatetags.app_filters import render_markdown_path
def add_api_uri_context(context, request):
# type: (Dict[str, Any], HttpRequest) -> None
def add_api_uri_context(context: Dict[str, Any], request: HttpRequest) -> None:
subdomain = get_subdomain(request)
if (subdomain != Realm.SUBDOMAIN_FOR_ROOT_DOMAIN
or not settings.ROOT_DOMAIN_LANDING_PAGE):
@ -37,8 +36,7 @@ def add_api_uri_context(context, request):
context["html_settings_links"] = html_settings_links
class ApiURLView(TemplateView):
def get_context_data(self, **kwargs):
# type: (**Any) -> Dict[str, str]
def get_context_data(self, **kwargs: Any) -> Dict[str, str]:
context = super().get_context_data(**kwargs)
add_api_uri_context(context, self.request)
return context
@ -50,16 +48,14 @@ class APIView(ApiURLView):
class MarkdownDirectoryView(ApiURLView):
path_template = ""
def get_path(self, article):
# type: (str) -> str
def get_path(self, article: str) -> str:
if article == "":
article = "index"
elif "/" in article:
article = "missing"
return self.path_template % (article,)
def get_context_data(self, **kwargs):
# type: (**Any) -> Dict[str, Any]
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
article = kwargs["article"]
context = super().get_context_data() # type: Dict[str, Any]
path = self.get_path(article)
@ -81,8 +77,7 @@ class MarkdownDirectoryView(ApiURLView):
context["api_uri_context"] = api_uri_context
return context
def get(self, request, article=""):
# type: (HttpRequest, str) -> HttpResponse
def get(self, request: HttpRequest, article: str="") -> HttpResponse:
path = self.get_path(article)
result = super().get(self, article=article)
try:
@ -95,8 +90,7 @@ class MarkdownDirectoryView(ApiURLView):
return result
def add_integrations_context(context):
# type: (Dict[str, Any]) -> None
def add_integrations_context(context: Dict[str, Any]) -> None:
alphabetical_sorted_categories = OrderedDict(sorted(CATEGORIES.items()))
alphabetical_sorted_integration = OrderedDict(sorted(INTEGRATIONS.items()))
context['categories_dict'] = alphabetical_sorted_categories
@ -113,8 +107,7 @@ def add_integrations_context(context):
context['subscriptions_html'] = subscriptions_html
def add_context_for_single_integration(context, name, request):
# type: (Dict[str, Any], str, HttpRequest) -> None
def add_context_for_single_integration(context: Dict[str, Any], name: str, request: HttpRequest) -> None:
add_api_uri_context(context, request)
if "html_settings_links" in context and context["html_settings_links"]:
@ -131,16 +124,14 @@ def add_context_for_single_integration(context, name, request):
class IntegrationView(ApiURLView):
template_name = 'zerver/integrations/index.html'
def get_context_data(self, **kwargs):
# type: (**Any) -> Dict[str, Any]
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
context = super().get_context_data(**kwargs) # type: Dict[str, Any]
add_integrations_context(context)
return context
@has_request_variables
def integration_doc(request, integration_name=REQ(default=None)):
# type: (HttpRequest, str) -> HttpResponse
def integration_doc(request: HttpRequest, integration_name: str=REQ(default=None)) -> HttpResponse:
try:
integration = INTEGRATIONS[integration_name]
except KeyError:
@ -162,8 +153,7 @@ def integration_doc(request, integration_name=REQ(default=None)):
return HttpResponse(doc_html_str)
def api_endpoint_docs(request):
# type: (HttpRequest) -> HttpResponse
def api_endpoint_docs(request: HttpRequest) -> HttpResponse:
context = {} # type: Dict[str, Any]
add_api_uri_context(context, request)

View File

@ -55,8 +55,7 @@ def invite_users_backend(request, user_profile,
do_invite_users(user_profile, invitee_emails, streams, invite_as_admin, body)
return json_success()
def get_invitee_emails_set(invitee_emails_raw):
# type: (str) -> Set[str]
def get_invitee_emails_set(invitee_emails_raw: str) -> Set[str]:
invitee_emails_list = set(re.split(r'[,\n]', invitee_emails_raw))
invitee_emails = set()
for email in invitee_emails_list:
@ -67,21 +66,20 @@ def get_invitee_emails_set(invitee_emails_raw):
return invitee_emails
@require_realm_admin
def get_user_invites(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
def get_user_invites(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
all_users = do_get_user_invites(user_profile)
return json_success({'invites': all_users})
@require_realm_admin
@has_request_variables
def revoke_user_invite(request, user_profile, prereg_id):
# type: (HttpRequest, UserProfile, int) -> HttpResponse
def revoke_user_invite(request: HttpRequest, user_profile: UserProfile,
prereg_id: int) -> HttpResponse:
do_revoke_user_invite(prereg_id, user_profile.realm_id)
return json_success()
@require_realm_admin
@has_request_variables
def resend_user_invite_email(request, user_profile, prereg_id):
# type: (HttpRequest, UserProfile, int) -> HttpResponse
def resend_user_invite_email(request: HttpRequest, user_profile: UserProfile,
prereg_id: int) -> HttpResponse:
timestamp = do_resend_user_invite_email(prereg_id, user_profile.realm_id)
return json_success({'timestamp': timestamp})

View File

@ -52,13 +52,11 @@ class BadNarrowOperator(JsonableError):
code = ErrorCode.BAD_NARROW
data_fields = ['desc']
def __init__(self, desc):
# type: (str) -> None
def __init__(self, desc: str) -> None:
self.desc = desc # type: str
@staticmethod
def msg_format():
# type: () -> str
def msg_format() -> str:
return _('Invalid narrow operator: {desc}')
# TODO: Should be Select, but sqlalchemy stubs are busted
@ -90,14 +88,12 @@ class NarrowBuilder:
# * anything that would pull in additional rows, or information on
# other messages.
def __init__(self, user_profile, msg_id_column):
# type: (UserProfile, str) -> None
def __init__(self, user_profile: UserProfile, msg_id_column: str) -> None:
self.user_profile = user_profile
self.msg_id_column = msg_id_column
self.user_realm = user_profile.realm
def add_term(self, query, term):
# type: (Query, Dict[str, Any]) -> Query
def add_term(self, query: Query, term: Dict[str, Any]) -> Query:
"""
Extend the given query to one narrowed by the given term, and return the result.
@ -130,16 +126,14 @@ class NarrowBuilder:
return method(query, operand, maybe_negate)
def by_has(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def by_has(self, query: Query, operand: str, maybe_negate: ConditionTransform) -> Query:
if operand not in ['attachment', 'image', 'link']:
raise BadNarrowOperator("unknown 'has' operand " + operand)
col_name = 'has_' + operand
cond = column(col_name)
return query.where(maybe_negate(cond))
def by_in(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def by_in(self, query: Query, operand: str, maybe_negate: ConditionTransform) -> Query:
if operand == 'home':
conditions = exclude_muting_conditions(self.user_profile, [])
return query.where(and_(*conditions))
@ -148,8 +142,7 @@ class NarrowBuilder:
raise BadNarrowOperator("unknown 'in' operand " + operand)
def by_is(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def by_is(self, query: Query, operand: str, maybe_negate: ConditionTransform) -> Query:
if operand == 'private':
# The `.select_from` method extends the query with a join.
query = query.select_from(join(query.froms[0], table("zerver_recipient"),
@ -177,8 +170,7 @@ class NarrowBuilder:
_alphanum = frozenset(
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
def _pg_re_escape(self, pattern):
# type: (Text) -> Text
def _pg_re_escape(self, pattern: Text) -> Text:
"""
Escape user input to place in a regex
@ -197,8 +189,7 @@ class NarrowBuilder:
s[i] = '\\' + c
return ''.join(s)
def by_stream(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def by_stream(self, query: Query, operand: str, maybe_negate: ConditionTransform) -> Query:
try:
# Because you can see your own message history for
# private streams you are no longer subscribed to, we
@ -235,8 +226,7 @@ class NarrowBuilder:
cond = column("recipient_id") == recipient.id
return query.where(maybe_negate(cond))
def by_topic(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def by_topic(self, query: Query, operand: str, maybe_negate: ConditionTransform) -> Query:
if self.user_profile.realm.is_zephyr_mirror_realm:
# MIT users expect narrowing to topic "foo" to also show messages to /^foo(.d)*$/
# (foo, foo.d, foo.d.d, etc)
@ -281,8 +271,7 @@ class NarrowBuilder:
cond = func.upper(column("subject")) == func.upper(literal(operand))
return query.where(maybe_negate(cond))
def by_sender(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def by_sender(self, query: Query, operand: str, maybe_negate: ConditionTransform) -> Query:
try:
sender = get_user_including_cross_realm(operand, self.user_realm)
except UserProfile.DoesNotExist:
@ -291,17 +280,14 @@ class NarrowBuilder:
cond = column("sender_id") == literal(sender.id)
return query.where(maybe_negate(cond))
def by_near(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def by_near(self, query: Query, operand: str, maybe_negate: ConditionTransform) -> Query:
return query
def by_id(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def by_id(self, query: Query, operand: str, maybe_negate: ConditionTransform) -> Query:
cond = self.msg_id_column == literal(operand)
return query.where(maybe_negate(cond))
def by_pm_with(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def by_pm_with(self, query: Query, operand: str, maybe_negate: ConditionTransform) -> Query:
if ',' in operand:
# Huddle
try:
@ -335,8 +321,8 @@ class NarrowBuilder:
column("recipient_id") == narrow_recipient.id))
return query.where(maybe_negate(cond))
def by_group_pm_with(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def by_group_pm_with(self, query: Query, operand: str,
maybe_negate: ConditionTransform) -> Query:
try:
narrow_profile = get_user_including_cross_realm(operand, self.user_realm)
except UserProfile.DoesNotExist:
@ -359,15 +345,14 @@ class NarrowBuilder:
cond = column("recipient_id").in_(recipient_ids)
return query.where(maybe_negate(cond))
def by_search(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def by_search(self, query: Query, operand: str, maybe_negate: ConditionTransform) -> Query:
if settings.USING_PGROONGA:
return self._by_search_pgroonga(query, operand, maybe_negate)
else:
return self._by_search_tsearch(query, operand, maybe_negate)
def _by_search_pgroonga(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def _by_search_pgroonga(self, query: Query, operand: str,
maybe_negate: ConditionTransform) -> Query:
match_positions_character = func.pgroonga.match_positions_character
query_extract_keywords = func.pgroonga.query_extract_keywords
keywords = query_extract_keywords(operand)
@ -378,8 +363,8 @@ class NarrowBuilder:
condition = column("search_pgroonga").op("@@")(operand)
return query.where(maybe_negate(condition))
def _by_search_tsearch(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
def _by_search_tsearch(self, query: Query, operand: str,
maybe_negate: ConditionTransform) -> Query:
tsquery = func.plainto_tsquery(literal("zulip.english_us_search"), literal(operand))
ts_locs_array = func.ts_match_locs_array
query = query.column(ts_locs_array(literal("zulip.english_us_search"),
@ -408,8 +393,7 @@ class NarrowBuilder:
# The offsets we get from PGroonga are counted in characters
# whereas the offsets from tsearch_extras are in bytes, so we
# have to account for both cases in the logic below.
def highlight_string(text, locs):
# type: (Text, Iterable[Tuple[int, int]]) -> Text
def highlight_string(text: Text, locs: Iterable[Tuple[int, int]]) -> Text:
highlight_start = u'<span class="highlight">'
highlight_stop = u'</span>'
pos = 0
@ -458,13 +442,12 @@ def highlight_string(text, locs):
result += final_frag
return result
def get_search_fields(rendered_content, subject, content_matches, subject_matches):
# type: (Text, Text, Iterable[Tuple[int, int]], Iterable[Tuple[int, int]]) -> Dict[str, Text]
def get_search_fields(rendered_content: Text, subject: Text, content_matches: Iterable[Tuple[int, int]],
subject_matches: Iterable[Tuple[int, int]]) -> Dict[str, Text]:
return dict(match_content=highlight_string(rendered_content, content_matches),
match_subject=highlight_string(escape_html(subject), subject_matches))
def narrow_parameter(json):
# type: (str) -> Optional[List[Dict[str, Any]]]
def narrow_parameter(json: str) -> Optional[List[Dict[str, Any]]]:
data = ujson.loads(json)
if not isinstance(data, list):
@ -473,8 +456,7 @@ def narrow_parameter(json):
# The "empty narrow" should be None, and not []
return None
def convert_term(elem):
# type: (Union[Dict[str, Any], List[str]]) -> Dict[str, Any]
def convert_term(elem: Union[Dict[str, Any], List[str]]) -> Dict[str, Any]:
# We have to support a legacy tuple format.
if isinstance(elem, list):
@ -505,8 +487,7 @@ def narrow_parameter(json):
return list(map(convert_term, data))
def ok_to_include_history(narrow, realm):
# type: (Optional[Iterable[Dict[str, Any]]], Realm) -> bool
def ok_to_include_history(narrow: Optional[Iterable[Dict[str, Any]]], realm: Realm) -> bool:
# There are occasions where we need to find Message rows that
# have no corresponding UserMessage row, because the user is
@ -533,16 +514,15 @@ def ok_to_include_history(narrow, realm):
return include_history
def get_stream_name_from_narrow(narrow):
# type: (Optional[Iterable[Dict[str, Any]]]) -> Optional[Text]
def get_stream_name_from_narrow(narrow: Optional[Iterable[Dict[str, Any]]]) -> Optional[Text]:
if narrow is not None:
for term in narrow:
if term['operator'] == 'stream':
return term['operand'].lower()
return None
def exclude_muting_conditions(user_profile, narrow):
# type: (UserProfile, Optional[Iterable[Dict[str, Any]]]) -> List[Selectable]
def exclude_muting_conditions(user_profile: UserProfile,
narrow: Optional[Iterable[Dict[str, Any]]]) -> List[Selectable]:
conditions = []
stream_name = get_stream_name_from_narrow(narrow)
@ -791,8 +771,7 @@ def update_message_flags(request, user_profile,
'msg': ''})
@has_request_variables
def mark_all_as_read(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
def mark_all_as_read(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
count = do_mark_all_as_read(user_profile)
log_data_str = "[%s updated]" % (count,)
@ -838,8 +817,8 @@ def mark_topic_as_read(request,
return json_success({'result': 'success',
'msg': ''})
def create_mirrored_message_users(request, user_profile, recipients):
# type: (HttpRequest, UserProfile, Iterable[Text]) -> Tuple[bool, Optional[UserProfile]]
def create_mirrored_message_users(request: HttpRequest, user_profile: UserProfile,
recipients: Iterable[Text]) -> Tuple[bool, Optional[UserProfile]]:
if "sender" not in request.POST:
return (False, None)
@ -874,8 +853,7 @@ def create_mirrored_message_users(request, user_profile, recipients):
sender = get_user_including_cross_realm(sender_email, user_profile.realm)
return (True, sender)
def same_realm_zephyr_user(user_profile, email):
# type: (UserProfile, Text) -> bool
def same_realm_zephyr_user(user_profile: UserProfile, email: Text) -> bool:
#
# Are the sender and recipient both addresses in the same Zephyr
# mirroring realm? We have to handle this specially, inferring
@ -894,8 +872,7 @@ def same_realm_zephyr_user(user_profile, email):
return user_profile.realm.is_zephyr_mirror_realm and \
RealmDomain.objects.filter(realm=user_profile.realm, domain=domain).exists()
def same_realm_irc_user(user_profile, email):
# type: (UserProfile, Text) -> bool
def same_realm_irc_user(user_profile: UserProfile, email: Text) -> bool:
# Check whether the target email address is an IRC user in the
# same realm as user_profile, i.e. if the domain were example.com,
# the IRC user would need to be username@irc.example.com
@ -910,8 +887,7 @@ def same_realm_irc_user(user_profile, email):
# these realms.
return RealmDomain.objects.filter(realm=user_profile.realm, domain=domain).exists()
def same_realm_jabber_user(user_profile, email):
# type: (UserProfile, Text) -> bool
def same_realm_jabber_user(user_profile: UserProfile, email: Text) -> bool:
try:
validators.validate_email(email)
except ValidationError:
@ -996,8 +972,7 @@ def send_message_backend(request, user_profile,
local_id=local_id, sender_queue_id=queue_id)
return json_success({"id": ret})
def fill_edit_history_entries(message_history, message):
# type: (List[Dict[str, Any]], Message) -> None
def fill_edit_history_entries(message_history: List[Dict[str, Any]], message: Message) -> None:
"""This fills out the message edit history entries from the database,
which are designed to have the minimum data possible, to instead
have the current topic + content as of that time, plus data on
@ -1141,8 +1116,8 @@ def update_message_backend(request, user_profile,
@has_request_variables
def delete_message_backend(request, user_profile, message_id=REQ(converter=to_non_negative_int)):
# type: (HttpRequest, UserProfile, int) -> HttpResponse
def delete_message_backend(request: HttpRequest, user_profile: UserProfile,
message_id: int=REQ(converter=to_non_negative_int)) -> HttpResponse:
message, ignored_user_message = access_message(user_profile, message_id)
is_user_allowed_to_delete_message = user_profile.is_realm_admin or \
(message.sender == user_profile and user_profile.realm.allow_message_deleting)
@ -1159,8 +1134,8 @@ def json_fetch_raw_message(request, user_profile,
return json_success({"raw_content": message.content})
@has_request_variables
def render_message_backend(request, user_profile, content=REQ()):
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
def render_message_backend(request: HttpRequest, user_profile: UserProfile,
content: Text=REQ()) -> HttpResponse:
message = Message()
message.sender = user_profile
message.content = content

View File

@ -13,8 +13,7 @@ from zerver.lib.request import JsonableError
from zerver.lib.response import json_success
from zerver.models import Message, Reaction, UserMessage, UserProfile
def create_historical_message(user_profile, message):
# type: (UserProfile, Message) -> None
def create_historical_message(user_profile: UserProfile, message: Message) -> None:
# Users can see and react to messages sent to streams they
# were not a subscriber to; in order to receive events for
# those, we give the user a `historical` UserMessage objects
@ -98,8 +97,8 @@ def remove_reaction(request: HttpRequest, user_profile: UserProfile, message_id:
return json_success()
@has_request_variables
def add_reaction_legacy(request, user_profile, message_id, emoji_name):
# type: (HttpRequest, UserProfile, int, Text) -> HttpResponse
def add_reaction_legacy(request: HttpRequest, user_profile: UserProfile,
message_id: int, emoji_name: Text) -> HttpResponse:
# access_message will throw a JsonableError exception if the user
# cannot see the message (e.g. for messages to private streams).
@ -122,8 +121,8 @@ def add_reaction_legacy(request, user_profile, message_id, emoji_name):
return json_success()
@has_request_variables
def remove_reaction_legacy(request, user_profile, message_id, emoji_name):
# type: (HttpRequest, UserProfile, int, Text) -> HttpResponse
def remove_reaction_legacy(request: HttpRequest, user_profile: UserProfile,
message_id: int, emoji_name: Text) -> HttpResponse:
# access_message will throw a JsonableError exception if the user
# cannot see the message (e.g. for messages to private streams).

View File

@ -48,8 +48,7 @@ import ujson
import urllib
@require_post
def accounts_register(request):
# type: (HttpRequest) -> HttpResponse
def accounts_register(request: HttpRequest) -> HttpResponse:
key = request.POST['key']
confirmation = Confirmation.objects.get(confirmation_key=key)
prereg_user = confirmation.content_object
@ -266,16 +265,16 @@ def accounts_register(request):
}
)
def login_and_go_to_home(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
def login_and_go_to_home(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
# Mark the user as having been just created, so no "new login" email is sent
user_profile.just_registered = True
do_login(request, user_profile)
return HttpResponseRedirect(user_profile.realm.uri + reverse('zerver.views.home.home'))
def send_registration_completion_email(email, request, realm_creation=False, streams=None):
# type: (str, HttpRequest, bool, Optional[List[Stream]]) -> None
def send_registration_completion_email(email: str, request: HttpRequest,
realm_creation: bool=False,
streams: Optional[List[Stream]]=None) -> None:
"""
Send an email with a confirmation link to the provided e-mail so the user
can complete their registration.
@ -293,15 +292,13 @@ def send_registration_completion_email(email, request, realm_creation=False, str
if settings.DEVELOPMENT and realm_creation:
request.session['confirmation_key'] = {'confirmation_key': activation_url.split('/')[-1]}
def redirect_to_email_login_url(email):
# type: (str) -> HttpResponseRedirect
def redirect_to_email_login_url(email: str) -> HttpResponseRedirect:
login_url = reverse('django.contrib.auth.views.login')
email = urllib.parse.quote_plus(email)
redirect_url = login_url + '?already_registered=' + email
return HttpResponseRedirect(redirect_url)
def create_realm(request, creation_key=None):
# type: (HttpRequest, Optional[Text]) -> HttpResponse
def create_realm(request: HttpRequest, creation_key: Optional[Text]=None) -> HttpResponse:
if not settings.OPEN_REALM_CREATION:
if creation_key is None:
return render(request, "zerver/realm_creation_failed.html",
@ -340,12 +337,10 @@ def create_realm(request, creation_key=None):
)
# This is used only by the casper test in 00-realm-creation.js.
def confirmation_key(request):
# type: (HttpRequest) -> HttpResponse
def confirmation_key(request: HttpRequest) -> HttpResponse:
return json_success(request.session.get('confirmation_key'))
def accounts_home(request, multiuse_object=None):
# type: (HttpRequest, Optional[MultiuseInvite]) -> HttpResponse
def accounts_home(request: HttpRequest, multiuse_object: Optional[MultiuseInvite]=None) -> HttpResponse:
realm = get_realm(get_subdomain(request))
if realm is None:
@ -386,8 +381,7 @@ def accounts_home(request, multiuse_object=None):
'from_multiuse_invite': from_multiuse_invite},
)
def accounts_home_from_multiuse_invite(request, confirmation_key):
# type: (HttpRequest, str) -> HttpResponse
def accounts_home_from_multiuse_invite(request: HttpRequest, confirmation_key: str) -> HttpResponse:
multiuse_object = None
try:
multiuse_object = get_object_from_key(confirmation_key, Confirmation.MULTIUSE_INVITE)
@ -399,12 +393,10 @@ def accounts_home_from_multiuse_invite(request, confirmation_key):
return render_confirmation_key_error(request, exception)
return accounts_home(request, multiuse_object=multiuse_object)
def generate_204(request):
# type: (HttpRequest) -> HttpResponse
def generate_204(request: HttpRequest) -> HttpResponse:
return HttpResponse(content=None, status=204)
def find_account(request):
# type: (HttpRequest) -> HttpResponse
def find_account(request: HttpRequest) -> HttpResponse:
url = reverse('zerver.views.registration.find_account')
emails = [] # type: List[Text]

View File

@ -13,14 +13,12 @@ from zerver.lib.validator import check_bool
from zerver.models import UserProfile, validate_attachment_request
from django.conf import settings
def serve_s3(request, url_path):
# type: (HttpRequest, str) -> HttpResponse
def serve_s3(request: HttpRequest, url_path: str) -> HttpResponse:
uri = get_signed_upload_url(url_path)
return redirect(uri)
# TODO: Rewrite this once we have django-sendfile
def serve_local(request, path_id):
# type: (HttpRequest, str) -> FileResponse
def serve_local(request: HttpRequest, path_id: str) -> FileResponse:
import os
import mimetypes
local_path = get_local_file_path(path_id)
@ -32,8 +30,8 @@ def serve_local(request, path_id):
return response
@has_request_variables
def serve_file_backend(request, user_profile, realm_id_str, filename):
# type: (HttpRequest, UserProfile, str, str) -> HttpResponse
def serve_file_backend(request: HttpRequest, user_profile: UserProfile,
realm_id_str: str, filename: str) -> HttpResponse:
path_id = "%s/%s" % (realm_id_str, filename)
is_authorized = validate_attachment_request(user_profile, path_id)
@ -46,8 +44,7 @@ def serve_file_backend(request, user_profile, realm_id_str, filename):
return serve_s3(request, path_id)
def upload_file_backend(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
def upload_file_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
if len(request.FILES) == 0:
return json_error(_("You must specify a file to upload"))
if len(request.FILES) != 1:

View File

@ -49,8 +49,8 @@ def edit_user_group(request, user_profile,
return json_success(result)
@has_request_variables
def delete_user_group(request, user_profile, user_group_id=REQ(validator=check_int)):
# type: (HttpRequest, UserProfile, int) -> HttpResponse
def delete_user_group(request: HttpRequest, user_profile: UserProfile,
user_group_id: int=REQ(validator=check_int)) -> HttpResponse:
check_delete_user_group(user_group_id, user_profile.realm)
return json_success()
@ -71,8 +71,8 @@ def update_user_group_backend(request, user_profile,
] # type: List[FuncKwargPair]
return compose_views(request, user_profile, method_kwarg_pairs)
def add_members_to_group_backend(request, user_profile, user_group_id, members):
# type: (HttpRequest, UserProfile, int, List[int]) -> HttpResponse
def add_members_to_group_backend(request: HttpRequest, user_profile: UserProfile,
user_group_id: int, members: List[int]) -> HttpResponse:
if not members:
return json_success()
@ -87,8 +87,8 @@ def add_members_to_group_backend(request, user_profile, user_group_id, members):
bulk_add_members_to_user_group(user_group, user_profiles)
return json_success()
def remove_members_from_group_backend(request, user_profile, user_group_id, members):
# type: (HttpRequest, UserProfile, int, List[int]) -> HttpResponse
def remove_members_from_group_backend(request: HttpRequest, user_profile: UserProfile,
user_group_id: int, members: List[int]) -> HttpResponse:
if not members:
return json_success()

View File

@ -28,8 +28,7 @@ from zerver.models import UserProfile, Realm, name_changes_disabled, \
from confirmation.models import get_object_from_key, render_confirmation_key_error, \
ConfirmationKeyException, Confirmation
def confirm_email_change(request, confirmation_key):
# type: (HttpRequest, str) -> HttpResponse
def confirm_email_change(request: HttpRequest, confirmation_key: str) -> HttpResponse:
try:
email_change_object = get_object_from_key(confirmation_key, Confirmation.EMAIL_CHANGE)
except ConfirmationKeyException as exception:
@ -207,8 +206,7 @@ def json_change_notify_settings(request, user_profile,
return json_success(result)
def set_avatar_backend(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
def set_avatar_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
if len(request.FILES) != 1:
return json_error(_("You must upload exactly one avatar."))
@ -225,8 +223,7 @@ def set_avatar_backend(request, user_profile):
)
return json_success(json_result)
def delete_avatar_backend(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
def delete_avatar_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
do_change_avatar_fields(user_profile, UserProfile.AVATAR_FROM_GRAVATAR)
gravatar_url = avatar_url(user_profile)
@ -238,8 +235,7 @@ def delete_avatar_backend(request, user_profile):
# We don't use @human_users_only here, because there are use cases for
# a bot regenerating its own API key.
@has_request_variables
def regenerate_api_key(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
def regenerate_api_key(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
do_regenerate_api_key(user_profile, user_profile)
json_result = dict(
api_key = user_profile.api_key

View File

@ -32,8 +32,8 @@ from zerver.models import UserProfile, Stream, Message, email_allowed_for_realm,
from zerver.lib.create_user import random_api_key
def deactivate_user_backend(request, user_profile, email):
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
def deactivate_user_backend(request: HttpRequest, user_profile: UserProfile,
email: Text) -> HttpResponse:
try:
target = get_user(email, user_profile.realm)
except UserProfile.DoesNotExist:
@ -44,21 +44,19 @@ def deactivate_user_backend(request, user_profile, email):
return json_error(_('Cannot deactivate the only organization administrator'))
return _deactivate_user_profile_backend(request, user_profile, target)
def deactivate_user_own_backend(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
def deactivate_user_own_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
if user_profile.is_realm_admin and check_last_admin(user_profile):
return json_error(_('Cannot deactivate the only organization administrator'))
do_deactivate_user(user_profile, acting_user=user_profile)
return json_success()
def check_last_admin(user_profile):
# type: (UserProfile) -> bool
def check_last_admin(user_profile: UserProfile) -> bool:
admins = set(user_profile.realm.get_admin_users())
return user_profile.is_realm_admin and len(admins) == 1
def deactivate_bot_backend(request, user_profile, email):
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
def deactivate_bot_backend(request: HttpRequest, user_profile: UserProfile,
email: Text) -> HttpResponse:
try:
target = get_user(email, user_profile.realm)
except UserProfile.DoesNotExist:
@ -67,16 +65,16 @@ def deactivate_bot_backend(request, user_profile, email):
return json_error(_('No such bot'))
return _deactivate_user_profile_backend(request, user_profile, target)
def _deactivate_user_profile_backend(request, user_profile, target):
# type: (HttpRequest, UserProfile, UserProfile) -> HttpResponse
def _deactivate_user_profile_backend(request: HttpRequest, user_profile: UserProfile,
target: UserProfile) -> HttpResponse:
if not user_profile.can_admin_user(target):
return json_error(_('Insufficient permission'))
do_deactivate_user(target, acting_user=user_profile)
return json_success()
def reactivate_user_backend(request, user_profile, email):
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
def reactivate_user_backend(request: HttpRequest, user_profile: UserProfile,
email: Text) -> HttpResponse:
try:
target = get_user(email, user_profile.realm)
except UserProfile.DoesNotExist:
@ -118,8 +116,7 @@ def update_user_backend(request, user_profile, email,
# different organizations, we'll eventually want this to be a
# logged-in endpoint so that we can access the realm_id.
@zulip_login_required
def avatar(request, email_or_id, medium=False):
# type: (HttpRequest, str, bool) -> HttpResponse
def avatar(request: HttpRequest, email_or_id: str, medium: bool=False) -> HttpResponse:
"""Accepts an email address or user ID and returns the avatar"""
is_email = False
try:
@ -149,8 +146,7 @@ def avatar(request, email_or_id, medium=False):
url += '&' + request.META['QUERY_STRING']
return redirect(url)
def get_stream_name(stream):
# type: (Optional[Stream]) -> Optional[Text]
def get_stream_name(stream: Optional[Stream]) -> Optional[Text]:
if stream:
return stream.name
return None
@ -219,8 +215,7 @@ def patch_bot_backend(request, user_profile, email,
return json_success(json_result)
@has_request_variables
def regenerate_bot_api_key(request, user_profile, email):
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
def regenerate_bot_api_key(request: HttpRequest, user_profile: UserProfile, email: Text) -> HttpResponse:
try:
bot = get_user(email, user_profile.realm)
except UserProfile.DoesNotExist:
@ -236,8 +231,8 @@ def regenerate_bot_api_key(request, user_profile, email):
return json_success(json_result)
# Adds an outgoing webhook or embedded bot service.
def add_service(name, user_profile, base_url=None, interface=None, token=None):
# type: (Text, UserProfile, Text, int, Text) -> None
def add_service(name: Text, user_profile: UserProfile, base_url: Text=None,
interface: int=None, token: Text=None) -> None:
Service.objects.create(name=name,
user_profile=user_profile,
base_url=base_url,
@ -324,15 +319,13 @@ def add_bot_backend(request, user_profile, full_name_raw=REQ("full_name"), short
)
return json_success(json_result)
def get_bots_backend(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
def get_bots_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
bot_profiles = UserProfile.objects.filter(is_bot=True, is_active=True,
bot_owner=user_profile)
bot_profiles = bot_profiles.select_related('default_sending_stream', 'default_events_register_stream')
bot_profiles = bot_profiles.order_by('date_joined')
def bot_info(bot_profile):
# type: (UserProfile) -> Dict[str, Any]
def bot_info(bot_profile: UserProfile) -> Dict[str, Any]:
default_sending_stream = get_stream_name(bot_profile.default_sending_stream)
default_events_register_stream = get_stream_name(bot_profile.default_events_register_stream)
@ -377,8 +370,7 @@ def get_members_backend(request, user_profile,
'bot_owner__email',
)
def get_member(row):
# type: (Dict[str, Any]) -> Dict[str, Any]
def get_member(row: Dict[str, Any]) -> Dict[str, Any]:
email = row['email']
user_id = row['id']
@ -439,12 +431,10 @@ def create_user_backend(request, user_profile, email=REQ(), password=REQ(),
do_create_user(email, password, realm, full_name, short_name)
return json_success()
def generate_client_id():
# type: () -> str
def generate_client_id() -> str:
return generate_random_token(32)
def get_profile_backend(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
def get_profile_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
result = dict(pointer = user_profile.pointer,
client_id = generate_client_id(),
max_message_id = -1,
@ -461,8 +451,7 @@ def get_profile_backend(request, user_profile):
return json_success(result)
def team_view(request):
# type: (HttpRequest) -> HttpResponse
def team_view(request: HttpRequest) -> HttpResponse:
with open(settings.CONTRIBUTORS_DATA) as f:
data = ujson.load(f)