zulip/zerver/openapi/python_examples.py

1270 lines
39 KiB
Python
Raw Normal View History

from typing import Dict, Any, Optional, Iterable, Callable, Set, List
import json
import os
import sys
from functools import wraps
2017-01-13 13:50:39 +01:00
from zerver.lib import mdiff
from zerver.openapi.openapi import validate_against_openapi_schema
from zerver.models import get_realm, get_user
from zulip import Client
2017-01-13 13:50:39 +01:00
ZULIP_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
TEST_FUNCTIONS = dict() # type: Dict[str, Callable[..., None]]
REGISTERED_TEST_FUNCTIONS = set() # type: Set[str]
CALLED_TEST_FUNCTIONS = set() # type: Set[str]
def openapi_test_function(endpoint: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""This decorator is used to register an openapi test function with
its endpoint. Example usage:
@openapi_test_function("/messages/render:post")
def ...
"""
def wrapper(test_func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(test_func)
def _record_calls_wrapper(*args: Any, **kwargs: Any) -> Any:
CALLED_TEST_FUNCTIONS.add(test_func.__name__)
return test_func(*args, **kwargs)
REGISTERED_TEST_FUNCTIONS.add(test_func.__name__)
TEST_FUNCTIONS[endpoint] = _record_calls_wrapper
return _record_calls_wrapper
return wrapper
def ensure_users(ids_list: List[int], user_names: List[str]) -> None:
# Ensure that the list of user ids (ids_list)
# matches the users we want to refer to (user_names).
realm = get_realm("zulip")
user_ids = [get_user(name + '@zulip.com', realm).id for name in user_names]
assert ids_list == user_ids
@openapi_test_function("/users/me/subscriptions:post")
2017-01-13 13:50:39 +01:00
def add_subscriptions(client):
# type: (Client) -> None
# {code_example|start}
# Subscribe to the stream "new stream"
result = client.add_subscriptions(
streams=[
{
'name': 'new stream',
'description': 'New stream for testing'
}
]
)
# {code_example|end}
validate_against_openapi_schema(result, '/users/me/subscriptions', 'post',
'200_without_principals')
# {code_example|start}
# To subscribe another user to a stream, you may pass in
# the `principals` argument, like so:
result = client.add_subscriptions(
streams=[
{'name': 'new stream', 'description': 'New stream for testing'}
],
principals=['newbie@zulip.com']
)
# {code_example|end}
2017-01-13 13:50:39 +01:00
assert result['result'] == 'success'
assert 'newbie@zulip.com' in result['subscribed']
2017-01-13 13:50:39 +01:00
def test_add_subscriptions_already_subscribed(client):
# type: (Client) -> None
result = client.add_subscriptions(
streams=[
{'name': 'new stream', 'description': 'New stream for testing'}
],
principals=['newbie@zulip.com']
)
validate_against_openapi_schema(result, '/users/me/subscriptions', 'post',
'200_already_subscribed')
def test_authorization_errors_fatal(client, nonadmin_client):
# type: (Client, Client) -> None
client.add_subscriptions(
streams=[
{'name': 'private_stream'}
],
)
stream_id = client.get_stream_id('private_stream')['stream_id']
client.call_endpoint(
'streams/{}'.format(stream_id),
method='PATCH',
request={'is_private': True}
)
result = nonadmin_client.add_subscriptions(
streams=[
{'name': 'private_stream'}
],
authorization_errors_fatal=False,
)
validate_against_openapi_schema(result, '/users/me/subscriptions', 'post',
'400_unauthorized_errors_fatal_false')
result = nonadmin_client.add_subscriptions(
streams=[
{'name': 'private_stream'}
],
authorization_errors_fatal=True,
)
validate_against_openapi_schema(result, '/users/me/subscriptions', 'post',
'400_unauthorized_errors_fatal_true')
@openapi_test_function("/users/{email}/presence:get")
def get_user_presence(client):
# type: (Client) -> None
# {code_example|start}
# Get presence information for "iago@zulip.com"
result = client.get_user_presence('iago@zulip.com')
# {code_example|end}
validate_against_openapi_schema(result, '/users/{email}/presence', 'get', '200')
@openapi_test_function("/users/me/presence:post")
def update_presence(client):
# type: (Client) -> None
request = {
'status': 'active',
'ping_only': False,
'new_user_input': False
}
result = client.update_presence(request)
assert result['result'] == 'success'
@openapi_test_function("/users:post")
2017-01-13 13:50:39 +01:00
def create_user(client):
# type: (Client) -> None
# {code_example|start}
# Create a user
request = {
'email': 'newbie@zulip.com',
'password': 'temp',
'full_name': 'New User',
'short_name': 'newbie'
}
2017-01-13 13:50:39 +01:00
result = client.create_user(request)
# {code_example|end}
validate_against_openapi_schema(result, '/users', 'post', '200')
# Test "Email already used error"
result = client.create_user(request)
validate_against_openapi_schema(result, '/users', 'post', '400')
2017-01-13 13:50:39 +01:00
@openapi_test_function("/users:get")
2017-01-13 13:50:39 +01:00
def get_members(client):
# type: (Client) -> None
# {code_example|start}
# Get all users in the realm
2017-01-13 13:50:39 +01:00
result = client.get_members()
# {code_example|end}
validate_against_openapi_schema(result, '/users', 'get', '200')
2017-01-13 13:50:39 +01:00
members = [m for m in result['members'] if m['email'] == 'newbie@zulip.com']
assert len(members) == 1
newbie = members[0]
assert not newbie['is_admin']
assert newbie['full_name'] == 'New User'
# {code_example|start}
# You may pass the `client_gravatar` query parameter as follows:
result = client.get_members({'client_gravatar': True})
# {code_example|end}
validate_against_openapi_schema(result, '/users', 'get', '200')
assert result['members'][0]['avatar_url'] is None
2017-01-13 13:50:39 +01:00
# {code_example|start}
# You may pass the `include_custom_profile_fields` query parameter as follows:
result = client.get_members({'include_custom_profile_fields': True})
# {code_example|end}
validate_against_openapi_schema(result, '/users', 'get', '200')
for member in result['members']:
if member["is_bot"]:
assert member.get('profile_data', None) is None
else:
assert member.get('profile_data', None) is not None
@openapi_test_function("/users/{user_id}:get")
def get_single_user(client):
# type: (Client) -> None
# {code_example|start}
# Fetch details on a user given a user ID
user_id = 8
result = client.get_user_by_id(user_id)
# {code_example|end}
validate_against_openapi_schema(result, '/users/{user_id}', 'get', '200')
# {code_example|start}
# If you'd like data on custom profile fields, you can request them as follows:
result = client.get_user_by_id(user_id, {'include_custom_profile_fields': True})
# {code_example|end}
validate_against_openapi_schema(result, '/users/{user_id}', 'get', '200')
@openapi_test_function("/users/{user_id}:delete")
def deactivate_user(client):
# type: (Client) -> None
# {code_example|start}
# Deactivate a user
user_id = 8
url = 'users/' + str(user_id)
result = client.call_endpoint(
url=url,
method='DELETE',
)
# {code_example|end}
validate_against_openapi_schema(result, '/users/{user_id}', 'delete', '200')
@openapi_test_function("/users/{user_id}:patch")
def update_user(client):
# type: (Client) -> None
# {code_example|start}
# Change a user's full name.
user_id = 10
full_name = "New Name"
url = 'users/' + str(user_id)
result = client.call_endpoint(
url=url,
method='PATCH',
request={'full_name': json.dumps(full_name)}
)
# {code_example|end}
validate_against_openapi_schema(result, '/users/{user_id}', 'patch', '200')
# {code_example|start}
# Change value of the custom profile field with ID 9.
user_id = 8
profile_data = [{'id': 9, 'value': 'some data'}]
url = 'users/' + str(user_id)
result = client.call_endpoint(
url=url,
method='PATCH',
request={'profile_data': json.dumps(profile_data)}
)
# {code_example|end}
validate_against_openapi_schema(result, '/users/{user_id}', 'patch', '400')
@openapi_test_function("/realm/filters:get")
2018-08-14 03:10:37 +02:00
def get_realm_filters(client):
# type: (Client) -> None
# {code_example|start}
# Fetch all the filters in this organization
result = client.get_realm_filters()
# {code_example|end}
validate_against_openapi_schema(result, '/realm/filters', 'get', '200')
@openapi_test_function("/realm/filters:post")
def add_realm_filter(client):
# type: (Client) -> None
# {code_example|start}
# Add a filter to automatically linkify #<number> to the corresponding
# issue in Zulip's server repo
result = client.add_realm_filter('#(?P<id>[0-9]+)',
'https://github.com/zulip/zulip/issues/%(id)s')
# {code_example|end}
validate_against_openapi_schema(result, '/realm/filters', 'post', '200')
@openapi_test_function("/realm/filters/{filter_id}:delete")
def remove_realm_filter(client):
# type: (Client) -> None
# {code_example|start}
# Remove the organization filter with ID 42
result = client.remove_realm_filter(42)
# {code_example|end}
validate_against_openapi_schema(result, '/realm/filters/{filter_id}', 'delete', '200')
@openapi_test_function("/users/me:get")
2017-01-13 13:50:39 +01:00
def get_profile(client):
# type: (Client) -> None
# {code_example|start}
# Get the profile of the user/bot that requests this endpoint,
# which is `client` in this case:
2017-01-13 13:50:39 +01:00
result = client.get_profile()
# {code_example|end}
validate_against_openapi_schema(result, '/users/me', 'get', '200')
2017-01-13 13:50:39 +01:00
@openapi_test_function("/get_stream_id:get")
2017-01-13 13:50:39 +01:00
def get_stream_id(client):
# type: (Client) -> int
2017-01-13 13:50:39 +01:00
# {code_example|start}
# Get the ID of a given stream
2017-01-13 13:50:39 +01:00
stream_name = 'new stream'
result = client.get_stream_id(stream_name)
# {code_example|end}
validate_against_openapi_schema(result, '/get_stream_id', 'get', '200')
2017-01-13 13:50:39 +01:00
return result['stream_id']
@openapi_test_function("/streams/{stream_id}:delete")
def delete_stream(client, stream_id):
# type: (Client, int) -> None
result = client.add_subscriptions(
streams=[
{
'name': 'stream to be deleted',
'description': 'New stream for testing'
}
]
)
# {code_example|start}
# Delete the stream named 'new stream'
stream_id = client.get_stream_id('stream to be deleted')['stream_id']
result = client.delete_stream(stream_id)
# {code_example|end}
validate_against_openapi_schema(result, '/streams/{stream_id}', 'delete', '200')
assert result['result'] == 'success'
@openapi_test_function("/streams:get")
2017-01-13 13:50:39 +01:00
def get_streams(client):
# type: (Client) -> None
# {code_example|start}
# Get all streams that the user has access to
2017-01-13 13:50:39 +01:00
result = client.get_streams()
# {code_example|end}
validate_against_openapi_schema(result, '/streams', 'get', '200')
2017-01-13 13:50:39 +01:00
streams = [s for s in result['streams'] if s['name'] == 'new stream']
assert streams[0]['description'] == 'New stream for testing'
# {code_example|start}
# You may pass in one or more of the query parameters mentioned above
# as keyword arguments, like so:
result = client.get_streams(include_public=False)
# {code_example|end}
validate_against_openapi_schema(result, '/streams', 'get', '200')
assert len(result['streams']) == 4
@openapi_test_function("/streams/{stream_id}:patch")
def update_stream(client, stream_id):
# type: (Client, int) -> None
# {code_example|start}
# Update the stream by a given ID
request = {
'stream_id': stream_id,
'stream_post_policy': 2,
'is_private': True,
}
result = client.update_stream(request)
# {code_example|end}
validate_against_openapi_schema(result, '/streams/{stream_id}', 'patch', '200')
assert result['result'] == 'success'
@openapi_test_function("/user_groups:get")
def get_user_groups(client):
# type: (Client) -> int
# {code_example|start}
# Get all user groups of the realm
result = client.get_user_groups()
# {code_example|end}
validate_against_openapi_schema(result, '/user_groups', 'get', '200')
hamlet_user_group = [u for u in result['user_groups']
if u['name'] == "hamletcharacters"][0]
assert hamlet_user_group['description'] == 'Characters of Hamlet'
marketing_user_group = [u for u in result['user_groups']
if u['name'] == "marketing"][0]
return marketing_user_group['id']
def test_user_not_authorized_error(nonadmin_client):
# type: (Client) -> None
result = nonadmin_client.get_streams(include_all_active=True)
validate_against_openapi_schema(result, '/rest-error-handling', 'post', '400_user_not_authorized_error')
2017-01-13 13:50:39 +01:00
def get_subscribers(client):
# type: (Client) -> None
result = client.get_subscribers(stream='new stream')
assert result['subscribers'] == ['iago@zulip.com', 'newbie@zulip.com']
2017-01-13 13:50:39 +01:00
def get_user_agent(client):
# type: (Client) -> None
result = client.get_user_agent()
assert result.startswith('ZulipPython/')
@openapi_test_function("/users/me/subscriptions:get")
2017-01-13 13:50:39 +01:00
def list_subscriptions(client):
# type: (Client) -> None
# {code_example|start}
# Get all streams that the user is subscribed to
2017-01-13 13:50:39 +01:00
result = client.list_subscriptions()
# {code_example|end}
validate_against_openapi_schema(result, '/users/me/subscriptions',
'get', '200')
2017-01-13 13:50:39 +01:00
streams = [s for s in result['subscriptions'] if s['name'] == 'new stream']
assert streams[0]['description'] == 'New stream for testing'
@openapi_test_function("/users/me/subscriptions:delete")
2017-01-13 13:50:39 +01:00
def remove_subscriptions(client):
# type: (Client) -> None
# {code_example|start}
# Unsubscribe from the stream "new stream"
result = client.remove_subscriptions(
['new stream']
)
# {code_example|end}
validate_against_openapi_schema(result, '/users/me/subscriptions',
'delete', '200')
2017-01-13 13:50:39 +01:00
# test it was actually removed
result = client.list_subscriptions()
assert result['result'] == 'success'
streams = [s for s in result['subscriptions'] if s['name'] == 'new stream']
assert len(streams) == 0
# {code_example|start}
# Unsubscribe another user from the stream "new stream"
result = client.remove_subscriptions(
['new stream'],
principals=['newbie@zulip.com']
)
# {code_example|end}
validate_against_openapi_schema(result, '/users/me/subscriptions',
'delete', '200')
@openapi_test_function("/users/me/subscriptions/muted_topics:patch")
def toggle_mute_topic(client):
# type: (Client) -> None
# Send a test message
message = {
'type': 'stream',
'to': 'Denmark',
'topic': 'boat party'
}
client.call_endpoint(
url='messages',
method='POST',
request=message
)
# {code_example|start}
# Mute the topic "boat party" in the stream "Denmark"
request = {
'stream': 'Denmark',
'topic': 'boat party',
'op': 'add'
}
result = client.mute_topic(request)
# {code_example|end}
validate_against_openapi_schema(result,
'/users/me/subscriptions/muted_topics',
'patch', '200')
# {code_example|start}
# Unmute the topic "boat party" in the stream "Denmark"
request = {
'stream': 'Denmark',
'topic': 'boat party',
'op': 'remove'
}
result = client.mute_topic(request)
# {code_example|end}
validate_against_openapi_schema(result,
'/users/me/subscriptions/muted_topics',
'patch', '200')
@openapi_test_function("/mark_all_as_read:post")
def mark_all_as_read(client):
# type: (Client) -> None
# {code_example|start}
# Mark all of the user's unread messages as read
result = client.mark_all_as_read()
# {code_example|end}
validate_against_openapi_schema(result, '/mark_all_as_read', 'post', '200')
@openapi_test_function("/mark_stream_as_read:post")
def mark_stream_as_read(client):
# type: (Client) -> None
# {code_example|start}
# Mark the unread messages in stream with ID "1" as read
result = client.mark_stream_as_read(1)
# {code_example|end}
validate_against_openapi_schema(result, '/mark_stream_as_read', 'post', '200')
@openapi_test_function("/mark_topic_as_read:post")
def mark_topic_as_read(client):
# type: (Client) -> None
# Grab an existing topic name
topic_name = client.get_stream_topics(1)['topics'][0]['name']
# {code_example|start}
# Mark the unread messages in stream 1's topic "topic_name" as read
result = client.mark_topic_as_read(1, topic_name)
# {code_example|end}
validate_against_openapi_schema(result, '/mark_stream_as_read', 'post', '200')
@openapi_test_function("/users/me/subscriptions/properties:post")
def update_subscription_settings(client):
# type: (Client) -> None
# {code_example|start}
# Update the user's subscription in stream #1 to pin it to the top of the
# stream list; and in stream #3 to have the hex color "f00"
request = [{
'stream_id': 1,
'property': 'pin_to_top',
'value': True
}, {
'stream_id': 3,
'property': 'color',
'value': 'f00'
}]
result = client.update_subscription_settings(request)
# {code_example|end}
validate_against_openapi_schema(result,
'/users/me/subscriptions/properties',
'POST', '200')
@openapi_test_function("/messages/render:post")
2017-01-13 13:50:39 +01:00
def render_message(client):
# type: (Client) -> None
# {code_example|start}
# Render a message
request = {
'content': '**foo**'
}
2017-01-13 13:50:39 +01:00
result = client.render_message(request)
# {code_example|end}
validate_against_openapi_schema(result, '/messages/render', 'post', '200')
2017-01-13 13:50:39 +01:00
@openapi_test_function("/messages:get")
def get_messages(client):
# type: (Client) -> None
# {code_example|start}
# Get the 100 last messages sent by "iago@zulip.com" to the stream "Verona"
request = {
'anchor': 'newest',
'num_before': 100,
'num_after': 0,
'narrow': [{'operator': 'sender', 'operand': 'iago@zulip.com'},
{'operator': 'stream', 'operand': 'Verona'}],
} # type: Dict[str, Any]
result = client.get_messages(request)
# {code_example|end}
validate_against_openapi_schema(result, '/messages', 'get', '200')
assert len(result['messages']) <= request['num_before']
@openapi_test_function("/messages/{message_id}:get")
def get_raw_message(client, message_id):
# type: (Client, int) -> None
assert int(message_id)
# {code_example|start}
# Get the raw content of the message with ID "message_id"
result = client.get_raw_message(message_id)
# {code_example|end}
validate_against_openapi_schema(result, '/messages/{message_id}', 'get',
'200')
@openapi_test_function("/messages:post")
def send_message(client):
2017-01-13 13:50:39 +01:00
# type: (Client) -> int
request = {} # type: Dict[str, Any]
# {code_example|start}
# Send a stream message
request = {
"type": "stream",
"to": "Denmark",
"topic": "Castle",
"content": "I come not, friends, to steal away your hearts."
}
2017-01-13 13:50:39 +01:00
result = client.send_message(request)
# {code_example|end}
validate_against_openapi_schema(result, '/messages', 'post', '200')
2017-01-13 13:50:39 +01:00
# test that the message was actually sent
message_id = result['id']
2017-01-13 13:50:39 +01:00
url = 'messages/' + str(message_id)
result = client.call_endpoint(
url=url,
method='GET'
)
assert result['result'] == 'success'
assert result['raw_content'] == request['content']
2017-01-13 13:50:39 +01:00
ensure_users([9], ['hamlet'])
# {code_example|start}
# Send a private message
user_id = 9
request = {
"type": "private",
"to": [user_id],
"content": "With mirth and laughter let old wrinkles come."
}
result = client.send_message(request)
# {code_example|end}
validate_against_openapi_schema(result, '/messages', 'post', '200')
# test that the message was actually sent
message_id = result['id']
url = 'messages/' + str(message_id)
result = client.call_endpoint(
url=url,
method='GET'
)
assert result['result'] == 'success'
assert result['raw_content'] == request['content']
return message_id
@openapi_test_function("/messages/{message_id}/reactions:post")
def add_reaction(client, message_id):
# type: (Client, int) -> None
# {code_example|start}
# Add an emoji reaction
request = {
'message_id': str(message_id),
'emoji_name': 'octopus',
}
result = client.add_reaction(request)
# {code_example|end}
validate_against_openapi_schema(result, '/messages/{message_id}/reactions', 'post', '200')
@openapi_test_function("/messages/{message_id}/reactions:delete")
def remove_reaction(client, message_id):
# type: (Client, int) -> None
# {code_example|start}
# Remove an emoji reaction
request = {
'message_id': str(message_id),
'emoji_name': 'octopus',
}
result = client.remove_reaction(request)
# {code_example|end}
validate_against_openapi_schema(result, '/messages/{message_id}/reactions', 'delete', '200')
def test_nonexistent_stream_error(client):
# type: (Client) -> None
request = {
"type": "stream",
"to": "nonexistent_stream",
"topic": "Castle",
"content": "I come not, friends, to steal away your hearts."
}
result = client.send_message(request)
validate_against_openapi_schema(result, '/messages', 'post',
'400_non_existing_stream')
def test_private_message_invalid_recipient(client):
# type: (Client) -> None
request = {
"type": "private",
"to": "eeshan@zulip.com",
"content": "With mirth and laughter let old wrinkles come."
}
result = client.send_message(request)
validate_against_openapi_schema(result, '/messages', 'post',
'400_non_existing_user')
@openapi_test_function("/messages/{message_id}:patch")
2017-01-13 13:50:39 +01:00
def update_message(client, message_id):
# type: (Client, int) -> None
assert int(message_id)
# {code_example|start}
# Edit a message
# (make sure that message_id below is set to the ID of the
# message you wish to update)
request = {
"message_id": message_id,
"content": "New content"
}
2017-01-13 13:50:39 +01:00
result = client.update_message(request)
# {code_example|end}
validate_against_openapi_schema(result, '/messages/{message_id}', 'patch',
'200')
2017-01-13 13:50:39 +01:00
# test it was actually updated
url = 'messages/' + str(message_id)
result = client.call_endpoint(
url=url,
method='GET'
)
assert result['result'] == 'success'
assert result['raw_content'] == request['content']
2017-01-13 13:50:39 +01:00
def test_update_message_edit_permission_error(client, nonadmin_client):
# type: (Client, Client) -> None
request = {
"type": "stream",
"to": "Denmark",
"topic": "Castle",
"content": "I come not, friends, to steal away your hearts."
}
result = client.send_message(request)
request = {
"message_id": result["id"],
"content": "New content"
}
result = nonadmin_client.update_message(request)
validate_against_openapi_schema(result, '/messages/{message_id}', 'patch', '400')
@openapi_test_function("/messages/{message_id}:delete")
def delete_message(client, message_id):
# type: (Client, int) -> None
# {code_example|start}
# Delete the message with ID "message_id"
result = client.delete_message(message_id)
# {code_example|end}
validate_against_openapi_schema(result, '/messages/{message_id}', 'delete',
'200')
def test_delete_message_edit_permission_error(client, nonadmin_client):
# type: (Client, Client) -> None
request = {
"type": "stream",
"to": "Denmark",
"topic": "Castle",
"content": "I come not, friends, to steal away your hearts."
}
result = client.send_message(request)
result = nonadmin_client.delete_message(result['id'])
validate_against_openapi_schema(result, '/messages/{message_id}', 'delete',
'400_not_admin')
@openapi_test_function("/messages/{message_id}/history:get")
def get_message_history(client, message_id):
# type: (Client, int) -> None
# {code_example|start}
# Get the edit history for message with ID "message_id"
result = client.get_message_history(message_id)
# {code_example|end}
validate_against_openapi_schema(result, '/messages/{message_id}/history',
'get', '200')
@openapi_test_function("/realm/emoji:get")
def get_realm_emoji(client):
# type: (Client) -> None
# {code_example|start}
result = client.get_realm_emoji()
# {code_example|end}
validate_against_openapi_schema(result, '/realm/emoji', 'GET', '200')
@openapi_test_function("/messages/flags:post")
def update_message_flags(client):
# type: (Client) -> None
# Send a few test messages
request = {
"type": "stream",
"to": "Denmark",
"topic": "Castle",
"content": "I come not, friends, to steal away your hearts."
} # type: Dict[str, Any]
message_ids = []
for i in range(0, 3):
message_ids.append(client.send_message(request)['id'])
# {code_example|start}
# Add the "read" flag to the messages with IDs in "message_ids"
request = {
'messages': message_ids,
'op': 'add',
'flag': 'read'
}
result = client.update_message_flags(request)
# {code_example|end}
validate_against_openapi_schema(result, '/messages/flags', 'post',
'200')
# {code_example|start}
# Remove the "starred" flag from the messages with IDs in "message_ids"
request = {
'messages': message_ids,
'op': 'remove',
'flag': 'starred'
}
result = client.update_message_flags(request)
# {code_example|end}
validate_against_openapi_schema(result, '/messages/flags', 'post',
'200')
@openapi_test_function("/register:post")
def register_queue(client):
# type: (Client) -> str
# {code_example|start}
# Register the queue
result = client.register(
event_types=['message', 'realm_emoji']
)
# {code_example|end}
validate_against_openapi_schema(result, '/register', 'post', '200')
return result['queue_id']
@openapi_test_function("/events:delete")
def deregister_queue(client, queue_id):
# type: (Client, str) -> None
# {code_example|start}
# Delete a queue (queue_id is the ID of the queue
# to be removed)
result = client.deregister(queue_id)
# {code_example|end}
validate_against_openapi_schema(result, '/events', 'delete', '200')
# Test "BAD_EVENT_QUEUE_ID" error
result = client.deregister(queue_id)
validate_against_openapi_schema(result, '/events', 'delete', '400')
@openapi_test_function("/server_settings:get")
def get_server_settings(client):
# type: (Client) -> None
# {code_example|start}
# Fetch the settings for this server
result = client.get_server_settings()
# {code_example|end}
validate_against_openapi_schema(result, '/server_settings', 'get', '200')
@openapi_test_function("/settings/notifications:patch")
def update_notification_settings(client):
# type: (Client) -> None
# {code_example|start}
# Enable push notifications even when online
request = {
'enable_offline_push_notifications': True,
'enable_online_push_notifications': True,
}
result = client.update_notification_settings(request)
# {code_example|end}
validate_against_openapi_schema(result, '/settings/notifications', 'patch', '200')
@openapi_test_function("/user_uploads:post")
def upload_file(client):
# type: (Client) -> None
path_to_file = os.path.join(ZULIP_DIR, 'zerver', 'tests', 'images', 'img.jpg')
# {code_example|start}
# Upload a file
with open(path_to_file, 'rb') as fp:
result = client.call_endpoint(
'user_uploads',
method='POST',
files=[fp]
)
client.send_message({
"type": "stream",
"to": "Denmark",
"topic": "Castle",
"content": "Check out [this picture](%s) of my castle!" % (result['uri'],)
})
# {code_example|end}
validate_against_openapi_schema(result, '/user_uploads', 'post', '200')
@openapi_test_function("/users/me/{stream_id}/topics:get")
def get_stream_topics(client, stream_id):
# type: (Client, int) -> None
# {code_example|start}
result = client.get_stream_topics(stream_id)
# {code_example|end}
validate_against_openapi_schema(result, '/users/me/{stream_id}/topics',
'get', '200')
@openapi_test_function("/typing:post")
2018-08-09 20:27:12 +02:00
def set_typing_status(client):
# type: (Client) -> None
typing: Deprecate emails in typing endpoint. The only clients that should use the typing indicators endpoint are our internal clients, and they should send a JSON-formatted list of user_ids. Unfortunately, we still have some older versions of mobile that still send emails. In this commit we fix non-user-facing things like docs and tests to promote the user_ids interface that has existed since about version 2.0 of the server. One annoyance is that we documented the typing endpoint with emails, instead of the more modern user_ids, which may have delayed mobile converting to user_ids (and which certainly caused confusion). It's trivial to update the docs, but we need to short circuit one assertion in the openapi tests. We also clean up the test structure for the typing tests: TypingHappyPathTest.test_start_to_another_user TypingHappyPathTest.test_start_to_multiple_recipients TypingHappyPathTest.test_start_to_self TypingHappyPathTest.test_start_to_single_recipient TypingHappyPathTest.test_stop_to_another_user TypingHappyPathTest.test_stop_to_self TypingValidateOperatorTest.test_invalid_parameter TypingValidateOperatorTest.test_missing_parameter TypingValidateUsersTest.test_argument_to_is_not_valid_json TypingValidateUsersTest.test_bogus_user_id TypingValidateUsersTest.test_empty_array TypingValidateUsersTest.test_missing_recipient TypingValidationHelpersTest.test_recipient_for_user_ids TypingValidationHelpersTest.test_recipient_for_user_ids_non_existent_id TypingLegacyMobileSupportTest.test_legacy_email_interface
2020-02-22 13:38:09 +01:00
ensure_users([9, 10], ['hamlet', 'iago'])
2018-08-09 20:27:12 +02:00
# {code_example|start}
# The user has started to type in the group PM with Iago and Polonius
typing: Deprecate emails in typing endpoint. The only clients that should use the typing indicators endpoint are our internal clients, and they should send a JSON-formatted list of user_ids. Unfortunately, we still have some older versions of mobile that still send emails. In this commit we fix non-user-facing things like docs and tests to promote the user_ids interface that has existed since about version 2.0 of the server. One annoyance is that we documented the typing endpoint with emails, instead of the more modern user_ids, which may have delayed mobile converting to user_ids (and which certainly caused confusion). It's trivial to update the docs, but we need to short circuit one assertion in the openapi tests. We also clean up the test structure for the typing tests: TypingHappyPathTest.test_start_to_another_user TypingHappyPathTest.test_start_to_multiple_recipients TypingHappyPathTest.test_start_to_self TypingHappyPathTest.test_start_to_single_recipient TypingHappyPathTest.test_stop_to_another_user TypingHappyPathTest.test_stop_to_self TypingValidateOperatorTest.test_invalid_parameter TypingValidateOperatorTest.test_missing_parameter TypingValidateUsersTest.test_argument_to_is_not_valid_json TypingValidateUsersTest.test_bogus_user_id TypingValidateUsersTest.test_empty_array TypingValidateUsersTest.test_missing_recipient TypingValidationHelpersTest.test_recipient_for_user_ids TypingValidationHelpersTest.test_recipient_for_user_ids_non_existent_id TypingLegacyMobileSupportTest.test_legacy_email_interface
2020-02-22 13:38:09 +01:00
user_id1 = 9
user_id2 = 10
2018-08-09 20:27:12 +02:00
request = {
'op': 'start',
typing: Deprecate emails in typing endpoint. The only clients that should use the typing indicators endpoint are our internal clients, and they should send a JSON-formatted list of user_ids. Unfortunately, we still have some older versions of mobile that still send emails. In this commit we fix non-user-facing things like docs and tests to promote the user_ids interface that has existed since about version 2.0 of the server. One annoyance is that we documented the typing endpoint with emails, instead of the more modern user_ids, which may have delayed mobile converting to user_ids (and which certainly caused confusion). It's trivial to update the docs, but we need to short circuit one assertion in the openapi tests. We also clean up the test structure for the typing tests: TypingHappyPathTest.test_start_to_another_user TypingHappyPathTest.test_start_to_multiple_recipients TypingHappyPathTest.test_start_to_self TypingHappyPathTest.test_start_to_single_recipient TypingHappyPathTest.test_stop_to_another_user TypingHappyPathTest.test_stop_to_self TypingValidateOperatorTest.test_invalid_parameter TypingValidateOperatorTest.test_missing_parameter TypingValidateUsersTest.test_argument_to_is_not_valid_json TypingValidateUsersTest.test_bogus_user_id TypingValidateUsersTest.test_empty_array TypingValidateUsersTest.test_missing_recipient TypingValidationHelpersTest.test_recipient_for_user_ids TypingValidationHelpersTest.test_recipient_for_user_ids_non_existent_id TypingLegacyMobileSupportTest.test_legacy_email_interface
2020-02-22 13:38:09 +01:00
'to': [user_id1, user_id2],
2018-08-09 20:27:12 +02:00
}
result = client.set_typing_status(request)
# {code_example|end}
validate_against_openapi_schema(result, '/typing', 'post', '200')
# {code_example|start}
# The user has finished typing in the group PM with Iago and Polonius
typing: Deprecate emails in typing endpoint. The only clients that should use the typing indicators endpoint are our internal clients, and they should send a JSON-formatted list of user_ids. Unfortunately, we still have some older versions of mobile that still send emails. In this commit we fix non-user-facing things like docs and tests to promote the user_ids interface that has existed since about version 2.0 of the server. One annoyance is that we documented the typing endpoint with emails, instead of the more modern user_ids, which may have delayed mobile converting to user_ids (and which certainly caused confusion). It's trivial to update the docs, but we need to short circuit one assertion in the openapi tests. We also clean up the test structure for the typing tests: TypingHappyPathTest.test_start_to_another_user TypingHappyPathTest.test_start_to_multiple_recipients TypingHappyPathTest.test_start_to_self TypingHappyPathTest.test_start_to_single_recipient TypingHappyPathTest.test_stop_to_another_user TypingHappyPathTest.test_stop_to_self TypingValidateOperatorTest.test_invalid_parameter TypingValidateOperatorTest.test_missing_parameter TypingValidateUsersTest.test_argument_to_is_not_valid_json TypingValidateUsersTest.test_bogus_user_id TypingValidateUsersTest.test_empty_array TypingValidateUsersTest.test_missing_recipient TypingValidationHelpersTest.test_recipient_for_user_ids TypingValidationHelpersTest.test_recipient_for_user_ids_non_existent_id TypingLegacyMobileSupportTest.test_legacy_email_interface
2020-02-22 13:38:09 +01:00
user_id1 = 9
user_id2 = 10
2018-08-09 20:27:12 +02:00
request = {
'op': 'stop',
typing: Deprecate emails in typing endpoint. The only clients that should use the typing indicators endpoint are our internal clients, and they should send a JSON-formatted list of user_ids. Unfortunately, we still have some older versions of mobile that still send emails. In this commit we fix non-user-facing things like docs and tests to promote the user_ids interface that has existed since about version 2.0 of the server. One annoyance is that we documented the typing endpoint with emails, instead of the more modern user_ids, which may have delayed mobile converting to user_ids (and which certainly caused confusion). It's trivial to update the docs, but we need to short circuit one assertion in the openapi tests. We also clean up the test structure for the typing tests: TypingHappyPathTest.test_start_to_another_user TypingHappyPathTest.test_start_to_multiple_recipients TypingHappyPathTest.test_start_to_self TypingHappyPathTest.test_start_to_single_recipient TypingHappyPathTest.test_stop_to_another_user TypingHappyPathTest.test_stop_to_self TypingValidateOperatorTest.test_invalid_parameter TypingValidateOperatorTest.test_missing_parameter TypingValidateUsersTest.test_argument_to_is_not_valid_json TypingValidateUsersTest.test_bogus_user_id TypingValidateUsersTest.test_empty_array TypingValidateUsersTest.test_missing_recipient TypingValidationHelpersTest.test_recipient_for_user_ids TypingValidationHelpersTest.test_recipient_for_user_ids_non_existent_id TypingLegacyMobileSupportTest.test_legacy_email_interface
2020-02-22 13:38:09 +01:00
'to': [user_id1, user_id2],
2018-08-09 20:27:12 +02:00
}
result = client.set_typing_status(request)
# {code_example|end}
validate_against_openapi_schema(result, '/typing', 'post', '200')
@openapi_test_function("/realm/emoji/{emoji_name}:post")
def upload_custom_emoji(client):
# type: (Client) -> None
emoji_path = os.path.join(ZULIP_DIR, 'zerver', 'tests', 'images', 'img.jpg')
# {code_example|start}
# Upload a custom emoji; assume `emoji_path` is the path to your image.
with open(emoji_path, 'rb') as fp:
emoji_name = 'my_custom_emoji'
result = client.call_endpoint(
'realm/emoji/{}'.format(emoji_name),
method='POST',
files=[fp]
)
# {code_example|end}
validate_against_openapi_schema(result,
'/realm/emoji/{emoji_name}',
'post', '200')
2018-08-09 20:27:12 +02:00
@openapi_test_function("/users/me/alert_words:get")
def get_alert_words(client):
# type: (Client) -> None
result = client.get_alert_words()
assert result['result'] == 'success'
@openapi_test_function("/users/me/alert_words:post")
def add_alert_words(client):
# type: (Client) -> None
word = ['foo', 'bar']
result = client.add_alert_words(word)
assert result['result'] == 'success'
@openapi_test_function("/users/me/alert_words:delete")
def remove_alert_words(client):
# type: (Client) -> None
word = ['foo']
result = client.remove_alert_words(word)
assert result['result'] == 'success'
@openapi_test_function("/user_groups/create:post")
def create_user_group(client):
# type: (Client) -> None
ensure_users([6, 7, 8, 9], ['aaron', 'zoe', 'cordelia', 'hamlet'])
# {code_example|start}
request = {
'name': 'marketing',
'description': 'The marketing team.',
'members': [6, 7, 8, 9],
}
result = client.create_user_group(request)
# {code_example|end}
validate_against_openapi_schema(result, '/user_groups/create', 'post', '200')
assert result['result'] == 'success'
@openapi_test_function("/user_groups/{group_id}:patch")
def update_user_group(client, group_id):
# type: (Client, int) -> None
# {code_example|start}
request = {
'group_id': group_id,
'name': 'marketing',
'description': 'The marketing team.',
}
result = client.update_user_group(request)
# {code_example|end}
assert result['result'] == 'success'
@openapi_test_function("/user_groups/{group_id}:delete")
def remove_user_group(client, group_id):
# type: (Client, int) -> None
# {code_example|start}
result = client.remove_user_group(group_id)
# {code_example|end}
validate_against_openapi_schema(result, '/user_groups/{group_id}', 'delete', '200')
assert result['result'] == 'success'
@openapi_test_function("/user_groups/{group_id}/members:post")
def update_user_group_members(client, group_id):
# type: (Client, int) -> None
ensure_users([8, 9, 10], ['cordelia', 'hamlet', 'iago'])
request = {
'group_id': group_id,
'delete': [8, 9],
'add': [10]
}
result = client.update_user_group_members(request)
assert result['result'] == 'success'
def test_invalid_api_key(client_with_invalid_key):
# type: (Client) -> None
result = client_with_invalid_key.list_subscriptions()
validate_against_openapi_schema(result, '/rest-error-handling', 'post', '400_invalid_api_key')
def test_missing_request_argument(client):
# type: (Client) -> None
result = client.render_message({})
validate_against_openapi_schema(result, '/rest-error-handling', 'post', '400_missing_request_argument_error')
def test_invalid_stream_error(client):
# type: (Client) -> None
result = client.get_stream_id('nonexistent')
validate_against_openapi_schema(result, '/get_stream_id', 'get', '400')
2017-01-13 13:50:39 +01:00
# SETUP METHODS FOLLOW
def test_against_fixture(result, fixture, check_if_equal=[], check_if_exists=[]):
# type: (Dict[str, Any], Dict[str, Any], Optional[Iterable[str]], Optional[Iterable[str]]) -> None
assertLength(result, fixture)
if not check_if_equal and not check_if_exists:
for key, value in fixture.items():
assertEqual(key, result, fixture)
if check_if_equal:
for key in check_if_equal:
assertEqual(key, result, fixture)
if check_if_exists:
for key in check_if_exists:
assertIn(key, result)
def assertEqual(key, result, fixture):
# type: (str, Dict[str, Any], Dict[str, Any]) -> None
if result[key] != fixture[key]:
first = "{key} = {value}".format(key=key, value=result[key])
second = "{key} = {value}".format(key=key, value=fixture[key])
raise AssertionError("Actual and expected outputs do not match; showing diff:\n" +
mdiff.diff_strings(first, second))
else:
assert result[key] == fixture[key]
def assertLength(result, fixture):
# type: (Dict[str, Any], Dict[str, Any]) -> None
if len(result) != len(fixture):
result_string = json.dumps(result, indent=4, sort_keys=True)
fixture_string = json.dumps(fixture, indent=4, sort_keys=True)
raise AssertionError("The lengths of the actual and expected outputs do not match; showing diff:\n" +
mdiff.diff_strings(result_string, fixture_string))
else:
assert len(result) == len(fixture)
def assertIn(key, result):
# type: (str, Dict[str, Any]) -> None
if key not in result.keys():
raise AssertionError(
"The actual output does not contain the the key `{key}`.".format(key=key)
)
else:
assert key in result
def test_messages(client, nonadmin_client):
# type: (Client, Client) -> None
2017-01-13 13:50:39 +01:00
render_message(client)
message_id = send_message(client)
add_reaction(client, message_id)
remove_reaction(client, message_id)
2017-01-13 13:50:39 +01:00
update_message(client, message_id)
get_raw_message(client, message_id)
get_messages(client)
get_message_history(client, message_id)
delete_message(client, message_id)
mark_all_as_read(client)
mark_stream_as_read(client)
mark_topic_as_read(client)
update_message_flags(client)
2017-01-13 13:50:39 +01:00
test_nonexistent_stream_error(client)
test_private_message_invalid_recipient(client)
test_update_message_edit_permission_error(client, nonadmin_client)
test_delete_message_edit_permission_error(client, nonadmin_client)
2017-01-13 13:50:39 +01:00
def test_users(client):
# type: (Client) -> None
create_user(client)
get_members(client)
get_single_user(client)
deactivate_user(client)
update_user(client)
2017-01-13 13:50:39 +01:00
get_profile(client)
update_notification_settings(client)
upload_file(client)
2018-08-09 20:27:12 +02:00
set_typing_status(client)
get_user_presence(client)
update_presence(client)
create_user_group(client)
group_id = get_user_groups(client)
update_user_group(client, group_id)
update_user_group_members(client, group_id)
remove_user_group(client, group_id)
get_alert_words(client)
add_alert_words(client)
remove_alert_words(client)
2017-01-13 13:50:39 +01:00
def test_streams(client, nonadmin_client):
# type: (Client, Client) -> None
2017-01-13 13:50:39 +01:00
add_subscriptions(client)
test_add_subscriptions_already_subscribed(client)
2017-01-13 13:50:39 +01:00
list_subscriptions(client)
stream_id = get_stream_id(client)
update_stream(client, stream_id)
2017-01-13 13:50:39 +01:00
get_streams(client)
get_subscribers(client)
remove_subscriptions(client)
toggle_mute_topic(client)
update_subscription_settings(client)
update_notification_settings(client)
get_stream_topics(client, 1)
delete_stream(client, stream_id)
2017-01-13 13:50:39 +01:00
test_user_not_authorized_error(nonadmin_client)
test_authorization_errors_fatal(client, nonadmin_client)
def test_queues(client):
# type: (Client) -> None
# Note that the example for api/get-events-from-queue is not tested.
# Since, methods such as client.get_events() or client.call_on_each_message
# are blocking calls and since the event queue backend is already
# thoroughly tested in zerver/tests/test_event_queue.py, it is not worth
# the effort to come up with asynchronous logic for testing those here.
queue_id = register_queue(client)
deregister_queue(client, queue_id)
def test_server_organizations(client):
# type: (Client) -> None
2018-08-14 03:10:37 +02:00
get_realm_filters(client)
add_realm_filter(client)
get_server_settings(client)
remove_realm_filter(client)
get_realm_emoji(client)
upload_custom_emoji(client)
def test_errors(client):
# type: (Client) -> None
test_missing_request_argument(client)
test_invalid_stream_error(client)
def test_the_api(client, nonadmin_client):
# type: (Client, Client) -> None
2017-01-13 13:50:39 +01:00
get_user_agent(client)
test_users(client)
test_streams(client, nonadmin_client)
test_messages(client, nonadmin_client)
test_queues(client)
test_server_organizations(client)
test_errors(client)
sys.stdout.flush()
if REGISTERED_TEST_FUNCTIONS != CALLED_TEST_FUNCTIONS:
print("Error! Some @openapi_test_function tests were never called:")
print(" ", REGISTERED_TEST_FUNCTIONS - CALLED_TEST_FUNCTIONS)
sys.exit(1)