mirror of https://github.com/zulip/zulip.git
billing: Remove BillingProcessor.
Leaving the model in place, so that we can do the database migration by hand.
This commit is contained in:
parent
72576b3a77
commit
84a31ca800
|
@ -18,7 +18,7 @@ from zerver.lib.timestamp import datetime_to_timestamp, timestamp_to_datetime
|
|||
from zerver.lib.utils import generate_random_token
|
||||
from zerver.lib.actions import do_change_plan_type
|
||||
from zerver.models import Realm, UserProfile, RealmAuditLog
|
||||
from corporate.models import Customer, Plan, Coupon, BillingProcessor
|
||||
from corporate.models import Customer, Plan, Coupon
|
||||
from zproject.settings import get_secret
|
||||
|
||||
STRIPE_PUBLISHABLE_KEY = get_secret('stripe_publishable_key')
|
||||
|
@ -320,105 +320,3 @@ def process_downgrade(user: UserProfile) -> None:
|
|||
# Keeping it out of the transaction.atomic block because it will
|
||||
# eventually have a lot of stuff going on.
|
||||
do_change_plan_type(user.realm, Realm.LIMITED)
|
||||
|
||||
## Process RealmAuditLog
|
||||
|
||||
def do_set_subscription_quantity(
|
||||
customer: Customer, timestamp: int, idempotency_key: str, quantity: int) -> None:
|
||||
stripe_customer = stripe_get_customer(customer.stripe_customer_id)
|
||||
stripe_subscription = extract_current_subscription(stripe_customer)
|
||||
stripe_subscription.quantity = quantity
|
||||
stripe_subscription.proration_date = timestamp
|
||||
stripe.Subscription.save(stripe_subscription, idempotency_key=idempotency_key)
|
||||
|
||||
def do_adjust_subscription_quantity(
|
||||
customer: Customer, timestamp: int, idempotency_key: str, delta: int) -> None:
|
||||
stripe_customer = stripe_get_customer(customer.stripe_customer_id)
|
||||
stripe_subscription = extract_current_subscription(stripe_customer)
|
||||
stripe_subscription.quantity = stripe_subscription.quantity + delta
|
||||
stripe_subscription.proration_date = timestamp
|
||||
stripe.Subscription.save(stripe_subscription, idempotency_key=idempotency_key)
|
||||
|
||||
def increment_subscription_quantity(
|
||||
customer: Customer, timestamp: int, idempotency_key: str) -> None:
|
||||
return do_adjust_subscription_quantity(customer, timestamp, idempotency_key, 1)
|
||||
|
||||
def decrement_subscription_quantity(
|
||||
customer: Customer, timestamp: int, idempotency_key: str) -> None:
|
||||
return do_adjust_subscription_quantity(customer, timestamp, idempotency_key, -1)
|
||||
|
||||
@catch_stripe_errors
|
||||
def process_billing_log_entry(processor: BillingProcessor, log_row: RealmAuditLog) -> None:
|
||||
processor.state = BillingProcessor.STARTED
|
||||
processor.log_row = log_row
|
||||
processor.save()
|
||||
|
||||
customer = Customer.objects.get(realm=log_row.realm)
|
||||
timestamp = datetime_to_timestamp(log_row.event_time)
|
||||
idempotency_key = 'process_billing_log_entry:%s' % (log_row.id,)
|
||||
if settings.TEST_SUITE:
|
||||
idempotency_key += '+' + generate_random_token(10)
|
||||
extra_args = {} # type: Dict[str, Any]
|
||||
if log_row.extra_data is not None:
|
||||
extra_args = ujson.loads(log_row.extra_data)
|
||||
processing_functions = {
|
||||
RealmAuditLog.STRIPE_PLAN_QUANTITY_RESET: do_set_subscription_quantity,
|
||||
RealmAuditLog.USER_CREATED: increment_subscription_quantity,
|
||||
RealmAuditLog.USER_ACTIVATED: increment_subscription_quantity,
|
||||
RealmAuditLog.USER_DEACTIVATED: decrement_subscription_quantity,
|
||||
RealmAuditLog.USER_REACTIVATED: increment_subscription_quantity,
|
||||
} # type: Dict[str, Callable[..., None]]
|
||||
processing_functions[log_row.event_type](customer, timestamp, idempotency_key, **extra_args)
|
||||
|
||||
processor.state = BillingProcessor.DONE
|
||||
processor.save()
|
||||
|
||||
def get_next_billing_log_entry(processor: BillingProcessor) -> Optional[RealmAuditLog]:
|
||||
if processor.state == BillingProcessor.STARTED:
|
||||
return processor.log_row
|
||||
assert processor.state != BillingProcessor.STALLED
|
||||
if processor.state not in [BillingProcessor.DONE, BillingProcessor.SKIPPED]:
|
||||
raise BillingError(
|
||||
'unknown processor state',
|
||||
"Check for typos, since this value is sometimes set by hand: %s" % (processor.state,))
|
||||
|
||||
if processor.realm is None:
|
||||
realms_with_processors = BillingProcessor.objects.exclude(
|
||||
realm=None).values_list('realm', flat=True)
|
||||
query = RealmAuditLog.objects.exclude(realm__in=realms_with_processors)
|
||||
else:
|
||||
global_processor = BillingProcessor.objects.get(realm=None)
|
||||
query = RealmAuditLog.objects.filter(
|
||||
realm=processor.realm, id__lt=global_processor.log_row.id)
|
||||
return query.filter(id__gt=processor.log_row.id,
|
||||
requires_billing_update=True).order_by('id').first()
|
||||
|
||||
def run_billing_processor_one_step(processor: BillingProcessor) -> bool:
|
||||
# Returns True if a row was processed, or if processing was attempted
|
||||
log_row = get_next_billing_log_entry(processor)
|
||||
if log_row is None:
|
||||
if processor.realm is not None:
|
||||
processor.delete()
|
||||
return False
|
||||
try:
|
||||
process_billing_log_entry(processor, log_row)
|
||||
return True
|
||||
except Exception as e:
|
||||
# Possible errors include processing subscription quantity entries
|
||||
# after downgrade, since the downgrade code doesn't check that
|
||||
# billing processor is up to date
|
||||
billing_logger.error("Error on log_row.realm=%s, event_type=%s, log_row.id=%s, "
|
||||
"processor.id=%s, processor.realm=%s" % (
|
||||
processor.log_row.realm.string_id, processor.log_row.event_type,
|
||||
processor.log_row.id, processor.id, processor.realm))
|
||||
if isinstance(e, StripeCardError):
|
||||
if processor.realm is None:
|
||||
BillingProcessor.objects.create(log_row=processor.log_row,
|
||||
realm=processor.log_row.realm,
|
||||
state=BillingProcessor.STALLED)
|
||||
processor.state = BillingProcessor.SKIPPED
|
||||
else:
|
||||
processor.state = BillingProcessor.STALLED
|
||||
processor.save()
|
||||
return True
|
||||
raise
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
"""\
|
||||
Run BillingProcessors.
|
||||
|
||||
This management command is run via supervisor. Do not run on multiple
|
||||
machines, as the code has not been made robust to race conditions from doing
|
||||
so. (Alternatively, you can set `BILLING_PROCESSOR_ENABLED=False` on all but
|
||||
one machine to make the command have no effect.)
|
||||
"""
|
||||
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from zerver.lib.context_managers import lockfile
|
||||
from zerver.lib.management import sleep_forever
|
||||
from corporate.lib.stripe import StripeConnectionError, \
|
||||
run_billing_processor_one_step
|
||||
from corporate.models import BillingProcessor
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = """Run BillingProcessors, to sync billing-relevant updates into Stripe.
|
||||
|
||||
Run this command under supervisor.
|
||||
|
||||
Usage: ./manage.py process_billing_updates
|
||||
"""
|
||||
|
||||
def handle(self, *args: Any, **options: Any) -> None:
|
||||
if not settings.BILLING_PROCESSOR_ENABLED:
|
||||
sleep_forever()
|
||||
|
||||
with lockfile("/tmp/zulip_billing_processor.lockfile"):
|
||||
while True:
|
||||
for processor in BillingProcessor.objects.exclude(
|
||||
state=BillingProcessor.STALLED):
|
||||
try:
|
||||
entry_processed = run_billing_processor_one_step(processor)
|
||||
except StripeConnectionError:
|
||||
time.sleep(5*60)
|
||||
# Less load on the db during times of activity
|
||||
# and more responsiveness when the load is low
|
||||
if entry_processed:
|
||||
time.sleep(10)
|
||||
else:
|
||||
time.sleep(2)
|
|
@ -29,6 +29,7 @@ class Coupon(models.Model):
|
|||
def __str__(self) -> str:
|
||||
return '<Coupon: %s %s %s>' % (self.percent_off, self.stripe_coupon_id, self.id)
|
||||
|
||||
# legacy
|
||||
class BillingProcessor(models.Model):
|
||||
log_row = models.ForeignKey(RealmAuditLog, on_delete=models.CASCADE) # RealmAuditLog
|
||||
# Exactly one processor, the global processor, has realm=None.
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -26,11 +26,10 @@ from zerver.models import Realm, UserProfile, get_realm, RealmAuditLog
|
|||
from corporate.lib.stripe import catch_stripe_errors, \
|
||||
do_subscribe_customer_to_plan, attach_discount_to_realm, \
|
||||
get_seat_count, extract_current_subscription, sign_string, unsign_string, \
|
||||
get_next_billing_log_entry, run_billing_processor_one_step, \
|
||||
BillingError, StripeCardError, StripeConnectionError, stripe_get_customer, \
|
||||
DEFAULT_INVOICE_DAYS_UNTIL_DUE, MIN_INVOICED_SEAT_COUNT, do_create_customer, \
|
||||
process_downgrade
|
||||
from corporate.models import Customer, Plan, Coupon, BillingProcessor
|
||||
from corporate.models import Customer, Plan, Coupon
|
||||
from corporate.views import payment_method_string
|
||||
import corporate.urls
|
||||
|
||||
|
@ -210,13 +209,6 @@ class Kandra(object):
|
|||
def __eq__(self, other: Any) -> bool:
|
||||
return True
|
||||
|
||||
def process_all_billing_log_entries() -> None:
|
||||
assert not RealmAuditLog.objects.get(pk=1).requires_billing_update
|
||||
processor = BillingProcessor.objects.create(
|
||||
log_row=RealmAuditLog.objects.get(pk=1), realm=None, state=BillingProcessor.DONE)
|
||||
while run_billing_processor_one_step(processor):
|
||||
pass
|
||||
|
||||
class StripeTest(ZulipTestCase):
|
||||
@mock_stripe(generate=False)
|
||||
def setUp(self, *mocks: Mock) -> None:
|
||||
|
@ -517,7 +509,6 @@ class StripeTest(ZulipTestCase):
|
|||
user = self.example_user("hamlet")
|
||||
self.login(user.email)
|
||||
self.upgrade(invoice=True)
|
||||
process_all_billing_log_entries()
|
||||
|
||||
# Check that we correctly created a Customer in Stripe
|
||||
stripe_customer = stripe_get_customer(Customer.objects.get(realm=user.realm).stripe_customer_id)
|
||||
|
@ -536,7 +527,8 @@ class StripeTest(ZulipTestCase):
|
|||
self.assertEqual(stripe_subscription.days_until_due, DEFAULT_INVOICE_DAYS_UNTIL_DUE)
|
||||
self.assertEqual(stripe_subscription.plan.id,
|
||||
Plan.objects.get(nickname=Plan.CLOUD_ANNUAL).stripe_plan_id)
|
||||
self.assertEqual(stripe_subscription.quantity, get_seat_count(user.realm))
|
||||
# In the middle of migrating off of this billing algorithm
|
||||
# self.assertEqual(stripe_subscription.quantity, get_seat_count(user.realm))
|
||||
self.assertEqual(stripe_subscription.status, 'active')
|
||||
# Check that we correctly created an initial Invoice in Stripe
|
||||
for stripe_invoice in stripe.Invoice.list(customer=stripe_customer.id, limit=1):
|
||||
|
@ -796,48 +788,6 @@ class StripeTest(ZulipTestCase):
|
|||
self.assertEqual(number_of_sources, 1)
|
||||
self.assertFalse(RealmAuditLog.objects.filter(event_type=RealmAuditLog.STRIPE_CARD_CHANGED).exists())
|
||||
|
||||
@mock_stripe()
|
||||
def test_billing_quantity_changes_end_to_end(self, *mocks: Mock) -> None:
|
||||
# A full end to end check would check the InvoiceItems, but this test is partway there
|
||||
self.login(self.example_email("hamlet"))
|
||||
processor = BillingProcessor.objects.create(
|
||||
log_row=RealmAuditLog.objects.order_by('id').first(), state=BillingProcessor.DONE)
|
||||
|
||||
def check_billing_processor_update(event_type: str, quantity: int) -> None:
|
||||
def check_subscription_save(subscription: stripe.Subscription, idempotency_key: str) -> None:
|
||||
self.assertEqual(subscription.quantity, quantity)
|
||||
log_row = RealmAuditLog.objects.filter(
|
||||
event_type=event_type, requires_billing_update=True).order_by('-id').first()
|
||||
self.assertEqual(idempotency_key.split('+')[0],
|
||||
'process_billing_log_entry:%s' % (log_row.id,))
|
||||
self.assertEqual(subscription.proration_date, datetime_to_timestamp(log_row.event_time))
|
||||
with patch('stripe.Subscription.save', side_effect=check_subscription_save):
|
||||
run_billing_processor_one_step(processor)
|
||||
|
||||
# Test STRIPE_PLAN_QUANTITY_RESET
|
||||
new_seat_count = 123
|
||||
# change the seat count while the user is going through the upgrade flow
|
||||
with patch('corporate.lib.stripe.get_seat_count', return_value=new_seat_count):
|
||||
self.upgrade()
|
||||
check_billing_processor_update(RealmAuditLog.STRIPE_PLAN_QUANTITY_RESET, new_seat_count)
|
||||
|
||||
# Test USER_CREATED
|
||||
user = do_create_user('newuser@zulip.com', 'password', get_realm('zulip'), 'full name', 'short name')
|
||||
check_billing_processor_update(RealmAuditLog.USER_CREATED, self.seat_count + 1)
|
||||
|
||||
# Test USER_DEACTIVATED
|
||||
do_deactivate_user(user)
|
||||
check_billing_processor_update(RealmAuditLog.USER_DEACTIVATED, self.seat_count - 1)
|
||||
|
||||
# Test USER_REACTIVATED
|
||||
do_reactivate_user(user)
|
||||
check_billing_processor_update(RealmAuditLog.USER_REACTIVATED, self.seat_count + 1)
|
||||
|
||||
# Test USER_ACTIVATED
|
||||
# Not a proper use of do_activate_user, but it's fine to call it like this for this test
|
||||
do_activate_user(user)
|
||||
check_billing_processor_update(RealmAuditLog.USER_ACTIVATED, self.seat_count + 1)
|
||||
|
||||
class RequiresBillingUpdateTest(ZulipTestCase):
|
||||
def test_activity_change_requires_seat_update(self) -> None:
|
||||
# Realm doesn't have a seat based plan
|
||||
|
@ -921,142 +871,3 @@ class RequiresBillingAccessTest(ZulipTestCase):
|
|||
json_endpoints.remove("json/billing/upgrade")
|
||||
|
||||
self.assertEqual(len(json_endpoints), len(params))
|
||||
|
||||
class BillingProcessorTest(ZulipTestCase):
|
||||
def add_log_entry(self, realm: Realm=get_realm('zulip'),
|
||||
event_type: str=RealmAuditLog.USER_CREATED,
|
||||
requires_billing_update: bool=True) -> RealmAuditLog:
|
||||
return RealmAuditLog.objects.create(
|
||||
realm=realm, event_time=datetime.datetime(2001, 2, 3, 4, 5, 6).replace(tzinfo=timezone_utc),
|
||||
event_type=event_type, requires_billing_update=requires_billing_update)
|
||||
|
||||
def test_get_next_billing_log_entry(self) -> None:
|
||||
second_realm = Realm.objects.create(string_id='second', name='second')
|
||||
entry1 = self.add_log_entry(realm=second_realm)
|
||||
realm_processor = BillingProcessor.objects.create(
|
||||
realm=second_realm, log_row=entry1, state=BillingProcessor.DONE)
|
||||
entry2 = self.add_log_entry()
|
||||
# global processor
|
||||
processor = BillingProcessor.objects.create(
|
||||
log_row=entry2, state=BillingProcessor.STARTED)
|
||||
|
||||
# Test STARTED, STALLED, and typo'ed state entry
|
||||
self.assertEqual(entry2, get_next_billing_log_entry(processor))
|
||||
processor.state = BillingProcessor.STALLED
|
||||
processor.save()
|
||||
with self.assertRaises(AssertionError):
|
||||
get_next_billing_log_entry(processor)
|
||||
processor.state = 'typo'
|
||||
processor.save()
|
||||
with self.assertRaisesRegex(BillingError, 'unknown processor state'):
|
||||
get_next_billing_log_entry(processor)
|
||||
|
||||
# Test global processor is handled correctly
|
||||
processor.state = BillingProcessor.DONE
|
||||
processor.save()
|
||||
# test it ignores entries with requires_billing_update=False
|
||||
entry3 = self.add_log_entry(requires_billing_update=False)
|
||||
# test it ignores entries with realm processors
|
||||
entry4 = self.add_log_entry(realm=second_realm)
|
||||
self.assertIsNone(get_next_billing_log_entry(processor))
|
||||
# test it does catch entries it should
|
||||
entry5 = self.add_log_entry()
|
||||
self.assertEqual(entry5, get_next_billing_log_entry(processor))
|
||||
|
||||
# Test realm processor is handled correctly
|
||||
# test it gets the entry with its realm, and ignores the entry with
|
||||
# requires_billing_update=False, when global processor is up ahead
|
||||
processor.log_row = entry5
|
||||
processor.save()
|
||||
self.assertEqual(entry4, get_next_billing_log_entry(realm_processor))
|
||||
|
||||
# test it doesn't run past the global processor
|
||||
processor.log_row = entry3
|
||||
processor.save()
|
||||
self.assertIsNone(get_next_billing_log_entry(realm_processor))
|
||||
|
||||
def test_run_billing_processor_logic_when_no_errors(self) -> None:
|
||||
second_realm = Realm.objects.create(string_id='second', name='second')
|
||||
entry1 = self.add_log_entry(realm=second_realm)
|
||||
realm_processor = BillingProcessor.objects.create(
|
||||
realm=second_realm, log_row=entry1, state=BillingProcessor.DONE)
|
||||
entry2 = self.add_log_entry()
|
||||
# global processor
|
||||
processor = BillingProcessor.objects.create(
|
||||
log_row=entry2, state=BillingProcessor.DONE)
|
||||
|
||||
# Test nothing to process
|
||||
# test nothing changes, for global processor
|
||||
self.assertFalse(run_billing_processor_one_step(processor))
|
||||
self.assertEqual(2, BillingProcessor.objects.count())
|
||||
# test realm processor gets deleted
|
||||
self.assertFalse(run_billing_processor_one_step(realm_processor))
|
||||
self.assertEqual(1, BillingProcessor.objects.count())
|
||||
self.assertEqual(1, BillingProcessor.objects.filter(realm=None).count())
|
||||
|
||||
# Test something to process
|
||||
processor.state = BillingProcessor.STARTED
|
||||
processor.save()
|
||||
realm_processor = BillingProcessor.objects.create(
|
||||
realm=second_realm, log_row=entry1, state=BillingProcessor.STARTED)
|
||||
Customer.objects.create(realm=get_realm('zulip'), stripe_customer_id='cust_1')
|
||||
Customer.objects.create(realm=second_realm, stripe_customer_id='cust_2')
|
||||
with patch('corporate.lib.stripe.do_adjust_subscription_quantity'):
|
||||
# test return values
|
||||
self.assertTrue(run_billing_processor_one_step(processor))
|
||||
self.assertTrue(run_billing_processor_one_step(realm_processor))
|
||||
# test no processors get added or deleted
|
||||
self.assertEqual(2, BillingProcessor.objects.count())
|
||||
|
||||
@patch("corporate.lib.stripe.billing_logger.error")
|
||||
def test_run_billing_processor_with_card_error(self, mock_billing_logger_error: Mock) -> None:
|
||||
second_realm = Realm.objects.create(string_id='second', name='second')
|
||||
entry1 = self.add_log_entry(realm=second_realm)
|
||||
# global processor
|
||||
processor = BillingProcessor.objects.create(
|
||||
log_row=entry1, state=BillingProcessor.STARTED)
|
||||
Customer.objects.create(realm=second_realm, stripe_customer_id='cust_2')
|
||||
|
||||
# card error on global processor should create a new realm processor
|
||||
with patch('corporate.lib.stripe.do_adjust_subscription_quantity',
|
||||
side_effect=stripe.error.CardError('message', 'param', 'code', json_body={})):
|
||||
self.assertTrue(run_billing_processor_one_step(processor))
|
||||
self.assertEqual(2, BillingProcessor.objects.count())
|
||||
self.assertTrue(BillingProcessor.objects.filter(
|
||||
realm=None, log_row=entry1, state=BillingProcessor.SKIPPED).exists())
|
||||
self.assertTrue(BillingProcessor.objects.filter(
|
||||
realm=second_realm, log_row=entry1, state=BillingProcessor.STALLED).exists())
|
||||
mock_billing_logger_error.assert_called()
|
||||
|
||||
# card error on realm processor should change state to STALLED
|
||||
realm_processor = BillingProcessor.objects.filter(realm=second_realm).first()
|
||||
realm_processor.state = BillingProcessor.STARTED
|
||||
realm_processor.save()
|
||||
with patch('corporate.lib.stripe.do_adjust_subscription_quantity',
|
||||
side_effect=stripe.error.CardError('message', 'param', 'code', json_body={})):
|
||||
self.assertTrue(run_billing_processor_one_step(realm_processor))
|
||||
self.assertEqual(2, BillingProcessor.objects.count())
|
||||
self.assertTrue(BillingProcessor.objects.filter(
|
||||
realm=second_realm, log_row=entry1, state=BillingProcessor.STALLED).exists())
|
||||
mock_billing_logger_error.assert_called()
|
||||
|
||||
@patch("corporate.lib.stripe.billing_logger.error")
|
||||
def test_run_billing_processor_with_uncaught_error(self, mock_billing_logger_error: Mock) -> None:
|
||||
# This tests three different things:
|
||||
# * That run_billing_processor_one_step passes through exceptions that
|
||||
# are not StripeCardError
|
||||
# * That process_billing_log_entry catches StripeErrors and re-raises them as BillingErrors
|
||||
# * That processor.state=STARTED for non-StripeCardError exceptions
|
||||
entry1 = self.add_log_entry()
|
||||
entry2 = self.add_log_entry()
|
||||
processor = BillingProcessor.objects.create(
|
||||
log_row=entry1, state=BillingProcessor.DONE)
|
||||
Customer.objects.create(realm=get_realm('zulip'), stripe_customer_id='cust_1')
|
||||
with patch('corporate.lib.stripe.do_adjust_subscription_quantity',
|
||||
side_effect=stripe.error.StripeError('message', json_body={})):
|
||||
with self.assertRaises(BillingError):
|
||||
run_billing_processor_one_step(processor)
|
||||
mock_billing_logger_error.assert_called()
|
||||
# check processor.state is STARTED
|
||||
self.assertTrue(BillingProcessor.objects.filter(
|
||||
log_row=entry2, state=BillingProcessor.STARTED).exists())
|
||||
|
|
|
@ -454,10 +454,6 @@ DEFAULT_SETTINGS.update({
|
|||
# Enables billing pages and plan-based feature gates. If False, all features
|
||||
# are available to all realms.
|
||||
'BILLING_ENABLED': False,
|
||||
|
||||
# Controls whether we run the worker that syncs billing-related updates
|
||||
# into Stripe. Should be True on at most one machine.
|
||||
'BILLING_PROCESSOR_ENABLED': False,
|
||||
})
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue