update_analytics_count: Use a correct lock mechanism.

Adds a re-usable lockfile_nonblocking helper to context_managers.

Relying on naive `os.mkdir` is not enough especially now that the
successful operation of this command is necessary for push notifications
to work for many servers.

We can't use `lockfile` context manager from
`zerver.lib.context_managers`, because we want the custom behavior of
failing if the lock can't be acquired, instead of waiting.
That's because if an instance of this gets stuck, we don't want to start
queueing up more processes waiting forever whenever the cronjob runs
again and fail->exit is preferrable instead.
This commit is contained in:
Mateusz Mandera 2024-02-26 23:44:52 +01:00 committed by Tim Abbott
parent fb7d77545f
commit 634015411a
3 changed files with 30 additions and 15 deletions

View File

@ -1,5 +1,4 @@
import hashlib import hashlib
import os
import time import time
from argparse import ArgumentParser from argparse import ArgumentParser
from datetime import timezone from datetime import timezone
@ -13,6 +12,7 @@ from typing_extensions import override
from analytics.lib.counts import ALL_COUNT_STATS, logger, process_count_stat from analytics.lib.counts import ALL_COUNT_STATS, logger, process_count_stat
from scripts.lib.zulip_tools import ENDC, WARNING from scripts.lib.zulip_tools import ENDC, WARNING
from zerver.lib.context_managers import lockfile_nonblocking
from zerver.lib.remote_server import send_server_data_to_push_bouncer from zerver.lib.remote_server import send_server_data_to_push_bouncer
from zerver.lib.timestamp import floor_to_hour from zerver.lib.timestamp import floor_to_hour
from zerver.models import Realm from zerver.models import Realm
@ -42,19 +42,16 @@ class Command(BaseCommand):
@override @override
def handle(self, *args: Any, **options: Any) -> None: def handle(self, *args: Any, **options: Any) -> None:
try: with lockfile_nonblocking(
os.mkdir(settings.ANALYTICS_LOCK_DIR) settings.ANALYTICS_LOCK_FILE,
except OSError: ) as lock_acquired:
if lock_acquired:
self.run_update_analytics_counts(options)
else:
print( print(
f"{WARNING}Analytics lock {settings.ANALYTICS_LOCK_DIR} is unavailable;" f"{WARNING}Analytics lock {settings.ANALYTICS_LOCK_FILE} is unavailable;"
f" exiting.{ENDC}" f" exiting.{ENDC}"
) )
return
try:
self.run_update_analytics_counts(options)
finally:
os.rmdir(settings.ANALYTICS_LOCK_DIR)
def run_update_analytics_counts(self, options: Dict[str, Any]) -> None: def run_update_analytics_counts(self, options: Dict[str, Any]) -> None:
# installation_epoch relies on there being at least one realm; we # installation_epoch relies on there being at least one realm; we

View File

@ -30,3 +30,21 @@ def lockfile(filename: str, shared: bool = False) -> Iterator[None]:
with open(filename, "w") as lock: with open(filename, "w") as lock:
with flock(lock, shared=shared): with flock(lock, shared=shared):
yield yield
@contextmanager
def lockfile_nonblocking(filename: str) -> Iterator[bool]: # nocoverage
"""Lock a file using flock(2) for the duration of a 'with' statement.
Doesn't block, yields False immediately if the lock can't be acquired."""
with open(filename) as f:
lock_acquired = False
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
lock_acquired = True
yield lock_acquired
except BlockingIOError:
yield False
finally:
if lock_acquired:
fcntl.flock(f, fcntl.LOCK_UN)

View File

@ -676,7 +676,7 @@ QUEUE_ERROR_DIR = zulip_path("/var/log/zulip/queue_error")
QUEUE_STATS_DIR = zulip_path("/var/log/zulip/queue_stats") QUEUE_STATS_DIR = zulip_path("/var/log/zulip/queue_stats")
DIGEST_LOG_PATH = zulip_path("/var/log/zulip/digest.log") DIGEST_LOG_PATH = zulip_path("/var/log/zulip/digest.log")
ANALYTICS_LOG_PATH = zulip_path("/var/log/zulip/analytics.log") ANALYTICS_LOG_PATH = zulip_path("/var/log/zulip/analytics.log")
ANALYTICS_LOCK_DIR = zulip_path("/home/zulip/deployments/analytics-lock-dir") ANALYTICS_LOCK_FILE = zulip_path("/home/zulip/deployments/analytics-lock.lock")
WEBHOOK_LOG_PATH = zulip_path("/var/log/zulip/webhooks_errors.log") WEBHOOK_LOG_PATH = zulip_path("/var/log/zulip/webhooks_errors.log")
WEBHOOK_ANOMALOUS_PAYLOADS_LOG_PATH = zulip_path("/var/log/zulip/webhooks_anomalous_payloads.log") WEBHOOK_ANOMALOUS_PAYLOADS_LOG_PATH = zulip_path("/var/log/zulip/webhooks_anomalous_payloads.log")
WEBHOOK_UNSUPPORTED_EVENTS_LOG_PATH = zulip_path("/var/log/zulip/webhooks_unsupported_events.log") WEBHOOK_UNSUPPORTED_EVENTS_LOG_PATH = zulip_path("/var/log/zulip/webhooks_unsupported_events.log")