# Zulip's OpenAPI-based API documentation system is documented at # https://zulip.readthedocs.io/en/latest/documentation/api.html # # This file defines the Python code examples that appears in Zulip's # REST API documentation, and also contains a system for running the # example code as part of the `tools/test-api` test suite. # # The actual documentation appears within these blocks: # # {code_example|start} # Code here # # {code_example|end} # # Whereas the surrounding code is test setup logic. import json import os import sys from email.headerregistry import Address from functools import wraps from typing import Any, Callable, TypeVar from typing_extensions import ParamSpec from zulip import Client from zerver.models.realms import get_realm from zerver.models.users import get_user from zerver.openapi.openapi import validate_against_openapi_schema ZULIP_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) TEST_FUNCTIONS: dict[str, Callable[..., object]] = {} REGISTERED_TEST_FUNCTIONS: set[str] = set() CALLED_TEST_FUNCTIONS: set[str] = set() ParamT = ParamSpec("ParamT") ReturnT = TypeVar("ReturnT") def openapi_test_function( endpoint: str, ) -> Callable[[Callable[ParamT, ReturnT]], Callable[ParamT, ReturnT]]: """This decorator is used to register an OpenAPI test function with its endpoint. Example usage: @openapi_test_function("/messages/render:post") def ... """ def wrapper(test_func: Callable[ParamT, ReturnT]) -> Callable[ParamT, ReturnT]: @wraps(test_func) def _record_calls_wrapper(*args: ParamT.args, **kwargs: ParamT.kwargs) -> ReturnT: CALLED_TEST_FUNCTIONS.add(test_func.__name__) return test_func(*args, **kwargs) REGISTERED_TEST_FUNCTIONS.add(test_func.__name__) TEST_FUNCTIONS[endpoint] = _record_calls_wrapper return _record_calls_wrapper return wrapper def ensure_users(ids_list: list[int], user_names: list[str]) -> None: # Ensure that the list of user ids (ids_list) # matches the users we want to refer to (user_names). realm = get_realm("zulip") user_ids = [ get_user(Address(username=name, domain="zulip.com").addr_spec, realm).id for name in user_names ] assert ids_list == user_ids def assert_success_response(response: dict[str, Any]) -> None: assert "result" in response assert response["result"] == "success" def assert_error_response(response: dict[str, Any], code: str = "BAD_REQUEST") -> None: assert "result" in response assert response["result"] == "error" assert "code" in response assert response["code"] == code def get_subscribed_stream_ids(client: Client) -> list[int]: streams = client.get_subscriptions() stream_ids = [stream["stream_id"] for stream in streams["subscriptions"]] return stream_ids def validate_message(client: Client, message_id: int, content: Any) -> None: url = "messages/" + str(message_id) result = client.call_endpoint( url=url, method="GET", ) assert result["raw_content"] == content @openapi_test_function("/users/me/subscriptions:post") def add_subscriptions(client: Client) -> None: # {code_example|start} # Create and subscribe to channel "python-test". result = client.add_subscriptions( streams=[ { "name": "python-test", "description": "Channel for testing Python", }, ], ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/subscriptions", "post", "200") user_id = 25 ensure_users([user_id], ["newbie"]) # {code_example|start} # To subscribe other users to a channel, you may pass # the `principals` argument, like so: result = client.add_subscriptions( streams=[ {"name": "python-test"}, ], principals=[user_id], ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/subscriptions", "post", "200") assert "newbie@zulip.com" in result["subscribed"] def test_add_subscriptions_already_subscribed(client: Client) -> None: result = client.add_subscriptions( streams=[ {"name": "python-test"}, ], principals=["newbie@zulip.com"], ) assert_success_response(result) validate_against_openapi_schema(result, "/users/me/subscriptions", "post", "200") def test_authorization_errors_fatal(client: Client, nonadmin_client: Client) -> None: client.add_subscriptions( streams=[ {"name": "private-channel"}, ], ) stream_id = client.get_stream_id("private-channel")["stream_id"] client.call_endpoint( f"streams/{stream_id}", method="PATCH", request={"is_private": True}, ) result = nonadmin_client.add_subscriptions( streams=[ {"name": "private-channel"}, ], authorization_errors_fatal=False, ) assert_success_response(result) validate_against_openapi_schema(result, "/users/me/subscriptions", "post", "200") result = nonadmin_client.add_subscriptions( streams=[ {"name": "private-channel"}, ], authorization_errors_fatal=True, ) assert_error_response(result) validate_against_openapi_schema(result, "/users/me/subscriptions", "post", "400") @openapi_test_function("/realm/presence:get") def get_presence(client: Client) -> None: # {code_example|start} # Get presence information of all the users in an organization. result = client.get_realm_presence() # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/presence", "get", "200") @openapi_test_function("/default_streams:post") def add_default_stream(client: Client) -> None: client.add_subscriptions( streams=[ { "name": "test channel", "description": "New channel for testing", }, ], ) stream_id = client.get_stream_id("test channel")["stream_id"] # {code_example|start} # Add a channel to the set of default channels for new users. result = client.add_default_stream(stream_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/default_streams", "post", "200") @openapi_test_function("/default_streams:delete") def remove_default_stream(client: Client) -> None: stream_id = client.get_stream_id("test channel")["stream_id"] # {code_example|start} # Remove a channel from the set of default channels for new users. request = {"stream_id": stream_id} result = client.call_endpoint( url="/default_streams", method="DELETE", request=request, ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/default_streams", "delete", "200") @openapi_test_function("/users/{user_id_or_email}/presence:get") def get_user_presence(client: Client) -> None: # {code_example|start} # Get presence information for "iago@zulip.com". result = client.get_user_presence("iago@zulip.com") # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/{user_id_or_email}/presence", "get", "200") @openapi_test_function("/users/{user_id}/status:get") def get_user_status(client: Client) -> None: user_id = 11 ensure_users([user_id], ["iago"]) # {code_example|start} # Get the status currently set by a user. result = client.call_endpoint( url=f"/users/{user_id}/status", method="GET", ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/{user_id}/status", "get", "200") @openapi_test_function("/users/me/presence:post") def update_presence(client: Client) -> None: # {code_example|start} # Update your presence. request = { "status": "active", "ping_only": False, "new_user_input": False, "last_update_id": -1, } result = client.update_presence(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/presence", "post", "200") @openapi_test_function("/users:post") def create_user(client: Client) -> None: # {code_example|start} # Create a user. request = { "email": "newbie@zulip.com", "password": "temp", "full_name": "New User", } result = client.create_user(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users", "post", "200") # Test "Email already used error". result = client.create_user(request) assert_error_response(result) validate_against_openapi_schema(result, "/users", "post", "400") @openapi_test_function("/users/me/status:post") def update_status(client: Client) -> None: # {code_example|start} # The request contains the new status and "away" boolean. request = { "status_text": "on vacation", "away": False, "emoji_name": "car", "emoji_code": "1f697", "reaction_type": "unicode_emoji", } result = client.call_endpoint(url="/users/me/status", method="POST", request=request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/status", "post", "200") # Test "status_text is too long error". request = { "status_text": "This is a message that exceeds 60 characters, and so should throw an error.", "away": "false", } result = client.call_endpoint(url="/users/me/status", method="POST", request=request) assert_error_response(result) validate_against_openapi_schema(result, "/users/me/status", "post", "400") @openapi_test_function("/users:get") def get_members(client: Client) -> None: # {code_example|start} # Get all users in the organization. result = client.get_members() # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users", "get", "200") members = [m for m in result["members"] if m["email"] == "newbie@zulip.com"] assert len(members) == 1 newbie = members[0] assert not newbie["is_admin"] assert newbie["full_name"] == "New User" # {code_example|start} # You may pass the `client_gravatar` query parameter as follows: result = client.get_members({"client_gravatar": False}) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users", "get", "200") assert result["members"][0]["avatar_url"] is not None # {code_example|start} # You may pass the `include_custom_profile_fields` query parameter as follows: result = client.get_members({"include_custom_profile_fields": True}) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users", "get", "200") for member in result["members"]: if member["is_bot"]: assert member.get("profile_data", None) is None else: assert member.get("profile_data", None) is not None assert member["avatar_url"] is None @openapi_test_function("/users/{email}:get") def get_user_by_email(client: Client) -> None: email = "iago@zulip.com" # {code_example|start} result = client.call_endpoint( url=f"/users/{email}", method="GET", ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/{email}", "get", "200") @openapi_test_function("/invites:get") def get_invitations(client: Client) -> None: # {code_example|start} # Get all invitations. result = client.call_endpoint(url="/invites", method="GET") # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/invites", "get", "200") @openapi_test_function("/invites:post") def send_invitations(client: Client) -> None: stream_ids = get_subscribed_stream_ids(client)[:3] # {code_example|start} # Send invitations. request = { "invitee_emails": "example@zulip.com, logan@zulip.com", "invite_expires_in_minutes": 60 * 24 * 10, # 10 days "invite_as": 400, "stream_ids": stream_ids, } result = client.call_endpoint(url="/invites", method="POST", request=request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/invites", "post", "200") @openapi_test_function("/invites/multiuse:post") def create_reusable_invitation_link(client: Client) -> None: stream_ids = get_subscribed_stream_ids(client)[:3] # {code_example|start} # Create a reusable invitation link. request = { "invite_expires_in_minutes": 60 * 24 * 10, # 10 days "invite_as": 400, "stream_ids": stream_ids, } result = client.call_endpoint(url="/invites/multiuse", method="POST", request=request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/invites/multiuse", "post", "200") @openapi_test_function("/invites/{invite_id}:delete") def revoke_email_invitation(client: Client) -> None: # Send email invitation. email = "delete-invite@zulip.com" request = { "invitee_emails": email, "stream_ids": [], } client.call_endpoint(url="/invites", method="POST", request=request) # Get invitation ID. invites = client.call_endpoint(url="/invites", method="GET")["invites"] invite = [s for s in invites if not s["is_multiuse"] and s["email"] == email] assert len(invite) == 1 invite_id = invite[0]["id"] # {code_example|start} # Revoke email invitation. result = client.call_endpoint(url=f"/invites/{invite_id}", method="DELETE") # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/invites/{invite_id}", "delete", "200") @openapi_test_function("/invites/multiuse/{invite_id}:delete") def revoke_reusable_invitation_link(client: Client) -> None: # Create multiuse invitation link. invite_url = client.call_endpoint(url="/invites/multiuse", method="POST", request={})[ "invite_link" ] # Get invitation ID. invites = client.call_endpoint(url="/invites", method="GET")["invites"] invite = [s for s in invites if s["is_multiuse"] and s["link_url"] == invite_url] assert len(invite) == 1 invite_id = invite[0]["id"] # {code_example|start} # Revoke reusable invitation link. result = client.call_endpoint(url=f"/invites/multiuse/{invite_id}", method="DELETE") # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/invites/multiuse/{invite_id}", "delete", "200") @openapi_test_function("/invites/{invite_id}/resend:post") def resend_email_invitation(client: Client) -> None: invites = client.call_endpoint(url="/invites", method="GET")["invites"] email_invites = [s for s in invites if not s["is_multiuse"]] assert len(email_invites) > 0 invite_id = email_invites[0]["id"] # {code_example|start} # Resend email invitation. result = client.call_endpoint(url=f"/invites/{invite_id}/resend", method="POST") # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/invites/{invite_id}/resend", "post", "200") @openapi_test_function("/users/{user_id}:get") def get_single_user(client: Client) -> None: user_id = 8 ensure_users([user_id], ["cordelia"]) # {code_example|start} # Fetch details on a user given a user ID. result = client.get_user_by_id(user_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/{user_id}", "get", "200") # {code_example|start} # If you'd like data on custom profile fields, you can request them as follows: result = client.get_user_by_id(user_id, include_custom_profile_fields=True) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/{user_id}", "get", "200") @openapi_test_function("/users/{user_id}:delete") def deactivate_user(client: Client) -> None: user_id = 8 ensure_users([user_id], ["cordelia"]) # {code_example|start} # Deactivate a user given a user ID. result = client.deactivate_user_by_id(user_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/{user_id}", "delete", "200") @openapi_test_function("/users/{user_id}/reactivate:post") def reactivate_user(client: Client) -> None: user_id = 8 ensure_users([user_id], ["cordelia"]) # {code_example|start} # Reactivate a user given a user ID. result = client.reactivate_user_by_id(user_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/{user_id}/reactivate", "post", "200") @openapi_test_function("/users/{user_id}:patch") def update_user(client: Client) -> None: ensure_users([8, 10], ["cordelia", "hamlet"]) user_id = 10 # {code_example|start} # Change a user's full name given a user ID. result = client.update_user_by_id(user_id, full_name="New Name") # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/{user_id}", "patch", "200") user_id = 8 # {code_example|start} # Change value of the custom profile field with ID 9. result = client.update_user_by_id(user_id, profile_data=[{"id": 9, "value": "some data"}]) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/{user_id}", "patch", "200") @openapi_test_function("/users/{user_id}/subscriptions/{stream_id}:get") def get_subscription_status(client: Client) -> None: user_id = 7 ensure_users([user_id], ["zoe"]) stream_id = client.get_subscriptions()["subscriptions"][0]["stream_id"] # {code_example|start} # Check whether a user is a subscriber to a given channel. result = client.call_endpoint( url=f"/users/{user_id}/subscriptions/{stream_id}", method="GET", ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema( result, "/users/{user_id}/subscriptions/{stream_id}", "get", "200" ) @openapi_test_function("/realm/linkifiers:get") def get_realm_linkifiers(client: Client) -> None: # {code_example|start} # Fetch all the linkifiers in this organization. result = client.call_endpoint( url="/realm/linkifiers", method="GET", ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/linkifiers", "get", "200") @openapi_test_function("/realm/linkifiers:patch") def reorder_realm_linkifiers(client: Client) -> None: realm_linkifiers = client.call_endpoint( url="/realm/linkifiers", method="GET", ) reordered_linkifiers = [linkifier["id"] for linkifier in realm_linkifiers["linkifiers"]][::-1] # {code_example|start} # Reorder the linkifiers in the user's organization. request = {"ordered_linkifier_ids": json.dumps(reordered_linkifiers)} result = client.call_endpoint(url="/realm/linkifiers", method="PATCH", request=request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/linkifiers", "patch", "200") @openapi_test_function("/realm/profile_fields:get") def get_realm_profile_fields(client: Client) -> None: # {code_example|start} # Fetch all the custom profile fields in the user's organization. result = client.call_endpoint( url="/realm/profile_fields", method="GET", ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/profile_fields", "get", "200") @openapi_test_function("/realm/profile_fields:patch") def reorder_realm_profile_fields(client: Client) -> None: realm_profile_fields = client.call_endpoint( url="/realm/profile_fields", method="GET", ) realm_profile_field_ids = [field["id"] for field in realm_profile_fields["custom_fields"]] reordered_profile_fields = realm_profile_field_ids[::-1] # {code_example|start} # Reorder the custom profile fields in the user's organization. request = {"order": json.dumps(reordered_profile_fields)} result = client.call_endpoint(url="/realm/profile_fields", method="PATCH", request=request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/profile_fields", "patch", "200") @openapi_test_function("/realm/profile_fields:post") def create_realm_profile_field(client: Client) -> None: # {code_example|start} # Create a custom profile field in the user's organization. request = {"name": "Phone", "hint": "Contact no.", "field_type": 1} result = client.call_endpoint(url="/realm/profile_fields", method="POST", request=request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/profile_fields", "post", "200") @openapi_test_function("/realm/filters:post") def add_realm_filter(client: Client) -> int: # TODO: Switch back to using client.add_realm_filter when python-zulip-api # begins to support url_template. # {code_example|start} # Add a filter to automatically linkify # to the corresponding # issue in Zulip's server repository. request = { "pattern": "#(?P[0-9]+)", "url_template": "https://github.com/zulip/zulip/issues/{id}", } result = client.call_endpoint("/realm/filters", method="POST", request=request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/filters", "post", "200") return result["id"] @openapi_test_function("/realm/filters/{filter_id}:patch") def update_realm_filter(client: Client, filter_id: int) -> None: # {code_example|start} # Update a linkifier. request = { "pattern": "#(?P[0-9]+)", "url_template": "https://github.com/zulip/zulip/issues/{id}", } result = client.call_endpoint( url=f"/realm/filters/{filter_id}", method="PATCH", request=request ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/filters/{filter_id}", "patch", "200") @openapi_test_function("/realm/filters/{filter_id}:delete") def remove_realm_filter(client: Client, filter_id: int) -> None: # {code_example|start} # Remove a linkifier. result = client.remove_realm_filter(filter_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/filters/{filter_id}", "delete", "200") @openapi_test_function("/realm/playgrounds:post") def add_realm_playground(client: Client) -> None: # {code_example|start} # Add a code playground for Python. request = { "name": "Python playground", "pygments_language": "Python", "url_template": "https://python.example.com?code={code}", } result = client.call_endpoint(url="/realm/playgrounds", method="POST", request=request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/playgrounds", "post", "200") @openapi_test_function("/realm/playgrounds/{playground_id}:delete") def remove_realm_playground(client: Client) -> None: # {code_example|start} # Remove the code playground with ID 1. result = client.call_endpoint(url="/realm/playgrounds/1", method="DELETE") # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/playgrounds/{playground_id}", "delete", "200") @openapi_test_function("/users/me:get") def get_profile(client: Client) -> None: # {code_example|start} # Get the profile of the user/bot that requests this endpoint, # which is `client` in this case. result = client.get_profile() # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me", "get", "200") @openapi_test_function("/users/me:delete") def deactivate_own_user(client: Client, owner_client: Client) -> None: user_id = client.get_profile()["user_id"] # {code_example|start} # Deactivate the account of the current user/bot. result = client.call_endpoint( url="/users/me", method="DELETE", ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me", "delete", "200") # Reactivate the account to avoid polluting other tests. owner_client.reactivate_user_by_id(user_id) @openapi_test_function("/get_stream_id:get") def get_stream_id(client: Client) -> int: name = "python-test" # {code_example|start} # Get the ID of a given channel name. result = client.get_stream_id(name) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/get_stream_id", "get", "200") return result["stream_id"] @openapi_test_function("/streams/{stream_id}:delete") def archive_stream(client: Client) -> None: client.add_subscriptions( streams=[ { "name": "example to archive", }, ], ) stream_id = client.get_stream_id("example to archive")["stream_id"] # {code_example|start} # Archive a channel, given the channel's ID. result = client.delete_stream(stream_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/streams/{stream_id}", "delete", "200") @openapi_test_function("/streams/{stream_id}/delete_topic:post") def delete_topic(client: Client, stream_id: int, topic: str) -> None: # {code_example|start} # Delete a topic in a channel, given the channel's ID. request = { "topic_name": topic, } result = client.call_endpoint( url=f"/streams/{stream_id}/delete_topic", method="POST", request=request ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/streams/{stream_id}/delete_topic", "post", "200") @openapi_test_function("/streams:get") def get_streams(client: Client) -> None: # {code_example|start} # Get all channels that the user has access to. result = client.get_streams() # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/streams", "get", "200") streams = [s for s in result["streams"] if s["name"] == "python-test"] assert streams[0]["description"] == "Channel for testing Python" # {code_example|start} # You may pass in one or more of the query parameters mentioned above # as keyword arguments, like so: result = client.get_streams(include_public=False) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/streams", "get", "200") assert len(result["streams"]) == 7 @openapi_test_function("/streams/{stream_id}:patch") def update_stream(client: Client, stream_id: int) -> None: # {code_example|start} # Update settings for the channel with a given ID. request = { "stream_id": stream_id, "stream_post_policy": 2, "is_private": True, } result = client.update_stream(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/streams/{stream_id}", "patch", "200") @openapi_test_function("/user_groups:get") def get_user_groups(client: Client) -> int: # {code_example|start} # Get all user groups of the organization. result = client.get_user_groups() # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/user_groups", "get", "200") [hamlet_user_group] = (u for u in result["user_groups"] if u["name"] == "hamletcharacters") assert hamlet_user_group["description"] == "Characters of Hamlet" [marketing_user_group] = (u for u in result["user_groups"] if u["name"] == "marketing") return marketing_user_group["id"] def test_user_not_authorized_error(nonadmin_client: Client) -> None: result = nonadmin_client.get_streams(include_all_active=True) assert_error_response(result) validate_against_openapi_schema(result, "/rest-error-handling", "post", "400") @openapi_test_function("/streams/{stream_id}/members:get") def get_subscribers(client: Client) -> None: user_ids = [11, 25] ensure_users(user_ids, ["iago", "newbie"]) # {code_example|start} # Get the subscribers to a channel. Note that `client.get_subscribers` # takes a `stream` parameter with the channel's name and not the # channel's ID. result = client.get_subscribers(stream="python-test") # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/streams/{stream_id}/members", "get", "200") assert result["subscribers"] == user_ids def get_user_agent(client: Client) -> None: result = client.get_user_agent() assert result.startswith("ZulipPython/") @openapi_test_function("/users/me/subscriptions:get") def get_subscriptions(client: Client) -> None: # {code_example|start} # Get all channels that the user is subscribed to. result = client.get_subscriptions() # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/subscriptions", "get", "200") streams = [s for s in result["subscriptions"] if s["name"] == "python-test"] assert streams[0]["description"] == "Channel for testing Python" @openapi_test_function("/users/me/subscriptions:delete") def remove_subscriptions(client: Client) -> None: # {code_example|start} # Unsubscribe from channel "python-test". result = client.remove_subscriptions( ["python-test"], ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/subscriptions", "delete", "200") # Confirm user is no longer subscribed to "python-test". subscriptions = client.get_subscriptions()["subscriptions"] streams = [s for s in subscriptions if s["name"] == "python-test"] assert len(streams) == 0 # {code_example|start} # Unsubscribe another user from channel "python-test". result = client.remove_subscriptions( ["python-test"], principals=["newbie@zulip.com"], ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/subscriptions", "delete", "200") @openapi_test_function("/users/me/subscriptions/muted_topics:patch") def toggle_mute_topic(client: Client) -> None: # Send a test message. message = { "type": "stream", "to": "Denmark", "topic": "boat party", } client.call_endpoint( url="messages", method="POST", request=message, ) # {code_example|start} # Mute the topic "boat party" in the channel named "Denmark". request = { "stream": "Denmark", "topic": "boat party", "op": "add", } result = client.mute_topic(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/subscriptions/muted_topics", "patch", "200") # {code_example|start} # Unmute the topic "boat party" in the channel named "Denmark". request = { "stream": "Denmark", "topic": "boat party", "op": "remove", } result = client.mute_topic(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/subscriptions/muted_topics", "patch", "200") @openapi_test_function("/user_topics:post") def update_user_topic(client: Client) -> None: stream_id = client.get_stream_id("Denmark")["stream_id"] # {code_example|start} # Mute the topic "dinner" in a channel, given the channel's ID. request = { "stream_id": stream_id, "topic": "dinner", "visibility_policy": 1, } result = client.call_endpoint( url="user_topics", method="POST", request=request, ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/user_topics", "post", "200") # {code_example|start} # Remove mute from the topic "dinner" in a channel, given the channel's ID. request = { "stream_id": stream_id, "topic": "dinner", "visibility_policy": 0, } result = client.call_endpoint( url="user_topics", method="POST", request=request, ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/user_topics", "post", "200") @openapi_test_function("/users/me/muted_users/{muted_user_id}:post") def add_user_mute(client: Client) -> None: muted_user_id = 10 ensure_users([muted_user_id], ["hamlet"]) # {code_example|start} # Mute a user, given the user's ID. result = client.call_endpoint(url=f"/users/me/muted_users/{muted_user_id}", method="POST") # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/muted_users/{muted_user_id}", "post", "200") @openapi_test_function("/users/me/muted_users/{muted_user_id}:delete") def remove_user_mute(client: Client) -> None: muted_user_id = 10 ensure_users([muted_user_id], ["hamlet"]) # {code_example|start} # Unmute a user, given the user's ID. result = client.call_endpoint(url=f"/users/me/muted_users/{muted_user_id}", method="DELETE") # {code_example|end} assert_success_response(result) validate_against_openapi_schema( result, "/users/me/muted_users/{muted_user_id}", "delete", "200" ) @openapi_test_function("/mark_all_as_read:post") def mark_all_as_read(client: Client) -> None: # {code_example|start} # Mark all of the user's unread messages as read. result = client.mark_all_as_read() # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/mark_all_as_read", "post", "200") @openapi_test_function("/mark_stream_as_read:post") def mark_stream_as_read(client: Client) -> None: stream_id = client.get_subscriptions()["subscriptions"][0]["stream_id"] # {code_example|start} # Mark the unread messages in a channel as read, given the channel's ID. result = client.mark_stream_as_read(stream_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/mark_stream_as_read", "post", "200") @openapi_test_function("/mark_topic_as_read:post") def mark_topic_as_read(client: Client) -> None: stream_id = client.get_subscriptions()["subscriptions"][0]["stream_id"] topic_name = client.get_stream_topics(stream_id)["topics"][0]["name"] # {code_example|start} # Mark unread messages in a given topic/channel as read. result = client.mark_topic_as_read(stream_id, topic_name) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/mark_stream_as_read", "post", "200") @openapi_test_function("/users/me/subscriptions/properties:post") def update_subscription_settings(client: Client) -> None: subscriptions = client.get_subscriptions()["subscriptions"] assert len(subscriptions) >= 2 stream_a_id = subscriptions[0]["stream_id"] stream_b_id = subscriptions[1]["stream_id"] # {code_example|start} # Update the user's subscription of the channel with ID `stream_a_id` # so that it's pinned to the top of the user's channel list, and in # the channel with ID `stream_b_id` so that it has the hex color "f00". request = [ { "stream_id": stream_a_id, "property": "pin_to_top", "value": True, }, { "stream_id": stream_b_id, "property": "color", "value": "#f00f00", }, ] result = client.update_subscription_settings(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/subscriptions/properties", "POST", "200") @openapi_test_function("/messages/render:post") def render_message(client: Client) -> None: # {code_example|start} # Render a message. request = { "content": "**foo**", } result = client.render_message(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages/render", "post", "200") @openapi_test_function("/messages:get") def get_messages(client: Client) -> None: # {code_example|start} # Get the 100 last messages sent by "iago@zulip.com" to # the channel named "Verona". request: dict[str, Any] = { "anchor": "newest", "num_before": 100, "num_after": 0, "narrow": [ {"operator": "sender", "operand": "iago@zulip.com"}, {"operator": "channel", "operand": "Verona"}, ], } result = client.get_messages(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages", "get", "200") assert len(result["messages"]) <= request["num_before"] @openapi_test_function("/messages/matches_narrow:get") def check_messages_match_narrow(client: Client) -> None: message = {"type": "stream", "to": "Verona", "topic": "test_topic", "content": "http://foo.com"} msg_ids = [] response = client.send_message(message) msg_ids.append(response["id"]) message["content"] = "no link here" response = client.send_message(message) msg_ids.append(response["id"]) # {code_example|start} # Check which messages, given the message IDs, match a narrow. request = { "msg_ids": msg_ids, "narrow": [{"operator": "has", "operand": "link"}], } result = client.call_endpoint(url="messages/matches_narrow", method="GET", request=request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages/matches_narrow", "get", "200") @openapi_test_function("/messages/{message_id}:get") def get_raw_message(client: Client, message_id: int) -> None: # {code_example|start} # Get the raw content of a message given the message's ID. result = client.get_raw_message(message_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages/{message_id}", "get", "200") @openapi_test_function("/attachments:get") def get_attachments(client: Client) -> int: # {code_example|start} # Get your attachments. result = client.get_attachments() # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/attachments", "get", "200") return result["attachments"][0]["id"] @openapi_test_function("/attachments/{attachment_id}:delete") def remove_attachment(client: Client, attachment_id: int) -> None: # {code_example|start} # Delete the attachment given the attachment's ID. url = "attachments/" + str(attachment_id) result = client.call_endpoint( url=url, method="DELETE", ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/attachments/{attachment_id}", "delete", "200") @openapi_test_function("/messages:post") def send_message(client: Client) -> int: request: dict[str, Any] = {} # {code_example|start} # Send a channel message. request = { "type": "stream", "to": "Denmark", "topic": "Castle", "content": "I come not, friends, to steal away your hearts.", } result = client.send_message(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages", "post", "200") # Confirm the message was actually sent. message_id = result["id"] validate_message(client, message_id, request["content"]) user_id = 10 ensure_users([user_id], ["hamlet"]) # {code_example|start} # Send a direct message. request = { "type": "private", "to": [user_id], "content": "With mirth and laughter let old wrinkles come.", } result = client.send_message(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages", "post", "200") # Confirm the message was actually sent. message_id = result["id"] validate_message(client, message_id, request["content"]) return message_id @openapi_test_function("/messages/{message_id}/reactions:post") def add_reaction(client: Client, message_id: int) -> None: request: dict[str, Any] = {} # {code_example|start} # Add an emoji reaction. request = { "message_id": message_id, "emoji_name": "octopus", } result = client.add_reaction(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages/{message_id}/reactions", "post", "200") @openapi_test_function("/messages/{message_id}/reactions:delete") def remove_reaction(client: Client, message_id: int) -> None: request: dict[str, Any] = {} # {code_example|start} # Remove an emoji reaction. request = { "message_id": message_id, "emoji_name": "octopus", } result = client.remove_reaction(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages/{message_id}/reactions", "delete", "200") @openapi_test_function("/messages/{message_id}/read_receipts:get") def get_read_receipts(client: Client, message_id: int) -> None: # {code_example|start} # Get read receipts for a message, given the message's ID. result = client.call_endpoint(f"/messages/{message_id}/read_receipts", method="GET") # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages/{message_id}/read_receipts", "get", "200") def test_nonexistent_stream_error(client: Client) -> None: request = { "type": "stream", "to": "nonexistent-channel", "topic": "Castle", "content": "I come not, friends, to steal away your hearts.", } result = client.send_message(request) assert_error_response(result, code="STREAM_DOES_NOT_EXIST") validate_against_openapi_schema(result, "/messages", "post", "400") def test_private_message_invalid_recipient(client: Client) -> None: request = { "type": "private", "to": "eeshan@zulip.com", "content": "With mirth and laughter let old wrinkles come.", } result = client.send_message(request) assert_error_response(result) validate_against_openapi_schema(result, "/messages", "post", "400") @openapi_test_function("/messages/{message_id}:patch") def update_message(client: Client, message_id: int) -> None: # {code_example|start} # Edit a message. Make sure that `message_id` is set to the ID of the # message you wish to update. request = { "message_id": message_id, "content": "New content", } result = client.update_message(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages/{message_id}", "patch", "200") # Confirm the message was actually updated. validate_message(client, message_id, request["content"]) def test_update_message_edit_permission_error(client: Client, nonadmin_client: Client) -> None: request = { "type": "stream", "to": "Denmark", "topic": "Castle", "content": "I come not, friends, to steal away your hearts.", } result = client.send_message(request) request = { "message_id": result["id"], "content": "New content", } result = nonadmin_client.update_message(request) assert_error_response(result) validate_against_openapi_schema(result, "/messages/{message_id}", "patch", "400") @openapi_test_function("/messages/{message_id}:delete") def delete_message(client: Client, message_id: int) -> None: # {code_example|start} # Delete a message, given the message's ID. result = client.delete_message(message_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages/{message_id}", "delete", "200") def test_delete_message_edit_permission_error(client: Client, nonadmin_client: Client) -> None: request = { "type": "stream", "to": "Denmark", "topic": "Castle", "content": "I come not, friends, to steal away your hearts.", } result = client.send_message(request) result = nonadmin_client.delete_message(result["id"]) assert_error_response(result) validate_against_openapi_schema(result, "/messages/{message_id}", "delete", "400") @openapi_test_function("/messages/{message_id}/history:get") def get_message_history(client: Client, message_id: int) -> None: # {code_example|start} # Get the edit history for a message, given the message's ID. result = client.get_message_history(message_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages/{message_id}/history", "get", "200") @openapi_test_function("/realm/emoji:get") def get_realm_emoji(client: Client) -> None: # {code_example|start} result = client.get_realm_emoji() # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/emoji", "GET", "200") @openapi_test_function("/messages/flags:post") def update_message_flags(client: Client) -> None: # Send a few test messages. request: dict[str, Any] = { "type": "stream", "to": "Denmark", "topic": "Castle", "content": "I come not, friends, to steal away your hearts.", } message_ids = [client.send_message(request)["id"] for i in range(3)] # {code_example|start} # Add the "read" flag to messages, given the messages' IDs. request = { "messages": message_ids, "op": "add", "flag": "read", } result = client.update_message_flags(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages/flags", "post", "200") # {code_example|start} # Remove the "starred" flag from messages, given the messages' IDs. request = { "messages": message_ids, "op": "remove", "flag": "starred", } result = client.update_message_flags(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/messages/flags", "post", "200") def register_queue_all_events(client: Client) -> str: # Register the queue and get all events. # Mainly for verifying schema of /register. result = client.register() assert_success_response(result) validate_against_openapi_schema(result, "/register", "post", "200") return result["queue_id"] @openapi_test_function("/register:post") def register_queue(client: Client) -> str: # {code_example|start} # Register the queue. result = client.register( event_types=["message", "realm_emoji"], ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/register", "post", "200") return result["queue_id"] @openapi_test_function("/events:delete") def deregister_queue(client: Client, queue_id: str) -> None: # {code_example|start} # Delete a queue, where `queue_id` is the ID of the queue # to be removed. result = client.deregister(queue_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/events", "delete", "200") # Test "BAD_EVENT_QUEUE_ID" error. result = client.deregister(queue_id) assert_error_response(result, code="BAD_EVENT_QUEUE_ID") validate_against_openapi_schema(result, "/events", "delete", "400") @openapi_test_function("/events:get") def get_queue(client: Client, queue_id: str) -> None: # {code_example|start} # If you already have a queue registered, and thus have a `queue_id` # on hand, you may use `client.get_events()` and pass in the below # parameters, like so: result = client.get_events(queue_id=queue_id, last_event_id=-1) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/events", "get", "200") @openapi_test_function("/server_settings:get") def get_server_settings(client: Client) -> None: # {code_example|start} # Fetch the settings for this server. result = client.get_server_settings() # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/server_settings", "get", "200") @openapi_test_function("/settings:patch") def update_settings(client: Client) -> None: # {code_example|start} # Enable push notifications even when online and change emoji set. request = { "enable_offline_push_notifications": True, "enable_online_push_notifications": True, "emojiset": "google", } result = client.call_endpoint("/settings", method="PATCH", request=request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/settings", "patch", "200") @openapi_test_function("/user_uploads:post") def upload_file(client: Client) -> None: path_to_file = os.path.join(ZULIP_DIR, "zerver", "tests", "images", "img.jpg") # {code_example|start} # Upload a file. with open(path_to_file, "rb") as fp: result = client.upload_file(fp) # Share the file by including it in a message. client.send_message( { "type": "stream", "to": "Denmark", "topic": "Castle", "content": "Check out [this picture]({}) of my castle!".format(result["uri"]), } ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/user_uploads", "post", "200") @openapi_test_function("/users/me/{stream_id}/topics:get") def get_stream_topics(client: Client, stream_id: int) -> None: # {code_example|start} result = client.get_stream_topics(stream_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/{stream_id}/topics", "get", "200") @openapi_test_function("/users/me/apns_device_token:post") def add_apns_token(client: Client) -> None: # {code_example|start} request = {"token": "apple-tokenbb", "appid": "org.zulip.Zulip"} result = client.call_endpoint(url="/users/me/apns_device_token", method="POST", request=request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/apns_device_token", "post", "200") @openapi_test_function("/users/me/apns_device_token:delete") def remove_apns_token(client: Client) -> None: # {code_example|start} request = { "token": "apple-tokenbb", } result = client.call_endpoint( url="/users/me/apns_device_token", method="DELETE", request=request ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/apns_device_token", "delete", "200") @openapi_test_function("/users/me/android_gcm_reg_id:post") def add_fcm_token(client: Client) -> None: # {code_example|start} request = {"token": "android-token"} result = client.call_endpoint( url="/users/me/android_gcm_reg_id", method="POST", request=request ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/android_gcm_reg_id", "post", "200") @openapi_test_function("/users/me/android_gcm_reg_id:delete") def remove_fcm_token(client: Client) -> None: # {code_example|start} request = { "token": "android-token", } result = client.call_endpoint( url="/users/me/android_gcm_reg_id", method="DELETE", request=request ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/android_gcm_reg_id", "delete", "200") @openapi_test_function("/typing:post") def set_typing_status(client: Client) -> None: ensure_users([10, 11], ["hamlet", "iago"]) user_a_id = 10 user_b_id = 11 # {code_example|start} # The user has started typing in the group direct message # with two users, "user_a" and "user_b". request = { "op": "start", "to": [user_a_id, user_b_id], } result = client.set_typing_status(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/typing", "post", "200") # {code_example|start} # The user has finished typing in the group direct message # with "user_a" and "user_b". request = { "op": "stop", "to": [user_a_id, user_b_id], } result = client.set_typing_status(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/typing", "post", "200") stream_id = client.get_stream_id("Denmark")["stream_id"] topic = "typing status" # {code_example|start} # The user has started typing in a topic/channel. request = { "type": "stream", "op": "start", "stream_id": stream_id, "topic": topic, } result = client.set_typing_status(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/typing", "post", "200") # {code_example|start} # The user has finished typing in a topic/channel. request = { "type": "stream", "op": "stop", "stream_id": stream_id, "topic": topic, } result = client.set_typing_status(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/typing", "post", "200") @openapi_test_function("/realm/emoji/{emoji_name}:post") def upload_custom_emoji(client: Client) -> None: emoji_path = os.path.join(ZULIP_DIR, "zerver", "tests", "images", "img.jpg") # {code_example|start} # Upload a custom emoji; assume `emoji_path` is the path to your image. with open(emoji_path, "rb") as fp: emoji_name = "my_custom_emoji" result = client.call_endpoint( f"realm/emoji/{emoji_name}", method="POST", files=[fp], ) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/emoji/{emoji_name}", "post", "200") @openapi_test_function("/realm/emoji/{emoji_name}:delete") def delete_custom_emoji(client: Client) -> None: emoji_name = "my_custom_emoji" # {code_example|start} # Delete a custom emoji. result = client.call_endpoint(f"realm/emoji/{emoji_name}", method="DELETE") # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/realm/emoji/{emoji_name}", "delete", "200") @openapi_test_function("/users/me/alert_words:get") def get_alert_words(client: Client) -> None: # {code_example|start} # Get all of the user's configured alert words. result = client.get_alert_words() # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/alert_words", "get", "200") @openapi_test_function("/users/me/alert_words:post") def add_alert_words(client: Client) -> None: words = ["foo", "bar"] # {code_example|start} # Add words (or phrases) to the user's set of configured alert words. result = client.add_alert_words(words) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/alert_words", "post", "200") @openapi_test_function("/users/me/alert_words:delete") def remove_alert_words(client: Client) -> None: words = client.get_alert_words()["alert_words"] assert len(words) > 0 # {code_example|start} # Remove words (or phrases) from the user's set of configured alert words. result = client.remove_alert_words(words) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/users/me/alert_words", "delete", "200") @openapi_test_function("/user_groups/create:post") def create_user_group(client: Client) -> None: user_ids = [6, 7, 8, 10] ensure_users(user_ids, ["aaron", "zoe", "cordelia", "hamlet"]) # {code_example|start} request = { "name": "marketing", "description": "The marketing team.", "members": user_ids, } result = client.create_user_group(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/user_groups/create", "post", "200") @openapi_test_function("/user_groups/{user_group_id}:patch") def update_user_group(client: Client, user_group_id: int) -> None: # {code_example|start} request = { "group_id": user_group_id, "name": "marketing", "description": "The marketing team.", } result = client.update_user_group(request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/user_groups/{user_group_id}", "patch", "200") @openapi_test_function("/user_groups/{user_group_id}:delete") def remove_user_group(client: Client, user_group_id: int) -> None: # {code_example|start} result = client.remove_user_group(user_group_id) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/user_groups/{user_group_id}", "delete", "200") @openapi_test_function("/user_groups/{user_group_id}/members:post") def update_user_group_members(client: Client, user_group_id: int) -> None: ensure_users([8, 10, 11], ["cordelia", "hamlet", "iago"]) user_ids_to_add = [11] user_ids_to_remove = [8, 10] # {code_example|start} request = { "delete": user_ids_to_remove, "add": user_ids_to_add, } result = client.update_user_group_members(user_group_id, request) # {code_example|end} assert_success_response(result) validate_against_openapi_schema(result, "/user_groups/{group_id}/members", "post", "200") def test_invalid_api_key(client_with_invalid_key: Client) -> None: result = client_with_invalid_key.get_subscriptions() assert_error_response(result, code="UNAUTHORIZED") validate_against_openapi_schema(result, "/rest-error-handling", "post", "400") def test_missing_request_argument(client: Client) -> None: result = client.render_message({}) assert_error_response(result, code="REQUEST_VARIABLE_MISSING") validate_against_openapi_schema(result, "/rest-error-handling", "post", "400") def test_user_account_deactivated(client: Client) -> None: request = { "content": "**foo**", } result = client.render_message(request) validate_against_openapi_schema(result, "/rest-error-handling", "post", "403") def test_realm_deactivated(client: Client) -> None: request = { "content": "**foo**", } result = client.render_message(request) validate_against_openapi_schema(result, "/rest-error-handling", "post", "403") def test_invalid_stream_error(client: Client) -> None: result = client.get_stream_id("nonexistent") assert_error_response(result) validate_against_openapi_schema(result, "/get_stream_id", "get", "400") # SETUP METHODS FOLLOW def test_messages(client: Client, nonadmin_client: Client) -> None: render_message(client) message_id = send_message(client) add_reaction(client, message_id) remove_reaction(client, message_id) update_message(client, message_id) get_raw_message(client, message_id) get_messages(client) check_messages_match_narrow(client) get_message_history(client, message_id) get_read_receipts(client, message_id) delete_message(client, message_id) mark_all_as_read(client) mark_stream_as_read(client) mark_topic_as_read(client) update_message_flags(client) test_nonexistent_stream_error(client) test_private_message_invalid_recipient(client) test_update_message_edit_permission_error(client, nonadmin_client) test_delete_message_edit_permission_error(client, nonadmin_client) def test_users(client: Client, owner_client: Client) -> None: create_user(client) get_members(client) get_single_user(client) deactivate_user(client) reactivate_user(client) update_user(client) update_status(client) get_user_status(client) get_user_by_email(client) get_subscription_status(client) get_profile(client) update_settings(client) upload_file(client) attachment_id = get_attachments(client) remove_attachment(client, attachment_id) set_typing_status(client) update_presence(client) get_user_presence(client) get_presence(client) create_user_group(client) user_group_id = get_user_groups(client) update_user_group(client, user_group_id) update_user_group_members(client, user_group_id) remove_user_group(client, user_group_id) get_alert_words(client) add_alert_words(client) remove_alert_words(client) deactivate_own_user(client, owner_client) add_user_mute(client) remove_user_mute(client) get_alert_words(client) add_alert_words(client) remove_alert_words(client) add_apns_token(client) remove_apns_token(client) add_fcm_token(client) remove_fcm_token(client) def test_streams(client: Client, nonadmin_client: Client) -> None: add_subscriptions(client) test_add_subscriptions_already_subscribed(client) get_subscriptions(client) stream_id = get_stream_id(client) update_stream(client, stream_id) get_streams(client) get_subscribers(client) remove_subscriptions(client) toggle_mute_topic(client) update_user_topic(client) update_subscription_settings(client) get_stream_topics(client, 1) delete_topic(client, 1, "test") archive_stream(client) add_default_stream(client) remove_default_stream(client) test_user_not_authorized_error(nonadmin_client) test_authorization_errors_fatal(client, nonadmin_client) def test_queues(client: Client) -> None: # Note that the example for api/get-events is not tested here. # # Since, methods such as client.get_events() or client.call_on_each_message # are blocking calls and since the event queue backend is already # thoroughly tested in zerver/tests/test_event_queue.py, it is not worth # the effort to come up with asynchronous logic for testing those here. # # We do validate endpoint example responses in zerver/tests/test_openapi.py, # as well as the example events returned by api/get-events. queue_id = register_queue(client) get_queue(client, queue_id) deregister_queue(client, queue_id) register_queue_all_events(client) def test_server_organizations(client: Client) -> None: get_realm_linkifiers(client) filter_id = add_realm_filter(client) update_realm_filter(client, filter_id) add_realm_playground(client) get_server_settings(client) reorder_realm_linkifiers(client) remove_realm_filter(client, filter_id) remove_realm_playground(client) get_realm_emoji(client) upload_custom_emoji(client) delete_custom_emoji(client) get_realm_profile_fields(client) reorder_realm_profile_fields(client) create_realm_profile_field(client) def test_errors(client: Client) -> None: test_missing_request_argument(client) test_invalid_stream_error(client) def test_invitations(client: Client) -> None: send_invitations(client) revoke_email_invitation(client) create_reusable_invitation_link(client) revoke_reusable_invitation_link(client) get_invitations(client) resend_email_invitation(client) def test_the_api(client: Client, nonadmin_client: Client, owner_client: Client) -> None: get_user_agent(client) test_users(client, owner_client) test_streams(client, nonadmin_client) test_messages(client, nonadmin_client) test_queues(client) test_server_organizations(client) test_errors(client) test_invitations(client) sys.stdout.flush() if REGISTERED_TEST_FUNCTIONS != CALLED_TEST_FUNCTIONS: print("Error! Some @openapi_test_function tests were never called:") print(" ", REGISTERED_TEST_FUNCTIONS - CALLED_TEST_FUNCTIONS) sys.exit(1)