python: Convert assignment type annotations to Python 3.6 style.

This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.

We can likely further refine the remaining pieces with some testing.

Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:

-    invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+    invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(

-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None

-    notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
-    signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+    notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+    signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)

-    author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+    author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)

-    bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+    bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)

-    default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-    default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+    default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+    default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)

-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}

-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}

-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None

Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
This commit is contained in:
Anders Kaseorg 2020-04-21 16:09:50 -07:00 committed by Tim Abbott
parent af1aef9b39
commit fead14951c
189 changed files with 1469 additions and 1398 deletions

View File

@ -303,7 +303,7 @@ def do_pull_minutes_active(property: str, start_time: datetime, end_time: dateti
).values_list(
'user_profile_id', 'user_profile__realm_id', 'start', 'end')
seconds_active = defaultdict(float) # type: Dict[Tuple[int, int], float]
seconds_active: Dict[Tuple[int, int], float] = defaultdict(float)
for user_id, realm_id, interval_start, interval_end in user_activity_intervals:
if realm is None or realm.id == realm_id:
start = max(start_time, interval_start)

View File

@ -27,15 +27,15 @@ def compute_stats(log_level: int) -> None:
"bitcoin@mit.edu", "lp@mit.edu", "clocks@mit.edu",
"root@mit.edu", "nagios@mit.edu",
"www-data|local-realm@mit.edu"])
user_counts = {} # type: Dict[str, Dict[str, int]]
user_counts: Dict[str, Dict[str, int]] = {}
for m in mit_query.select_related("sending_client", "sender"):
email = m.sender.email
user_counts.setdefault(email, {})
user_counts[email].setdefault(m.sending_client.name, 0)
user_counts[email][m.sending_client.name] += 1
total_counts = {} # type: Dict[str, int]
total_user_counts = {} # type: Dict[str, int]
total_counts: Dict[str, int] = {}
total_user_counts: Dict[str, int] = {}
for email, counts in user_counts.items():
total_user_counts.setdefault(email, 0)
for client_name, count in counts.items():
@ -44,7 +44,7 @@ def compute_stats(log_level: int) -> None:
total_user_counts[email] += count
logging.debug("%40s | %10s | %s" % ("User", "Messages", "Percentage Zulip"))
top_percents = {} # type: Dict[int, float]
top_percents: Dict[int, float] = {}
for size in [10, 25, 50, 100, 200, len(total_user_counts.keys())]:
top_percents[size] = 0.0
for i, email in enumerate(sorted(total_user_counts.keys(),

View File

@ -81,7 +81,7 @@ class Command(BaseCommand):
end_times = time_range(last_end_time, last_end_time, stat.frequency,
len(list(fixture_data.values())[0]))
if table == InstallationCount:
id_args = {} # type: Dict[str, Any]
id_args: Dict[str, Any] = {}
if table == RealmCount:
id_args = {'realm': realm}
if table == UserCount:
@ -96,13 +96,13 @@ class Command(BaseCommand):
for end_time, value in zip(end_times, values) if value != 0])
stat = COUNT_STATS['1day_actives::day']
realm_data = {
realm_data: Mapping[Optional[str], List[int]] = {
None: self.generate_fixture_data(stat, .08, .02, 3, .3, 6, partial_sum=True),
} # type: Mapping[Optional[str], List[int]]
}
insert_fixture_data(stat, realm_data, RealmCount)
installation_data = {
installation_data: Mapping[Optional[str], List[int]] = {
None: self.generate_fixture_data(stat, .8, .2, 4, .3, 6, partial_sum=True),
} # type: Mapping[Optional[str], List[int]]
}
insert_fixture_data(stat, installation_data, InstallationCount)
FillState.objects.create(property=stat.property, end_time=last_end_time,
state=FillState.DONE)
@ -132,8 +132,9 @@ class Command(BaseCommand):
state=FillState.DONE)
stat = COUNT_STATS['messages_sent:is_bot:hour']
user_data = {'false': self.generate_fixture_data(
stat, 2, 1, 1.5, .6, 8, holiday_rate=.1)} # type: Mapping[Optional[str], List[int]]
user_data: Mapping[Optional[str], List[int]] = {
'false': self.generate_fixture_data(stat, 2, 1, 1.5, .6, 8, holiday_rate=.1),
}
insert_fixture_data(stat, user_data, UserCount)
realm_data = {'false': self.generate_fixture_data(stat, 35, 15, 6, .6, 4),
'true': self.generate_fixture_data(stat, 15, 15, 3, .4, 2)}
@ -209,8 +210,10 @@ class Command(BaseCommand):
realm_data = {'false': self.generate_fixture_data(stat, 30, 5, 6, .6, 4),
'true': self.generate_fixture_data(stat, 20, 2, 3, .2, 3)}
insert_fixture_data(stat, realm_data, RealmCount)
stream_data = {'false': self.generate_fixture_data(stat, 10, 7, 5, .6, 4),
'true': self.generate_fixture_data(stat, 5, 3, 2, .4, 2)} # type: Mapping[Optional[str], List[int]]
stream_data: Mapping[Optional[str], List[int]] = {
'false': self.generate_fixture_data(stat, 10, 7, 5, .6, 4),
'true': self.generate_fixture_data(stat, 5, 3, 2, .4, 2),
}
insert_fixture_data(stat, stream_data, StreamCount)
FillState.objects.create(property=stat.property, end_time=last_end_time,
state=FillState.DONE)

View File

@ -8,13 +8,13 @@ from zerver.lib.timestamp import floor_to_day
from zerver.models import Realm, Stream, UserProfile
class FillState(models.Model):
property = models.CharField(max_length=40, unique=True) # type: str
end_time = models.DateTimeField() # type: datetime.datetime
property: str = models.CharField(max_length=40, unique=True)
end_time: datetime.datetime = models.DateTimeField()
# Valid states are {DONE, STARTED}
DONE = 1
STARTED = 2
state = models.PositiveSmallIntegerField() # type: int
state: int = models.PositiveSmallIntegerField()
def __str__(self) -> str:
return "<FillState: %s %s %s>" % (self.property, self.end_time, self.state)
@ -37,10 +37,10 @@ class BaseCount(models.Model):
# Note: When inheriting from BaseCount, you may want to rearrange
# the order of the columns in the migration to make sure they
# match how you'd like the table to be arranged.
property = models.CharField(max_length=32) # type: str
subgroup = models.CharField(max_length=16, null=True) # type: Optional[str]
end_time = models.DateTimeField() # type: datetime.datetime
value = models.BigIntegerField() # type: int
property: str = models.CharField(max_length=32)
subgroup: Optional[str] = models.CharField(max_length=16, null=True)
end_time: datetime.datetime = models.DateTimeField()
value: int = models.BigIntegerField()
class Meta:
abstract = True

View File

@ -43,7 +43,7 @@ class AnalyticsTestCase(TestCase):
# used to generate unique names in self.create_*
self.name_counter = 100
# used as defaults in self.assertCountEquals
self.current_property = None # type: Optional[str]
self.current_property: Optional[str] = None
# Lightweight creation of users, streams, and messages
def create_user(self, **kwargs: Any) -> UserProfile:
@ -60,7 +60,7 @@ class AnalyticsTestCase(TestCase):
kwargs[key] = kwargs.get(key, value)
kwargs['delivery_email'] = kwargs['email']
with mock.patch("zerver.lib.create_user.timezone_now", return_value=kwargs['date_joined']):
pass_kwargs = {} # type: Dict[str, Any]
pass_kwargs: Dict[str, Any] = {}
if kwargs['is_bot']:
pass_kwargs['bot_type'] = UserProfile.DEFAULT_BOT
pass_kwargs['bot_owner'] = None
@ -147,7 +147,7 @@ class AnalyticsTestCase(TestCase):
'end_time': self.TIME_ZERO,
'value': 1}
for values in arg_values:
kwargs = {} # type: Dict[str, Any]
kwargs: Dict[str, Any] = {}
for i in range(len(values)):
kwargs[arg_keys[i]] = values[i]
for key, value in defaults.items():

View File

@ -617,7 +617,7 @@ class TestGetChartDataHelpers(ZulipTestCase):
self.assertEqual(last_successful_fill('property'), one_hour_before)
def test_sort_by_totals(self) -> None:
empty = [] # type: List[int]
empty: List[int] = []
value_arrays = {'c': [0, 1], 'a': [9], 'b': [1, 1, 1], 'd': empty}
self.assertEqual(sort_by_totals(value_arrays), ['a', 'b', 'c', 'd'])

View File

