billing: Add RealmAuditLogProcessor.

This commit is contained in:
Rishi Gupta 2018-07-03 12:49:55 -07:00
parent e804c563a8
commit 27d4b1a5f0
8 changed files with 427 additions and 7 deletions

View File

@ -41,3 +41,40 @@ Notes:
* 4242424242424242 is Stripe's test credit card, also useful for manually
testing. You can put anything in the address fields, any future expiry
date, and anything for the CVV code.
## BillingProcessor
The general strategy here is that billing-relevant events get written to
RealmAuditLog with `requires_billing_update = True`, and then a worker
goes through, reads RealmAuditLog row by row, and makes the appropriate
updates in Stripe (in order), keeping track of its state in
`BillingProcessor`. An invariant is that it cannot be important when
exactly the worker gets around to making the update in Stripe, as long
as the updates for each customer (realm) are made in `RealmAuditLog.id` order.
Almost all the complexity in the code is due to error handling. We
distinguish three kinds of errors:
* Transient errors, like rate limiting or network failures, where we just
wait a bit and try again.
* Card decline errors (see below)
* Everything else (e.g. misconfigured API keys, errors thrown by buggy code,
etc.), where we just throw an exception and stop the worker.
We use the following strategy for card decline errors. There is a global
BillingProcessor (with `realm=None`) that processes RealmAuditLog
entries for every customer (realm). If it runs into a card decline error on
some entry, it gives up on that entry and (temporarily) all future entries
of that realm, and spins off a realm-specific BillingProcessor that
marks that realm as needing manual attention. When whatever issue has been
corrected, the realm-specific BillingProcessor completes any
realm-specific RealmAuditLog entries, and then deletes itself.
Notes for manually resolving errors:
* `BillingProcessor.objects.filter(state='stalled')` is always safe to
handle manually.
* `BillingProcessor.objects.filter(state='started')` is safe to handle
manually only if the billing process worker is not running.
* After resolving the issue, set the processor's state to `done`.
* Stripe's idempotency keys are only valid for 24 hours. So be mindful of
that if manually cleaning something up more than 24 hours after the error
occured.

View File

