from typing import Dict, Any, Optional, Iterable from io import StringIO import json import os from zerver.lib import mdiff from zerver.lib.openapi import validate_against_openapi_schema if False: from zulip import Client ZULIP_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) FIXTURE_PATH = os.path.join(ZULIP_DIR, 'templates', 'zerver', 'api', 'fixtures.json') def load_api_fixtures(): # type: () -> Dict[str, Any] with open(FIXTURE_PATH, 'r') as fp: json_dict = json.loads(fp.read()) return json_dict FIXTURES = load_api_fixtures() def add_subscriptions(client): # type: (Client) -> None # {code_example|start} # Subscribe to the stream "new stream" result = client.add_subscriptions( streams=[ { 'name': 'new stream', 'description': 'New stream for testing' } ] ) # {code_example|end} fixture = FIXTURES['add-subscriptions']['without_principals'] test_against_fixture(result, fixture) # {code_example|start} # To subscribe another user to a stream, you may pass in # the `principals` argument, like so: result = client.add_subscriptions( streams=[ {'name': 'new stream', 'description': 'New stream for testing'} ], principals=['newbie@zulip.com'] ) # {code_example|end} assert result['result'] == 'success' assert 'newbie@zulip.com' in result['subscribed'] def test_add_subscriptions_already_subscribed(client): # type: (Client) -> None result = client.add_subscriptions( streams=[ {'name': 'new stream', 'description': 'New stream for testing'} ], principals=['newbie@zulip.com'] ) fixture = FIXTURES['add-subscriptions']['already_subscribed'] test_against_fixture(result, fixture) def test_authorization_errors_fatal(client, nonadmin_client): # type: (Client, Client) -> None client.add_subscriptions( streams=[ {'name': 'private_stream'} ], ) stream_id = client.get_stream_id('private_stream')['stream_id'] client.call_endpoint( 'streams/{}'.format(stream_id), method='PATCH', request={'is_private': True} ) result = nonadmin_client.add_subscriptions( streams=[ {'name': 'private_stream'} ], authorization_errors_fatal=False, ) fixture = FIXTURES['add-subscriptions']['unauthorized_errors_fatal_false'] test_against_fixture(result, fixture) result = nonadmin_client.add_subscriptions( streams=[ {'name': 'private_stream'} ], authorization_errors_fatal=True, ) fixture = FIXTURES['add-subscriptions']['unauthorized_errors_fatal_true'] test_against_fixture(result, fixture) def create_user(client): # type: (Client) -> None # {code_example|start} # Create a user request = { 'email': 'newbie@zulip.com', 'password': 'temp', 'full_name': 'New User', 'short_name': 'newbie' } result = client.create_user(request) # {code_example|end} fixture = FIXTURES['successful-response-empty'] test_against_fixture(result, fixture) # Test "Email already used error" result = client.create_user(request) fixture = FIXTURES['email_already_used_error'] test_against_fixture(result, fixture) def get_members(client): # type: (Client) -> None # {code_example|start} # Get all users in the realm result = client.get_members() # {code_example|end} fixture = FIXTURES['get-all-users'] test_against_fixture(result, fixture, check_if_equal=['msg', 'result'], check_if_exists=['members']) members = [m for m in result['members'] if m['email'] == 'newbie@zulip.com'] assert len(members) == 1 newbie = members[0] assert not newbie['is_admin'] assert newbie['full_name'] == 'New User' member_fixture = fixture['members'][0] member_result = result['members'][0] test_against_fixture(member_result, member_fixture, check_if_exists=member_fixture.keys()) # {code_example|start} # You may pass the `client_gravatar` query parameter as follows: result = client.call_endpoint( url='users?client_gravatar=true', method='GET', ) # {code_example|end} test_against_fixture(result, fixture, check_if_equal=['msg', 'result'], check_if_exists=['members']) assert result['members'][0]['avatar_url'] is None def get_profile(client): # type: (Client) -> None # {code_example|start} # Get the profile of the user/bot that requests this endpoint, # which is `client` in this case: result = client.get_profile() # {code_example|end} fixture = FIXTURES['get-profile'] check_if_equal = ['email', 'full_name', 'msg', 'result', 'short_name'] check_if_exists = ['client_id', 'is_admin', 'is_bot', 'max_message_id', 'pointer', 'user_id'] test_against_fixture(result, fixture, check_if_equal=check_if_equal, check_if_exists=check_if_exists) def get_stream_id(client): # type: (Client) -> None # {code_example|start} # Get the ID of a given stream stream_name = 'new stream' result = client.get_stream_id(stream_name) # {code_example|end} fixture = FIXTURES['get-stream-id'] test_against_fixture(result, fixture, check_if_equal=['msg', 'result'], check_if_exists=['stream_id']) def get_streams(client): # type: (Client) -> None # {code_example|start} # Get all streams that the user has access to result = client.get_streams() # {code_example|end} fixture = FIXTURES['get-all-streams'] test_against_fixture(result, fixture, check_if_equal=['msg', 'result'], check_if_exists=['streams']) assert len(result['streams']) == len(fixture['streams']) streams = [s for s in result['streams'] if s['name'] == 'new stream'] assert streams[0]['description'] == 'New stream for testing' # {code_example|start} # You may pass in one or more of the query parameters mentioned above # as keyword arguments, like so: result = client.get_streams(include_public=False) # {code_example|end} test_against_fixture(result, fixture, check_if_equal=['msg', 'result'], check_if_exists=['streams']) assert len(result['streams']) == 4 def test_user_not_authorized_error(nonadmin_client): # type: (Client) -> None result = nonadmin_client.get_streams(include_all_active=True) fixture = FIXTURES['user-not-authorized-error'] test_against_fixture(result, fixture) def get_subscribers(client): # type: (Client) -> None result = client.get_subscribers(stream='new stream') assert result['subscribers'] == ['iago@zulip.com', 'newbie@zulip.com'] def get_user_agent(client): # type: (Client) -> None result = client.get_user_agent() assert result.startswith('ZulipPython/') def list_subscriptions(client): # type: (Client) -> None # {code_example|start} # Get all streams that the user is subscribed to result = client.list_subscriptions() # {code_example|end} fixture = FIXTURES['get-subscribed-streams'] test_against_fixture(result, fixture, check_if_equal=['msg', 'result'], check_if_exists=['subscriptions']) streams = [s for s in result['subscriptions'] if s['name'] == 'new stream'] assert streams[0]['description'] == 'New stream for testing' def remove_subscriptions(client): # type: (Client) -> None # {code_example|start} # Unsubscribe from the stream "new stream" result = client.remove_subscriptions( ['new stream'] ) # {code_example|end} fixture = FIXTURES['remove-subscriptions'] test_against_fixture(result, fixture) # test it was actually removed result = client.list_subscriptions() assert result['result'] == 'success' streams = [s for s in result['subscriptions'] if s['name'] == 'new stream'] assert len(streams) == 0 # {code_example|start} # Unsubscribe another user from the stream "new stream" result = client.remove_subscriptions( ['new stream'], principals=['newbie@zulip.com'] ) # {code_example|end} test_against_fixture(result, fixture) def render_message(client): # type: (Client) -> None # {code_example|start} # Render a message request = { 'content': '**foo**' } result = client.render_message(request) # {code_example|end} fixture = FIXTURES['render-message'] test_against_fixture(result, fixture) def stream_message(client): # type: (Client) -> int # {code_example|start} # Send a stream message request = { "type": "stream", "to": "Denmark", "subject": "Castle", "content": "Something is rotten in the state of Denmark." } result = client.send_message(request) # {code_example|end} fixture = FIXTURES['stream-message'] test_against_fixture(result, fixture, check_if_equal=['result'], check_if_exists=['id']) # test it was actually sent message_id = result['id'] url = 'messages/' + str(message_id) result = client.call_endpoint( url=url, method='GET' ) assert result['result'] == 'success' assert result['raw_content'] == request['content'] return message_id def test_nonexistent_stream_error(client): # type: (Client) -> None request = { "type": "stream", "to": "nonexistent_stream", "subject": "Castle", "content": "Something is rotten in the state of Denmark." } result = client.send_message(request) fixture = FIXTURES['nonexistent-stream-error'] test_against_fixture(result, fixture) def private_message(client): # type: (Client) -> None # {code_example|start} # Send a private message request = { "type": "private", "to": "iago@zulip.com", "content": "I come not, friends, to steal away your hearts." } result = client.send_message(request) # {code_example|end} fixture = FIXTURES['private-message'] test_against_fixture(result, fixture, check_if_equal=['result'], check_if_exists=['id']) # test it was actually sent message_id = result['id'] url = 'messages/' + str(message_id) result = client.call_endpoint( url=url, method='GET' ) assert result['result'] == 'success' assert result['raw_content'] == request['content'] def test_private_message_invalid_recipient(client): # type: (Client) -> None request = { "type": "private", "to": "eeshan@zulip.com", "content": "I come not, friends, to steal away your hearts." } result = client.send_message(request) fixture = FIXTURES['invalid-pm-recipient-error'] test_against_fixture(result, fixture) def update_message(client, message_id): # type: (Client, int) -> None assert int(message_id) # {code_example|start} # Edit a message # (make sure that message_id below is set to the ID of the # message you wish to update) request = { "message_id": message_id, "content": "New content" } result = client.update_message(request) # {code_example|end} validate_against_openapi_schema(result, '/messages/{message_id}', 'patch') # test it was actually updated url = 'messages/' + str(message_id) result = client.call_endpoint( url=url, method='GET' ) assert result['result'] == 'success' assert result['raw_content'] == request['content'] def test_update_message_edit_permission_error(client, nonadmin_client): # type: (Client, Client) -> None request = { "type": "stream", "to": "Denmark", "subject": "Castle", "content": "Something is rotten in the state of Denmark." } result = client.send_message(request) request = { "message_id": result["id"], "content": "New content" } result = nonadmin_client.update_message(request) fixture = FIXTURES['update-message-edit-permission-error'] test_against_fixture(result, fixture) def register_queue(client): # type: (Client) -> str # {code_example|start} # Register the queue result = client.register( event_types=['messages', 'realm_emoji'] ) # {code_example|end} fixture = FIXTURES['register-queue'] test_against_fixture(result, fixture, check_if_equal=['msg', 'result'], check_if_exists=['last_event_id', 'queue_id']) return result['queue_id'] def deregister_queue(client, queue_id): # type: (Client, str) -> None # {code_example|start} # Delete a queue (queue_id is the ID of the queue # to be removed) result = client.deregister(queue_id) # {code_example|end} fixture = FIXTURES['successful-response-empty'] test_against_fixture(result, fixture) # Test "BAD_EVENT_QUEUE_ID" error result = client.deregister(queue_id) fixture = FIXTURES['bad_event_queue_id_error'] test_against_fixture(result, fixture, check_if_equal=['code', 'result'], check_if_exists=['queue_id', 'msg']) def upload_file(client): # type: (Client) -> None fp = StringIO("zulip") fp.name = "zulip.txt" # {code_example|start} # Upload a file # (Make sure that 'fp' is a file object) result = client.call_endpoint( 'user_uploads', method='POST', files=[fp] ) # {code_example|end} fixture = FIXTURES['upload-file'] test_against_fixture(result, fixture, check_if_equal=['msg', 'result'], check_if_exists=['uri']) def get_stream_topics(client, stream_id): # type: (Client, int) -> None # {code_example|start} result = client.get_stream_topics(stream_id) # {code_example|end} validate_against_openapi_schema(result, '/users/me/{stream_id}/topics', 'get') def test_invalid_api_key(client_with_invalid_key): # type: (Client) -> None result = client_with_invalid_key.list_subscriptions() fixture = FIXTURES['invalid-api-key'] test_against_fixture(result, fixture) def test_missing_request_argument(client): # type: (Client) -> None result = client.render_message({}) fixture = FIXTURES['missing-request-argument-error'] test_against_fixture(result, fixture) def test_invalid_stream_error(client): # type: (Client) -> None result = client.get_stream_id('nonexistent') fixture = FIXTURES['invalid-stream-error'] test_against_fixture(result, fixture) TEST_FUNCTIONS = { 'render-message': render_message, 'stream-message': stream_message, 'private-message': private_message, '/messages/{message_id}:patch': update_message, 'get-stream-id': get_stream_id, 'get-subscribed-streams': list_subscriptions, 'get-all-streams': get_streams, 'create-user': create_user, 'get-profile': get_profile, 'add-subscriptions': add_subscriptions, 'remove-subscriptions': remove_subscriptions, 'get-all-users': get_members, 'register-queue': register_queue, 'delete-queue': deregister_queue, 'upload-file': upload_file, '/users/me/{stream_id}/topics:get': get_stream_topics } # SETUP METHODS FOLLOW def test_against_fixture(result, fixture, check_if_equal=[], check_if_exists=[]): # type: (Dict[str, Any], Dict[str, Any], Optional[Iterable[str]], Optional[Iterable[str]]) -> None assertLength(result, fixture) if not check_if_equal and not check_if_exists: for key, value in fixture.items(): assertEqual(key, result, fixture) if check_if_equal: for key in check_if_equal: assertEqual(key, result, fixture) if check_if_exists: for key in check_if_exists: assertIn(key, result) def assertEqual(key, result, fixture): # type: (str, Dict[str, Any], Dict[str, Any]) -> None if result[key] != fixture[key]: first = "{key} = {value}".format(key=key, value=result[key]) second = "{key} = {value}".format(key=key, value=fixture[key]) raise AssertionError("Actual and expected outputs do not match; showing diff:\n" + mdiff.diff_strings(first, second)) else: assert result[key] == fixture[key] def assertLength(result, fixture): # type: (Dict[str, Any], Dict[str, Any]) -> None if len(result) != len(fixture): result_string = json.dumps(result, indent=4, sort_keys=True) fixture_string = json.dumps(fixture, indent=4, sort_keys=True) raise AssertionError("The lengths of the actual and expected outputs do not match; showing diff:\n" + mdiff.diff_strings(result_string, fixture_string)) else: assert len(result) == len(fixture) def assertIn(key, result): # type: (str, Dict[str, Any]) -> None if key not in result.keys(): raise AssertionError( "The actual output does not contain the the key `{key}`.".format(key=key) ) else: assert key in result def test_messages(client): # type: (Client) -> None render_message(client) message_id = stream_message(client) update_message(client, message_id) private_message(client) test_nonexistent_stream_error(client) test_private_message_invalid_recipient(client) def test_users(client): # type: (Client) -> None create_user(client) get_members(client) get_profile(client) upload_file(client) def test_streams(client): # type: (Client) -> None add_subscriptions(client) test_add_subscriptions_already_subscribed(client) list_subscriptions(client) get_stream_id(client) get_streams(client) get_subscribers(client) remove_subscriptions(client) get_stream_topics(client, 1) def test_queues(client): # type: (Client) -> None # Note that the example for api/get-events-from-queue is not tested. # Since, methods such as client.get_events() or client.call_on_each_message # are blocking calls and since the event queue backend is already # thoroughly tested in zerver/tests/test_event_queue.py, it is not worth # the effort to come up with asynchronous logic for testing those here. queue_id = register_queue(client) deregister_queue(client, queue_id) def test_errors(client): # type: (Client) -> None test_missing_request_argument(client) test_invalid_stream_error(client) def test_the_api(client): # type: (Client) -> None get_user_agent(client) test_users(client) test_streams(client) test_messages(client) test_queues(client) test_errors(client)