diff --git a/static/images/integrations/airbyte/001.png b/static/images/integrations/airbyte/001.png new file mode 100644 index 0000000000..f90bf6e885 Binary files /dev/null and b/static/images/integrations/airbyte/001.png differ diff --git a/static/images/integrations/bot_avatars/airbyte.png b/static/images/integrations/bot_avatars/airbyte.png new file mode 100644 index 0000000000..1c563d017d Binary files /dev/null and b/static/images/integrations/bot_avatars/airbyte.png differ diff --git a/static/images/integrations/logos/airbyte.svg b/static/images/integrations/logos/airbyte.svg new file mode 100644 index 0000000000..b81818d577 Binary files /dev/null and b/static/images/integrations/logos/airbyte.svg differ diff --git a/zerver/lib/integrations.py b/zerver/lib/integrations.py index 6c94734bb4..a8274eb413 100644 --- a/zerver/lib/integrations.py +++ b/zerver/lib/integrations.py @@ -343,6 +343,7 @@ EMBEDDED_BOTS: list[EmbeddedBotIntegration] = [ WEBHOOK_INTEGRATIONS: list[WebhookIntegration] = [ WebhookIntegration("airbrake", ["monitoring"]), + WebhookIntegration("airbyte", ["monitoring"]), WebhookIntegration( "alertmanager", ["monitoring"], @@ -712,6 +713,7 @@ NO_SCREENSHOT_WEBHOOKS = { DOC_SCREENSHOT_CONFIG: dict[str, list[BaseScreenshotConfig]] = { "airbrake": [ScreenshotConfig("error_message.json")], + "airbyte": [ScreenshotConfig("airbyte_job_payload_success.json")], "alertmanager": [ ScreenshotConfig("alert.json", extra_params={"name": "topic", "desc": "description"}) ], diff --git a/zerver/webhooks/airbyte/__init__.py b/zerver/webhooks/airbyte/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zerver/webhooks/airbyte/doc.md b/zerver/webhooks/airbyte/doc.md new file mode 100644 index 0000000000..488384a035 --- /dev/null +++ b/zerver/webhooks/airbyte/doc.md @@ -0,0 +1,28 @@ +# Zulip Airbyte integration + +Get Zulip notifications from Airbyte. + +{start_tabs} + +1. {!create-channel.md!} + +1. {!create-an-incoming-webhook.md!} + +1. {!generate-webhook-url-basic.md!} + +1. In Airbyte, go to your project settings. Click **Notifications**, + and toggle the **Webhook** button for the notifications you'd like + to receive. + +1. Enter the URL generated above in the **Webhook URL** field. Click the + **Save changes** button at the bottom of the page. + +{end_tabs} + +{!congrats.md!} + +![](/static/images/integrations/airbyte/001.png) + +### Related documentation + +{!webhooks-url-specification.md!} diff --git a/zerver/webhooks/airbyte/fixtures/airbyte_job_payload_failure.json b/zerver/webhooks/airbyte/fixtures/airbyte_job_payload_failure.json new file mode 100644 index 0000000000..bb8a76f9d7 --- /dev/null +++ b/zerver/webhooks/airbyte/fixtures/airbyte_job_payload_failure.json @@ -0,0 +1,83 @@ +{ + "text": "Your connection Google Sheets → Postgres from Google Sheets to Postgres failed\nThis happened with Checking source connection failed - please review this connection's configuration to prevent future syncs from failing\n\nYou can access its logs here: https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720\n\nJob ID: 20441143", + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "Sync completed: " + } + }, + { + "type": "section", + "fields": [ + { + "type": "mrkdwn", + "text": "*Source:*" + }, + { + "type": "mrkdwn", + "text": "" + }, + { + "type": "mrkdwn", + "text": "*Destination:*" + }, + { + "type": "mrkdwn", + "text": "" + }, + { + "type": "mrkdwn", + "text": "*Duration:*" + }, + { + "type": "mrkdwn", + "text": "1 min 23 sec" + } + ] + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "*Sync Summary:*\n1400 record(s) extracted / 1400 record(s) loaded\n281 kB extracted / 281 kB loaded\n" + } + } + ], + "data": { + "workspace": { + "id": "84d2dd6e-82aa-406e-91f3-bf8dbf176e69", + "name": "Zulip Airbyte Integration", + "url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69" + }, + "connection": { + "id": "aa941643-07ea-48a2-9035-024575491720", + "name": "Google Sheets → Postgres", + "url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720" + }, + "source": { + "id": "363c0ea3-e989-4051-9f54-d41b794d6621", + "name": "Google Sheets", + "url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/source/363c0ea3-e989-4051-9f54-d41b794d6621" + }, + "destination": { + "id": "b3a05072-e3c8-435a-8e6e-4a5c601039c6", + "name": "Postgres", + "url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/destination/b3a05072-e3c8-435a-8e6e-4a5c601039c6" + }, + "jobId": 20441143, + "startedAt": "2024-10-22T20:27:59Z", + "finishedAt": "2024-10-22T20:29:22Z", + "bytesEmitted": 0, + "bytesCommitted": 0, + "recordsEmitted": 0, + "recordsCommitted": 0, + "errorMessage": "Checking source connection failed - please review this connection's configuration to prevent future syncs from failing", + "durationFormatted": "28 sec", + "bytesEmittedFormatted": "0 B", + "bytesCommittedFormatted": "0 B", + "success": false, + "durationInSeconds": 28 + } +} diff --git a/zerver/webhooks/airbyte/fixtures/airbyte_job_payload_success.json b/zerver/webhooks/airbyte/fixtures/airbyte_job_payload_success.json new file mode 100644 index 0000000000..d573f8b63d --- /dev/null +++ b/zerver/webhooks/airbyte/fixtures/airbyte_job_payload_success.json @@ -0,0 +1,83 @@ +{ + "text": "Your connection Google Sheets → Postgres from Google Sheets to Postgres succeeded\nThis was for null\n\nYou can access its logs here: https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720\n\nJob ID: 20441143", + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "Sync completed: " + } + }, + { + "type": "section", + "fields": [ + { + "type": "mrkdwn", + "text": "*Source:*" + }, + { + "type": "mrkdwn", + "text": "" + }, + { + "type": "mrkdwn", + "text": "*Destination:*" + }, + { + "type": "mrkdwn", + "text": "" + }, + { + "type": "mrkdwn", + "text": "*Duration:*" + }, + { + "type": "mrkdwn", + "text": "1 min 23 sec" + } + ] + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "*Sync Summary:*\n1400 record(s) extracted / 1400 record(s) loaded\n281 kB extracted / 281 kB loaded\n" + } + } + ], + "data": { + "workspace": { + "id": "84d2dd6e-82aa-406e-91f3-bf8dbf176e69", + "name": "Zulip Airbyte Integration", + "url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69" + }, + "connection": { + "id": "aa941643-07ea-48a2-9035-024575491720", + "name": "Google Sheets → Postgres", + "url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720" + }, + "source": { + "id": "363c0ea3-e989-4051-9f54-d41b794d6621", + "name": "Google Sheets", + "url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/source/363c0ea3-e989-4051-9f54-d41b794d6621" + }, + "destination": { + "id": "b3a05072-e3c8-435a-8e6e-4a5c601039c6", + "name": "Postgres", + "url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/destination/b3a05072-e3c8-435a-8e6e-4a5c601039c6" + }, + "jobId": 20441143, + "startedAt": "2024-10-22T20:27:59Z", + "finishedAt": "2024-10-22T20:29:22Z", + "bytesEmitted": 288179, + "bytesCommitted": 288179, + "recordsEmitted": 1400, + "recordsCommitted": 1400, + "errorMessage": null, + "durationFormatted": "1 min 23 sec", + "bytesEmittedFormatted": "281 kB", + "bytesCommittedFormatted": "281 kB", + "success": true, + "durationInSeconds": 83 + } +} diff --git a/zerver/webhooks/airbyte/fixtures/test_airbyte_job_hello_world_failure.json b/zerver/webhooks/airbyte/fixtures/test_airbyte_job_hello_world_failure.json new file mode 100644 index 0000000000..5df5ad4aed --- /dev/null +++ b/zerver/webhooks/airbyte/fixtures/test_airbyte_job_hello_world_failure.json @@ -0,0 +1,3 @@ +{ + "text": "Hello World! This is a test from Airbyte to try slack notification settings for sync failures." +} diff --git a/zerver/webhooks/airbyte/fixtures/test_airbyte_job_hello_world_success.json b/zerver/webhooks/airbyte/fixtures/test_airbyte_job_hello_world_success.json new file mode 100644 index 0000000000..1323fd41c0 --- /dev/null +++ b/zerver/webhooks/airbyte/fixtures/test_airbyte_job_hello_world_success.json @@ -0,0 +1,4 @@ +{ + "text": "Hello World! This is a test from Airbyte to try slack notification settings for sync successes." +} + diff --git a/zerver/webhooks/airbyte/tests.py b/zerver/webhooks/airbyte/tests.py new file mode 100644 index 0000000000..1a997bc292 --- /dev/null +++ b/zerver/webhooks/airbyte/tests.py @@ -0,0 +1,70 @@ +from zerver.lib.test_classes import WebhookTestCase + + +class AirbyteHookTests(WebhookTestCase): + STREAM_NAME = "airbyte" + URL_TEMPLATE = "/api/v1/external/airbyte?api_key={api_key}&stream={stream}" + FIXTURE_DIR_NAME = "airbyte" + CHANNEL_NAME = "test" + WEBHOOK_DIR_NAME = "airbyte" + + def test_airbyte_job_success(self) -> None: + expected_topic = "Zulip Airbyte Integration - Google Sheets → Postgres" + + expected_message = """:green_circle: Airbyte sync **succeeded** for [Google Sheets → Postgres](https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720). + + +* **Source:** [Google Sheets](https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/source/363c0ea3-e989-4051-9f54-d41b794d6621) +* **Destination:** [Postgres](https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/destination/b3a05072-e3c8-435a-8e6e-4a5c601039c6) +* **Records:** 1400 emitted, 1400 committed +* **Bytes:** 281 kB emitted, 281 kB committed +* **Duration:** 1 min 23 sec""" + + self.check_webhook( + "airbyte_job_payload_success", + expected_topic, + expected_message, + content_type="application/json", + ) + + def test_airbyte_job_failure(self) -> None: + expected_topic = "Zulip Airbyte Integration - Google Sheets → Postgres" + expected_message = """:red_circle: Airbyte sync **failed** for [Google Sheets → Postgres](https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720). + + +* **Source:** [Google Sheets](https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/source/363c0ea3-e989-4051-9f54-d41b794d6621) +* **Destination:** [Postgres](https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/destination/b3a05072-e3c8-435a-8e6e-4a5c601039c6) +* **Records:** 0 emitted, 0 committed +* **Bytes:** 0 B emitted, 0 B committed +* **Duration:** 28 sec + +**Error message:** Checking source connection failed - please review this connection's configuration to prevent future syncs from failing""" + + self.check_webhook( + "airbyte_job_payload_failure", + expected_topic, + expected_message, + content_type="application/json", + ) + + def test_airbyte_job_hello_world_success(self) -> None: + expected_topic = "Airbyte notification" + expected_message = """Hello World! This is a test from Airbyte to try slack notification settings for sync successes.""" + + self.check_webhook( + "test_airbyte_job_hello_world_success", + expected_topic, + expected_message, + content_type="application/json", + ) + + def test_airbyte_job_hello_world_failure(self) -> None: + expected_topic = "Airbyte notification" + expected_message = """Hello World! This is a test from Airbyte to try slack notification settings for sync failures.""" + + self.check_webhook( + "test_airbyte_job_hello_world_failure", + expected_topic, + expected_message, + content_type="application/json", + ) diff --git a/zerver/webhooks/airbyte/view.py b/zerver/webhooks/airbyte/view.py new file mode 100644 index 0000000000..f422d0b144 --- /dev/null +++ b/zerver/webhooks/airbyte/view.py @@ -0,0 +1,97 @@ +# Webhooks for external integrations. + +from django.http import HttpRequest, HttpResponse + +from zerver.decorator import webhook_view +from zerver.lib.response import json_success +from zerver.lib.typed_endpoint import JsonBodyPayload, typed_endpoint +from zerver.lib.validator import WildValue, check_bool, check_int, check_string +from zerver.lib.webhooks.common import check_send_webhook_message +from zerver.models import UserProfile + +AIRBYTE_TOPIC_TEMPLATE = "{workspace} - {connection}" + +AIRBYTE_MESSAGE_TEMPLATE = """\ +{sync_status_emoji} Airbyte sync **{status}** for [{connection_name}]({connection_url}). + + +* **Source:** [{source_name}]({source_url}) +* **Destination:** [{destination_name}]({destination_url}) +* **Records:** {records_emitted} emitted, {records_committed} committed +* **Bytes:** {bytes_emitted} emitted, {bytes_committed} committed +* **Duration:** {duration} +""" + + +def extract_data_from_payload(payload_data: WildValue) -> dict[str, str | int | bool]: + data: dict[str, str | int | bool] = { + "workspace_name": payload_data["workspace"]["name"].tame(check_string), + "connection_name": payload_data["connection"]["name"].tame(check_string), + "source_name": payload_data["source"]["name"].tame(check_string), + "destination_name": payload_data["destination"]["name"].tame(check_string), + "connection_url": payload_data["connection"]["url"].tame(check_string), + "source_url": payload_data["source"]["url"].tame(check_string), + "destination_url": payload_data["destination"]["url"].tame(check_string), + "successful_sync": payload_data["success"].tame(check_bool), + "duration_formatted": payload_data["durationFormatted"].tame(check_string), + "records_emitted": payload_data["recordsEmitted"].tame(check_int), + "records_committed": payload_data["recordsCommitted"].tame(check_int), + "bytes_emitted_formatted": payload_data["bytesEmittedFormatted"].tame(check_string), + "bytes_committed_formatted": payload_data["bytesCommittedFormatted"].tame(check_string), + } + + if not data["successful_sync"]: + data["error_message"] = payload_data["errorMessage"].tame(check_string) + + return data + + +def format_message_from_data(data: dict[str, str | int | bool]) -> str: + content = AIRBYTE_MESSAGE_TEMPLATE.format( + sync_status_emoji=":green_circle:" if data["successful_sync"] else ":red_circle:", + status="succeeded" if data["successful_sync"] else "failed", + connection_name=data["connection_name"], + connection_url=data["connection_url"], + source_name=data["source_name"], + source_url=data["source_url"], + destination_name=data["destination_name"], + destination_url=data["destination_url"], + duration=data["duration_formatted"], + records_emitted=data["records_emitted"], + records_committed=data["records_committed"], + bytes_emitted=data["bytes_emitted_formatted"], + bytes_committed=data["bytes_committed_formatted"], + ) + + if not data["successful_sync"]: + error_message = data["error_message"] + content += f"\n**Error message:** {error_message}" + + return content + + +def create_topic_from_data(data: dict[str, str | int | bool]) -> str: + return AIRBYTE_TOPIC_TEMPLATE.format( + workspace=data["workspace_name"], + connection=data["connection_name"], + ) + + +@webhook_view("Airbyte") +@typed_endpoint +def api_airbyte_webhook( + request: HttpRequest, + user_profile: UserProfile, + *, + payload: JsonBodyPayload[WildValue], +) -> HttpResponse: + if "data" in payload: + data = extract_data_from_payload(payload["data"]) + content = format_message_from_data(data) + topic = create_topic_from_data(data) + else: + # Test Airbyte notification payloads only contain this field. + content = payload["text"].tame(check_string) + topic = "Airbyte notification" + check_send_webhook_message(request, user_profile, topic, content) + return json_success(request)