@ -33,6 +33,8 @@ class Subscription:
status: str
canceled_at: int
cancel_at_period_end: bool
proration_date: int
quantity: int
@staticmethod
def create(customer: str, billing: str, items: List[Dict[str, Any]],

View File

@ -2,7 +2,7 @@ import datetime
from functools import wraps
import logging
import os
from typing import Any, Callable, Optional, TypeVar, Tuple
from typing import Any, Callable, Dict, Optional, TypeVar, Tuple
import ujson
from django.conf import settings
@ -16,7 +16,7 @@ from zerver.lib.logging_util import log_to_file
from zerver.lib.timestamp import datetime_to_timestamp, timestamp_to_datetime
from zerver.lib.utils import generate_random_token
from zerver.models import Realm, UserProfile, RealmAuditLog
from zilencer.models import Customer, Plan
from zilencer.models import Customer, Plan, BillingProcessor
from zproject.settings import get_secret
STRIPE_PUBLISHABLE_KEY = get_secret('stripe_publishable_key')
@ -205,3 +205,100 @@ def process_initial_upgrade(user: UserProfile, plan: Plan, seat_count: int, stri
# TODO: billing address details are passed to us in the request;
# use that to calculate taxes.
tax_percent=0)
## Process RealmAuditLog
def do_set_subscription_quantity(
customer: Customer, timestamp: int, idempotency_key: str, quantity: int) -> None:
stripe_customer = stripe_get_customer(customer.stripe_customer_id)
stripe_subscription = extract_current_subscription(stripe_customer)
stripe_subscription.quantity = quantity
stripe_subscription.proration_date = timestamp
stripe_subscription.save(idempotency_key=idempotency_key)
def do_adjust_subscription_quantity(
customer: Customer, timestamp: int, idempotency_key: str, delta: int) -> None:
stripe_customer = stripe_get_customer(customer.stripe_customer_id)
stripe_subscription = extract_current_subscription(stripe_customer)
stripe_subscription.quantity = stripe_subscription.quantity + delta
stripe_subscription.proration_date = timestamp
stripe_subscription.save(idempotency_key=idempotency_key)
def increment_subscription_quantity(
customer: Customer, timestamp: int, idempotency_key: str) -> None:
return do_adjust_subscription_quantity(customer, timestamp, idempotency_key, 1)
def decrement_subscription_quantity(
customer: Customer, timestamp: int, idempotency_key: str) -> None:
return do_adjust_subscription_quantity(customer, timestamp, idempotency_key, -1)
@catch_stripe_errors
def process_billing_log_entry(processor: BillingProcessor, log_row: RealmAuditLog) -> None:
processor.state = BillingProcessor.STARTED
processor.log_row = log_row
processor.save()
customer = Customer.objects.get(realm=log_row.realm)
timestamp = datetime_to_timestamp(log_row.event_time)
idempotency_key = 'process_billing_log_entry:%s' % (log_row.id,)
extra_args = {} # type: Dict[str, Any]
if log_row.extra_data is not None:
extra_args = ujson.loads(log_row.extra_data)
processing_functions = {
RealmAuditLog.STRIPE_PLAN_QUANTITY_RESET: do_set_subscription_quantity,
RealmAuditLog.USER_CREATED: increment_subscription_quantity,
RealmAuditLog.USER_ACTIVATED: increment_subscription_quantity,
RealmAuditLog.USER_DEACTIVATED: decrement_subscription_quantity,
RealmAuditLog.USER_REACTIVATED: increment_subscription_quantity,
} # type: Dict[str, Callable[..., None]]
processing_functions[log_row.event_type](customer, timestamp, idempotency_key, **extra_args)
processor.state = BillingProcessor.DONE
processor.save()
def get_next_billing_log_entry(processor: BillingProcessor) -> Optional[RealmAuditLog]:
if processor.state == BillingProcessor.STARTED:
return processor.log_row
assert processor.state != BillingProcessor.STALLED
if processor.state not in [BillingProcessor.DONE, BillingProcessor.SKIPPED]:
raise BillingError(
'unknown processor state',
"Check for typos, since this value is sometimes set by hand: %s" % (processor.state,))
if processor.realm is None:
realms_with_processors = BillingProcessor.objects.exclude(
realm=None).values_list('realm', flat=True)
query = RealmAuditLog.objects.exclude(realm__in=realms_with_processors)
else:
global_processor = BillingProcessor.objects.get(realm=None)
query = RealmAuditLog.objects.filter(
realm=processor.realm, id__lt=global_processor.log_row.id)
return query.filter(id__gt=processor.log_row.id,
requires_billing_update=True).order_by('id').first()
def run_billing_processor_one_step(processor: BillingProcessor) -> bool:
# Returns True if a row was processed, or if processing was attempted
log_row = get_next_billing_log_entry(processor)
if log_row is None:
if processor.realm is not None:
processor.delete()
return False
try:
process_billing_log_entry(processor, log_row)
return True
except Exception as e:
billing_logger.error("Error on log_row.realm=%s, event_type=%s, log_row.id=%s, "
"processor.id=%s, processor.realm=%s" % (
processor.log_row.realm.string_id, processor.log_row.event_type,
processor.log_row.id, processor.id, processor.realm))
if isinstance(e, StripeCardError):
if processor.realm is None:
BillingProcessor.objects.create(log_row=processor.log_row,
realm=processor.log_row.realm,
state=BillingProcessor.STALLED)
processor.state = BillingProcessor.SKIPPED
else:
processor.state = BillingProcessor.STALLED
processor.save()
return True
raise

View File

@ -0,0 +1,46 @@
"""\
Run BillingProcessors.
This management command is run via supervisor. Do not run on multiple
machines, as the code has not been made robust to race conditions from doing
so. (Alternatively, you can set `BILLING_PROCESSOR_ENABLED=False` on all but
one machine to make the command have no effect.)
"""
import time
from typing import Any
from django.conf import settings
from django.core.management.base import BaseCommand
from zerver.lib.context_managers import lockfile
from zilencer.lib.stripe import StripeConnectionError, \
run_billing_processor_one_step
from zilencer.models import BillingProcessor
class Command(BaseCommand):
help = """Run BillingProcessors, to sync billing-relevant updates into Stripe.
Run this command under supervisor.
Usage: ./manage.py process_billing_updates
"""
def handle(self, *args: Any, **options: Any) -> None:
if not settings.BILLING_PROCESSOR_ENABLED:
time.sleep(10**9) # "infinity seconds"
with lockfile("/tmp/zulip_billing_processor.lockfile"):
while True:
for processor in BillingProcessor.objects.exclude(
state=BillingProcessor.STALLED):
try:
entry_processed = run_billing_processor_one_step(processor)
except StripeConnectionError:
time.sleep(5*60)
# Less load on the db during times of activity
# and more responsiveness when the load is low
if entry_processed:
time.sleep(10)
else:
time.sleep(2)

View File

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.14 on 2018-08-13 23:15
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('zerver', '0182_set_initial_value_is_private_flag'),
('zilencer', '0009_plan'),
]
operations = [
migrations.CreateModel(
name='BillingProcessor',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('state', models.CharField(max_length=20)),
('last_modified', models.DateTimeField(auto_now=True)),
('log_row', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='zerver.RealmAuditLog')),
('realm', models.OneToOneField(null=True, on_delete=django.db.models.deletion.CASCADE, to='zerver.Realm')),
],
),
]

