mirror of https://github.com/zulip/zulip.git
55 lines
2.4 KiB
Python
55 lines
2.4 KiB
Python
|
# Webhooks for external integrations.
|
||
|
from typing import Text, Dict, Any
|
||
|
from django.http import HttpRequest, HttpResponse
|
||
|
from zerver.decorator import api_key_only_webhook_view
|
||
|
from zerver.lib.actions import check_send_stream_message, check_send_private_message
|
||
|
from zerver.lib.request import REQ, has_request_variables
|
||
|
from zerver.lib.response import json_success
|
||
|
from zerver.models import UserProfile, get_user_profile_by_email
|
||
|
import time
|
||
|
current_time = time.time()
|
||
|
|
||
|
def get_user_name(email: str) -> str:
|
||
|
profile = get_user_profile_by_email(email)
|
||
|
name = profile.short_name
|
||
|
return name
|
||
|
|
||
|
@api_key_only_webhook_view("beeminder")
|
||
|
@has_request_variables
|
||
|
def api_beeminder_webhook(request: HttpRequest, user_profile: UserProfile,
|
||
|
payload: Dict[str, Any]=REQ(argument_type='body'),
|
||
|
stream: Text=REQ(default="beeminder"),
|
||
|
email: str=REQ(default='foo@gmail.com'),
|
||
|
topic: Text=REQ(default='beekeeper')) -> HttpResponse:
|
||
|
|
||
|
secret = payload["goal"]["secret"]
|
||
|
goal_name = payload["goal"]["slug"]
|
||
|
losedate = payload["goal"]["losedate"]
|
||
|
limsum = payload["goal"]["limsum"]
|
||
|
pledge = payload["goal"]["pledge"]
|
||
|
time_remain = (losedate - current_time)/3600 # time in hours
|
||
|
# To show user's probable reaction by looking at pledge amount
|
||
|
if pledge > 0:
|
||
|
expression = ':worried:'
|
||
|
else:
|
||
|
expression = ':relieved:'
|
||
|
|
||
|
if not secret:
|
||
|
# In this case notifications will be sent to stream
|
||
|
|
||
|
name = get_user_name(email)
|
||
|
body = u"Hello **{}**! I am the Beeminder bot! :octopus: \n You are going to derail \
|
||
|
from goal **{}** in **{:0.1f} hours** \n You need **{}** to avoid derailing \n * Pledge: **{}$** {}"
|
||
|
body = body.format(name, goal_name, time_remain, limsum, pledge, expression)
|
||
|
check_send_stream_message(user_profile, request.client, stream, topic, body)
|
||
|
return json_success()
|
||
|
|
||
|
else:
|
||
|
# In this case PM will be sent to user
|
||
|
p = get_user_profile_by_email(email)
|
||
|
body = u"I am the Beeminder bot! :octopus: \n You are going to derail from \
|
||
|
goal **{}** in **{:0.1f} hours** \n You need **{}** to avoid derailing \n * Pledge: **{}$**{}"
|
||
|
body = body.format(goal_name, time_remain, limsum, pledge, expression)
|
||
|
check_send_private_message(user_profile, request.client, p, body)
|
||
|
return json_success()
|