@ -184,10 +184,10 @@ def get_chart_data(request: HttpRequest, user_profile: UserProfile, chart_name:
COUNT_STATS['realm_active_humans::day'],
COUNT_STATS['active_users_audit:is_bot:day']]
tables = [aggregate_table]
subgroup_to_label = {
subgroup_to_label: Dict[CountStat, Dict[Optional[str], str]] = {
stats[0]: {None: '_1day'},
stats[1]: {None: '_15day'},
stats[2]: {'false': 'all_time'}} # type: Dict[CountStat, Dict[Optional[str], str]]
stats[2]: {'false': 'all_time'}}
labels_sort_function = None
include_empty_subgroups = True
elif chart_name == 'messages_sent_over_time':
@ -263,7 +263,7 @@ def get_chart_data(request: HttpRequest, user_profile: UserProfile, chart_name:
assert len({stat.frequency for stat in stats}) == 1
end_times = time_range(start, end, stats[0].frequency, min_length)
data = {'end_times': end_times, 'frequency': stats[0].frequency} # type: Dict[str, Any]
data: Dict[str, Any] = {'end_times': end_times, 'frequency': stats[0].frequency}
aggregation_level = {
InstallationCount: 'everyone',
@ -308,7 +308,7 @@ def sort_by_totals(value_arrays: Dict[str, List[int]]) -> List[str]:
def sort_client_labels(data: Dict[str, Dict[str, List[int]]]) -> List[str]:
realm_order = sort_by_totals(data['everyone'])
user_order = sort_by_totals(data['user'])
label_sort_values = {} # type: Dict[str, float]
label_sort_values: Dict[str, float] = {}
for i, label in enumerate(realm_order):
label_sort_values[label] = i
for i, label in enumerate(user_order):
@ -352,7 +352,7 @@ def client_label_map(name: str) -> str:
return name
def rewrite_client_arrays(value_arrays: Dict[str, List[int]]) -> Dict[str, List[int]]:
mapped_arrays = {} # type: Dict[str, List[int]]
mapped_arrays: Dict[str, List[int]] = {}
for label, array in value_arrays.items():
mapped_label = client_label_map(label)
if mapped_label in mapped_arrays:
@ -370,7 +370,7 @@ def get_time_series_by_subgroup(stat: CountStat,
include_empty_subgroups: bool) -> Dict[str, List[int]]:
queryset = table_filtered_to_id(table, key_id).filter(property=stat.property) \
.values_list('subgroup', 'end_time', 'value')
value_dicts = defaultdict(lambda: defaultdict(int)) # type: Dict[Optional[str], Dict[datetime, int]]
value_dicts: Dict[Optional[str], Dict[datetime, int]] = defaultdict(lambda: defaultdict(int))
for subgroup, end_time, value in queryset:
value_dicts[subgroup][end_time] = value
value_arrays = {}
@ -441,7 +441,7 @@ def get_realm_day_counts() -> Dict[str, Dict[str, str]]:
rows = dictfetchall(cursor)
cursor.close()
counts = defaultdict(dict) # type: Dict[str, Dict[int, int]]
counts: Dict[str, Dict[int, int]] = defaultdict(dict)
for row in rows:
counts[row['string_id']][row['age']] = row['cnt']
@ -585,7 +585,7 @@ def realm_summary_table(realm_minutes: Dict[str, float]) -> str:
cursor.close()
# Fetch all the realm administrator users
realm_admins = defaultdict(list) # type: Dict[str, List[str]]
realm_admins: Dict[str, List[str]] = defaultdict(list)
for up in UserProfile.objects.select_related("realm").filter(
role=UserProfile.ROLE_REALM_ADMINISTRATOR,
is_active=True
@ -1024,7 +1024,7 @@ def ad_hoc_queries() -> List[Dict[str, str]]:
@has_request_variables
def get_activity(request: HttpRequest) -> HttpResponse:
duration_content, realm_minutes = user_activity_intervals() # type: Tuple[mark_safe, Dict[str, float]]
counts_content = realm_summary_table(realm_minutes) # type: str
counts_content: str = realm_summary_table(realm_minutes)
data = [
('Counts', counts_content),
('Durations', duration_content),
@ -1082,7 +1082,7 @@ def get_confirmations(types: List[int], object_ids: List[int],
@require_server_admin
def support(request: HttpRequest) -> HttpResponse:
context = {} # type: Dict[str, Any]
context: Dict[str, Any] = {}
if settings.BILLING_ENABLED and request.method == "POST":
realm_id = request.POST.get("realm_id", None)
realm = Realm.objects.get(id=realm_id)
@ -1145,7 +1145,7 @@ def support(request: HttpRequest) -> HttpResponse:
context["realms"] = realms
confirmations = [] # type: List[Dict[str, Any]]
confirmations: List[Dict[str, Any]] = []
preregistration_users = PreregistrationUser.objects.filter(email__in=key_words)
confirmations += get_confirmations([Confirmation.USER_REGISTRATION, Confirmation.INVITATION,
@ -1229,7 +1229,7 @@ def get_user_activity_summary(records: List[QuerySet]) -> Dict[str, Dict[str, An
#: We could use something like:
# `Union[Dict[str, Dict[str, int]], Dict[str, Dict[str, datetime]]]`
#: but that would require this long `Union` to carry on throughout inner functions.
summary = {} # type: Dict[str, Dict[str, Any]]
summary: Dict[str, Dict[str, Any]] = {}
def update(action: str, record: QuerySet) -> None:
if action not in summary:
@ -1440,8 +1440,8 @@ def realm_user_summary_table(all_records: List[QuerySet],
@require_server_admin
def get_realm_activity(request: HttpRequest, realm_str: str) -> HttpResponse:
data = [] # type: List[Tuple[str, str]]
all_user_records = {} # type: Dict[str, Any]
data: List[Tuple[str, str]] = []
all_user_records: Dict[str, Any] = {}
try:
admins = Realm.objects.get(string_id=realm_str).get_human_admin_users()
@ -1477,7 +1477,7 @@ def get_realm_activity(request: HttpRequest, realm_str: str) -> HttpResponse:
def get_user_activity(request: HttpRequest, email: str) -> HttpResponse:
records = get_user_activity_records_for_email(email)
data = [] # type: List[Tuple[str, str]]
data: List[Tuple[str, str]] = []
user_summary = get_user_activity_summary(records)
content = user_activity_summary_table(user_summary)

View File

@ -88,11 +88,11 @@ def confirmation_url(confirmation_key: str, host: str,
class Confirmation(models.Model):
content_type = models.ForeignKey(ContentType, on_delete=CASCADE)
object_id = models.PositiveIntegerField(db_index=True) # type: int
object_id: int = models.PositiveIntegerField(db_index=True)
content_object = GenericForeignKey('content_type', 'object_id')
date_sent = models.DateTimeField(db_index=True) # type: datetime.datetime
confirmation_key = models.CharField(max_length=40, db_index=True) # type: str
realm = models.ForeignKey(Realm, null=True, on_delete=CASCADE) # type: Optional[Realm]
date_sent: datetime.datetime = models.DateTimeField(db_index=True)
confirmation_key: str = models.CharField(max_length=40, db_index=True)
realm: Optional[Realm] = models.ForeignKey(Realm, null=True, on_delete=CASCADE)
# The following list is the set of valid types
USER_REGISTRATION = 1
@ -103,7 +103,7 @@ class Confirmation(models.Model):
MULTIUSE_INVITE = 6
REALM_CREATION = 7
REALM_REACTIVATION = 8
type = models.PositiveSmallIntegerField() # type: int
type: int = models.PositiveSmallIntegerField()
def __str__(self) -> str:
return '<Confirmation: %s>' % (self.content_object,)
@ -177,7 +177,7 @@ class RealmCreationKey(models.Model):
# True just if we should presume the email address the user enters
# is theirs, and skip sending mail to it to confirm that.
presume_email_valid = models.BooleanField(default=False) # type: bool
presume_email_valid: bool = models.BooleanField(default=False)
class Invalid(Exception):
pass

View File

@ -398,7 +398,7 @@ def invoice_plan(plan: CustomerPlan, event_time: datetime) -> None:
invoice_item_created = False
for ledger_entry in LicenseLedger.objects.filter(plan=plan, id__gt=plan.invoiced_through.id,
event_time__lte=event_time).order_by('id'):
price_args = {} # type: Dict[str, int]
price_args: Dict[str, int] = {}
if ledger_entry.is_renewal:
if plan.fixed_price is not None:
price_args = {'amount': plan.fixed_price}
@ -423,7 +423,7 @@ def invoice_plan(plan: CustomerPlan, event_time: datetime) -> None:
plan.invoiced_through = ledger_entry
plan.invoicing_status = CustomerPlan.STARTED
plan.save(update_fields=['invoicing_status', 'invoiced_through'])
idempotency_key = 'ledger_entry:{}'.format(ledger_entry.id) # type: Optional[str]
idempotency_key: Optional[str] = 'ledger_entry:{}'.format(ledger_entry.id)
if settings.TEST_SUITE:
idempotency_key = None
stripe.InvoiceItem.create(

View File

@ -8,10 +8,10 @@ from django.db.models import CASCADE
from zerver.models import Realm
class Customer(models.Model):
realm = models.OneToOneField(Realm, on_delete=CASCADE) # type: Realm
stripe_customer_id = models.CharField(max_length=255, null=True, unique=True) # type: str
realm: Realm = models.OneToOneField(Realm, on_delete=CASCADE)
stripe_customer_id: str = models.CharField(max_length=255, null=True, unique=True)
# A percentage, like 85.
default_discount = models.DecimalField(decimal_places=4, max_digits=7, null=True) # type: Optional[Decimal]
default_discount: Optional[Decimal] = models.DecimalField(decimal_places=4, max_digits=7, null=True)
def __str__(self) -> str:
return "<Customer %s %s>" % (self.realm, self.stripe_customer_id)
@ -20,35 +20,35 @@ def get_customer_by_realm(realm: Realm) -> Optional[Customer]:
return Customer.objects.filter(realm=realm).first()
class CustomerPlan(models.Model):
customer = models.ForeignKey(Customer, on_delete=CASCADE) # type: Customer
automanage_licenses = models.BooleanField(default=False) # type: bool
charge_automatically = models.BooleanField(default=False) # type: bool
customer: Customer = models.ForeignKey(Customer, on_delete=CASCADE)
automanage_licenses: bool = models.BooleanField(default=False)
charge_automatically: bool = models.BooleanField(default=False)
# Both of these are in cents. Exactly one of price_per_license or
# fixed_price should be set. fixed_price is only for manual deals, and
# can't be set via the self-serve billing system.
price_per_license = models.IntegerField(null=True) # type: Optional[int]
fixed_price = models.IntegerField(null=True) # type: Optional[int]
price_per_license: Optional[int] = models.IntegerField(null=True)
fixed_price: Optional[int] = models.IntegerField(null=True)
# Discount that was applied. For display purposes only.
discount = models.DecimalField(decimal_places=4, max_digits=6, null=True) # type: Optional[Decimal]
discount: Optional[Decimal] = models.DecimalField(decimal_places=4, max_digits=6, null=True)
billing_cycle_anchor = models.DateTimeField() # type: datetime.datetime
billing_cycle_anchor: datetime.datetime = models.DateTimeField()
ANNUAL = 1
MONTHLY = 2
billing_schedule = models.SmallIntegerField() # type: int
billing_schedule: int = models.SmallIntegerField()
next_invoice_date = models.DateTimeField(db_index=True, null=True) # type: Optional[datetime.datetime]
invoiced_through = models.ForeignKey(
'LicenseLedger', null=True, on_delete=CASCADE, related_name='+') # type: Optional[LicenseLedger]
next_invoice_date: Optional[datetime.datetime] = models.DateTimeField(db_index=True, null=True)
invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
'LicenseLedger', null=True, on_delete=CASCADE, related_name='+')
DONE = 1
STARTED = 2
invoicing_status = models.SmallIntegerField(default=DONE) # type: int
invoicing_status: int = models.SmallIntegerField(default=DONE)
STANDARD = 1
PLUS = 2 # not available through self-serve signup
ENTERPRISE = 10
tier = models.SmallIntegerField() # type: int
tier: int = models.SmallIntegerField()
ACTIVE = 1
DOWNGRADE_AT_END_OF_CYCLE = 2
@ -57,7 +57,7 @@ class CustomerPlan(models.Model):
LIVE_STATUS_THRESHOLD = 10
ENDED = 11
NEVER_STARTED = 12
status = models.SmallIntegerField(default=ACTIVE) # type: int
status: int = models.SmallIntegerField(default=ACTIVE)
# TODO maybe override setattr to ensure billing_cycle_anchor, etc are immutable
@ -72,11 +72,11 @@ def get_current_plan_by_realm(realm: Realm) -> Optional[CustomerPlan]:
return get_current_plan_by_customer(customer)
class LicenseLedger(models.Model):
plan = models.ForeignKey(CustomerPlan, on_delete=CASCADE) # type: CustomerPlan
plan: CustomerPlan = models.ForeignKey(CustomerPlan, on_delete=CASCADE)
# Also True for the initial upgrade.
is_renewal = models.BooleanField(default=False) # type: bool
event_time = models.DateTimeField() # type: datetime.datetime
licenses = models.IntegerField() # type: int
is_renewal: bool = models.BooleanField(default=False)
event_time: datetime.datetime = models.DateTimeField()
licenses: int = models.IntegerField()
# None means the plan does not automatically renew.
# This cannot be None if plan.automanage_licenses.
licenses_at_next_renewal = models.IntegerField(null=True) # type: Optional[int]
licenses_at_next_renewal: Optional[int] = models.IntegerField(null=True)

View File

@ -149,8 +149,9 @@ def normalize_fixture_data(decorated_function: CallableT,
'"%s": 1[5-9][0-9]{8}(?![0-9-])' % (timestamp_field,)
] = '"%s": 1%02d%%07d' % (timestamp_field, i+1)
normalized_values = {pattern: {}
for pattern in pattern_translations.keys()} # type: Dict[str, Dict[str, str]]
normalized_values: Dict[str, Dict[str, str]] = {
pattern: {} for pattern in pattern_translations.keys()
}
for fixture_file in fixture_files_for_function(decorated_function):
with open(fixture_file) as f:
file_content = f.read()
@ -258,10 +259,10 @@ class StripeTestCase(ZulipTestCase):
if realm is not None: # nocoverage: TODO
host_args['HTTP_HOST'] = realm.host
response = self.client_get("/upgrade/", **host_args)
params = {
params: Dict[str, Any] = {
'schedule': 'annual',
'signed_seat_count': self.get_signed_seat_count_from_response(response),
'salt': self.get_salt_from_response(response)} # type: Dict[str, Any]
'salt': self.get_salt_from_response(response)}
if invoice: # send_invoice
params.update({
'billing_modality': 'send_invoice',
@ -1110,10 +1111,10 @@ class RequiresBillingAccessTest(ZulipTestCase):
self.assert_json_error_contains(response, "Must be a billing administrator or an organization")
def test_non_admins_blocked_from_json_endpoints(self) -> None:
params = [
params: List[Tuple[str, Dict[str, Any]]] = [
("/json/billing/sources/change", {'stripe_token': ujson.dumps('token')}),
("/json/billing/plan/change", {'status': ujson.dumps(1)}),
] # type: List[Tuple[str, Dict[str, Any]]]
]
for (url, data) in params:
self.verify_non_admins_blocked_from_endpoint(url, data)

View File

@ -6,7 +6,7 @@ from django.conf.urls import include, url
import corporate.views
from zerver.lib.rest import rest_dispatch
i18n_urlpatterns = [
i18n_urlpatterns: Any = [
# Zephyr/MIT
url(r'^zephyr/$', TemplateView.as_view(template_name='corporate/zephyr.html')),
url(r'^zephyr-mirror/$', TemplateView.as_view(template_name='corporate/zephyr-mirror.html')),
@ -16,7 +16,7 @@ i18n_urlpatterns = [
# Billing
url(r'^billing/$', corporate.views.billing_home, name='corporate.views.billing_home'),
url(r'^upgrade/$', corporate.views.initial_upgrade, name='corporate.views.initial_upgrade'),
] # type: Any
]
v1_api_and_json_patterns = [
url(r'^billing/upgrade$', rest_dispatch,

View File

@ -57,7 +57,7 @@ def check_upgrade_parameters(
# Should only be called if the customer is being charged automatically
def payment_method_string(stripe_customer: stripe.Customer) -> str:
stripe_source = stripe_customer.default_source # type: Optional[Union[stripe.Card, stripe.Source]]
stripe_source: Optional[Union[stripe.Card, stripe.Source]] = stripe_customer.default_source
# In case of e.g. an expired card
if stripe_source is None: # nocoverage
return _("No payment method on file")
@ -128,7 +128,7 @@ def initial_upgrade(request: HttpRequest) -> HttpResponse:
seat_count = get_latest_seat_count(user.realm)
signed_seat_count, salt = sign_string(str(seat_count))
context = {
context: Dict[str, Any] = {
'publishable_key': STRIPE_PUBLISHABLE_KEY,
'email': user.delivery_email,
'seat_count': seat_count,
@ -143,7 +143,7 @@ def initial_upgrade(request: HttpRequest) -> HttpResponse:
'monthly_price': 800,
'percent_off': float(percent_off),
},
} # type: Dict[str, Any]
}
response = render(request, 'corporate/upgrade.html', context=context)
return response
@ -157,7 +157,7 @@ def billing_home(request: HttpRequest) -> HttpResponse:
return HttpResponseRedirect(reverse('corporate.views.initial_upgrade'))
if not user.is_realm_admin and not user.is_billing_admin:
context = {'admin_access': False} # type: Dict[str, Any]
context: Dict[str, Any] = {'admin_access': False}
return render(request, 'corporate/billing.html', context=context)
context = {

View File

@ -32,8 +32,8 @@ from version import ZULIP_VERSION
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
] # type: List[str]
extensions: List[str] = [
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
@ -63,7 +63,7 @@ release = ZULIP_VERSION
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None # type: Optional[str]
language: Optional[str] = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
@ -217,7 +217,7 @@ htmlhelp_basename = 'zulip-contributor-docsdoc'
# -- Options for LaTeX output ---------------------------------------------
latex_elements = {
latex_elements: Dict[str, str] = {
# The paper size ('letterpaper' or 'a4paper').
#'papersize': 'letterpaper',
@ -229,7 +229,7 @@ latex_elements = {
# Latex figure (float) alignment
#'figure_align': 'htbp',
} # type: Dict[str, str]
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,

View File

@ -104,15 +104,15 @@ def run_tests(files: Iterable[str], external_host: str) -> None:
else:
loop_cnt = None
remote_debug = [] # type: List[str]
remote_debug: List[str] = []
if options.remote_debug:
remote_debug = ["--remote-debugger-port=7777", "--remote-debugger-autorun=yes"]
verbose = [] # type: List[str]
verbose: List[str] = []
if options.verbose:
verbose = ["--verbose", "--log-level=debug"]
xunit_export = [] # type: List[str]
xunit_export: List[str] = []
if options.xunit_export:
xunit_export = ["--xunit=var/xunit-test-results/casper/result.xml"]

View File

@ -12,7 +12,7 @@ def flush_cache(sender: AppConfig, **kwargs: Any) -> None:
class ZerverConfig(AppConfig):
name = "zerver" # type: str
name: str = "zerver"
def ready(self) -> None:
# We import zerver.signals here for the side effect of

View File

@ -152,14 +152,14 @@ def login_context(request: HttpRequest) -> Dict[str, Any]:
realm_description = get_realm_rendered_description(realm)
realm_invite_required = realm.invite_required
context = {
context: Dict[str, Any] = {
'realm_invite_required': realm_invite_required,
'realm_description': realm_description,
'require_email_format_usernames': require_email_format_usernames(realm),
'password_auth_enabled': password_auth_enabled(realm),
'any_social_backend_enabled': any_social_backend_enabled(realm),
'two_factor_authentication_enabled': settings.TWO_FACTOR_AUTHENTICATION_ENABLED,
} # type: Dict[str, Any]
}
if realm is not None and realm.description:
context['OPEN_GRAPH_TITLE'] = realm.name

View File

@ -32,7 +32,7 @@ def gitter_workspace_to_realm(domain_name: str, gitter_data: GitterDataT,
3. user_map, which is a dictionary to map from gitter user id to zulip user id
"""
NOW = float(timezone_now().timestamp())
zerver_realm = build_zerver_realm(realm_id, realm_subdomain, NOW, 'Gitter') # type: List[ZerverFieldsT]
zerver_realm: List[ZerverFieldsT] = build_zerver_realm(realm_id, realm_subdomain, NOW, 'Gitter')
realm = build_realm(zerver_realm, realm_id, domain_name)
zerver_userprofile, avatars, user_map = build_userprofile(int(NOW), domain_name, gitter_data)
@ -60,8 +60,8 @@ def build_userprofile(timestamp: Any, domain_name: str,
"""
logging.info('######### IMPORTING USERS STARTED #########\n')
zerver_userprofile = []
avatar_list = [] # type: List[ZerverFieldsT]
user_map = {} # type: Dict[str, int]
avatar_list: List[ZerverFieldsT] = []
user_map: Dict[str, int] = {}
user_id = 0
for data in gitter_data:
@ -169,7 +169,7 @@ def convert_gitter_workspace_messages(gitter_data: GitterDataT, output_dir: str,
while True:
message_json = {}
zerver_message = []
zerver_usermessage = [] # type: List[ZerverFieldsT]
zerver_usermessage: List[ZerverFieldsT] = []
message_data = gitter_data[low_index: upper_index]
if len(message_data) == 0:
break
@ -262,7 +262,7 @@ def do_convert_data(gitter_data_file: str, output_dir: str, threads: int=6) -> N
os.makedirs(avatar_realm_folder, exist_ok=True)
avatar_records = process_avatars(avatar_list, avatar_folder, realm_id, threads)
attachment = {"zerver_attachment": []} # type: Dict[str, List[Any]]
attachment: Dict[str, List[Any]] = {"zerver_attachment": []}
# IO realm.json
create_converted_data_files(realm, output_dir, '/realm.json')

View File

@ -244,11 +244,11 @@ def convert_room_data(raw_data: List[ZerverFieldsT],
)
if invite_only:
users = {
users: Set[int] = {
user_id_mapper.get(key)
for key in in_dict['members']
if user_id_mapper.has(key)
} # type: Set[int]
}
if user_id_mapper.has(in_dict['owner']):
owner = user_id_mapper.get(in_dict['owner'])
@ -671,7 +671,7 @@ def process_raw_message_batch(realm_id: int,
content = content.replace('@here', '@**all**')
return content
mention_map = dict() # type: Dict[int, Set[int]]
mention_map: Dict[int, Set[int]] = dict()
zerver_message = []
@ -807,7 +807,7 @@ def do_convert_data(input_tar_file: str,
if api_token is None:
if slim_mode:
public_stream_subscriptions = [] # type: List[ZerverFieldsT]
public_stream_subscriptions: List[ZerverFieldsT] = []
else:
public_stream_subscriptions = build_public_stream_subscriptions(
zerver_userprofile=normal_users,

View File

@ -11,7 +11,7 @@ from typing import Any, Dict, List, Optional
class AttachmentHandler:
def __init__(self) -> None:
self.info_dict = dict() # type: Dict[str, Dict[str, Any]]
self.info_dict: Dict[str, Dict[str, Any]] = dict()
def handle_message_data(self,
realm_id: int,
@ -77,8 +77,8 @@ class AttachmentHandler:
return content
def write_info(self, output_dir: str, realm_id: int) -> None:
attachments = [] # type: List[Dict[str, Any]]
uploads_records = [] # type: List[Dict[str, Any]]
attachments: List[Dict[str, Any]] = []
uploads_records: List[Dict[str, Any]] = []
def add_attachment(info: Dict[str, Any]) -> None:
build_attachment(

View File

@ -23,8 +23,8 @@ class UserHandler:
'''
def __init__(self) -> None:
self.id_to_user_map = dict() # type: Dict[int, Dict[str, Any]]
self.name_to_mirror_user_map = dict() # type: Dict[str, Dict[str, Any]]
self.id_to_user_map: Dict[int, Dict[str, Any]] = dict()
self.name_to_mirror_user_map: Dict[str, Dict[str, Any]] = dict()
self.mirror_user_id = 1
def add_user(self, user: Dict[str, Any]) -> None:

View File

@ -21,8 +21,8 @@ ZerverFieldsT = Dict[str, Any]
class SubscriberHandler:
def __init__(self) -> None:
self.stream_info = dict() # type: Dict[int, Set[int]]
self.huddle_info = dict() # type: Dict[int, Set[int]]
self.stream_info: Dict[int, Set[int]] = dict()
self.huddle_info: Dict[int, Set[int]] = dict()
def set_info(self,
users: Set[int],
@ -105,7 +105,7 @@ def make_subscriber_map(zerver_subscription: List[ZerverFieldsT]) -> Dict[int, S
This can be convenient for building up UserMessage
rows.
'''
subscriber_map = dict() # type: Dict[int, Set[int]]
subscriber_map: Dict[int, Set[int]] = dict()
for sub in zerver_subscription:
user_id = sub['user_profile']
recipient_id = sub['recipient']
@ -163,7 +163,7 @@ def build_public_stream_subscriptions(
users to every public stream. This returns a list of Subscription
dicts.
'''
subscriptions = [] # type: List[ZerverFieldsT]
subscriptions: List[ZerverFieldsT] = []
public_stream_ids = {
stream['id']
@ -199,7 +199,7 @@ def build_stream_subscriptions(
zerver_recipient: List[ZerverFieldsT],
zerver_stream: List[ZerverFieldsT]) -> List[ZerverFieldsT]:
subscriptions = [] # type: List[ZerverFieldsT]
subscriptions: List[ZerverFieldsT] = []
stream_ids = {stream['id'] for stream in zerver_stream}
@ -227,7 +227,7 @@ def build_huddle_subscriptions(
zerver_recipient: List[ZerverFieldsT],
zerver_huddle: List[ZerverFieldsT]) -> List[ZerverFieldsT]:
subscriptions = [] # type: List[ZerverFieldsT]
subscriptions: List[ZerverFieldsT] = []
huddle_ids = {huddle['id'] for huddle in zerver_huddle}
@ -252,7 +252,7 @@ def build_huddle_subscriptions(
def build_personal_subscriptions(zerver_recipient: List[ZerverFieldsT]) -> List[ZerverFieldsT]:
subscriptions = [] # type: List[ZerverFieldsT]
subscriptions: List[ZerverFieldsT] = []
personal_recipients = [
recipient
@ -586,7 +586,7 @@ def run_parallel_wrapper(f: Callable[[ListJobData], None], full_items: List[List
if count % 1000 == 0:
logging.info("A download thread finished %s items" % (count,))
return 0
job_lists = [full_items[i::threads] for i in range(threads)] # type: List[List[ListJobData]]
job_lists: List[List[ListJobData]] = [full_items[i::threads] for i in range(threads)]
return run_parallel(wrapping_function, job_lists, threads=threads)
def process_uploads(upload_list: List[ZerverFieldsT], upload_dir: str,

View File

@ -124,8 +124,8 @@ def convert_channel_data(channel_data: List[ZerverFieldsT],
if d['team'] == team_name
]
channel_members_map = {} # type: Dict[str, List[str]]
channel_admins_map = {} # type: Dict[str, List[str]]
channel_members_map: Dict[str, List[str]] = {}
channel_admins_map: Dict[str, List[str]] = {}
def initialize_stream_membership_dicts() -> None:
for channel in channel_data:
@ -310,7 +310,7 @@ def process_raw_message_batch(realm_id: int,
content = content.replace('@here', '@**all**')
return content
mention_map = dict() # type: Dict[int, Set[int]]
mention_map: Dict[int, Set[int]] = dict()
zerver_message = []
import html2text
@ -672,7 +672,7 @@ def reset_mirror_dummy_users(username_to_user: Dict[str, Dict[str, Any]]) -> Non
user["is_mirror_dummy"] = False
def mattermost_data_file_to_dict(mattermost_data_file: str) -> Dict[str, Any]:
mattermost_data = {} # type: Dict[str, Any]
mattermost_data: Dict[str, Any] = {}
mattermost_data["version"] = []
mattermost_data["team"] = []
mattermost_data["channel"] = []
@ -694,7 +694,7 @@ def mattermost_data_file_to_dict(mattermost_data_file: str) -> Dict[str, Any]:
return mattermost_data
def do_convert_data(mattermost_data_dir: str, output_dir: str, masking_content: bool) -> None:
username_to_user = {} # type: Dict[str, Dict[str, Any]]
username_to_user: Dict[str, Dict[str, Any]] = {}
os.makedirs(output_dir, exist_ok=True)
if os.listdir(output_dir): # nocoverage
@ -741,7 +741,7 @@ def do_convert_data(mattermost_data_dir: str, output_dir: str, masking_content:
)
realm['zerver_stream'] = zerver_stream
zerver_huddle = [] # type: List[ZerverFieldsT]
zerver_huddle: List[ZerverFieldsT] = []
if len(mattermost_data["team"]) == 1:
zerver_huddle = convert_huddle_data(
huddle_data=mattermost_data["direct_channel"],
@ -796,7 +796,7 @@ def do_convert_data(mattermost_data_dir: str, output_dir: str, masking_content:
zerver_subscription=zerver_subscription,
)
total_reactions = [] # type: List[Dict[str, Any]]
total_reactions: List[Dict[str, Any]] = []
write_message_data(
num_teams=len(mattermost_data["team"]),
team_name=team_name,
@ -825,7 +825,7 @@ def do_convert_data(mattermost_data_dir: str, output_dir: str, masking_content:
create_converted_data_files([], realm_output_dir, '/uploads/records.json')
# Mattermost currently doesn't support exporting attachments
attachment = {"zerver_attachment": []} # type: Dict[str, List[Any]]
attachment: Dict[str, List[Any]] = {"zerver_attachment": []}
create_converted_data_files(attachment, realm_output_dir, '/attachment.json')
logging.info('Start making tarball')

View File

@ -11,7 +11,7 @@ class UserHandler:
'''
def __init__(self) -> None:
self.id_to_user_map = dict() # type: Dict[int, Dict[str, Any]]
self.id_to_user_map: Dict[int, Dict[str, Any]] = dict()
def add_user(self, user: Dict[str, Any]) -> None:
user_id = user['id']

View File

@ -28,7 +28,7 @@ def sequencer() -> Callable[[str], int]:
NEXT_ID = sequencer()
message_id = NEXT_ID('message')
'''
seq_dict = dict() # type: Dict[str, Callable[[], int]]
seq_dict: Dict[str, Callable[[], int]] = dict()
def next_one(name: str) -> int:
if name not in seq_dict:
@ -59,7 +59,7 @@ def is_int(key: Any) -> bool:
class IdMapper:
def __init__(self) -> None:
self.map = dict() # type: Dict[Any, int]
self.map: Dict[Any, int] = dict()
self.cnt = 0
def has(self, their_id: Any) -> bool:

View File

@ -62,7 +62,7 @@ def slack_workspace_to_realm(domain_name: str, realm_id: int, user_list: List[Ze
"""
NOW = float(timezone_now().timestamp())
zerver_realm = build_zerver_realm(realm_id, realm_subdomain, NOW, 'Slack') # type: List[ZerverFieldsT]
zerver_realm: List[ZerverFieldsT] = build_zerver_realm(realm_id, realm_subdomain, NOW, 'Slack')
realm = build_realm(zerver_realm, realm_id, domain_name)
zerver_userprofile, avatars, slack_user_id_to_zulip_user_id, zerver_customprofilefield, \
@ -127,17 +127,17 @@ def users_to_zerver_userprofile(slack_data_dir: str, users: List[ZerverFieldsT],
"""
logging.info('######### IMPORTING USERS STARTED #########\n')
zerver_userprofile = []
zerver_customprofilefield = [] # type: List[ZerverFieldsT]
zerver_customprofilefield_values = [] # type: List[ZerverFieldsT]
avatar_list = [] # type: List[ZerverFieldsT]
zerver_customprofilefield: List[ZerverFieldsT] = []
zerver_customprofilefield_values: List[ZerverFieldsT] = []
avatar_list: List[ZerverFieldsT] = []
slack_user_id_to_zulip_user_id = {}
# The user data we get from the slack api does not contain custom profile data
# Hence we get it from the slack zip file
slack_data_file_user_list = get_data_file(slack_data_dir + '/users.json')
slack_user_id_to_custom_profile_fields = {} # type: ZerverFieldsT
slack_custom_field_name_to_zulip_custom_field_id = {} # type: ZerverFieldsT
slack_user_id_to_custom_profile_fields: ZerverFieldsT = {}
slack_custom_field_name_to_zulip_custom_field_id: ZerverFieldsT = {}
for user in slack_data_file_user_list:
process_slack_custom_fields(user, slack_user_id_to_custom_profile_fields)
@ -498,8 +498,8 @@ def process_long_term_idle_users(slack_data_dir: str, users: List[ZerverFieldsT]
"""
all_messages = get_messages_iterator(slack_data_dir, added_channels, added_mpims, dm_members)
sender_counts = defaultdict(int) # type: Dict[str, int]
recent_senders = set() # type: Set[str]
sender_counts: Dict[str, int] = defaultdict(int)
recent_senders: Set[str] = set()
NOW = float(timezone_now().timestamp())
for message in all_messages:
timestamp = float(message['ts'])
@ -563,9 +563,9 @@ def convert_slack_workspace_messages(slack_data_dir: str, users: List[ZerverFiel
all_messages = get_messages_iterator(slack_data_dir, added_channels, added_mpims, dm_members)
logging.info('######### IMPORTING MESSAGES STARTED #########\n')
total_reactions = [] # type: List[ZerverFieldsT]
total_attachments = [] # type: List[ZerverFieldsT]
total_uploads = [] # type: List[ZerverFieldsT]
total_reactions: List[ZerverFieldsT] = []
total_attachments: List[ZerverFieldsT] = []
total_uploads: List[ZerverFieldsT] = []
dump_file_id = 1
@ -615,7 +615,7 @@ def get_messages_iterator(slack_data_dir: str, added_channels: Dict[str, Any],
large imports that can OOM kill."""
dir_names = list(added_channels.keys()) + list(added_mpims.keys()) + list(dm_members.keys())
all_json_names = defaultdict(list) # type: Dict[str, List[str]]
all_json_names: Dict[str, List[str]] = defaultdict(list)
for dir_name in dir_names:
dir_path = os.path.join(slack_data_dir, dir_name)
json_names = os.listdir(dir_path)
@ -624,7 +624,7 @@ def get_messages_iterator(slack_data_dir: str, added_channels: Dict[str, Any],
# Sort json_name by date
for json_name in sorted(all_json_names.keys()):
messages_for_one_day = [] # type: List[ZerverFieldsT]
messages_for_one_day: List[ZerverFieldsT] = []
for dir_path in all_json_names[json_name]:
message_dir = os.path.join(dir_path, json_name)
dir_name = os.path.basename(dir_path)
@ -675,10 +675,10 @@ def channel_message_to_zerver_message(realm_id: int,
5. reaction_list, which is a list of all user reactions
"""
zerver_message = []
zerver_usermessage = [] # type: List[ZerverFieldsT]
uploads_list = [] # type: List[ZerverFieldsT]
zerver_attachment = [] # type: List[ZerverFieldsT]
reaction_list = [] # type: List[ZerverFieldsT]
zerver_usermessage: List[ZerverFieldsT] = []
uploads_list: List[ZerverFieldsT] = []
zerver_attachment: List[ZerverFieldsT] = []
reaction_list: List[ZerverFieldsT] = []
total_user_messages = 0
total_skipped_user_messages = 0
@ -947,7 +947,7 @@ def fetch_shared_channel_users(user_list: List[ZerverFieldsT], slack_data_dir: s
normal_user_ids = set()
mirror_dummy_user_ids = set()
added_channels = {}
team_id_to_domain = {} # type: Dict[str, str]
team_id_to_domain: Dict[str, str] = {}
for user in user_list:
user["is_mirror_dummy"] = False
normal_user_ids.add(user["id"])

View File

@ -61,7 +61,7 @@ log_to_file(webhook_unexpected_events_logger,
settings.WEBHOOK_UNEXPECTED_EVENTS_LOG_PATH)
def cachify(method: Callable[..., ReturnT]) -> Callable[..., ReturnT]:
dct = {} # type: Dict[Tuple[Any, ...], ReturnT]
dct: Dict[Tuple[Any, ...], ReturnT] = {}
def cache_wrapper(*args: Any) -> ReturnT:
tup = tuple(args)
@ -131,7 +131,7 @@ def get_client_name(request: HttpRequest) -> str:
if 'client' in request.POST:
return request.POST['client']
if "HTTP_USER_AGENT" in request.META:
user_agent = parse_user_agent(request.META["HTTP_USER_AGENT"]) # type: Optional[Dict[str, str]]
user_agent: Optional[Dict[str, str]] = parse_user_agent(request.META["HTTP_USER_AGENT"])
else:
user_agent = None
if user_agent is not None:
@ -167,7 +167,7 @@ class InvalidZulipServerError(JsonableError):
data_fields = ['role']
def __init__(self, role: str) -> None:
self.role = role # type: str
self.role: str = role
@staticmethod
def msg_format() -> str:

View File

@ -264,7 +264,7 @@ class ZulipPasswordResetForm(PasswordResetForm):
logging.info("Too many password reset attempts for email %s" % (email,))
return
user = None # type: Optional[UserProfile]
user: Optional[UserProfile] = None
try:
user = get_user_by_delivery_email(email, realm)
except UserProfile.DoesNotExist:
@ -333,7 +333,7 @@ class OurAuthenticationForm(AuthenticationForm):
(username, subdomain))
raise ValidationError("Realm does not exist")
return_data = {} # type: Dict[str, Any]
return_data: Dict[str, Any] = {}
try:
self.user_cache = authenticate(request=self.request, username=username, password=password,
realm=realm, return_data=return_data)

View File

@ -363,7 +363,7 @@ def process_new_human_user(user_profile: UserProfile,
mit_beta_user = user_profile.realm.is_zephyr_mirror_realm
if prereg_user is not None:
streams = prereg_user.streams.all()
acting_user = prereg_user.referred_by # type: Optional[UserProfile]
acting_user: Optional[UserProfile] = prereg_user.referred_by
else:
streams = []
acting_user = None
@ -447,7 +447,7 @@ def notify_created_user(user_profile: UserProfile) -> None:
# values are expected to be added in a
# later event.
custom_profile_field_data={})
event = dict(type="realm_user", op="add", person=person) # type: Dict[str, Any]
event: Dict[str, Any] = dict(type="realm_user", op="add", person=person)
send_event(user_profile.realm, event, active_user_ids(user_profile.realm_id))
def created_bot_event(user_profile: UserProfile) -> Dict[str, Any]:
@ -992,9 +992,9 @@ def get_recipient_info(recipient: Recipient,
stream_topic: Optional[StreamTopicTarget],
possibly_mentioned_user_ids: Optional[Set[int]]=None,
possible_wildcard_mention: bool=True) -> RecipientInfoResult:
stream_push_user_ids = set() # type: Set[int]
stream_email_user_ids = set() # type: Set[int]
wildcard_mention_user_ids = set() # type: Set[int]
stream_push_user_ids: Set[int] = set()
stream_email_user_ids: Set[int] = set()
wildcard_mention_user_ids: Set[int] = set()
if recipient.type == Recipient.PERSONAL:
# The sender and recipient may be the same id, so
@ -1174,7 +1174,7 @@ def get_recipient_info(recipient: Recipient,
if is_service_bot(row)
]
info = dict(
info: RecipientInfoResult = dict(
active_user_ids=active_user_ids,
push_notify_user_ids=push_notify_user_ids,
stream_push_user_ids=stream_push_user_ids,
@ -1184,14 +1184,14 @@ def get_recipient_info(recipient: Recipient,
long_term_idle_user_ids=long_term_idle_user_ids,
default_bot_user_ids=default_bot_user_ids,
service_bot_tuples=service_bot_tuples
) # type: RecipientInfoResult
)
return info
def get_service_bot_events(sender: UserProfile, service_bot_tuples: List[Tuple[int, int]],
mentioned_user_ids: Set[int], active_user_ids: Set[int],
recipient_type: int) -> Dict[str, List[Dict[str, Any]]]:
event_dict = defaultdict(list) # type: Dict[str, List[Dict[str, Any]]]
event_dict: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
# Avoid infinite loops by preventing messages sent by bots from generating
# Service events.
@ -1247,7 +1247,7 @@ def get_service_bot_events(sender: UserProfile, service_bot_tuples: List[Tuple[i
return event_dict
def do_schedule_messages(messages: Sequence[Mapping[str, Any]]) -> List[int]:
scheduled_messages = [] # type: List[ScheduledMessage]
scheduled_messages: List[ScheduledMessage] = []
for message in messages:
scheduled_message = ScheduledMessage()
@ -1283,8 +1283,8 @@ def do_send_messages(messages_maybe_none: Sequence[Optional[MutableMapping[str,
messages = [message for message in messages_maybe_none if message is not None]
# Filter out zephyr mirror anomalies where the message was already sent
already_sent_ids = [] # type: List[int]
new_messages = [] # type: List[MutableMapping[str, Any]]
already_sent_ids: List[int] = []
new_messages: List[MutableMapping[str, Any]] = []
for message in messages:
if isinstance(message['message'], int):
already_sent_ids.append(message['message'])
@ -1292,7 +1292,7 @@ def do_send_messages(messages_maybe_none: Sequence[Optional[MutableMapping[str,
new_messages.append(message)
messages = new_messages
links_for_embed = set() # type: Set[str]
links_for_embed: Set[str] = set()
# For consistency, changes to the default values for these gets should also be applied
# to the default args in do_send_message
for message in messages:
@ -1310,10 +1310,10 @@ def do_send_messages(messages_maybe_none: Sequence[Optional[MutableMapping[str,
if message['message'].is_stream_message():
stream_id = message['message'].recipient.type_id
stream_topic = StreamTopicTarget(
stream_topic: Optional[StreamTopicTarget] = StreamTopicTarget(
stream_id=stream_id,
topic_name=message['message'].topic_name()
) # type: Optional[StreamTopicTarget]
)
else:
stream_topic = None
@ -1375,7 +1375,7 @@ def do_send_messages(messages_maybe_none: Sequence[Optional[MutableMapping[str,
message['um_eligible_user_ids'] |= mentioned_bot_user_ids
# Save the message receipts in the database
user_message_flags = defaultdict(dict) # type: Dict[int, Dict[int, List[str]]]
user_message_flags: Dict[int, Dict[int, List[str]]] = defaultdict(dict)
with transaction.atomic():
Message.objects.bulk_create([message['message'] for message in messages])
@ -1386,7 +1386,7 @@ def do_send_messages(messages_maybe_none: Sequence[Optional[MutableMapping[str,
message['message'].has_attachment = True
message['message'].save(update_fields=['has_attachment'])
ums = [] # type: List[UserMessageLite]
ums: List[UserMessageLite] = []
for message in messages:
# Service bots (outgoing webhook bots and embedded bots) don't store UserMessage rows;
# they will be processed later.
@ -1661,13 +1661,15 @@ def notify_reaction_update(user_profile: UserProfile, message: Message,
'email': user_profile.email,
'full_name': user_profile.full_name}
event = {'type': 'reaction',
'op': op,
'user': user_dict,
'message_id': message.id,
'emoji_name': reaction.emoji_name,
'emoji_code': reaction.emoji_code,
'reaction_type': reaction.reaction_type} # type: Dict[str, Any]
event: Dict[str, Any] = {
'type': 'reaction',
'op': op,
'user': user_dict,
'message_id': message.id,
'emoji_name': reaction.emoji_name,
'emoji_code': reaction.emoji_code,
'reaction_type': reaction.reaction_type,
}
# Update the cached message since new reaction is added.
update_to_dict_cache([message])
@ -1860,13 +1862,13 @@ def get_recipient_from_user_profiles(recipient_profiles: Sequence[UserProfile],
# Otherwise, we need a huddle. Make sure the sender is included in huddle messages
recipient_profiles_map[sender.id] = sender
user_ids = {user_id for user_id in recipient_profiles_map} # type: Set[int]
user_ids: Set[int] = {user_id for user_id in recipient_profiles_map}
return get_huddle_recipient(user_ids)
def validate_recipient_user_profiles(user_profiles: Sequence[UserProfile],
sender: UserProfile,
allow_deactivated: bool=False) -> Sequence[UserProfile]:
recipient_profiles_map = {} # type: Dict[int, UserProfile]
recipient_profiles_map: Dict[int, UserProfile] = {}
# We exempt cross-realm bots from the check that all the recipients
# are in the same realm.
@ -1896,7 +1898,7 @@ def user_ids_for_emails(
we still have to support mobile sending emails
in typing notifications.
'''
user_ids = [] # type: List[int]
user_ids: List[int] = []
for email in emails:
try:
user_profile = get_user_including_cross_realm(email, realm)
@ -2537,7 +2539,7 @@ def bulk_get_subscriber_user_ids(stream_dicts: Iterable[Mapping[str, Any]],
for stream_id in stream_ids
])
result = {stream["id"]: [] for stream in stream_dicts} # type: Dict[int, List[int]]
result: Dict[int, List[int]] = {stream["id"]: [] for stream in stream_dicts}
if not recipient_ids:
return result
@ -2687,7 +2689,7 @@ def get_user_ids_for_streams(streams: Iterable[Stream]) -> Dict[int, List[int]]:
get_stream_id = itemgetter('recipient__type_id')
all_subscribers_by_stream = defaultdict(list) # type: Dict[int, List[int]]
all_subscribers_by_stream: Dict[int, List[int]] = defaultdict(list)
for stream_id, rows in itertools.groupby(all_subs, get_stream_id):
user_ids = [row['user_profile_id'] for row in rows]
all_subscribers_by_stream[stream_id] = user_ids
@ -2714,25 +2716,25 @@ def bulk_add_subscriptions(streams: Iterable[Stream],
acting_user: Optional[UserProfile]=None) -> SubT:
users = list(users)
recipients_map = {stream.id: stream.recipient_id for stream in streams} # type: Dict[int, int]
recipient_ids = [recipient_id for recipient_id in recipients_map.values()] # type: List[int]
recipients_map: Dict[int, int] = {stream.id: stream.recipient_id for stream in streams}
recipient_ids: List[int] = [recipient_id for recipient_id in recipients_map.values()]
stream_map = {} # type: Dict[int, Stream]
stream_map: Dict[int, Stream] = {}
for stream in streams:
stream_map[recipients_map[stream.id]] = stream
subs_by_user = defaultdict(list) # type: Dict[int, List[Subscription]]
subs_by_user: Dict[int, List[Subscription]] = defaultdict(list)
all_subs_query = get_stream_subscriptions_for_users(users).select_related('user_profile')
for sub in all_subs_query:
subs_by_user[sub.user_profile_id].append(sub)
realm = users[0].realm
already_subscribed = [] # type: List[Tuple[UserProfile, Stream]]
subs_to_activate = [] # type: List[Tuple[Subscription, Stream]]
new_subs = [] # type: List[Tuple[UserProfile, int, Stream]]
already_subscribed: List[Tuple[UserProfile, Stream]] = []
subs_to_activate: List[Tuple[Subscription, Stream]] = []
new_subs: List[Tuple[UserProfile, int, Stream]] = []
for user_profile in users:
needs_new_sub = set(recipient_ids) # type: Set[int]
needs_new_sub: Set[int] = set(recipient_ids)
for sub in subs_by_user[user_profile.id]:
if sub.recipient_id in needs_new_sub:
needs_new_sub.remove(sub.recipient_id)
@ -2747,7 +2749,7 @@ def bulk_add_subscriptions(streams: Iterable[Stream],
for recipient_id in needs_new_sub:
new_subs.append((user_profile, recipient_id, stream_map[recipient_id]))
subs_to_add = [] # type: List[Tuple[Subscription, Stream]]
subs_to_add: List[Tuple[Subscription, Stream]] = []
for (user_profile, recipient_id, stream) in new_subs:
if color_map is not None and stream.name in color_map:
color = color_map[stream.name]
@ -2772,7 +2774,7 @@ def bulk_add_subscriptions(streams: Iterable[Stream],
event_time = timezone_now()
event_last_message_id = get_last_message_id()
all_subscription_logs = [] # type: (List[RealmAuditLog])
all_subscription_logs: (List[RealmAuditLog]) = []
for (sub, stream) in subs_to_add:
all_subscription_logs.append(RealmAuditLog(realm=realm,
acting_user=acting_user,
@ -2814,8 +2816,8 @@ def bulk_add_subscriptions(streams: Iterable[Stream],
user_ids = all_subscribers_by_stream[stream.id]
return user_ids
sub_tuples_by_user = defaultdict(list) # type: Dict[int, List[Tuple[Subscription, Stream]]]
new_streams = set() # type: Set[Tuple[int, int]]
sub_tuples_by_user: Dict[int, List[Tuple[Subscription, Stream]]] = defaultdict(list)
new_streams: Set[Tuple[int, int]] = set()
for (sub, stream) in subs_to_add + subs_to_activate:
sub_tuples_by_user[sub.user_profile.id].append((sub, stream))
new_streams.add((sub.user_profile.id, stream.id))
@ -2918,7 +2920,7 @@ def bulk_remove_subscriptions(users: Iterable[UserProfile],
def get_non_subscribed_tups() -> List[Tuple[UserProfile, Stream]]:
stream_ids = {stream.id for stream in streams}
not_subscribed = [] # type: List[Tuple[UserProfile, Stream]]
not_subscribed: List[Tuple[UserProfile, Stream]] = []
for user_profile in users:
user_sub_stream_info = existing_subs_by_user[user_profile.id]
@ -2937,8 +2939,8 @@ def bulk_remove_subscriptions(users: Iterable[UserProfile],
not_subscribed = get_non_subscribed_tups()
subs_to_deactivate = [] # type: List[Tuple[Subscription, Stream]]
sub_ids_to_deactivate = [] # type: List[int]
subs_to_deactivate: List[Tuple[Subscription, Stream]] = []
sub_ids_to_deactivate: List[int] = []
# This loop just flattens out our data into big lists for
# bulk operations.
@ -2961,7 +2963,7 @@ def bulk_remove_subscriptions(users: Iterable[UserProfile],
# Log Subscription Activities in RealmAuditLog
event_time = timezone_now()
event_last_message_id = get_last_message_id()
all_subscription_logs = [] # type: (List[RealmAuditLog])
all_subscription_logs: (List[RealmAuditLog]) = []
for (sub, stream) in subs_to_deactivate:
all_subscription_logs.append(RealmAuditLog(realm=sub.user_profile.realm,
modified_user=sub.user_profile,
@ -2972,8 +2974,8 @@ def bulk_remove_subscriptions(users: Iterable[UserProfile],
# Now since we have all log objects generated we can do a bulk insert
RealmAuditLog.objects.bulk_create(all_subscription_logs)
altered_user_dict = defaultdict(list) # type: Dict[int, List[UserProfile]]
streams_by_user = defaultdict(list) # type: Dict[int, List[Stream]]
altered_user_dict: Dict[int, List[UserProfile]] = defaultdict(list)
streams_by_user: Dict[int, List[Stream]] = defaultdict(list)
for (sub, stream) in subs_to_deactivate:
streams_by_user[sub.user_profile_id].append(stream)
altered_user_dict[stream.id].append(sub.user_profile)
@ -3179,14 +3181,14 @@ def do_change_bot_owner(user_profile: UserProfile, bot_owner: UserProfile,
# Since `bot_owner_id` is included in the user profile dict we need
# to update the users dict with the new bot owner id
event = dict(
event: Dict[str, Any] = dict(
type="realm_user",
op="update",
person=dict(
user_id=user_profile.id,
bot_owner_id=user_profile.bot_owner.id,
),
) # type: Dict[str, Any]
)
send_event(user_profile.realm, event, active_user_ids(user_profile.realm_id))
def do_change_tos_version(user_profile: UserProfile, tos_version: str) -> None:
@ -3354,7 +3356,7 @@ def do_change_default_sending_stream(user_profile: UserProfile, stream: Optional
'stream': str(stream)})
if user_profile.is_bot:
if stream:
stream_name = stream.name # type: Optional[str]
stream_name: Optional[str] = stream.name
else:
stream_name = None
send_event(user_profile.realm,
@ -3377,7 +3379,7 @@ def do_change_default_events_register_stream(user_profile: UserProfile,
'stream': str(stream)})
if user_profile.is_bot:
if stream:
stream_name = stream.name # type: Optional[str]
stream_name: Optional[str] = stream.name
else:
stream_name = None
send_event(user_profile.realm,
@ -3611,7 +3613,7 @@ def do_create_realm(string_id: str, name: str,
logging.info("Server not yet initialized. Creating the internal realm first.")
create_internal_realm()
kwargs = {} # type: Dict[str, Any]
kwargs: Dict[str, Any] = {}
if emails_restricted_to_domains is not None:
kwargs['emails_restricted_to_domains'] = emails_restricted_to_domains
realm = Realm(string_id=string_id, name=name, **kwargs)
@ -4267,7 +4269,7 @@ def update_user_message_flags(message: Message, ums: Iterable[UserMessage]) -> N
wildcard = message.mentions_wildcard
mentioned_ids = message.mentions_user_ids
ids_with_alert_words = message.user_ids_with_alert_words
changed_ums = set() # type: Set[UserMessage]
changed_ums: Set[UserMessage] = set()
def update_flag(um: UserMessage, should_set: bool, flag: int) -> None:
if should_set:
@ -4311,10 +4313,10 @@ def do_update_embedded_data(user_profile: UserProfile,
message: Message,
content: Optional[str],
rendered_content: Optional[str]) -> None:
event = {
event: Dict[str, Any] = {
'type': 'update_message',
'sender': user_profile.email,
'message_id': message.id} # type: Dict[str, Any]
'message_id': message.id}
changed_messages = [message]
ums = UserMessage.objects.filter(message=message.id)
@ -4359,15 +4361,17 @@ def do_update_message(user_profile: UserProfile, message: Message,
timestamp = timezone_now()
message.last_edit_time = timestamp
event = {'type': 'update_message',
'user_id': user_profile.id,
'edit_timestamp': datetime_to_timestamp(timestamp),
'message_id': message.id} # type: Dict[str, Any]
event: Dict[str, Any] = {
'type': 'update_message',
'user_id': user_profile.id,
'edit_timestamp': datetime_to_timestamp(timestamp),
'message_id': message.id,
}
edit_history_event = {
edit_history_event: Dict[str, Any] = {
'user_id': user_profile.id,
'timestamp': event['edit_timestamp'],
} # type: Dict[str, Any]
}
changed_messages = [message]
@ -4417,10 +4421,10 @@ def do_update_message(user_profile: UserProfile, message: Message,
else:
new_topic_name = message.topic_name()
stream_topic = StreamTopicTarget(
stream_topic: Optional[StreamTopicTarget] = StreamTopicTarget(
stream_id=stream_id,
topic_name=new_topic_name,
) # type: Optional[StreamTopicTarget]
)
else:
stream_topic = None
@ -4545,7 +4549,7 @@ def do_delete_messages(realm: Realm, messages: Iterable[Message]) -> None:
return
usermessages = UserMessage.objects.filter(message_id__in=message_ids)
message_id_to_notifiable_users = {} # type: Dict[int, List[int]]
message_id_to_notifiable_users: Dict[int, List[int]] = {}
for um in usermessages:
if um.message_id not in message_id_to_notifiable_users:
message_id_to_notifiable_users[um.message_id] = []
@ -4557,12 +4561,13 @@ def do_delete_messages(realm: Realm, messages: Iterable[Message]) -> None:
if not message.is_stream_message():
message_type = "private"
event = {
event: Dict[str, Any] = {
'type': 'delete_message',
'sender': message.sender.email,
'sender_id': message.sender_id,
'message_id': message.id,
'message_type': message_type, } # type: Dict[str, Any]
'message_type': message_type,
}
if message_type == "stream":
event['stream_id'] = message.recipient.type_id
event['topic'] = message.topic_name()
@ -4685,7 +4690,7 @@ def gather_subscriptions_helper(user_profile: UserProfile,
stream_recipient = StreamRecipientMap()
stream_recipient.populate_for_recipient_ids(sub_recipient_ids)
stream_ids = set() # type: Set[int]
stream_ids: Set[int] = set()
for sub in sub_dicts:
sub['stream_id'] = stream_recipient.stream_id_for(sub['recipient_id'])
stream_ids.add(sub['stream_id'])
@ -4724,12 +4729,12 @@ def gather_subscriptions_helper(user_profile: UserProfile,
streams_subscribed_map.update({stream['id']: False for stream in all_streams if stream not in streams})
if include_subscribers:
subscriber_map = bulk_get_subscriber_user_ids(
subscriber_map: Mapping[int, Optional[List[int]]] = bulk_get_subscriber_user_ids(
all_streams,
user_profile,
streams_subscribed_map,
stream_recipient
) # type: Mapping[int, Optional[List[int]]]
)
else:
# If we're not including subscribers, always return None,
# which the below code needs to check for anyway.
@ -4774,7 +4779,7 @@ def gather_subscriptions_helper(user_profile: UserProfile,
stream["name"], stream["email_token"], show_sender=True)
# Construct and add subscribers data
subscribers = subscriber_map[stream["id"]] # type: Optional[List[int]]
subscribers: Optional[List[int]] = subscriber_map[stream["id"]]
# Important: don't show the subscribers if the stream is invite only
# and this user isn't on it anymore (or a realm administrator).
if stream["invite_only"] and not (sub["active"] or user_profile.is_realm_admin):
@ -4873,7 +4878,7 @@ def get_active_presence_idle_user_ids(realm: Realm,
user_ids = set()
for user_id in active_user_ids:
flags = user_flags.get(user_id, []) # type: Iterable[str]
flags: Iterable[str] = user_flags.get(user_id, [])
mentioned = 'mentioned' in flags or 'wildcard_mentioned' in flags
private_message = is_pm and user_id != sender_id
alerted = 'has_alert_word' in flags
@ -4941,9 +4946,9 @@ class InvitationError(JsonableError):
def __init__(self, msg: str, errors: List[Tuple[str, str, bool]],
sent_invitations: bool) -> None:
self._msg = msg # type: str
self.errors = errors # type: List[Tuple[str, str, bool]]
self.sent_invitations = sent_invitations # type: bool
self._msg: str = msg
self.errors: List[Tuple[str, str, bool]] = errors
self.sent_invitations: bool = sent_invitations
def estimate_recent_invites(realms: Iterable[Realm], *, days: int) -> int:
'''An upper bound on the number of invites sent in the last `days` days'''
@ -5009,8 +5014,8 @@ def do_invite_users(user_profile: UserProfile,
"Ask an organization admin, or a more experienced user."),
[], sent_invitations=False)
good_emails = set() # type: Set[str]
errors = [] # type: List[Tuple[str, str, bool]]
good_emails: Set[str] = set()
errors: List[Tuple[str, str, bool]] = []
validate_email_allowed_in_realm = get_realm_email_validator(user_profile.realm)
for email in invitee_emails:
if email == '':
@ -5032,7 +5037,7 @@ def do_invite_users(user_profile: UserProfile,
'''
error_dict = get_existing_user_errors(user_profile.realm, good_emails)
skipped = [] # type: List[Tuple[str, str, bool]]
skipped: List[Tuple[str, str, bool]] = []
for email in error_dict:
msg, deactivated = error_dict[email]
skipped.append((email, msg, deactivated))
@ -5324,7 +5329,7 @@ def do_get_streams(
else:
# We construct a query as the or (|) of the various sources
# this user requested streams from.
query_filter = None # type: Optional[Q]
query_filter: Optional[Q] = None
def add_filter_option(option: Q) -> None:
nonlocal query_filter
@ -5631,7 +5636,7 @@ def do_update_bot_config_data(bot_profile: UserProfile,
def get_service_dicts_for_bot(user_profile_id: str) -> List[Dict[str, Any]]:
user_profile = get_user_profile_by_id(user_profile_id)
services = get_bot_services(user_profile_id)
service_dicts = [] # type: List[Dict[str, Any]]
service_dicts: List[Dict[str, Any]] = []
if user_profile.bot_type == UserProfile.OUTGOING_WEBHOOK_BOT:
service_dicts = [{'base_url': service.base_url,
'interface': service.interface,
@ -5651,7 +5656,7 @@ def get_service_dicts_for_bot(user_profile_id: str) -> List[Dict[str, Any]]:
def get_service_dicts_for_bots(bot_dicts: List[Dict[str, Any]],
realm: Realm) -> Dict[int, List[Dict[str, Any]]]:
bot_profile_ids = [bot_dict['id'] for bot_dict in bot_dicts]
bot_services_by_uid = defaultdict(list) # type: Dict[int, List[Service]]
bot_services_by_uid: Dict[int, List[Service]] = defaultdict(list)
for service in Service.objects.filter(user_profile_id__in=bot_profile_ids):
bot_services_by_uid[service.user_profile_id].append(service)
@ -5659,12 +5664,12 @@ def get_service_dicts_for_bots(bot_dicts: List[Dict[str, Any]],
if bot_dict['bot_type'] == UserProfile.EMBEDDED_BOT]
embedded_bot_configs = get_bot_configs(embedded_bot_ids)
service_dicts_by_uid = {} # type: Dict[int, List[Dict[str, Any]]]
service_dicts_by_uid: Dict[int, List[Dict[str, Any]]] = {}
for bot_dict in bot_dicts:
bot_profile_id = bot_dict["id"]
bot_type = bot_dict["bot_type"]
services = bot_services_by_uid[bot_profile_id]
service_dicts = [] # type: List[Dict[str, Any]]
service_dicts: List[Dict[str, Any]] = []
if bot_type == UserProfile.OUTGOING_WEBHOOK_BOT:
service_dicts = [{'base_url': service.base_url,
'interface': service.interface,

View File

@ -30,7 +30,7 @@ def raw_pm_with_emails_by_ids(user_ids: Iterable[int], my_email: str,
return emails
def get_user_profiles(emails: Iterable[str], realm: Realm) -> List[UserProfile]:
user_profiles = [] # type: List[UserProfile]
user_profiles: List[UserProfile] = []
for email in emails:
try:
user_profile = get_user_including_cross_realm(email, realm)
@ -40,7 +40,7 @@ def get_user_profiles(emails: Iterable[str], realm: Realm) -> List[UserProfile]:
return user_profiles
def get_user_profiles_by_ids(user_ids: Iterable[int], realm: Realm) -> List[UserProfile]:
user_profiles = [] # type: List[UserProfile]
user_profiles: List[UserProfile] = []
for user_id in user_ids:
try:
user_profile = get_user_by_id_in_realm_including_cross_realm(user_id, realm)

View File

@ -26,7 +26,7 @@ def get_bot_configs(bot_profile_ids: List[int]) -> Dict[int, Dict[str, str]]:
if not bot_profile_ids:
return {}
entries = BotConfigData.objects.filter(bot_profile_id__in=bot_profile_ids)
entries_by_uid = defaultdict(dict) # type: Dict[int, Dict[str, str]]
entries_by_uid: Dict[int, Dict[str, str]] = defaultdict(dict)
for entry in entries:
entries_by_uid[entry.bot_profile_id].update({entry.key: entry.value})
return entries_by_uid

View File

@ -28,12 +28,12 @@ def get_bot_handler(service_name: str) -> Any:
if not configured_service:
return None
bot_module_name = 'zulip_bots.bots.%s.%s' % (configured_service, configured_service)
bot_module = importlib.import_module(bot_module_name) # type: Any
bot_module: Any = importlib.import_module(bot_module_name)
return bot_module.handler_class()
class StateHandler:
storage_size_limit = 10000000 # type: int # TODO: Store this in the server configuration model.
storage_size_limit: int = 10000000 # TODO: Store this in the server configuration model.
def __init__(self, user_profile: UserProfile) -> None:
self.user_profile = user_profile

View File

@ -129,7 +129,7 @@ STREAM_TOPIC_LINK_REGEX = r"""
def get_compiled_stream_topic_link_regex() -> Pattern:
return verbose_compile(STREAM_TOPIC_LINK_REGEX)
LINK_REGEX = None # type: Pattern
LINK_REGEX: Pattern = None
def get_web_link_regex() -> str:
# We create this one time, but not at startup. So the
@ -814,7 +814,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
Finally we add any remaining text to the last node.
"""
to_process = [] # type: List[Dict[str, Any]]
to_process: List[Dict[str, Any]] = []
# Build dicts for URLs
for url_data in urls:
short_url = url_data["url"]
@ -907,7 +907,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
res = fetch_tweet_data(tweet_id)
if res is None:
return None
user = res['user'] # type: Dict[str, Any]
user: Dict[str, Any] = res['user']
tweet = markdown.util.etree.Element("div")
tweet.set("class", "twitter-tweet")
img_a = markdown.util.etree.SubElement(tweet, 'a')
@ -925,7 +925,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
text = html.unescape(res['full_text'])
urls = res.get('urls', [])
user_mentions = res.get('user_mentions', [])
media = res.get('media', []) # type: List[Dict[str, Any]]
media: List[Dict[str, Any]] = res.get('media', [])
p = self.twitter_text(text, urls, user_mentions, media)
tweet.append(p)
@ -1083,7 +1083,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
if len(unique_previewable_urls) > self.INLINE_PREVIEW_LIMIT_PER_MESSAGE:
return
processed_urls = set() # type: Set[str]
processed_urls: Set[str] = set()
rendered_tweet_count = 0
for found_url in found_urls:
@ -1303,7 +1303,7 @@ class Emoji(markdown.inlinepatterns.Pattern):
orig_syntax = match.group("syntax")
name = orig_syntax[1:-1]
active_realm_emoji = {} # type: Dict[str, Dict[str, str]]
active_realm_emoji: Dict[str, Dict[str, str]] = {}
db_data = self.markdown.zulip_db_data
if db_data is not None:
active_realm_emoji = db_data['active_realm_emoji']
@ -1516,8 +1516,8 @@ class BugdownListPreprocessor(markdown.preprocessors.Preprocessor):
])
inserts = 0
in_code_fence = False # type: bool
open_fences = [] # type: List[Fence]
in_code_fence: bool = False
open_fences: List[Fence] = []
copy = lines[:]
for i in range(len(lines) - 1):
# Ignore anything that is inside a fenced code block but not quoted.
@ -1968,8 +1968,8 @@ class Bugdown(markdown.Markdown):
self.preprocessors = get_sub_registry(self.preprocessors, ['custom_text_notifications'])
self.parser.blockprocessors = get_sub_registry(self.parser.blockprocessors, ['paragraph'])
md_engines = {} # type: Dict[Tuple[int, bool], markdown.Markdown]
realm_filter_data = {} # type: Dict[int, List[Tuple[str, str, int]]]
md_engines: Dict[Tuple[int, bool], markdown.Markdown] = {}
realm_filter_data: Dict[int, List[Tuple[str, str, int]]] = {}
def make_md_engine(realm_filters_key: int, email_gateway: bool) -> None:
md_engine_key = (realm_filters_key, email_gateway)
@ -2009,7 +2009,7 @@ basic_link_splitter = re.compile(r'[ !;\?\),\'\"]')
# rendered by clients (just as links rendered into message bodies
# are validated and escaped inside `url_to_a`).
def topic_links(realm_filters_key: int, topic_name: str) -> List[str]:
matches = [] # type: List[str]
matches: List[str] = []
realm_filters = realm_filters_for_realm(realm_filters_key)
@ -2154,7 +2154,7 @@ class MentionData:
content: str) -> None:
user_group_names = possible_user_group_mentions(content)
self.user_group_name_info = get_user_group_name_info(realm_id, user_group_names)
self.user_group_members = defaultdict(list) # type: Dict[int, List[int]]
self.user_group_members: Dict[int, List[int]] = defaultdict(list)
group_ids = [group.id for group in self.user_group_name_info.values()]
if not group_ids:

View File

@ -55,7 +55,7 @@ class APIArgumentsTablePreprocessor(Preprocessor):
if is_openapi_format:
endpoint, method = doc_name.rsplit(':', 1)
arguments = [] # type: List[Dict[str, Any]]
arguments: List[Dict[str, Any]] = []
try:
arguments = get_openapi_parameters(endpoint, method)

View File

@ -242,7 +242,7 @@ def render_curl_example(function: str, api_url: str,
parts = function.split(":")
endpoint = parts[0]
method = parts[1]
kwargs = dict() # type: Dict[str, Any]
kwargs: Dict[str, Any] = dict()
if len(parts) > 2:
kwargs["auth_email"] = parts[2]
if len(parts) > 3:
@ -252,7 +252,7 @@ def render_curl_example(function: str, api_url: str,
kwargs["include"] = include
return generate_curl_example(endpoint, method, **kwargs)
SUPPORTED_LANGUAGES = {
SUPPORTED_LANGUAGES: Dict[str, Any] = {
'python': {
'client_config': PYTHON_CLIENT_CONFIG,
'admin_config': PYTHON_CLIENT_ADMIN_CONFIG,
@ -261,7 +261,7 @@ SUPPORTED_LANGUAGES = {
'curl': {
'render': render_curl_example
}
} # type: Dict[str, Any]
}
class APICodeExamplesGenerator(Extension):
def __init__(self, api_url: Optional[str]) -> None:

View File

@ -203,7 +203,7 @@ class CodeHandler(BaseHandler):
self.output = output
self.fence = fence
self.lang = lang
self.lines = [] # type: List[str]
self.lines: List[str] = []
self.run_content_validators = run_content_validators
def handle_line(self, line: str) -> None:
@ -233,7 +233,7 @@ class QuoteHandler(BaseHandler):
self.processor = processor
self.output = output
self.fence = fence
self.lines = [] # type: List[str]
self.lines: List[str] = []
def handle_line(self, line: str) -> None:
if line.rstrip() == self.fence:
@ -255,7 +255,7 @@ class TexHandler(BaseHandler):
self.processor = processor
self.output = output
self.fence = fence
self.lines = [] # type: List[str]
self.lines: List[str] = []
def handle_line(self, line: str) -> None:
if line.rstrip() == self.fence:
@ -280,7 +280,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
self.checked_for_codehilite = False
self.run_content_validators = run_content_validators
self.codehilite_conf = {} # type: Dict[str, List[Any]]
self.codehilite_conf: Dict[str, List[Any]] = {}
def push(self, handler: BaseHandler) -> None:
self.handlers.append(handler)
@ -291,10 +291,10 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
def run(self, lines: Iterable[str]) -> List[str]:
""" Match and store Fenced Code Blocks in the HtmlStash. """
output = [] # type: List[str]
output: List[str] = []
processor = self
self.handlers = [] # type: List[BaseHandler]
self.handlers: List[BaseHandler] = []
handler = OuterHandler(processor, output, self.run_content_validators)
self.push(handler)

View File

@ -71,7 +71,7 @@ class RelativeLinksHelpExtension(markdown.Extension):
md.registerExtension(self)
md.preprocessors.add('help_relative_links', RelativeLinks(), '_begin')
relative_help_links = None # type: Optional[bool]
relative_help_links: Optional[bool] = None
def set_relative_help_links(value: bool) -> None:
global relative_help_links

View File

@ -67,7 +67,7 @@ class SettingHelpExtension(markdown.Extension):
md.registerExtension(self)
md.preprocessors.add('setting', Setting(), '_begin')
relative_settings_links = None # type: Optional[bool]
relative_settings_links: Optional[bool] = None
def set_relative_settings_links(value: bool) -> None:
global relative_settings_links

View File

@ -37,8 +37,8 @@ class NestedCodeBlocksRendererTreeProcessor(markdown.treeprocessors.Treeprocesso
) -> List[ResultWithFamily]:
nested_code_blocks = []
for code_tag in code_tags:
parent = code_tag.family.parent # type: Any
grandparent = code_tag.family.grandparent # type: Any
parent: Any = code_tag.family.parent
grandparent: Any = code_tag.family.grandparent
if parent.tag == "p" and grandparent.tag == "li":
# if the parent (<p>) has no text, and no children,
# that means that the <code> element inside is its

View File

@ -138,7 +138,7 @@ class TabbedSectionsPreprocessor(Preprocessor):
return NAV_BAR_TEMPLATE.format(tabs='\n'.join(li_elements))
def parse_tabs(self, lines: List[str]) -> Optional[Dict[str, Any]]:
block = {} # type: Dict[str, Any]
block: Dict[str, Any] = {}
for index, line in enumerate(lines):
start_match = START_TABBED_SECTION_REGEX.search(line)
if start_match:

View File

@ -24,7 +24,7 @@ def bulk_create_users(realm: Realm,
users = sorted([user_raw for user_raw in users_raw if user_raw[0] not in existing_users])
# Now create user_profiles
profiles_to_create = [] # type: List[UserProfile]
profiles_to_create: List[UserProfile] = []
for (email, full_name, short_name, active) in users:
profile = create_user_profile(realm, email,
initial_password(email), active, bot_type,
@ -52,7 +52,7 @@ def bulk_create_users(realm: Realm,
event_type=RealmAuditLog.USER_CREATED, event_time=profile_.date_joined)
for profile_ in profiles_to_create])
recipients_to_create = [] # type: List[Recipient]
recipients_to_create: List[Recipient] = []
for user_id in user_ids:
recipient = Recipient(type_id=user_id, type=Recipient.PERSONAL)
recipients_to_create.append(recipient)
@ -61,11 +61,11 @@ def bulk_create_users(realm: Realm,
bulk_set_users_or_streams_recipient_fields(UserProfile, profiles_to_create, recipients_to_create)
recipients_by_user_id = {} # type: Dict[int, Recipient]
recipients_by_user_id: Dict[int, Recipient] = {}
for recipient in recipients_to_create:
recipients_by_user_id[recipient.type_id] = recipient
subscriptions_to_create = [] # type: List[Subscription]
subscriptions_to_create: List[Subscription] = []
for user_id in user_ids:
recipient = recipients_by_user_id[user_id]
subscription = Subscription(user_profile_id=user_id, recipient=recipient)
@ -106,7 +106,7 @@ def bulk_create_streams(realm: Realm,
existing_streams = frozenset([name.lower() for name in
Stream.objects.filter(realm=realm)
.values_list('name', flat=True)])
streams_to_create = [] # type: List[Stream]
streams_to_create: List[Stream] = []
for name, options in stream_dict.items():
if 'history_public_to_subscribers' not in options:
options['history_public_to_subscribers'] = (
@ -137,7 +137,7 @@ def bulk_create_streams(realm: Realm,
streams_to_create.sort(key=lambda x: x.name)
Stream.objects.bulk_create(streams_to_create)
recipients_to_create = [] # type: List[Recipient]
recipients_to_create: List[Recipient] = []
for stream in Stream.objects.filter(realm=realm).values('id', 'name'):
if stream['name'].lower() not in existing_streams:
recipients_to_create.append(Recipient(type_id=stream['id'],

View File

@ -101,7 +101,7 @@ def get_or_create_key_prefix() -> str:
return prefix
KEY_PREFIX = get_or_create_key_prefix() # type: str
KEY_PREFIX: str = get_or_create_key_prefix()
def bounce_key_prefix_for_testing(test_name: str) -> None:
global KEY_PREFIX
@ -377,14 +377,15 @@ def generic_bulk_cached_fetch(
# Nothing to fetch.
return {}
cache_keys = {} # type: Dict[ObjKT, str]
cache_keys: Dict[ObjKT, str] = {}
for object_id in object_ids:
cache_keys[object_id] = cache_key_function(object_id)
cached_objects_compressed = safe_cache_get_many([cache_keys[object_id]
for object_id in object_ids]) # type: Dict[str, Tuple[CompressedItemT]]
cached_objects_compressed: Dict[str, Tuple[CompressedItemT]] = safe_cache_get_many(
[cache_keys[object_id] for object_id in object_ids]
)
cached_objects = {} # type: Dict[str, CacheItemT]
cached_objects: Dict[str, CacheItemT] = {}
for (key, val) in cached_objects_compressed.items():
cached_objects[key] = extractor(cached_objects_compressed[key][0])
needed_ids = [object_id for object_id in object_ids if
@ -396,7 +397,7 @@ def generic_bulk_cached_fetch(
else:
db_objects = []
items_for_remote_cache = {} # type: Dict[str, Tuple[CompressedItemT]]
items_for_remote_cache: Dict[str, Tuple[CompressedItemT]] = {}
for obj in db_objects:
key = cache_keys[id_fetcher(obj)]
item = cache_transformer(obj)
@ -439,13 +440,13 @@ def user_profile_by_id_cache_key(user_profile_id: int) -> str:
def user_profile_by_api_key_cache_key(api_key: str) -> str:
return "user_profile_by_api_key:%s" % (api_key,)
realm_user_dict_fields = [
realm_user_dict_fields: List[str] = [
'id', 'full_name', 'short_name', 'email',
'avatar_source', 'avatar_version', 'is_active',
'role', 'is_bot', 'realm_id', 'timezone',
'date_joined', 'bot_owner_id', 'delivery_email',
'bot_type'
] # type: List[str]
]
def realm_user_dicts_cache_key(realm_id: int) -> str:
return "realm_user_dicts:%s" % (realm_id,)
@ -459,13 +460,15 @@ def active_user_ids_cache_key(realm_id: int) -> str:
def active_non_guest_user_ids_cache_key(realm_id: int) -> str:
return "active_non_guest_user_ids:%s" % (realm_id,)
bot_dict_fields = ['id', 'full_name', 'short_name', 'bot_type', 'email',
'is_active', 'default_sending_stream__name',
'realm_id',
'default_events_register_stream__name',
'default_all_public_streams', 'api_key',
'bot_owner__email', 'avatar_source',
'avatar_version'] # type: List[str]
bot_dict_fields: List[str] = [
'id', 'full_name', 'short_name', 'bot_type', 'email',
'is_active', 'default_sending_stream__name',
'realm_id',
'default_events_register_stream__name',
'default_all_public_streams', 'api_key',
'bot_owner__email', 'avatar_source',
'avatar_version',
]
def bot_dicts_in_realm_cache_key(realm: 'Realm') -> str:
return "bot_dicts_in_realm:%s" % (realm.id,)

View File

@ -108,7 +108,7 @@ def get_users() -> List[UserProfile]:
# doing any setup for things we're unlikely to use (without the lambda
# wrapper the below adds an extra 3ms or so to startup time for
# anything importing this file).
cache_fillers = {
cache_fillers: Dict[str, Tuple[Callable[[], List[Any]], Callable[[Dict[str, Any], Any], None], int, int]] = {
'user': (get_users, user_cache_items, 3600*24*7, 10000),
'client': (lambda: Client.objects.select_related().all(), client_cache_items, 3600*24*7, 10000),
'stream': (get_streams, stream_cache_items, 3600*24*7, 10000),
@ -118,12 +118,12 @@ cache_fillers = {
# 'message': (message_fetch_objects, message_cache_items, 3600 * 24, 1000),
'huddle': (lambda: Huddle.objects.select_related().all(), huddle_cache_items, 3600*24*7, 10000),
'session': (lambda: Session.objects.all(), session_cache_items, 3600*24*7, 10000),
} # type: Dict[str, Tuple[Callable[[], List[Any]], Callable[[Dict[str, Any], Any], None], int, int]]
}
def fill_remote_cache(cache: str) -> None:
remote_cache_time_start = get_remote_cache_time()
remote_cache_requests_start = get_remote_cache_requests()
items_for_remote_cache = {} # type: Dict[str, Any]
items_for_remote_cache: Dict[str, Any] = {}
(objects, items_filler, timeout, batch_size) = cache_fillers[cache]
count = 0
for obj in objects():

View File

@ -38,7 +38,7 @@ class TimeTrackingConnection(connection):
"""A psycopg2 connection class that uses TimeTrackingCursors."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
self.queries = [] # type: List[Dict[str, str]]
self.queries: List[Dict[str, str]] = []
super().__init__(*args, **kwargs)
def cursor(self, *args: Any, **kwargs: Any) -> TimeTrackingCursor:

View File

@ -68,7 +68,7 @@ def tracemalloc_listen_sock(sock: socket.socket) -> None:
sock.recv(1)
tracemalloc_dump()
listener_pid = None # type: Optional[int]
listener_pid: Optional[int] = None
def tracemalloc_listen() -> None:
global listener_pid

View File

@ -81,9 +81,9 @@ def gather_hot_conversations(user_profile: UserProfile, messages: List[Message])
# Returns a list of dictionaries containing the templating
# information for each hot conversation.
conversation_length = defaultdict(int) # type: Dict[Tuple[int, str], int]
conversation_messages = defaultdict(list) # type: Dict[Tuple[int, str], List[Message]]
conversation_diversity = defaultdict(set) # type: Dict[Tuple[int, str], Set[str]]
conversation_length: Dict[Tuple[int, str], int] = defaultdict(int)
conversation_messages: Dict[Tuple[int, str], List[Message]] = defaultdict(list)
conversation_diversity: Dict[Tuple[int, str], Set[str]] = defaultdict(set)
for message in messages:
key = (message.recipient.type_id,
message.topic_name())

View File

@ -92,13 +92,13 @@ def bulk_fetch_display_recipients(recipient_tuples: Set[Tuple[int, int, int]]
return stream['name']
# ItemT = Stream, CacheItemT = str (name), ObjKT = int (recipient_id)
stream_display_recipients = generic_bulk_cached_fetch(
stream_display_recipients: Dict[int, str] = generic_bulk_cached_fetch(
cache_key_function=display_recipient_cache_key,
query_function=stream_query_function,
object_ids=[recipient[0] for recipient in stream_recipients],
id_fetcher=stream_id_fetcher,
cache_transformer=stream_cache_transformer,
) # type: Dict[int, str]
)
# Now we have to create display_recipients for personal and huddle messages.
# We do this via generic_bulk_cached_fetch, supplying apprioprate functions to it.
@ -121,8 +121,8 @@ def bulk_fetch_display_recipients(recipient_tuples: Set[Tuple[int, int, int]]
) for recipient_id in recipient_ids]
# Find all user ids whose UserProfiles we will need to fetch:
user_ids_to_fetch = set() # type: Set[int]
huddle_user_ids = {} # type: Dict[int, List[int]]
user_ids_to_fetch: Set[int] = set()
huddle_user_ids: Dict[int, List[int]] = {}
huddle_user_ids = bulk_get_huddle_user_ids([recipient for recipient in recipients
if recipient.type == Recipient.HUDDLE])
for recipient in recipients:
@ -132,10 +132,10 @@ def bulk_fetch_display_recipients(recipient_tuples: Set[Tuple[int, int, int]]
user_ids_to_fetch = user_ids_to_fetch.union(huddle_user_ids[recipient.id])
# Fetch the needed UserProfiles:
user_profiles = bulk_get_user_profile_by_id(list(user_ids_to_fetch)) # type: Dict[int, UserDisplayRecipient]
user_profiles: Dict[int, UserDisplayRecipient] = bulk_get_user_profile_by_id(list(user_ids_to_fetch))
# Build the return value:
result = [] # type: List[Tuple[int, List[UserDisplayRecipient]]]
result: List[Tuple[int, List[UserDisplayRecipient]]] = []
for recipient in recipients:
if recipient.type == Recipient.PERSONAL:
result.append((recipient.id, [user_profiles[recipient.type_id]]))

View File

@ -393,7 +393,7 @@ def process_missed_message(to: str, message: message.Message) -> None:
user_profile.id, recipient_str))
def process_message(message: message.Message, rcpt_to: Optional[str]=None) -> None:
to = None # type: Optional[str]
to: Optional[str] = None
try:
if rcpt_to is not None:

View File

@ -86,7 +86,7 @@ def decode_email_address(email: str) -> Tuple[str, Dict[str, bool]]:
msg_string = msg_string.replace('.', '+')
parts = msg_string.split('+')
options = {} # type: Dict[str, bool]
options: Dict[str, bool] = {}
for part in parts:
if part in optional_address_tokens:
optional_address_tokens[part](options)

View File

@ -126,7 +126,7 @@ def build_message_list(user_profile: UserProfile, messages: List[Message]) -> Li
The messages are collapsed into per-recipient and per-sender blocks, like
our web interface
"""
messages_to_render = [] # type: List[Dict[str, Any]]
messages_to_render: List[Dict[str, Any]] = []
def sender_string(message: Message) -> str:
if message.recipient.type in (Recipient.STREAM, Recipient.HUDDLE):
@ -417,7 +417,7 @@ def do_send_missedmessage_events_reply_in_zulip(user_profile: UserProfile,
})
with override_language(user_profile.default_language):
from_name = _("Zulip missed messages") # type: str
from_name: str = _("Zulip missed messages")
from_address = FromAddress.NOREPLY
if len(senders) == 1 and settings.SEND_MISSED_MESSAGE_EMAILS_AS_USER:
# If this setting is enabled, you can reply to the Zulip
@ -470,7 +470,7 @@ def handle_missedmessage_emails(user_profile_id: int,
# We bucket messages by tuples that identify similar messages.
# For streams it's recipient_id and topic.
# For PMs it's recipient id and sender.
messages_by_bucket = defaultdict(list) # type: Dict[Tuple[int, str], List[Message]]
messages_by_bucket: Dict[Tuple[int, str], List[Message]] = defaultdict(list)
for msg in messages:
if msg.recipient.type == Recipient.PERSONAL:
# For PM's group using (recipient, sender).
@ -491,7 +491,7 @@ def handle_missedmessage_emails(user_profile_id: int,
msg_list.extend(filtered_context_messages)
# Sort emails by least recently-active discussion.
bucket_tups = [] # type: List[Tuple[Tuple[int, str], int]]
bucket_tups: List[Tuple[Tuple[int, str], int]] = []
for bucket_tup, msg_list in messages_by_bucket.items():
max_message_id = max(msg_list, key=lambda msg: msg.id).id
bucket_tups.append((bucket_tup, max_message_id))

View File

@ -127,7 +127,7 @@ def get_existing_user_errors(
to cross-realm bots and mirror dummies too.
'''
errors = {} # type: Dict[str, Tuple[str, bool]]
errors: Dict[str, Tuple[str, bool]] = {}
users = get_users_by_delivery_email(emails, target_realm).only(
'delivery_email',

View File

@ -88,12 +88,12 @@ def fetch_initial_state_data(user_profile: UserProfile,
queue_id: str, client_gravatar: bool,
slim_presence: bool = False,
include_subscribers: bool = True) -> Dict[str, Any]:
state = {'queue_id': queue_id} # type: Dict[str, Any]
state: Dict[str, Any] = {'queue_id': queue_id}
realm = user_profile.realm
if event_types is None:
# return True always
want = always_want # type: Callable[[str], bool]
want: Callable[[str], bool] = always_want
else:
want = set(event_types).__contains__
@ -803,7 +803,7 @@ def do_events_register(user_profile: UserProfile, user_client: Client,
raise JsonableError(_("Could not allocate event queue"))
if fetch_event_types is not None:
event_types_set = set(fetch_event_types) # type: Optional[Set[str]]
event_types_set: Optional[Set[str]] = set(fetch_event_types)
elif event_types is not None:
event_types_set = set(event_types)
else:

View File

@ -83,18 +83,18 @@ class JsonableError(Exception):
'''
# Override this in subclasses, as needed.
code = ErrorCode.BAD_REQUEST # type: ErrorCode
code: ErrorCode = ErrorCode.BAD_REQUEST
# Override this in subclasses if providing structured data.
data_fields = [] # type: List[str]
data_fields: List[str] = []
# Optionally override this in subclasses to return a different HTTP status,
# like 403 or 404.
http_status_code = 400 # type: int
http_status_code: int = 400
def __init__(self, msg: str) -> None:
# `_msg` is an implementation detail of `JsonableError` itself.
self._msg = msg # type: str
self._msg: str = msg
@staticmethod
def msg_format() -> str:
@ -190,7 +190,7 @@ class InvalidJSONError(JsonableError):
return _("Malformed JSON")
class OrganizationAdministratorRequired(JsonableError):
code = ErrorCode.UNAUTHORIZED_PRINCIPAL # type: ErrorCode
code: ErrorCode = ErrorCode.UNAUTHORIZED_PRINCIPAL
ADMIN_REQUIRED_MESSAGE = _("Must be an organization administrator")

View File

@ -244,7 +244,7 @@ ANALYTICS_TABLES = {
#
# TODO: This data structure could likely eventually be replaced by
# inspecting the corresponding Django models
DATE_FIELDS = {
DATE_FIELDS: Dict[TableName, List[Field]] = {
'zerver_attachment': ['create_time'],
'zerver_message': ['last_edit_time', 'date_sent'],
'zerver_mutedtopic': ['date_muted'],
@ -260,7 +260,7 @@ DATE_FIELDS = {
'analytics_realmcount': ['end_time'],
'analytics_usercount': ['end_time'],
'analytics_streamcount': ['end_time'],
} # type: Dict[TableName, List[Field]]
}
def sanity_check_output(data: TableData) -> None:
# First, we verify that the export tool has a declared
@ -389,10 +389,10 @@ class Config:
self.concat_and_destroy = concat_and_destroy
self.id_source = id_source
self.source_filter = source_filter
self.children = [] # type: List[Config]
self.children: List[Config] = []
if normal_parent is not None:
self.parent = normal_parent # type: Optional[Config]
self.parent: Optional[Config] = normal_parent
else:
self.parent = None
@ -471,7 +471,7 @@ def export_from_config(response: TableData, config: Config, seed_object: Optiona
# When we concat_and_destroy, we are working with
# temporary "tables" that are lists of records that
# should already be ready to export.
data = [] # type: List[Record]
data: List[Record] = []
for t in config.concat_and_destroy:
data += response[t]
del response[t]
@ -494,7 +494,7 @@ def export_from_config(response: TableData, config: Config, seed_object: Optiona
assert parent.table is not None
assert config.parent_key is not None
parent_ids = [r['id'] for r in response[parent.table]]
filter_parms = {config.parent_key: parent_ids} # type: Dict[str, Any]
filter_parms: Dict[str, Any] = {config.parent_key: parent_ids}
if config.filter_args is not None:
filter_parms.update(config.filter_args)
assert model is not None
@ -807,8 +807,8 @@ def fetch_user_profile(response: TableData, config: Config, context: Context) ->
exclude = ['password', 'api_key']
rows = make_raw(list(query), exclude=exclude)
normal_rows = [] # type: List[Record]
dummy_rows = [] # type: List[Record]
normal_rows: List[Record] = []
dummy_rows: List[Record] = []
for row in rows:
if exportable_user_ids is not None:
@ -1003,7 +1003,7 @@ def export_partial_message_files(realm: Realm,
response['zerver_userprofile_mirrordummy'] +
response['zerver_userprofile_crossrealm'])
consented_user_ids = set() # type: Set[int]
consented_user_ids: Set[int] = set()
if consent_message_id is not None:
consented_user_ids = get_consented_user_ids(consent_message_id)
@ -1070,7 +1070,7 @@ def export_partial_message_files(realm: Realm,
messages_we_sent_to_them,
]
all_message_ids = set() # type: Set[int]
all_message_ids: Set[int] = set()
dump_file_id = 1
for message_query in message_queries:
@ -1109,14 +1109,14 @@ def write_message_partial_for_query(realm: Realm, message_query: Any, dump_file_
logging.info("Fetched Messages for %s" % (message_filename,))
# Clean up our messages.
table_data = {} # type: TableData
table_data: TableData = {}
table_data['zerver_message'] = message_chunk
floatify_datetime_fields(table_data, 'zerver_message')
# Build up our output for the .partial file, which needs
# a list of user_profile_ids to search for (as well as
# the realm id).
output = {} # type: MessageOutput
output: MessageOutput = {}
output['zerver_message'] = table_data['zerver_message']
output['zerver_userprofile_ids'] = list(user_profile_ids)
output['realm_id'] = realm.id
@ -1270,7 +1270,7 @@ def export_files_from_s3(realm: Realm, bucket_name: str, output_dir: Path,
bucket_list = bucket.list(prefix="%s/" % (realm.id,))
if settings.EMAIL_GATEWAY_BOT is not None:
email_gateway_bot = get_system_bot(settings.EMAIL_GATEWAY_BOT) # type: Optional[UserProfile]
email_gateway_bot: Optional[UserProfile] = get_system_bot(settings.EMAIL_GATEWAY_BOT)
else:
email_gateway_bot = None
@ -1451,7 +1451,7 @@ def do_export_realm(realm: Realm, output_dir: Path, threads: int,
exportable_user_ids: Optional[Set[int]]=None,
public_only: bool=False,
consent_message_id: Optional[int]=None) -> str:
response = {} # type: TableData
response: TableData = {}
# We need at least one thread running to export
# UserMessage rows. The management command should
@ -1489,7 +1489,7 @@ def do_export_realm(realm: Realm, output_dir: Path, threads: int,
logging.info('%d messages were exported' % (len(message_ids),))
# zerver_reaction
zerver_reaction = {} # type: TableData
zerver_reaction: TableData = {}
fetch_reaction_data(response=zerver_reaction, message_ids=message_ids)
response.update(zerver_reaction)
@ -1524,7 +1524,7 @@ def do_export_realm(realm: Realm, output_dir: Path, threads: int,
return tarball_path
def export_attachment_table(realm: Realm, output_dir: Path, message_ids: Set[int]) -> None:
response = {} # type: TableData
response: TableData = {}
fetch_attachment_data(response=response, realm_id=realm.id, message_ids=message_ids)
output_file = os.path.join(output_dir, "attachment.json")
logging.info('Writing attachment table data to %s' % (output_file,))
@ -1576,7 +1576,7 @@ def launch_user_message_subprocesses(threads: int, output_dir: Path,
print('Shard %s finished, status %s' % (shard, status))
def do_export_user(user_profile: UserProfile, output_dir: Path) -> None:
response = {} # type: TableData
response: TableData = {}
export_single_user(user_profile, response)
export_file = os.path.join(output_dir, "user.json")
@ -1672,14 +1672,14 @@ def export_messages_single_user(user_profile: UserProfile, output_dir: Path,
output = {'zerver_message': message_chunk}
floatify_datetime_fields(output, 'zerver_message')
message_output = dict(output) # type: MessageOutput
message_output: MessageOutput = dict(output)
write_message_export(message_filename, message_output)
min_id = max(user_message_ids)
dump_file_id += 1
def export_analytics_tables(realm: Realm, output_dir: Path) -> None:
response = {} # type: TableData
response: TableData = {}
export_file = os.path.join(output_dir, "analytics.json")
logging.info("Writing analytics table data to %s", (export_file))

View File

@ -45,7 +45,7 @@ def parse_file(config: Dict[str, Any], gens: Dict[str, Any], corpus_file: str) -
# First, load the entire file into a dictionary,
# then apply our custom filters to it as needed.
paragraphs = [] # type: List[str]
paragraphs: List[str] = []
with open(corpus_file) as infile:
# OUR DATA: we need to separate the person talking and what they say

View File

@ -7,7 +7,7 @@ from zerver.models import UserProfile, UserHotspot
from typing import List, Dict
ALL_HOTSPOTS = {
ALL_HOTSPOTS: Dict[str, Dict[str, str]] = {
'intro_reply': {
'title': _('Reply to a message'),
'description': _('Click anywhere on a message to reply.'),
@ -33,7 +33,7 @@ ALL_HOTSPOTS = {
'description': _('Click here to start a new conversation. Pick a topic '
'(2-3 words is best), and give it a go!'),
},
} # type: Dict[str, Dict[str, str]]
}
def get_next_hotspots(user: UserProfile) -> List[Dict[str, object]]:
# For manual testing, it can be convenient to set

View File

@ -54,7 +54,7 @@ realm_tables = [("zerver_defaultstream", DefaultStream, "defaultstream"),
#
# Code reviewers: give these tables extra scrutiny, as we need to
# make sure to reload related tables AFTER we re-map the ids.
ID_MAP = {
ID_MAP: Dict[str, Dict[int, int]] = {
'client': {},
'user_profile': {},
'huddle': {},
@ -87,15 +87,15 @@ ID_MAP = {
'analytics_realmcount': {},
'analytics_streamcount': {},
'analytics_usercount': {},
} # type: Dict[str, Dict[int, int]]
}
id_map_to_list = {
id_map_to_list: Dict[str, Dict[int, List[int]]] = {
'huddle_to_user_list': {},
} # type: Dict[str, Dict[int, List[int]]]
}
path_maps = {
path_maps: Dict[str, Dict[str, str]] = {
'attachment_path': {},
} # type: Dict[str, Dict[str, str]]
}
def update_id_map(table: TableName, old_id: int, new_id: int) -> None:
if table not in ID_MAP:
@ -296,7 +296,7 @@ def fix_message_rendered_content(realm: Realm,
# platforms, since they generally don't have an "alert
# words" type feature, and notifications aren't important anyway.
realm_alert_words_automaton = None
message_user_ids = set() # type: Set[int]
message_user_ids: Set[int] = set()
rendered_content = do_render_markdown(
message=cast(Message, message_object),
@ -604,7 +604,7 @@ def import_uploads(realm: Realm, import_dir: Path, processes: int, processing_av
records_filename = os.path.join(import_dir, "records.json")
with open(records_filename) as records_file:
records = ujson.load(records_file) # type: List[Dict[str, Any]]
records: List[Dict[str, Any]] = ujson.load(records_file)
timestamp = datetime_to_timestamp(timezone_now())
re_map_foreign_keys_internal(records, 'records', 'realm_id', related_table="realm",
@ -696,7 +696,7 @@ def import_uploads(realm: Realm, import_dir: Path, processes: int, processing_av
# set; that is OK, because those are never served
# directly anyway.
content_type = 'application/octet-stream'
headers = {'Content-Type': content_type} # type: Dict[str, Any]
headers: Dict[str, Any] = {'Content-Type': content_type}
key.set_contents_from_filename(os.path.join(import_dir, record['path']), headers=headers)
else:
@ -804,12 +804,12 @@ def do_import_realm(import_dir: Path, subdomain: str, processes: int=1) -> Realm
realm = Realm(**data['zerver_realm'][0])
if realm.notifications_stream_id is not None:
notifications_stream_id = int(realm.notifications_stream_id) # type: Optional[int]
notifications_stream_id: Optional[int] = int(realm.notifications_stream_id)
else:
notifications_stream_id = None
realm.notifications_stream_id = None
if realm.signup_notifications_stream_id is not None:
signup_notifications_stream_id = int(realm.signup_notifications_stream_id) # type: Optional[int]
signup_notifications_stream_id: Optional[int] = int(realm.signup_notifications_stream_id)
else:
signup_notifications_stream_id = None
realm.signup_notifications_stream_id = None
@ -1138,9 +1138,9 @@ def get_incoming_message_ids(import_dir: Path,
'''
if sort_by_date:
tups = list() # type: List[Tuple[int, int]]
tups: List[Tuple[int, int]] = list()
else:
message_ids = [] # type: List[int]
message_ids: List[int] = []
dump_file_id = 1
while True:
@ -1263,16 +1263,16 @@ def import_attachments(data: TableData) -> None:
# We do this in a slightly convoluted way to anticipate
# a future where we may need to call re_map_foreign_keys.
m2m_rows = [] # type: List[Record]
m2m_rows: List[Record] = []
for parent_row in data[parent_db_table_name]:
for fk_id in parent_row[child_plural]:
m2m_row = {} # type: Record
m2m_row: Record = {}
m2m_row[parent_singular] = parent_row['id']
m2m_row[child_singular] = ID_MAP['message'][fk_id]
m2m_rows.append(m2m_row)
# Create our table data for insert.
m2m_data = {m2m_table_name: m2m_rows} # type: TableData
m2m_data: TableData = {m2m_table_name: m2m_rows}
convert_to_id_fields(m2m_data, m2m_table_name, parent_singular)
convert_to_id_fields(m2m_data, m2m_table_name, child_singular)
m2m_rows = m2m_data[m2m_table_name]

View File

@ -29,7 +29,7 @@ Over time, we expect this registry to grow additional convenience
features for writing and configuring integrations efficiently.
"""
CATEGORIES = {
CATEGORIES: Dict[str, str] = {
'meta-integration': _('Integration frameworks'),
'continuous-integration': _('Continuous integration'),
'customer-support': _('Customer support'),
@ -44,7 +44,7 @@ CATEGORIES = {
'productivity': _('Productivity'),
'version-control': _('Version control'),
'bots': _('Interactive bots'),
} # type: Dict[str, str]
}
class Integration:
DEFAULT_LOGO_STATIC_PATH_PNG = 'images/integrations/logos/{name}.png'
@ -222,16 +222,16 @@ class EmbeddedBotIntegration(Integration):
super().__init__(
name, client_name, *args, **kwargs)
EMBEDDED_BOTS = [
EMBEDDED_BOTS: List[EmbeddedBotIntegration] = [
EmbeddedBotIntegration('converter', []),
EmbeddedBotIntegration('encrypt', []),
EmbeddedBotIntegration('helloworld', []),
EmbeddedBotIntegration('virtual_fs', []),
EmbeddedBotIntegration('giphy', []),
EmbeddedBotIntegration('followup', []),
] # type: List[EmbeddedBotIntegration]
]
WEBHOOK_INTEGRATIONS = [
WEBHOOK_INTEGRATIONS: List[WebhookIntegration] = [
WebhookIntegration('airbrake', ['monitoring']),
WebhookIntegration(
'alertmanager',
@ -358,9 +358,9 @@ WEBHOOK_INTEGRATIONS = [
WebhookIntegration('zabbix', ['monitoring'], display_name='Zabbix'),
WebhookIntegration('gci', ['misc'], display_name='Google Code-in',
stream_name='gci'),
] # type: List[WebhookIntegration]
]
INTEGRATIONS = {
INTEGRATIONS: Dict[str, Integration] = {
'asana': Integration('asana', 'asana', ['project-management'], doc='zerver/integrations/asana.md'),
'capistrano': Integration(
'capistrano',
@ -452,16 +452,16 @@ INTEGRATIONS = {
# _ needed to get around adblock plus
logo='images/integrations/logos/twitte_r.svg',
doc='zerver/integrations/twitter.md'),
} # type: Dict[str, Integration]
}
BOT_INTEGRATIONS = [
BOT_INTEGRATIONS: List[BotIntegration] = [
BotIntegration('github_detail', ['version-control', 'bots'],
display_name='GitHub Detail'),
BotIntegration('xkcd', ['bots', 'misc'], display_name='xkcd',
logo='images/integrations/logos/xkcd.png'),
] # type: List[BotIntegration]
]
HUBOT_INTEGRATIONS = [
HUBOT_INTEGRATIONS: List[HubotIntegration] = [
HubotIntegration('assembla', ['version-control', 'project-management'],
display_name='Assembla', logo_alt='Assembla'),
HubotIntegration('bonusly', ['hr']),
@ -480,7 +480,7 @@ HUBOT_INTEGRATIONS = [
HubotIntegration('youtube', ['misc'], display_name='YouTube',
# _ needed to get around adblock plus
logo='images/integrations/logos/youtub_e.svg'),
] # type: List[HubotIntegration]
]
for hubot_integration in HUBOT_INTEGRATIONS:
INTEGRATIONS[hubot_integration.name] = hubot_integration

View File

@ -113,7 +113,7 @@ def messages_for_ids(message_ids: List[int],
extractor=extract_message_dict,
setter=stringify_message_dict)
message_list = [] # type: List[Dict[str, Any]]
message_list: List[Dict[str, Any]] = []
for message_id in message_ids:
msg_dict = message_dicts[message_id]
@ -499,11 +499,13 @@ class MessageDict:
if len(display_recipient) == 1:
# add the sender in if this isn't a message between
# someone and themself, preserving ordering
recip = {'email': sender_email,
'full_name': sender_full_name,
'short_name': sender_short_name,
'id': sender_id,
'is_mirror_dummy': sender_is_mirror_dummy} # type: UserDisplayRecipient
recip: UserDisplayRecipient = {
'email': sender_email,
'full_name': sender_full_name,
'short_name': sender_short_name,
'id': sender_id,
'is_mirror_dummy': sender_is_mirror_dummy,
}
if recip['email'] < display_recipient[0]['email']:
display_recipient = [recip, display_recipient[0]]
elif recip['email'] > display_recipient[0]['email']:
@ -658,7 +660,7 @@ def render_markdown(message: Message,
'''
if user_ids is None:
message_user_ids = set() # type: Set[int]
message_user_ids: Set[int] = set()
else:
message_user_ids = user_ids
@ -719,21 +721,21 @@ def do_render_markdown(message: Message,
return rendered_content
def huddle_users(recipient_id: int) -> str:
display_recipient = get_display_recipient_by_id(recipient_id,
Recipient.HUDDLE,
None) # type: DisplayRecipientT
display_recipient: DisplayRecipientT = get_display_recipient_by_id(
recipient_id, Recipient.HUDDLE, None
)
# str is for streams.
assert not isinstance(display_recipient, str)
user_ids = [obj['id'] for obj in display_recipient] # type: List[int]
user_ids: List[int] = [obj['id'] for obj in display_recipient]
user_ids = sorted(user_ids)
return ','.join(str(uid) for uid in user_ids)
def aggregate_message_dict(input_dict: Dict[int, Dict[str, Any]],
lookup_fields: List[str],
collect_senders: bool) -> List[Dict[str, Any]]:
lookup_dict = dict() # type: Dict[Tuple[Any, ...], Dict[str, Any]]
lookup_dict: Dict[Tuple[Any, ...], Dict[str, Any]] = dict()
'''
A concrete example might help explain the inputs here:
@ -862,7 +864,7 @@ def get_raw_unread_data(user_profile: UserProfile) -> RawUnreadMessagesResult:
return False
huddle_cache = {} # type: Dict[int, str]
huddle_cache: Dict[int, str] = {}
def get_huddle_users(recipient_id: int) -> str:
if recipient_id in huddle_cache:
@ -975,12 +977,12 @@ def aggregate_unread_data(raw_data: RawUnreadMessagesResult) -> UnreadMessagesRe
collect_senders=False,
)
result = dict(
result: UnreadMessagesResult = dict(
pms=pm_objects,
streams=stream_objects,
huddles=huddle_objects,
mentions=mentions,
count=count) # type: UnreadMessagesResult
count=count)
return result

View File

@ -9,7 +9,7 @@ from django.utils.translation import ugettext as _
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence
stop_words_list = None # type: Optional[List[str]]
stop_words_list: Optional[List[str]] = None
def read_stop_words() -> List[str]:
global stop_words_list
if stop_words_list is None:

View File

@ -74,7 +74,7 @@ def send_initial_realm_messages(realm: Realm) -> None:
# Make sure each stream created in the realm creation process has at least one message below
# Order corresponds to the ordering of the streams on the left sidebar, to make the initial Home
# view slightly less overwhelming
welcome_messages = [
welcome_messages: List[Dict[str, str]] = [
{'stream': Realm.INITIAL_PRIVATE_STREAM_NAME,
'topic': "private streams",
'content': "This is a private stream, as indicated by the "
@ -96,7 +96,7 @@ def send_initial_realm_messages(realm: Realm) -> None:
"\n\n[](/static/images/cute/turtle.png)"
"\n\n[Start a new topic](/help/start-a-new-topic) any time you're not replying to a "
"previous message."},
] # type: List[Dict[str, str]]
]
messages = [internal_prep_stream_message_by_name(
realm, welcome_bot, message['stream'], message['topic'],
message['content'] % {

View File

@ -21,9 +21,9 @@ from version import ZULIP_VERSION
class OutgoingWebhookServiceInterface:
def __init__(self, token: str, user_profile: UserProfile, service_name: str) -> None:
self.token = token # type: str
self.user_profile = user_profile # type: UserProfile
self.service_name = service_name # type: str
self.token: str = token
self.user_profile: UserProfile = user_profile
self.service_name: str = service_name
class GenericOutgoingWebhookService(OutgoingWebhookServiceInterface):
@ -118,10 +118,10 @@ class SlackOutgoingWebhookService(OutgoingWebhookServiceInterface):
return None
AVAILABLE_OUTGOING_WEBHOOK_INTERFACES = {
AVAILABLE_OUTGOING_WEBHOOK_INTERFACES: Dict[str, Any] = {
GENERIC_INTERFACE: GenericOutgoingWebhookService,
SLACK_INTERFACE: SlackOutgoingWebhookService,
} # type: Dict[str, Any]
}
def get_service_interface_class(interface: str) -> Any:
if interface is None or interface not in AVAILABLE_OUTGOING_WEBHOOK_INTERFACES:

View File

@ -10,7 +10,7 @@ JobData = TypeVar('JobData')
def run_parallel(job: Callable[[JobData], int],
data: Iterable[JobData],
threads: int=6) -> Iterator[Tuple[int, JobData]]:
pids = {} # type: Dict[int, JobData]
pids: Dict[int, JobData] = {}
def wait_for_one() -> Tuple[int, JobData]:
while True:

View File

@ -40,7 +40,7 @@ def get_status_dicts_for_rows(all_rows: List[Dict[str, Any]],
get_user_key = lambda row: row['user_profile__email']
get_user_info = get_legacy_user_info
user_statuses = dict() # type: Dict[str, Dict[str, Any]]
user_statuses: Dict[str, Dict[str, Any]] = dict()
for user_key, presence_rows in itertools.groupby(all_rows, get_user_key):
info = get_user_info(
@ -137,7 +137,7 @@ def get_presence_for_user(user_profile_id: int,
)
presence_rows = list(query)
mobile_user_ids = set() # type: Set[int]
mobile_user_ids: Set[int] = set()
if PushDeviceToken.objects.filter(user_id=user_profile_id).exists(): # nocoverage
# TODO: Add a test, though this is low priority, since we don't use mobile_user_ids yet.
mobile_user_ids.add(user_profile_id)

View File

@ -26,7 +26,7 @@ def profiled(func: Callable[..., ReturnT]) -> Callable[..., ReturnT]:
def wrapped_func(*args: Any, **kwargs: Any) -> ReturnT:
fn = func.__name__ + ".profile"
prof = cProfile.Profile()
retval = prof.runcall(func, *args, **kwargs) # type: ReturnT
retval: ReturnT = prof.runcall(func, *args, **kwargs)
prof.dump_stats(fn)
return retval
return wrapped_func

View File

@ -52,7 +52,7 @@ def hex_to_b64(data: str) -> str:
# Sending to APNs, for iOS
#
_apns_client = None # type: Optional[APNsClient]
_apns_client: Optional["APNsClient"] = None
_apns_client_initialized = False
def get_apns_client() -> 'Optional[APNsClient]':
@ -547,7 +547,7 @@ def truncate_content(content: str) -> Tuple[str, bool]:
def get_base_payload(user_profile: UserProfile) -> Dict[str, Any]:
'''Common fields for all notification payloads.'''
data = {} # type: Dict[str, Any]
data: Dict[str, Any] = {}
# These will let the app support logging into multiple realms and servers.
data['server'] = settings.EXTERNAL_HOST

View File

@ -28,9 +28,9 @@ class SimpleQueueClient:
rabbitmq_heartbeat: Optional[int] = 0,
) -> None:
self.log = logging.getLogger('zulip.queue')
self.queues = set() # type: Set[str]
self.channel = None # type: Optional[BlockingChannel]
self.consumers = defaultdict(set) # type: Dict[str, Set[Consumer]]
self.queues: Set[str] = set()
self.channel: Optional[BlockingChannel] = None
self.consumers: Dict[str, Set[Consumer]] = defaultdict(set)
self.rabbitmq_heartbeat = rabbitmq_heartbeat
self._connect()
@ -205,7 +205,7 @@ class TornadoQueueClient(SimpleQueueClient):
super().__init__(
# TornadoConnection can process heartbeats, so enable them.
rabbitmq_heartbeat=None)
self._on_open_cbs = [] # type: List[Callable[[], None]]
self._on_open_cbs: List[Callable[[], None]] = []
self._connection_failure_count = 0
def _connect(self) -> None:
@ -308,7 +308,7 @@ class TornadoQueueClient(SimpleQueueClient):
lambda: self.channel.basic_consume(queue_name, wrapped_consumer,
consumer_tag=self._generate_ctag(queue_name)))
queue_client = None # type: Optional[SimpleQueueClient]
queue_client: Optional[SimpleQueueClient] = None
def get_queue_client() -> SimpleQueueClient:
global queue_client
if queue_client is None:

View File

@ -19,7 +19,7 @@ import time
# https://www.domaintools.com/resources/blog/rate-limiting-with-redis
client = get_redis_client()
rules = settings.RATE_LIMITING_RULES # type: Dict[str, List[Tuple[int, int]]]
rules: Dict[str, List[Tuple[int, int]]] = settings.RATE_LIMITING_RULES
KEY_PREFIX = ''
@ -31,7 +31,7 @@ class RateLimiterLockingException(Exception):
class RateLimitedObject(ABC):
def __init__(self, backend: Optional['Type[RateLimiterBackend]']=None) -> None:
if backend is not None:
self.backend = backend # type: Type[RateLimiterBackend]
self.backend: Type[RateLimiterBackend] = backend
elif settings.RUNNING_INSIDE_TORNADO:
self.backend = TornadoInMemoryRateLimiterBackend
else:
@ -119,7 +119,7 @@ class RateLimitedUser(RateLimitedObject):
def rules(self) -> List[Tuple[int, int]]:
# user.rate_limits are general limits, applicable to the domain 'api_by_user'
if self.user.rate_limits != "" and self.domain == 'api_by_user':
result = [] # type: List[Tuple[int, int]]
result: List[Tuple[int, int]] = []
for limit in self.user.rate_limits.split(','):
(seconds, requests) = limit.split(':', 2)
result.append((int(seconds), int(requests)))
@ -179,15 +179,15 @@ class RateLimiterBackend(ABC):
class TornadoInMemoryRateLimiterBackend(RateLimiterBackend):
# reset_times[rule][key] is the time at which the event
# request from the rate-limited key will be accepted.
reset_times = {} # type: Dict[Tuple[int, int], Dict[str, float]]
reset_times: Dict[Tuple[int, int], Dict[str, float]] = {}
# last_gc_time is the last time when the garbage was
# collected from reset_times for rule (time_window, max_count).
last_gc_time = {} # type: Dict[Tuple[int, int], float]
last_gc_time: Dict[Tuple[int, int], float] = {}
# timestamps_blocked_until[key] contains the timestamp
# up to which the key has been blocked manually.
timestamps_blocked_until = {} # type: Dict[str, float]
timestamps_blocked_until: Dict[str, float] = {}
@classmethod
def _garbage_collect_for_rule(cls, now: float, time_window: int, max_count: int) -> None:
@ -333,8 +333,8 @@ class RedisRateLimiterBackend(RateLimiterBackend):
results = pipe.execute()
count = results[0] # type: int
newest_call = results[1] # type: Optional[bytes]
count: int = results[0]
newest_call: Optional[bytes] = results[1]
calls_left = max_calls - count
if newest_call is not None:
@ -361,7 +361,7 @@ class RedisRateLimiterBackend(RateLimiterBackend):
pipe.get(blocking_key)
pipe.ttl(blocking_key)
rule_timestamps = pipe.execute() # type: List[Optional[bytes]]
rule_timestamps: List[Optional[bytes]] = pipe.execute()
# Check if there is a manual block on this API key
blocking_ttl_b = rule_timestamps.pop()

View File

@ -19,8 +19,8 @@ class RequestConfusingParmsError(JsonableError):
data_fields = ['var_name1', 'var_name2']
def __init__(self, var_name1: str, var_name2: str) -> None:
self.var_name1 = var_name1 # type: str
self.var_name2 = var_name2 # type: str
self.var_name1: str = var_name1
self.var_name2: str = var_name2
@staticmethod
def msg_format() -> str:
@ -31,7 +31,7 @@ class RequestVariableMissingError(JsonableError):
data_fields = ['var_name']
def __init__(self, var_name: str) -> None:
self.var_name = var_name # type: str
self.var_name: str = var_name
@staticmethod
def msg_format() -> str:
@ -42,7 +42,7 @@ class RequestVariableConversionError(JsonableError):
data_fields = ['var_name', 'bad_value']
def __init__(self, var_name: str, bad_value: Any) -> None:
self.var_name = var_name # type: str
self.var_name: str = var_name
self.bad_value = bad_value
@staticmethod
@ -106,7 +106,7 @@ class _REQ(Generic[ResultT]):
"""
self.post_var_name = whence
self.func_var_name = None # type: Optional[str]
self.func_var_name: Optional[str] = None
self.converter = converter
self.validator = validator
self.str_validator = str_validator
@ -237,7 +237,7 @@ def REQ(
path_only=path_only,
))
arguments_map = defaultdict(list) # type: Dict[str, List[str]]
arguments_map: Dict[str, List[str]] = defaultdict(list)
# Extracts variables from the request object and passes them as
# named function arguments. The request object must be the first
@ -314,7 +314,7 @@ def has_request_variables(view_func: ViewFuncT) -> ViewFuncT:
default_assigned = False
post_var_name = None # type: Optional[str]
post_var_name: Optional[str] = None
for req_var in post_var_names:
if req_var in request.POST:

View File

@ -57,7 +57,7 @@ def rest_dispatch(request: HttpRequest, **kwargs: Any) -> HttpResponse:
Never make a urls.py pattern put user input into a variable called GET, POST,
etc, as that is where we route HTTP verbs to target functions.
"""
supported_methods = {} # type: Dict[str, Any]
supported_methods: Dict[str, Any] = {}
if hasattr(request, "saved_response"):
# For completing long-polled Tornado requests, we skip the

View File

@ -20,7 +20,7 @@ log_to_file(logger, settings.RETENTION_LOG_PATH)
MESSAGE_BATCH_SIZE = 1000
models_with_message_key = [
models_with_message_key: List[Dict[str, Any]] = [
{
'class': Reaction,
'archive_class': ArchivedReaction,
@ -39,7 +39,7 @@ models_with_message_key = [
'table_name': 'zerver_usermessage',
'archive_table_name': 'zerver_archivedusermessage'
},
] # type: List[Dict[str, Any]]
]
@transaction.atomic(savepoint=False)
def move_rows(base_model: Model, raw_query: str, src_db_table: str='', returning_id: bool=False,
@ -280,7 +280,7 @@ def archive_stream_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) ->
if not realm.message_retention_days:
streams = streams.exclude(message_retention_days__isnull=True)
retention_policy_dict = {} # type: Dict[int, int]
retention_policy_dict: Dict[int, int] = {}
for stream in streams:
# if stream.message_retention_days is null, use the realm's policy
if stream.message_retention_days:

View File

@ -20,7 +20,7 @@ def filter_by_subscription_history(user_profile: UserProfile,
all_stream_messages: DefaultDict[int, List[Message]],
all_stream_subscription_logs: DefaultDict[int, List[RealmAuditLog]],
) -> List[UserMessage]:
user_messages_to_insert = [] # type: List[UserMessage]
user_messages_to_insert: List[UserMessage] = []
def store_user_message_to_insert(message: Message) -> None:
message = UserMessage(user_profile=user_profile,
@ -149,7 +149,7 @@ def add_missing_messages(user_profile: UserProfile) -> None:
modified_stream__id__in=stream_ids,
event_type__in=events).order_by('event_last_message_id', 'id'))
all_stream_subscription_logs = defaultdict(list) # type: DefaultDict[int, List[RealmAuditLog]]
all_stream_subscription_logs: DefaultDict[int, List[RealmAuditLog]] = defaultdict(list)
for log in subscription_logs:
all_stream_subscription_logs[log.modified_stream_id].append(log)
@ -177,7 +177,7 @@ def add_missing_messages(user_profile: UserProfile) -> None:
all_stream_msgs = [msg for msg in all_stream_msgs
if msg['id'] not in already_created_ums]
stream_messages = defaultdict(list) # type: DefaultDict[int, List[Message]]
stream_messages: DefaultDict[int, List[Message]] = defaultdict(list)
for msg in all_stream_msgs:
stream_messages[msg['recipient__type_id']].append(msg)
@ -240,7 +240,7 @@ def do_soft_deactivate_users(users: List[UserProfile]) -> List[UserProfile]:
return users_soft_deactivated
def do_auto_soft_deactivate_users(inactive_for_days: int, realm: Optional[Realm]) -> List[UserProfile]:
filter_kwargs = {} # type: Dict[str, Realm]
filter_kwargs: Dict[str, Realm] = {}
if realm is not None:
filter_kwargs = dict(user_profile__realm=realm)
users_to_deactivate = get_users_for_soft_deactivation(inactive_for_days, filter_kwargs)

View File

@ -23,7 +23,7 @@ class NonClosingPool(sqlalchemy.pool.NullPool):
logging_name=self._orig_logging_name,
_dispatch=self.dispatch)
sqlalchemy_engine = None # type: Optional[Any]
sqlalchemy_engine: Optional[Any] = None
def get_sqlalchemy_connection() -> sqlalchemy.engine.base.Connection:
global sqlalchemy_engine
if sqlalchemy_engine is None:

View File

@ -18,8 +18,8 @@ class StreamRecipientMap:
'''
def __init__(self) -> None:
self.recip_to_stream = dict() # type: Dict[int, int]
self.stream_to_recip = dict() # type: Dict[int, int]
self.recip_to_stream: Dict[int, int] = dict()
self.stream_to_recip: Dict[int, int] = dict()
def populate_with(self, *, stream_id: int, recipient_id: int) -> None:
# We use * to enforce using named arguments when calling this function,

View File

@ -51,10 +51,10 @@ def get_bulk_stream_subscriber_info(
stream_ids = stream_dict.keys()
result = {
result: Dict[int, List[Tuple[Subscription, Stream]]] = {
user_profile.id: []
for user_profile in user_profiles
} # type: Dict[int, List[Tuple[Subscription, Stream]]]
}
subs = Subscription.objects.filter(
user_profile__in=user_profiles,

View File

@ -86,8 +86,8 @@ def create_streams_if_needed(realm: Realm,
stream_dicts: List[Mapping[str, Any]]) -> Tuple[List[Stream], List[Stream]]:
"""Note that stream_dict["name"] is assumed to already be stripped of
whitespace"""
added_streams = [] # type: List[Stream]
existing_streams = [] # type: List[Stream]
added_streams: List[Stream] = []
existing_streams: List[Stream] = []
for stream_dict in stream_dicts:
stream, created = create_stream_if_needed(
realm,
@ -361,7 +361,7 @@ def can_access_stream_history_by_id(user_profile: UserProfile, stream_id: int) -
def filter_stream_authorization(user_profile: UserProfile,
streams: Iterable[Stream]) -> Tuple[List[Stream], List[Stream]]:
streams_subscribed = set() # type: Set[int]
streams_subscribed: Set[int] = set()
recipient_ids = [stream.recipient_id for stream in streams]
subs = Subscription.objects.filter(user_profile=user_profile,
recipient_id__in=recipient_ids,
@ -370,7 +370,7 @@ def filter_stream_authorization(user_profile: UserProfile,
for sub in subs:
streams_subscribed.add(sub.recipient.type_id)
unauthorized_streams = [] # type: List[Stream]
unauthorized_streams: List[Stream] = []
for stream in streams:
# The user is authorized for their own streams
if stream.id in streams_subscribed:
@ -411,8 +411,8 @@ def list_to_streams(streams_raw: Iterable[Mapping[str, Any]],
assert stream_name == stream_name.strip()
check_stream_name(stream_name)
existing_streams = [] # type: List[Stream]
missing_stream_dicts = [] # type: List[Mapping[str, Any]]
existing_streams: List[Stream] = []
missing_stream_dicts: List[Mapping[str, Any]] = []
existing_stream_map = bulk_get_streams(user_profile.realm, stream_set)
member_creating_announcement_only_stream = False
@ -438,7 +438,7 @@ def list_to_streams(streams_raw: Iterable[Mapping[str, Any]],
if len(missing_stream_dicts) == 0:
# This is the happy path for callers who expected all of these
# streams to exist already.
created_streams = [] # type: List[Stream]
created_streams: List[Stream] = []
else:
# autocreate=True path starts here
if not user_profile.can_create_streams():

View File

@ -96,11 +96,11 @@ class UploadSerializeMixin(SerializeMixin):
class ZulipTestCase(TestCase):
# Ensure that the test system just shows us diffs
maxDiff = None # type: Optional[int]
maxDiff: Optional[int] = None
def setUp(self) -> None:
super().setUp()
self.API_KEYS = {} # type: Dict[str, str]
self.API_KEYS: Dict[str, str] = {}
def tearDown(self) -> None:
super().tearDown()
@ -868,7 +868,7 @@ class ZulipTestCase(TestCase):
if binary:
with open(attr_value, "rb") as f:
# attr_value should be a path to the file with the binary data
data = f.read() # type: Union[str, bytes]
data: Union[str, bytes] = f.read()
else:
data = attr_value
@ -893,10 +893,10 @@ class WebhookTestCase(ZulipTestCase):
If you create your url in uncommon way you can override build_webhook_url method
In case that you need modify body or create it without using fixture you can also override get_body method
"""
STREAM_NAME = None # type: Optional[str]
STREAM_NAME: Optional[str] = None
TEST_USER_EMAIL = 'webhook-bot@zulip.com'
URL_TEMPLATE = None # type: Optional[str]
FIXTURE_DIR_NAME = None # type: Optional[str]
URL_TEMPLATE: Optional[str] = None
FIXTURE_DIR_NAME: Optional[str] = None
@property
def test_user(self) -> UserProfile:
@ -989,14 +989,14 @@ class MigrationsTestCase(ZulipTestCase): # nocoverage
def app(self) -> str:
return apps.get_containing_app_config(type(self).__module__).name
migrate_from = None # type: Optional[str]
migrate_to = None # type: Optional[str]
migrate_from: Optional[str] = None
migrate_to: Optional[str] = None
def setUp(self) -> None:
assert self.migrate_from and self.migrate_to, \
"TestCase '{}' must define migrate_from and migrate_to properties".format(type(self).__name__)
migrate_from = [(self.app, self.migrate_from)] # type: List[Tuple[str, str]]
migrate_to = [(self.app, self.migrate_to)] # type: List[Tuple[str, str]]
migrate_from: List[Tuple[str, str]] = [(self.app, self.migrate_from)]
migrate_to: List[Tuple[str, str]] = [(self.app, self.migrate_to)]
executor = MigrationExecutor(connection)
old_apps = executor.loader.project_state(migrate_from).apps

View File

@ -321,7 +321,7 @@ def destroy_leaked_test_databases(expiry_time: int = 60 * 60) -> int:
while also ensuring we will eventually delete all leaked databases.
"""
files = glob.glob(os.path.join(UUID_VAR_DIR, TEMPLATE_DATABASE_DIR, "*"))
test_databases = set() # type: Set[str]
test_databases: Set[str] = set()
try:
with connection.cursor() as cursor:
cursor.execute("SELECT datname FROM pg_database;")
@ -332,7 +332,7 @@ def destroy_leaked_test_databases(expiry_time: int = 60 * 60) -> int:
except ProgrammingError:
pass
databases_in_use = set() # type: Set[str]
databases_in_use: Set[str] = set()
for file in files:
if round(time.time()) - os.path.getmtime(file) < expiry_time:
with open(file) as f:

View File

@ -117,7 +117,7 @@ def capture_event(event_info: EventInfo) -> Iterator[None]:
@contextmanager
def simulated_empty_cache() -> Generator[
List[Tuple[str, Union[str, List[str]], str]], None, None]:
cache_queries = [] # type: List[Tuple[str, Union[str, List[str]], str]]
cache_queries: List[Tuple[str, Union[str, List[str]], str]] = []
def my_cache_get(key: str, cache_name: Optional[str]=None) -> Optional[Dict[str, Any]]:
cache_queries.append(('get', key, cache_name))
@ -143,7 +143,7 @@ def queries_captured(include_savepoints: Optional[bool]=False) -> Generator[
the with statement.
'''
queries = [] # type: List[Dict[str, Union[str, bytes]]]
queries: List[Dict[str, Union[str, bytes]]] = []
def wrapper_execute(self: TimeTrackingCursor,
action: Callable[[str, Iterable[Any]], None],
@ -258,19 +258,19 @@ class POSTRequestMock:
method = "POST"
def __init__(self, post_data: Dict[str, Any], user_profile: Optional[UserProfile]) -> None:
self.GET = {} # type: Dict[str, Any]
self.GET: Dict[str, Any] = {}
# Convert any integer parameters passed into strings, even
# though of course the HTTP API would do so. Ideally, we'd
# get rid of this abstraction entirely and just use the HTTP
# API directly, but while it exists, we need this code.
self.POST = {} # type: Dict[str, str]
self.POST: Dict[str, str] = {}
for key in post_data:
self.POST[key] = str(post_data[key])
self.user = user_profile
self._tornado_handler = DummyHandler()
self._log_data = {} # type: Dict[str, Any]
self._log_data: Dict[str, Any] = {}
self.META = {'PATH_INFO': 'test'}
self.path = ''
@ -280,8 +280,8 @@ class HostRequestMock:
def __init__(self, user_profile: UserProfile=None, host: str=settings.EXTERNAL_HOST) -> None:
self.host = host
self.GET = {} # type: Dict[str, Any]
self.POST = {} # type: Dict[str, Any]
self.GET: Dict[str, Any] = {}
self.POST: Dict[str, Any] = {}
self.META = {'PATH_INFO': 'test'}
self.path = ''
self.user = user_profile
@ -309,7 +309,7 @@ class MockPythonResponse:
INSTRUMENTING = os.environ.get('TEST_INSTRUMENT_URL_COVERAGE', '') == 'TRUE'
INSTRUMENTED_CALLS = [] # type: List[Dict[str, Any]]
INSTRUMENTED_CALLS: List[Dict[str, Any]] = []
UrlFuncT = Callable[..., HttpResponse] # TODO: make more specific
@ -350,7 +350,7 @@ def write_instrumentation_reports(full_suite: bool, include_webhooks: bool) -> N
from zproject.urls import urlpatterns, v1_api_and_json_patterns
# Find our untested urls.
pattern_cnt = collections.defaultdict(int) # type: Dict[str, int]
pattern_cnt: Dict[str, int] = collections.defaultdict(int)
def re_strip(r: Any) -> str:
return str(r).lstrip('^').rstrip('$')

View File

@ -80,7 +80,7 @@ def get_test_method(test: TestCase) -> Callable[[], None]:
return getattr(test, test._testMethodName)
# Each tuple is delay, test_name, slowness_reason
TEST_TIMINGS = [] # type: List[Tuple[float, str, str]]
TEST_TIMINGS: List[Tuple[float, str, str]] = []
def report_slow_tests() -> None:
@ -154,7 +154,7 @@ class TextTestResult(runner.TextTestResult):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.failed_tests = [] # type: List[str]
self.failed_tests: List[str] = []
def addInfo(self, test: TestCase, msg: str) -> None:
self.stream.write(msg) # type: ignore[attr-defined] # https://github.com/python/typeshed/issues/3139
@ -416,10 +416,10 @@ class Runner(DiscoverRunner):
# `templates_rendered` holds templates which were rendered
# in proper logical tests.
self.templates_rendered = set() # type: Set[str]
self.templates_rendered: Set[str] = set()
# `shallow_tested_templates` holds templates which were rendered
# in `zerver.tests.test_templates`.
self.shallow_tested_templates = set() # type: Set[str]
self.shallow_tested_templates: Set[str] = set()
template_rendered.connect(self.on_template_rendered)
def get_resultclass(self) -> Type[TestResult]:

View File

@ -35,8 +35,12 @@ def timeout(timeout: float, func: Callable[..., ResultT], *args: Any, **kwargs:
class TimeoutThread(threading.Thread):
def __init__(self) -> None:
threading.Thread.__init__(self)
self.result = None # type: Optional[ResultT]
self.exc_info = (None, None, None) # type: Tuple[Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]]
self.result: Optional[ResultT] = None
self.exc_info: Tuple[
Optional[Type[BaseException]],
Optional[BaseException],
Optional[TracebackType],
] = (None, None, None)
# Don't block the whole program from exiting
# if this is the only thread left.

View File

@ -161,7 +161,7 @@ def update_messages_for_topic_edit(message: Message,
return messages_list
def generate_topic_history_from_db_rows(rows: List[Tuple[str, int]]) -> List[Dict[str, Any]]:
canonical_topic_names = {} # type: Dict[str, Tuple[int, str]]
canonical_topic_names: Dict[str, Tuple[int, str]] = {}
# Sort rows by max_message_id so that if a topic
# has many different casings, we use the most

View File

@ -10,7 +10,7 @@ class SourceMap:
def __init__(self, sourcemap_dirs: List[str]) -> None:
self._dirs = sourcemap_dirs
self._indices = {} # type: Dict[str, sourcemap.SourceMapDecoder]
self._indices: Dict[str, sourcemap.SourceMapDecoder] = {}
def _index_for(self, minified_src: str) -> sourcemap.SourceMapDecoder:
'''Return the source map index for minified_src, loading it if not
@ -26,7 +26,7 @@ class SourceMap:
return self._indices[minified_src]
def annotate_stacktrace(self, stacktrace: str) -> str:
out = '' # type: str
out: str = ''
for ln in stacktrace.splitlines():
out += ln + '\n'
match = re.search(r'/static/webpack-bundles/([^:]+):(\d+):(\d+)', ln)

View File

@ -337,7 +337,7 @@ def get_signed_upload_url(path: str) -> str:
def get_realm_for_filename(path: str) -> Optional[int]:
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
key = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET).get_key(path) # type: Optional[Key]
key: Optional[Key] = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET).get_key(path)
if key is None:
# This happens if the key does not exist.
return None
@ -351,7 +351,7 @@ class S3UploadBackend(ZulipUploadBackend):
bucket = get_bucket(self.connection, bucket_name)
# check if file exists
key = bucket.get_key(path_id) # type: Optional[Key]
key: Optional[Key] = bucket.get_key(path_id)
if key is not None:
bucket.delete_key(key)
return True
@ -860,7 +860,7 @@ class LocalUploadBackend(ZulipUploadBackend):
# Common and wrappers
if settings.LOCAL_UPLOADS_DIR is not None:
upload_backend = LocalUploadBackend() # type: ZulipUploadBackend
upload_backend: ZulipUploadBackend = LocalUploadBackend()
else:
upload_backend = S3UploadBackend() # nocoverage

View File

@ -26,7 +26,7 @@ def user_groups_in_realm_serialized(realm: Realm) -> List[Dict[str, Any]]:
UserGroup and UserGroupMembership that we need.
"""
realm_groups = UserGroup.objects.filter(realm=realm)
group_dicts = {} # type: Dict[str, Any]
group_dicts: Dict[str, Any] = {}
for user_group in realm_groups:
group_dicts[user_group.id] = dict(
id=user_group.id,

View File

@ -20,7 +20,7 @@ def get_user_info_dict(realm_id: int) -> Dict[int, Dict[str, Any]]:
'status_text',
)
user_dict = dict() # type: Dict[int, Dict[str, Any]]
user_dict: Dict[int, Dict[str, Any]] = dict()
for row in rows:
away = row['status'] == UserStatus.AWAY
status_text = row['status_text']

View File

@ -169,11 +169,11 @@ def user_ids_to_users(user_ids: List[int], realm: Realm) -> List[UserProfile]:
def fetch_users_by_id(user_ids: List[int]) -> List[UserProfile]:
return list(UserProfile.objects.filter(id__in=user_ids).select_related())
user_profiles_by_id = generic_bulk_cached_fetch(
user_profiles_by_id: Dict[int, UserProfile] = generic_bulk_cached_fetch(
cache_key_function=user_profile_by_id_cache_key,
query_function=fetch_users_by_id,
object_ids=user_ids
) # type: Dict[int, UserProfile]
)
found_user_ids = user_profiles_by_id.keys()
missed_user_ids = [user_id for user_id in user_ids if user_id not in found_user_ids]
@ -380,7 +380,7 @@ def get_cross_realm_dicts() -> List[Dict[str, Any]]:
def get_custom_profile_field_values(custom_profile_field_values:
List[CustomProfileFieldValue]) -> Dict[int, Dict[str, Any]]:
profiles_by_user_id = defaultdict(dict) # type: Dict[int, Dict[str, Any]]
profiles_by_user_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
for profile_field in custom_profile_field_values:
user_id = profile_field.user_profile_id
if profile_field.field.is_renderable():

View File

@ -210,7 +210,7 @@ def check_dict(required_keys: Iterable[Tuple[str, Validator]]=[],
optional_keys: Iterable[Tuple[str, Validator]]=[],
value_validator: Optional[Validator]=None,
_allow_only_listed_keys: bool=False) -> Validator:
type_structure = {} # type: Dict[str, Any]
type_structure: Dict[str, Any] = {}
@set_type_structure(type_structure)
def f(var_name: str, val: object) -> Optional[str]:

View File

@ -77,7 +77,7 @@ def get_push_commits_event_message(user_name: str, compare_url: Optional[str],
number_of_commits=len(commits_data),
commit_or_commits=COMMIT_OR_COMMITS.format('s' if len(commits_data) > 1 else ''))
committers_items = get_all_committers(commits_data) # type: List[Tuple[str, int]]
committers_items: List[Tuple[str, int]] = get_all_committers(commits_data)
if len(committers_items) == 1 and user_name == committers_items[0][0]:
return PUSH_COMMITS_MESSAGE_TEMPLATE_WITHOUT_COMMITTERS.format(
user_name=user_name,
@ -279,15 +279,16 @@ def get_short_sha(sha: str) -> str:
return sha[:7]
def get_all_committers(commits_data: List[Dict[str, Any]]) -> List[Tuple[str, int]]:
committers = defaultdict(int) # type: Dict[str, int]
committers: Dict[str, int] = defaultdict(int)
for commit in commits_data:
committers[commit['name']] += 1
# Sort by commit count, breaking ties alphabetically.
committers_items = sorted(list(committers.items()),
key=lambda item: (-item[1], item[0])) # type: List[Tuple[str, int]]
committers_values = [c_i[1] for c_i in committers_items] # type: List[int]
committers_items: List[Tuple[str, int]] = sorted(
list(committers.items()), key=lambda item: (-item[1], item[0])
)
committers_values: List[int] = [c_i[1] for c_i in committers_items]
if len(committers) > PUSH_COMMITTERS_LIMIT_INFO:
others_number_of_commits = sum(committers_values[PUSH_COMMITTERS_LIMIT_INFO:])

View File

@ -86,7 +86,7 @@ class AdminNotifyHandler(logging.Handler):
logging.Handler.__init__(self)
def emit(self, record: logging.LogRecord) -> None:
report = {} # type: Dict[str, Any]
report: Dict[str, Any] = {}
# This parameter determines whether Zulip should attempt to
# send Zulip messages containing the error report. If there's

View File

@ -50,7 +50,9 @@ than max_api_calls! (trying to trim) %s %s" % (key, count))
wildcard_list = "ratelimit:*:*:list"
wildcard_zset = "ratelimit:*:*:zset"
trim_func = lambda key, max_calls: client.ltrim(key, 0, max_calls - 1) # type: Optional[Callable[[str, int], None]]
trim_func: Optional[
Callable[[str, int], None]
] = lambda key, max_calls: client.ltrim(key, 0, max_calls - 1)
if not options['trim']:
trim_func = None

View File

@ -88,7 +88,7 @@ class Command(compilemessages.Command):
locale_path = "{}/locale".format(settings.DEPLOY_ROOT)
output_path = "{}/language_options.json".format(locale_path)
data = {'languages': []} # type: Dict[str, List[Dict[str, Any]]]
data: Dict[str, List[Dict[str, Any]]] = {'languages': []}
try:
locales = self.get_locales()
@ -114,7 +114,7 @@ class Command(compilemessages.Command):
# Not a locale.
continue
info = {} # type: Dict[str, Any]
info: Dict[str, Any] = {}
code = to_language(locale)
percentage = self.get_translation_percentage(locale_path, locale)
try:

View File

@ -33,7 +33,7 @@ You can use "-" to represent stdin.
file_name = options['file_name']
if file_name == '-':
f = sys.stdin # type: IO[str]
f: IO[str] = sys.stdin
else:
f = open(file_name)

View File

@ -30,7 +30,7 @@ class Command(ZulipBaseCommand):
realm = self.get_realm(options)
assert realm is not None # Should be ensured by parser
streams = [] # type: List[Stream]
streams: List[Stream] = []
if options["streams"]:
stream_names = {stream.strip() for stream in options["streams"].split(",")}
for stream_name in set(stream_names):

View File

@ -140,7 +140,7 @@ class Command(makemessages.Command):
template.constant_re = old_constant_re
def extract_strings(self, data: str) -> List[str]:
translation_strings = [] # type: List[str]
translation_strings: List[str] = []
for regex in frontend_compiled_regexes:
for match in regex.findall(data):
match = match.strip()
@ -158,7 +158,7 @@ class Command(makemessages.Command):
return data
def get_translation_strings(self) -> List[str]:
translation_strings = [] # type: List[str]
translation_strings: List[str] = []
dirname = self.get_template_dir()
for dirpath, dirnames, filenames in os.walk(dirname):

View File

@ -70,7 +70,7 @@ Example:
message = self._parse_email_fixture(full_fixture_path)
self._prepare_message(message, realm, stream)
data = {} # type: Dict[str, str]
data: Dict[str, str] = {}
data['recipient'] = str(message['To']) # Need str() here to avoid mypy throwing an error
data['msg_text'] = message.as_string()
mirror_email_message(data)

View File

@ -57,7 +57,7 @@ class Command(ZulipBaseCommand):
activate = options['activate']
deactivate = options['deactivate']
filter_kwargs = {} # type: Dict[str, Realm]
filter_kwargs: Dict[str, Realm] = {}
if realm is not None:
filter_kwargs = dict(realm=realm)

View File

@ -332,7 +332,7 @@ class CsrfFailureError(JsonableError):
data_fields = ['reason']
def __init__(self, reason: str) -> None:
self.reason = reason # type: str
self.reason: str = reason
@staticmethod
def msg_format() -> str:

View File

@ -38,7 +38,7 @@ def sync_filesizes(apps: StateApps, schema_editor: DatabaseSchemaEditor) -> None
bucket = conn.get_bucket(bucket_name, validate=False)
for attachment in attachments.objects.all():
if attachment.size is None:
file_key = bucket.get_key(attachment.path_id) # type: Optional[Key]
file_key: Optional[Key] = bucket.get_key(attachment.path_id)
if file_key is None:
new_size = 0
else:

View File

@ -111,7 +111,7 @@ class S3Uploader(Uploader):
def upload_files(self, response: Response, resized_image: bytes,
dst_path_id: str) -> None:
headers = None # type: Optional[Dict[str, str]]
headers: Optional[Dict[str, str]] = None
content_type = response.headers.get("Content-Type") or guess_type(dst_path_id)[0]
if content_type:
headers = {'Content-Type': content_type}
@ -129,7 +129,7 @@ def get_uploader() -> Uploader:
def upload_emoji_to_storage(apps: StateApps, schema_editor: DatabaseSchemaEditor) -> None:
realm_emoji_model = apps.get_model('zerver', 'RealmEmoji')
uploader = get_uploader() # type: Uploader
uploader: Uploader = get_uploader()
for emoji in realm_emoji_model.objects.all():
file_name = uploader.upload_emoji(emoji.realm_id, emoji.img_url, emoji.name)
if file_name is None:

Some files were not shown because too many files have changed in this diff Show More