View File

@ -2,7 +2,8 @@ import datetime
from django.db import models
from zerver.models import AbstractPushDeviceToken, Realm, UserProfile
from zerver.models import AbstractPushDeviceToken, Realm, UserProfile, \
RealmAuditLog
def get_remote_server_by_uuid(uuid: str) -> 'RemoteZulipServer':
return RemoteZulipServer.objects.get(uuid=uuid)
@ -50,3 +51,19 @@ class Plan(models.Model):
nickname = models.CharField(max_length=40, unique=True) # type: str
stripe_plan_id = models.CharField(max_length=255, unique=True) # type: str
class BillingProcessor(models.Model):
log_row = models.ForeignKey(RealmAuditLog, on_delete=models.CASCADE) # RealmAuditLog
# Exactly one processor, the global processor, has realm=None.
realm = models.OneToOneField(Realm, null=True, on_delete=models.CASCADE) # type: Realm
DONE = 'done'
STARTED = 'started'
SKIPPED = 'skipped' # global processor only
STALLED = 'stalled' # realm processors only
state = models.CharField(max_length=20) # type: str
last_modified = models.DateTimeField(auto_now=True) # type: datetime.datetime
def __str__(self) -> str:
return '<BillingProcessor: %s %s %s>' % (self.realm, self.log_row, self.id)

View File

