2018-11-13 23:03:18 +01:00
|
|
|
from typing import Any, Callable, Dict
|
2018-10-13 15:13:35 +02:00
|
|
|
|
|
|
|
'''
|
|
|
|
This module helps you set up a bunch
|
|
|
|
of sequences, similar to how database
|
|
|
|
sequences work.
|
|
|
|
|
|
|
|
You need to be a bit careful here, since
|
|
|
|
you're dealing with a big singleton, but
|
|
|
|
for data imports that's usually easy to
|
|
|
|
manage. See hipchat.py for example usage.
|
|
|
|
'''
|
|
|
|
|
|
|
|
def _seq() -> Callable[[], int]:
|
|
|
|
i = 0
|
|
|
|
|
|
|
|
def next_one() -> int:
|
|
|
|
nonlocal i
|
|
|
|
i += 1
|
|
|
|
return i
|
|
|
|
|
|
|
|
return next_one
|
|
|
|
|
|
|
|
def sequencer() -> Callable[[str], int]:
|
|
|
|
'''
|
|
|
|
Use like this:
|
|
|
|
|
|
|
|
NEXT_ID = sequencer()
|
2018-10-23 21:38:37 +02:00
|
|
|
message_id = NEXT_ID('message')
|
2018-10-13 15:13:35 +02:00
|
|
|
'''
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
seq_dict: Dict[str, Callable[[], int]] = dict()
|
2018-10-13 15:13:35 +02:00
|
|
|
|
|
|
|
def next_one(name: str) -> int:
|
|
|
|
if name not in seq_dict:
|
|
|
|
seq_dict[name] = _seq()
|
|
|
|
seq = seq_dict[name]
|
|
|
|
return seq()
|
|
|
|
|
|
|
|
return next_one
|
2018-10-23 21:38:37 +02:00
|
|
|
|
|
|
|
'''
|
|
|
|
NEXT_ID is a singleton used by an entire process, which is
|
|
|
|
almost always reasonable. If you want to have two parallel
|
|
|
|
sequences, just use different `name` values.
|
|
|
|
|
|
|
|
This object gets created once and only once during the first
|
|
|
|
import of the file.
|
|
|
|
'''
|
|
|
|
|
|
|
|
NEXT_ID = sequencer()
|
2018-11-13 23:03:18 +01:00
|
|
|
|
|
|
|
def is_int(key: Any) -> bool:
|
|
|
|
try:
|
|
|
|
n = int(key)
|
|
|
|
except ValueError:
|
|
|
|
return False
|
|
|
|
|
|
|
|
return n <= 999999999
|
|
|
|
|
|
|
|
class IdMapper:
|
|
|
|
def __init__(self) -> None:
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
self.map: Dict[Any, int] = dict()
|
2018-11-13 23:03:18 +01:00
|
|
|
self.cnt = 0
|
|
|
|
|
|
|
|
def has(self, their_id: Any) -> bool:
|
|
|
|
return their_id in self.map
|
|
|
|
|
|
|
|
def get(self, their_id: Any) -> int:
|
|
|
|
if their_id in self.map:
|
|
|
|
return self.map[their_id]
|
|
|
|
|
|
|
|
if is_int(their_id):
|
|
|
|
our_id = int(their_id)
|
|
|
|
if self.cnt > 0:
|
|
|
|
raise Exception('mixed key styles')
|
|
|
|
else:
|
|
|
|
self.cnt += 1
|
|
|
|
our_id = self.cnt
|
|
|
|
|
|
|
|
self.map[their_id] = our_id
|
|
|
|
return our_id
|