zulip/zerver/webhooks/beeminder/view.py

55 lines
2.4 KiB
Python

# Webhooks for external integrations.
from typing import Text, Dict, Any
from django.http import HttpRequest, HttpResponse
from zerver.decorator import api_key_only_webhook_view
from zerver.lib.actions import check_send_stream_message, check_send_private_message
from zerver.lib.request import REQ, has_request_variables
from zerver.lib.response import json_success
from zerver.models import UserProfile, get_user_profile_by_email
import time
current_time = time.time()
def get_user_name(email: str) -> str:
profile = get_user_profile_by_email(email)
name = profile.short_name
return name
@api_key_only_webhook_view("beeminder")
@has_request_variables
def api_beeminder_webhook(request: HttpRequest, user_profile: UserProfile,
payload: Dict[str, Any]=REQ(argument_type='body'),
stream: Text=REQ(default="beeminder"),
email: str=REQ(default='foo@gmail.com'),
topic: Text=REQ(default='beekeeper')) -> HttpResponse:
secret = payload["goal"]["secret"]
goal_name = payload["goal"]["slug"]
losedate = payload["goal"]["losedate"]
limsum = payload["goal"]["limsum"]
pledge = payload["goal"]["pledge"]
time_remain = (losedate - current_time)/3600 # time in hours
# To show user's probable reaction by looking at pledge amount
if pledge > 0:
expression = ':worried:'
else:
expression = ':relieved:'
if not secret:
# In this case notifications will be sent to stream
name = get_user_name(email)
body = u"Hello **{}**! I am the Beeminder bot! :octopus: \n You are going to derail \
from goal **{}** in **{:0.1f} hours** \n You need **{}** to avoid derailing \n * Pledge: **{}$** {}"
body = body.format(name, goal_name, time_remain, limsum, pledge, expression)
check_send_stream_message(user_profile, request.client, stream, topic, body)
return json_success()
else:
# In this case PM will be sent to user
p = get_user_profile_by_email(email)
body = u"I am the Beeminder bot! :octopus: \n You are going to derail from \
goal **{}** in **{:0.1f} hours** \n You need **{}** to avoid derailing \n * Pledge: **{}$**{}"
body = body.format(goal_name, time_remain, limsum, pledge, expression)
check_send_private_message(user_profile, request.client, p, body)
return json_success()