pep8: Add compliance with rule E261 event_queue.py.

This commit is contained in:
Aditya Bansal 2017-06-04 15:45:23 +05:30 committed by Tim Abbott
parent 4679da87c4
commit 5b2dec0845
1 changed files with 48 additions and 48 deletions

View File

@ -72,8 +72,8 @@ class ClientDescriptor(object):
self.user_profile_id = user_profile_id
self.user_profile_email = user_profile_email
self.realm_id = realm_id
self.current_handler_id = None # type: Optional[int]
self.current_client_name = None # type: Optional[Text]
self.current_handler_id = None # type: Optional[int]
self.current_client_name = None # type: Optional[Text]
self.event_queue = event_queue
self.queue_timeout = lifespan_secs
self.event_types = event_types
@ -81,7 +81,7 @@ class ClientDescriptor(object):
self.apply_markdown = apply_markdown
self.all_public_streams = all_public_streams
self.client_type_name = client_type_name
self._timeout_handle = None # type: Any # TODO: should be return type of ioloop.add_timeout
self._timeout_handle = None # type: Any # TODO: should be return type of ioloop.add_timeout
self.narrow = narrow
self.narrow_filter = build_narrow_filter(narrow)
@ -232,10 +232,10 @@ def compute_full_event_type(event):
class EventQueue(object):
def __init__(self, id):
# type: (str) -> None
self.queue = deque() # type: ignore # type signature should Deque[Dict[str, Any]] but we need https://github.com/python/mypy/pull/2845 to be merged
self.next_event_id = 0 # type: int
self.id = id # type: str
self.virtual_events = {} # type: Dict[str, Dict[str, Any]]
self.queue = deque() # type: ignore # type signature should Deque[Dict[str, Any]] but we need https://github.com/python/mypy/pull/2845 to be merged
self.next_event_id = 0 # type: int
self.id = id # type: str
self.virtual_events = {} # type: Dict[str, Dict[str, Any]]
def to_dict(self):
# type: () -> Dict[str, Any]
@ -299,8 +299,8 @@ class EventQueue(object):
def contents(self):
# type: () -> List[Dict[str, Any]]
contents = [] # type: List[Dict[str, Any]]
virtual_id_map = {} # type: Dict[str, Dict[str, Any]]
contents = [] # type: List[Dict[str, Any]]
virtual_id_map = {} # type: Dict[str, Dict[str, Any]]
for event_type in self.virtual_events:
virtual_id_map[self.virtual_events[event_type]["id"]] = self.virtual_events[event_type]
virtual_ids = sorted(list(virtual_id_map.keys()))
@ -322,18 +322,18 @@ class EventQueue(object):
return contents
# maps queue ids to client descriptors
clients = {} # type: Dict[str, ClientDescriptor]
clients = {} # type: Dict[str, ClientDescriptor]
# maps user id to list of client descriptors
user_clients = {} # type: Dict[int, List[ClientDescriptor]]
user_clients = {} # type: Dict[int, List[ClientDescriptor]]
# maps realm id to list of client descriptors with all_public_streams=True
realm_clients_all_streams = {} # type: Dict[int, List[ClientDescriptor]]
realm_clients_all_streams = {} # type: Dict[int, List[ClientDescriptor]]
# list of registered gc hooks.
# each one will be called with a user profile id, queue, and bool
# last_for_client that is true if this is the last queue pertaining
# to this user_profile_id
# that is about to be deleted
gc_hooks = [] # type: List[Callable[[int, ClientDescriptor, bool], None]]
gc_hooks = [] # type: List[Callable[[int, ClientDescriptor, bool], None]]
next_queue_id = 0
@ -397,9 +397,9 @@ def do_gc_event_queues(to_remove, affected_users, affected_realms):
def gc_event_queues():
# type: () -> None
start = time.time()
to_remove = set() # type: Set[str]
affected_users = set() # type: Set[int]
affected_realms = set() # type: Set[int]
to_remove = set() # type: Set[str]
affected_users = set() # type: Set[int]
affected_realms = set() # type: Set[int]
for (id, client) in six.iteritems(clients):
if client.idle(start):
to_remove.add(id)
@ -458,7 +458,7 @@ def load_event_queues():
def send_restart_events(immediate=False):
# type: (bool) -> None
event = dict(type='restart', server_generation=settings.SERVER_GENERATION) # type: Dict[str, Any]
event = dict(type='restart', server_generation=settings.SERVER_GENERATION) # type: Dict[str, Any]
if immediate:
event['immediate'] = True
for client in six.itervalues(clients):
@ -472,7 +472,7 @@ def setup_event_queue():
atexit.register(dump_event_queues)
# Make sure we dump event queues even if we exit via signal
signal.signal(signal.SIGTERM, lambda signum, stack: sys.exit(1)) # type: ignore # https://github.com/python/mypy/issues/2955
tornado.autoreload.add_reload_hook(dump_event_queues) # type: ignore # TODO: Fix missing tornado.autoreload stub
tornado.autoreload.add_reload_hook(dump_event_queues) # type: ignore # TODO: Fix missing tornado.autoreload stub
try:
os.rename(settings.JSON_PERSISTENT_QUEUE_FILENAME, "/var/tmp/event_queues.json.last")
@ -489,14 +489,14 @@ def setup_event_queue():
def fetch_events(query):
# type: (Mapping[str, Any]) -> Dict[str, Any]
queue_id = query["queue_id"] # type: str
dont_block = query["dont_block"] # type: bool
last_event_id = query["last_event_id"] # type: int
user_profile_id = query["user_profile_id"] # type: int
new_queue_data = query.get("new_queue_data") # type: Optional[MutableMapping[str, Any]]
user_profile_email = query["user_profile_email"] # type: Text
client_type_name = query["client_type_name"] # type: Text
handler_id = query["handler_id"] # type: int
queue_id = query["queue_id"] # type: str
dont_block = query["dont_block"] # type: bool
last_event_id = query["last_event_id"] # type: int
user_profile_id = query["user_profile_id"] # type: int
new_queue_data = query.get("new_queue_data") # type: Optional[MutableMapping[str, Any]]
user_profile_email = query["user_profile_email"] # type: Text
client_type_name = query["client_type_name"] # type: Text
handler_id = query["handler_id"] # type: int
try:
was_connected = False
@ -521,7 +521,7 @@ def fetch_events(query):
if not client.event_queue.empty() or dont_block:
response = dict(events=client.event_queue.contents(),
handler_id=handler_id) # type: Dict[str, Any]
handler_id=handler_id) # type: Dict[str, Any]
if orig_queue_id is None:
response['queue_id'] = queue_id
if len(response["events"]) == 1:
@ -557,7 +557,7 @@ def extract_json_response(resp):
if requests_json_is_function:
return resp.json()
else:
return resp.json # type: ignore # mypy trusts the stub, not the runtime type checking of this fn
return resp.json # type: ignore # mypy trusts the stub, not the runtime type checking of this fn
def request_event_queue(user_profile, user_client, apply_markdown,
queue_lifespan_secs, event_types=None, all_public_streams=False,
@ -625,7 +625,7 @@ def missedmessage_hook(user_profile_id, queue, last_for_client):
if not last_for_client:
return
message_ids_to_notify = [] # type: List[Dict[str, Any]]
message_ids_to_notify = [] # type: List[Dict[str, Any]]
for event in queue.event_queue.contents():
if not event['type'] == 'message' or not event['flags']:
continue
@ -684,20 +684,20 @@ def receiver_is_idle(user_profile_id, realm_presences):
def process_message_event(event_template, users):
# type: (Mapping[str, Any], Iterable[Mapping[str, Any]]) -> None
realm_presences = {int(k): v for k, v in event_template['presences'].items()} # type: Dict[int, Dict[Text, Dict[str, Any]]]
sender_queue_id = event_template.get('sender_queue_id', None) # type: Optional[str]
message_dict_markdown = event_template['message_dict_markdown'] # type: Dict[str, Any]
message_dict_no_markdown = event_template['message_dict_no_markdown'] # type: Dict[str, Any]
sender_id = message_dict_markdown['sender_id'] # type: int
message_id = message_dict_markdown['id'] # type: int
message_type = message_dict_markdown['type'] # type: str
sending_client = message_dict_markdown['client'] # type: Text
realm_presences = {int(k): v for k, v in event_template['presences'].items()} # type: Dict[int, Dict[Text, Dict[str, Any]]]
sender_queue_id = event_template.get('sender_queue_id', None) # type: Optional[str]
message_dict_markdown = event_template['message_dict_markdown'] # type: Dict[str, Any]
message_dict_no_markdown = event_template['message_dict_no_markdown'] # type: Dict[str, Any]
sender_id = message_dict_markdown['sender_id'] # type: int
message_id = message_dict_markdown['id'] # type: int
message_type = message_dict_markdown['type'] # type: str
sending_client = message_dict_markdown['client'] # type: Text
# To remove duplicate clients: Maps queue ID to {'client': Client, 'flags': flags}
send_to_clients = {} # type: Dict[str, Dict[str, Any]]
send_to_clients = {} # type: Dict[str, Dict[str, Any]]
# Extra user-specific data to include
extra_user_data = {} # type: Dict[int, Any]
extra_user_data = {} # type: Dict[int, Any]
if 'stream_name' in event_template and not event_template.get("invite_only"):
for client in get_client_descriptors_for_realm_all_streams(event_template['realm_id']):
@ -706,8 +706,8 @@ def process_message_event(event_template, users):
send_to_clients[client.event_queue.id]['is_sender'] = True
for user_data in users:
user_profile_id = user_data['id'] # type: int
flags = user_data.get('flags', []) # type: Iterable[str]
user_profile_id = user_data['id'] # type: int
flags = user_data.get('flags', []) # type: Iterable[str]
for client in get_client_descriptors_for_user(user_profile_id):
send_to_clients[client.event_queue.id] = {'client': client, 'flags': flags}
@ -723,7 +723,7 @@ def process_message_event(event_template, users):
if (received_pm or mentioned) and (idle or always_push_notify):
notice = build_offline_notification(user_profile_id, message_id)
queue_json_publish("missedmessage_mobile_notifications", notice, lambda notice: None)
notified = dict(push_notified=True) # type: Dict[str, bool]
notified = dict(push_notified=True) # type: Dict[str, bool]
# Don't send missed message emails if always_push_notify is True
if idle:
# We require RabbitMQ to do this, as we can't call the email handler
@ -736,8 +736,8 @@ def process_message_event(event_template, users):
for client_data in six.itervalues(send_to_clients):
client = client_data['client']
flags = client_data['flags']
is_sender = client_data.get('is_sender', False) # type: bool
extra_data = extra_user_data.get(client.user_profile_id, None) # type: Optional[Mapping[str, bool]]
is_sender = client_data.get('is_sender', False) # type: bool
extra_data = extra_user_data.get(client.user_profile_id, None) # type: Optional[Mapping[str, bool]]
if not client.accepts_messages():
# The actual check is the accepts_event() check below;
@ -757,7 +757,7 @@ def process_message_event(event_template, users):
if flags is not None:
message_dict['is_mentioned'] = 'mentioned' in flags
user_event = dict(type='message', message=message_dict, flags=flags) # type: Dict[str, Any]
user_event = dict(type='message', message=message_dict, flags=flags) # type: Dict[str, Any]
if extra_data is not None:
user_event.update(extra_data)
@ -786,7 +786,7 @@ def process_userdata_event(event_template, users):
# type: (Mapping[str, Any], Iterable[Mapping[str, Any]]) -> None
for user_data in users:
user_profile_id = user_data['id']
user_event = dict(event_template) # shallow copy, but deep enough for our needs
user_event = dict(event_template) # shallow copy, but deep enough for our needs
for key in user_data.keys():
if key != "id":
user_event[key] = user_data[key]
@ -797,8 +797,8 @@ def process_userdata_event(event_template, users):
def process_notification(notice):
# type: (Mapping[str, Any]) -> None
event = notice['event'] # type: Mapping[str, Any]
users = notice['users'] # type: Union[Iterable[int], Iterable[Mapping[str, Any]]]
event = notice['event'] # type: Mapping[str, Any]
users = notice['users'] # type: Union[Iterable[int], Iterable[Mapping[str, Any]]]
if event['type'] in ["update_message", "delete_message"]:
process_userdata_event(event, cast(Iterable[Mapping[str, Any]], users))
elif event['type'] == "message":