mirror of https://github.com/zulip/zulip.git
[manual] Add worker process for processing asynchronous FTS updates
Unlike the other workers, this process runs on the database server. To prevent duplicate work, only the worker on the master actually processes updates. To deploy, we in theory just need to apply the new puppet config. In practice, the database servers aren't on wheezy yet, so the changes should be applied by hand to postgres0 and postgres1. This only needs to be done on deploy to staging. (imported from commit d679ffc0838f9fc8c7f0bede08a5568b339b7ddb)
This commit is contained in:
parent
d05ddce624
commit
0008edc9d5
|
@ -0,0 +1,54 @@
|
|||
#!/usr/bin/python
|
||||
import psycopg2
|
||||
import psycopg2.extensions
|
||||
import select
|
||||
import time
|
||||
import logging
|
||||
|
||||
def update_fts_columns(cursor):
|
||||
cursor.execute("SELECT id, message_id FROM fts_update_log;")
|
||||
ids = []
|
||||
for (id, message_id) in cursor.fetchall():
|
||||
cursor.execute("UPDATE zephyr_message SET "
|
||||
"search_tsvector = to_tsvector('humbug.english_us_search', "
|
||||
"subject || rendered_content) "
|
||||
"WHERE id = %s", (message_id,))
|
||||
ids.append(id)
|
||||
cursor.execute("DELETE FROM fts_update_log WHERE id = ANY(%s)", (ids,))
|
||||
|
||||
|
||||
def am_master(cursor):
|
||||
cursor.execute("SELECT pg_is_in_recovery()")
|
||||
return not cursor.fetchall()[0][0]
|
||||
|
||||
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s")
|
||||
logger = logging.getLogger("process_fts_updates")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
logger.info("process_fts_updates starting")
|
||||
|
||||
conn = psycopg2.connect("host=localhost user=humbug")
|
||||
cursor = conn.cursor()
|
||||
|
||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
|
||||
first_check = True
|
||||
while not am_master(cursor):
|
||||
if first_check:
|
||||
first_check = False
|
||||
logger.info("In recovery; sleeping")
|
||||
time.sleep(5)
|
||||
|
||||
logger.info("Not in recovery; listening for FTS updates")
|
||||
|
||||
cursor.execute("LISTEN fts_update_log;")
|
||||
update_fts_columns(cursor)
|
||||
|
||||
# TODO: If we go back into recovery, we should stop processing updates
|
||||
while True:
|
||||
if select.select([conn], [], [], 30) != ([], [], []):
|
||||
conn.poll()
|
||||
while conn.notifies:
|
||||
conn.notifies.pop()
|
||||
update_fts_columns(cursor)
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
[program:process-fts-updates]
|
||||
command=/usr/local/bin/process_fts_updates
|
||||
priority=600 ; the relative start priority (default 999)
|
||||
autostart=true ; start at supervisord start (default: true)
|
||||
autorestart=true ; whether/when to restart (default: unexpected)
|
||||
stopsignal=TERM ; signal used to kill process (default TERM)
|
||||
stopwaitsecs=30 ; max num secs to wait b4 SIGKILL (default 10)
|
||||
user=humbug ; setuid to this UNIX account to run the program
|
||||
redirect_stderr=true ; redirect proc stderr to stdout (default false)
|
||||
stdout_logfile=/var/log/humbug/fts-updates.log ; stdout log path, NONE for none; default AUTO
|
|
@ -1,6 +1,28 @@
|
|||
class humbug::postgres-appdb {
|
||||
class { 'humbug::base': }
|
||||
class { 'humbug::postgres-common': }
|
||||
class { 'humbug::supervisor': }
|
||||
|
||||
$appdb_packages = [ "python-psycopg2",]
|
||||
package { $appdb_packages: ensure => "installed" }
|
||||
|
||||
file { "/usr/local/bin/process_fts_updates":
|
||||
ensure => file,
|
||||
owner => "root",
|
||||
group => "root",
|
||||
mode => 755,
|
||||
source => "puppet:///modules/humbug/postgresql/process_fts_updates",
|
||||
}
|
||||
|
||||
file { "/etc/supervisor/conf.d/zulip_db.conf":
|
||||
require => Package[supervisor],
|
||||
ensure => file,
|
||||
owner => "root",
|
||||
group => "root",
|
||||
mode => 644,
|
||||
source => "puppet:///modules/humbug/supervisord/conf.d/zulip_db.conf",
|
||||
notify => Service[supervisor],
|
||||
}
|
||||
|
||||
file { "/etc/postgresql/9.1/main/pg_hba.conf":
|
||||
require => Package["postgresql-9.1"],
|
||||
|
|
Loading…
Reference in New Issue