zephyr_mirror: Use zephyr's bulk subscription functionality.

Previously we were spending 15 seconds on linerva (and more like 2.5
minutes on the not-yet-operational zmirror.humbughq.com) to subscribe
to all of our streams.

(imported from commit c36cb1c26868f142683d9c92d4875fcd4931886e)
This commit is contained in:
Tim Abbott 2012-11-07 14:32:02 -05:00
parent 7aaa08d9b2
commit 935632428b
1 changed files with 10 additions and 7 deletions

View File

@ -130,11 +130,9 @@ def username_to_fullname(username):
return fullnames[username] return fullnames[username]
current_zephyr_subs = set() current_zephyr_subs = set()
def ensure_subscribed(sub): def zephyr_bulk_subscribe(subs):
if sub in current_zephyr_subs:
return
try: try:
zephyr.Subscriptions().add((sub, '*', '*')) zephyr._z.subAll(subs)
except IOError: except IOError:
# Since we haven't added the subscription to # Since we haven't added the subscription to
# current_zephyr_subs yet, we can just return (so that we'll # current_zephyr_subs yet, we can just return (so that we'll
@ -142,9 +140,10 @@ def ensure_subscribed(sub):
# retrying the next time the bot checks its subscriptions are # retrying the next time the bot checks its subscriptions are
# up to date. # up to date.
traceback.print_exc() traceback.print_exc()
print "Error subscribing to stream %s; will retry later." % (sub,) print "Error subscribing to streams; will retry later."
return return
current_zephyr_subs.add(sub) for (cls, instance, recipient) in subs:
current_zephyr_subs.add(cls)
def update_subscriptions_from_humbug(): def update_subscriptions_from_humbug():
try: try:
@ -154,8 +153,12 @@ def update_subscriptions_from_humbug():
print "%s: Error getting public streams:" % (datetime.datetime.now()) print "%s: Error getting public streams:" % (datetime.datetime.now())
traceback.print_exc() traceback.print_exc()
return return
streams_to_subscribe = []
for stream in streams: for stream in streams:
ensure_subscribed(stream) if stream in current_zephyr_subs:
continue
streams_to_subscribe.append((stream, "*", "*"))
zephyr_bulk_subscribe(streams_to_subscribe)
def maybe_restart_mirroring_script(): def maybe_restart_mirroring_script():
if os.stat(os.path.join(options.root_path, "stamps", "restart_stamp")).st_mtime > start_time or \ if os.stat(os.path.join(options.root_path, "stamps", "restart_stamp")).st_mtime > start_time or \