@ -1,3 +1,4 @@
import datetime
import mock
import os
from typing import Any, Optional
@ -6,19 +7,21 @@ import re
from django.core import signing
from django.http import HttpResponse
from django.utils.timezone import utc as timezone_utc
import stripe
from zerver.lib.actions import do_deactivate_user, do_create_user, \
do_activate_user, do_reactivate_user, activity_change_requires_seat_update
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.timestamp import timestamp_to_datetime
from zerver.lib.timestamp import timestamp_to_datetime, datetime_to_timestamp
from zerver.models import Realm, UserProfile, get_realm, RealmAuditLog
from zilencer.lib.stripe import catch_stripe_errors, \
do_create_customer_with_payment_source, do_subscribe_customer_to_plan, \
get_seat_count, extract_current_subscription, sign_string, unsign_string, \
BillingError
from zilencer.models import Customer, Plan
get_next_billing_log_entry, run_billing_processor_one_step, \
BillingError, StripeCardError, StripeConnectionError
from zilencer.models import Customer, Plan, BillingProcessor
fixture_data_file = open(os.path.join(os.path.dirname(__file__), 'stripe_fixtures.json'), 'r')
fixture_data = ujson.load(fixture_data_file)
@ -286,7 +289,55 @@ class StripeTest(ZulipTestCase):
with self.assertRaises(signing.BadSignature):
unsign_string(signed_string, "randomsalt")
class BillingUpdateTest(ZulipTestCase):
@mock.patch("stripe.Customer.create", side_effect=mock_create_customer)
@mock.patch("stripe.Subscription.create", side_effect=mock_create_subscription)
@mock.patch("stripe.Customer.retrieve", side_effect=mock_customer_with_subscription)
def test_billing_quantity_changes_end_to_end(
self, mock_customer_with_subscription: mock.Mock, mock_create_subscription: mock.Mock,
mock_create_customer: mock.Mock) -> None:
self.login(self.example_email("hamlet"))
processor = BillingProcessor.objects.create(
log_row=RealmAuditLog.objects.order_by('id').first(), state=BillingProcessor.DONE)
def check_billing_processor_update(event_type: str, quantity: int) -> None:
def check_subscription_save(subscription: stripe.Subscription, idempotency_key: str) -> None:
self.assertEqual(subscription.quantity, quantity)
log_row = RealmAuditLog.objects.filter(
event_type=event_type, requires_billing_update=True).order_by('-id').first()
self.assertEqual(idempotency_key, 'process_billing_log_entry:%s' % (log_row.id,))
self.assertEqual(subscription.proration_date, datetime_to_timestamp(log_row.event_time))
with mock.patch.object(stripe.Subscription, 'save', autospec=True,
side_effect=check_subscription_save):
run_billing_processor_one_step(processor)
# Test STRIPE_PLAN_QUANTITY_RESET
new_seat_count = 123
# change the seat count while the user is going through the upgrade flow
with mock.patch('zilencer.lib.stripe.get_seat_count', return_value=new_seat_count):
self.client_post("/upgrade/", {'stripeToken': self.token,
'signed_seat_count': self.signed_seat_count,
'salt': self.salt,
'plan': Plan.CLOUD_ANNUAL})
check_billing_processor_update(RealmAuditLog.STRIPE_PLAN_QUANTITY_RESET, new_seat_count)
# Test USER_CREATED
user = do_create_user('newuser@zulip.com', 'password', get_realm('zulip'), 'full name', 'short name')
check_billing_processor_update(RealmAuditLog.USER_CREATED, self.quantity + 1)
# Test USER_DEACTIVATED
do_deactivate_user(user)
check_billing_processor_update(RealmAuditLog.USER_DEACTIVATED, self.quantity - 1)
# Test USER_REACTIVATED
do_reactivate_user(user)
check_billing_processor_update(RealmAuditLog.USER_REACTIVATED, self.quantity + 1)
# Test USER_ACTIVATED
# Not a proper use of do_activate_user, but it's fine to call it like this for this test
do_activate_user(user)
check_billing_processor_update(RealmAuditLog.USER_ACTIVATED, self.quantity + 1)
class RequiresBillingUpdateTest(ZulipTestCase):
def test_activity_change_requires_seat_update(self) -> None:
# Realm doesn't have a seat based plan
self.assertFalse(activity_change_requires_seat_update(self.example_user("hamlet")))
@ -319,3 +370,142 @@ class BillingUpdateTest(ZulipTestCase):
do_reactivate_user(user2)
do_activate_user(user2)
self.assertEqual(4, RealmAuditLog.objects.filter(requires_billing_update=True).count())
class BillingProcessorTest(ZulipTestCase):
def add_log_entry(self, realm: Realm=get_realm('zulip'),
event_type: str=RealmAuditLog.USER_CREATED,
requires_billing_update: bool=True) -> RealmAuditLog:
return RealmAuditLog.objects.create(
realm=realm, event_time=datetime.datetime(2001, 2, 3, 4, 5, 6).replace(tzinfo=timezone_utc),
event_type=event_type, requires_billing_update=requires_billing_update)
def test_get_next_billing_log_entry(self) -> None:
second_realm = Realm.objects.create(string_id='second', name='second')
entry1 = self.add_log_entry(realm=second_realm)
realm_processor = BillingProcessor.objects.create(
realm=second_realm, log_row=entry1, state=BillingProcessor.DONE)
entry2 = self.add_log_entry()
# global processor
processor = BillingProcessor.objects.create(
log_row=entry2, state=BillingProcessor.STARTED)
# Test STARTED, STALLED, and typo'ed state entry
self.assertEqual(entry2, get_next_billing_log_entry(processor))
processor.state = BillingProcessor.STALLED
processor.save()
with self.assertRaises(AssertionError):
get_next_billing_log_entry(processor)
processor.state = 'typo'
processor.save()
with self.assertRaisesRegex(BillingError, 'unknown processor state'):
get_next_billing_log_entry(processor)
# Test global processor is handled correctly
processor.state = BillingProcessor.DONE
processor.save()
# test it ignores entries with requires_billing_update=False
entry3 = self.add_log_entry(requires_billing_update=False)
# test it ignores entries with realm processors
entry4 = self.add_log_entry(realm=second_realm)
self.assertIsNone(get_next_billing_log_entry(processor))
# test it does catch entries it should
entry5 = self.add_log_entry()
self.assertEqual(entry5, get_next_billing_log_entry(processor))
# Test realm processor is handled correctly
# test it gets the entry with its realm, and ignores the entry with
# requires_billing_update=False, when global processor is up ahead
processor.log_row = entry5
processor.save()
self.assertEqual(entry4, get_next_billing_log_entry(realm_processor))
# test it doesn't run past the global processor
processor.log_row = entry3
processor.save()
self.assertIsNone(get_next_billing_log_entry(realm_processor))
def test_run_billing_processor_logic_when_no_errors(self) -> None:
second_realm = Realm.objects.create(string_id='second', name='second')
entry1 = self.add_log_entry(realm=second_realm)
realm_processor = BillingProcessor.objects.create(
realm=second_realm, log_row=entry1, state=BillingProcessor.DONE)
entry2 = self.add_log_entry()
# global processor
processor = BillingProcessor.objects.create(
log_row=entry2, state=BillingProcessor.DONE)
# Test nothing to process
# test nothing changes, for global processor
self.assertFalse(run_billing_processor_one_step(processor))
self.assertEqual(2, BillingProcessor.objects.count())
# test realm processor gets deleted
self.assertFalse(run_billing_processor_one_step(realm_processor))
self.assertEqual(1, BillingProcessor.objects.count())
self.assertEqual(1, BillingProcessor.objects.filter(realm=None).count())
# Test something to process
processor.state = BillingProcessor.STARTED
processor.save()
realm_processor = BillingProcessor.objects.create(
realm=second_realm, log_row=entry1, state=BillingProcessor.STARTED)
Customer.objects.create(realm=get_realm('zulip'), stripe_customer_id='cust_1')
Customer.objects.create(realm=second_realm, stripe_customer_id='cust_2')
with mock.patch('zilencer.lib.stripe.do_adjust_subscription_quantity'):
# test return values
self.assertTrue(run_billing_processor_one_step(processor))
self.assertTrue(run_billing_processor_one_step(realm_processor))
# test no processors get added or deleted
self.assertEqual(2, BillingProcessor.objects.count())
@mock.patch("zilencer.lib.stripe.billing_logger.error")
def test_run_billing_processor_with_card_error(self, mock_billing_logger_error: mock.Mock) -> None:
second_realm = Realm.objects.create(string_id='second', name='second')
entry1 = self.add_log_entry(realm=second_realm)
# global processor
processor = BillingProcessor.objects.create(
log_row=entry1, state=BillingProcessor.STARTED)
Customer.objects.create(realm=second_realm, stripe_customer_id='cust_2')
# card error on global processor should create a new realm processor
with mock.patch('zilencer.lib.stripe.do_adjust_subscription_quantity',
side_effect=stripe.error.CardError('message', 'param', 'code', json_body={})):
self.assertTrue(run_billing_processor_one_step(processor))
self.assertEqual(2, BillingProcessor.objects.count())
self.assertTrue(BillingProcessor.objects.filter(
realm=None, log_row=entry1, state=BillingProcessor.SKIPPED).exists())
self.assertTrue(BillingProcessor.objects.filter(
realm=second_realm, log_row=entry1, state=BillingProcessor.STALLED).exists())
mock_billing_logger_error.assert_called()
# card error on realm processor should change state to STALLED
realm_processor = BillingProcessor.objects.filter(realm=second_realm).first()
realm_processor.state = BillingProcessor.STARTED
realm_processor.save()
with mock.patch('zilencer.lib.stripe.do_adjust_subscription_quantity',
side_effect=stripe.error.CardError('message', 'param', 'code', json_body={})):
self.assertTrue(run_billing_processor_one_step(realm_processor))
self.assertEqual(2, BillingProcessor.objects.count())
self.assertTrue(BillingProcessor.objects.filter(
realm=second_realm, log_row=entry1, state=BillingProcessor.STALLED).exists())
mock_billing_logger_error.assert_called()
@mock.patch("zilencer.lib.stripe.billing_logger.error")
def test_run_billing_processor_with_uncaught_error(self, mock_billing_logger_error: mock.Mock) -> None:
# This tests three different things:
# * That run_billing_processor_one_step passes through exceptions that
# are not StripeCardError
# * That process_billing_log_entry catches StripeErrors and re-raises them as BillingErrors
# * That processor.state=STARTED for non-StripeCardError exceptions
entry1 = self.add_log_entry()
entry2 = self.add_log_entry()
processor = BillingProcessor.objects.create(
log_row=entry1, state=BillingProcessor.DONE)
Customer.objects.create(realm=get_realm('zulip'), stripe_customer_id='cust_1')
with mock.patch('zilencer.lib.stripe.do_adjust_subscription_quantity',
side_effect=stripe.error.StripeError('message', 'param', 'code', json_body={})):
with self.assertRaises(BillingError):
run_billing_processor_one_step(processor)
mock_billing_logger_error.assert_called()
# check processor.state is STARTED
self.assertTrue(BillingProcessor.objects.filter(
log_row=entry2, state=BillingProcessor.STARTED).exists())

View File

@ -436,6 +436,10 @@ DEFAULT_SETTINGS.update({
# value in static/js/presence.js. Also, probably move it out of
# DEFAULT_SETTINGS, since it likely isn't usefully user-configurable.
'OFFLINE_THRESHOLD_SECS': 5 * 60,
# Controls whether we run the worker that syncs billing-related updates
# into Stripe. Should be True on at most one machine.
'BILLING_PROCESSOR_ENABLED': False,
})