integrations: Create incoming webhook for Airbyte.

Note about the documentation: There are currently two "Save changes"
buttons on the Airbyte "Notifications" settings page, so the
instructions specify which one to use for clarity.
This commit is contained in:
aniebietafia 2024-10-08 23:02:24 +01:00 committed by Tim Abbott
parent e506c0c612
commit 257ce8bca2
12 changed files with 370 additions and 0 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.2 KiB

View File

@ -343,6 +343,7 @@ EMBEDDED_BOTS: list[EmbeddedBotIntegration] = [
WEBHOOK_INTEGRATIONS: list[WebhookIntegration] = [
WebhookIntegration("airbrake", ["monitoring"]),
WebhookIntegration("airbyte", ["monitoring"]),
WebhookIntegration(
"alertmanager",
["monitoring"],
@ -712,6 +713,7 @@ NO_SCREENSHOT_WEBHOOKS = {
DOC_SCREENSHOT_CONFIG: dict[str, list[BaseScreenshotConfig]] = {
"airbrake": [ScreenshotConfig("error_message.json")],
"airbyte": [ScreenshotConfig("airbyte_job_payload_success.json")],
"alertmanager": [
ScreenshotConfig("alert.json", extra_params={"name": "topic", "desc": "description"})
],

View File

View File

@ -0,0 +1,28 @@
# Zulip Airbyte integration
Get Zulip notifications from Airbyte.
{start_tabs}
1. {!create-channel.md!}
1. {!create-an-incoming-webhook.md!}
1. {!generate-webhook-url-basic.md!}
1. In Airbyte, go to your project settings. Click **Notifications**,
and toggle the **Webhook** button for the notifications you'd like
to receive.
1. Enter the URL generated above in the **Webhook URL** field. Click the
**Save changes** button at the bottom of the page.
{end_tabs}
{!congrats.md!}
![](/static/images/integrations/airbyte/001.png)
### Related documentation
{!webhooks-url-specification.md!}

View File

@ -0,0 +1,83 @@
{
"text": "Your connection Google Sheets → Postgres from Google Sheets to Postgres failed\nThis happened with Checking source connection failed - please review this connection's configuration to prevent future syncs from failing\n\nYou can access its logs here: https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720\n\nJob ID: 20441143",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Sync completed: <https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720|Google Sheets \u2192 Postgres>"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Source:*"
},
{
"type": "mrkdwn",
"text": "<https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/source/363c0ea3-e989-4051-9f54-d41b794d6621|Google Sheets>"
},
{
"type": "mrkdwn",
"text": "*Destination:*"
},
{
"type": "mrkdwn",
"text": "<https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/destination/b3a05072-e3c8-435a-8e6e-4a5c601039c6|Postgres>"
},
{
"type": "mrkdwn",
"text": "*Duration:*"
},
{
"type": "mrkdwn",
"text": "1 min 23 sec"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Sync Summary:*\n1400 record(s) extracted / 1400 record(s) loaded\n281 kB extracted / 281 kB loaded\n"
}
}
],
"data": {
"workspace": {
"id": "84d2dd6e-82aa-406e-91f3-bf8dbf176e69",
"name": "Zulip Airbyte Integration",
"url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69"
},
"connection": {
"id": "aa941643-07ea-48a2-9035-024575491720",
"name": "Google Sheets → Postgres",
"url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720"
},
"source": {
"id": "363c0ea3-e989-4051-9f54-d41b794d6621",
"name": "Google Sheets",
"url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/source/363c0ea3-e989-4051-9f54-d41b794d6621"
},
"destination": {
"id": "b3a05072-e3c8-435a-8e6e-4a5c601039c6",
"name": "Postgres",
"url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/destination/b3a05072-e3c8-435a-8e6e-4a5c601039c6"
},
"jobId": 20441143,
"startedAt": "2024-10-22T20:27:59Z",
"finishedAt": "2024-10-22T20:29:22Z",
"bytesEmitted": 0,
"bytesCommitted": 0,
"recordsEmitted": 0,
"recordsCommitted": 0,
"errorMessage": "Checking source connection failed - please review this connection's configuration to prevent future syncs from failing",
"durationFormatted": "28 sec",
"bytesEmittedFormatted": "0 B",
"bytesCommittedFormatted": "0 B",
"success": false,
"durationInSeconds": 28
}
}

View File

@ -0,0 +1,83 @@
{
"text": "Your connection Google Sheets → Postgres from Google Sheets to Postgres succeeded\nThis was for null\n\nYou can access its logs here: https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720\n\nJob ID: 20441143",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Sync completed: <https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720|Google Sheets \u2192 Postgres>"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Source:*"
},
{
"type": "mrkdwn",
"text": "<https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/source/363c0ea3-e989-4051-9f54-d41b794d6621|Google Sheets>"
},
{
"type": "mrkdwn",
"text": "*Destination:*"
},
{
"type": "mrkdwn",
"text": "<https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/destination/b3a05072-e3c8-435a-8e6e-4a5c601039c6|Postgres>"
},
{
"type": "mrkdwn",
"text": "*Duration:*"
},
{
"type": "mrkdwn",
"text": "1 min 23 sec"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Sync Summary:*\n1400 record(s) extracted / 1400 record(s) loaded\n281 kB extracted / 281 kB loaded\n"
}
}
],
"data": {
"workspace": {
"id": "84d2dd6e-82aa-406e-91f3-bf8dbf176e69",
"name": "Zulip Airbyte Integration",
"url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69"
},
"connection": {
"id": "aa941643-07ea-48a2-9035-024575491720",
"name": "Google Sheets → Postgres",
"url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720"
},
"source": {
"id": "363c0ea3-e989-4051-9f54-d41b794d6621",
"name": "Google Sheets",
"url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/source/363c0ea3-e989-4051-9f54-d41b794d6621"
},
"destination": {
"id": "b3a05072-e3c8-435a-8e6e-4a5c601039c6",
"name": "Postgres",
"url": "https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/destination/b3a05072-e3c8-435a-8e6e-4a5c601039c6"
},
"jobId": 20441143,
"startedAt": "2024-10-22T20:27:59Z",
"finishedAt": "2024-10-22T20:29:22Z",
"bytesEmitted": 288179,
"bytesCommitted": 288179,
"recordsEmitted": 1400,
"recordsCommitted": 1400,
"errorMessage": null,
"durationFormatted": "1 min 23 sec",
"bytesEmittedFormatted": "281 kB",
"bytesCommittedFormatted": "281 kB",
"success": true,
"durationInSeconds": 83
}
}

View File

@ -0,0 +1,3 @@
{
"text": "Hello World! This is a test from Airbyte to try slack notification settings for sync failures."
}

View File

@ -0,0 +1,4 @@
{
"text": "Hello World! This is a test from Airbyte to try slack notification settings for sync successes."
}

View File

@ -0,0 +1,70 @@
from zerver.lib.test_classes import WebhookTestCase
class AirbyteHookTests(WebhookTestCase):
STREAM_NAME = "airbyte"
URL_TEMPLATE = "/api/v1/external/airbyte?api_key={api_key}&stream={stream}"
FIXTURE_DIR_NAME = "airbyte"
CHANNEL_NAME = "test"
WEBHOOK_DIR_NAME = "airbyte"
def test_airbyte_job_success(self) -> None:
expected_topic = "Zulip Airbyte Integration - Google Sheets → Postgres"
expected_message = """:green_circle: Airbyte sync **succeeded** for [Google Sheets → Postgres](https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720).
* **Source:** [Google Sheets](https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/source/363c0ea3-e989-4051-9f54-d41b794d6621)
* **Destination:** [Postgres](https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/destination/b3a05072-e3c8-435a-8e6e-4a5c601039c6)
* **Records:** 1400 emitted, 1400 committed
* **Bytes:** 281 kB emitted, 281 kB committed
* **Duration:** 1 min 23 sec"""
self.check_webhook(
"airbyte_job_payload_success",
expected_topic,
expected_message,
content_type="application/json",
)
def test_airbyte_job_failure(self) -> None:
expected_topic = "Zulip Airbyte Integration - Google Sheets → Postgres"
expected_message = """:red_circle: Airbyte sync **failed** for [Google Sheets → Postgres](https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/connections/aa941643-07ea-48a2-9035-024575491720).
* **Source:** [Google Sheets](https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/source/363c0ea3-e989-4051-9f54-d41b794d6621)
* **Destination:** [Postgres](https://cloud.airbyte.com/workspaces/84d2dd6e-82aa-406e-91f3-bf8dbf176e69/destination/b3a05072-e3c8-435a-8e6e-4a5c601039c6)
* **Records:** 0 emitted, 0 committed
* **Bytes:** 0 B emitted, 0 B committed
* **Duration:** 28 sec
**Error message:** Checking source connection failed - please review this connection's configuration to prevent future syncs from failing"""
self.check_webhook(
"airbyte_job_payload_failure",
expected_topic,
expected_message,
content_type="application/json",
)
def test_airbyte_job_hello_world_success(self) -> None:
expected_topic = "Airbyte notification"
expected_message = """Hello World! This is a test from Airbyte to try slack notification settings for sync successes."""
self.check_webhook(
"test_airbyte_job_hello_world_success",
expected_topic,
expected_message,
content_type="application/json",
)
def test_airbyte_job_hello_world_failure(self) -> None:
expected_topic = "Airbyte notification"
expected_message = """Hello World! This is a test from Airbyte to try slack notification settings for sync failures."""
self.check_webhook(
"test_airbyte_job_hello_world_failure",
expected_topic,
expected_message,
content_type="application/json",
)

View File

@ -0,0 +1,97 @@
# Webhooks for external integrations.
from django.http import HttpRequest, HttpResponse
from zerver.decorator import webhook_view
from zerver.lib.response import json_success
from zerver.lib.typed_endpoint import JsonBodyPayload, typed_endpoint
from zerver.lib.validator import WildValue, check_bool, check_int, check_string
from zerver.lib.webhooks.common import check_send_webhook_message
from zerver.models import UserProfile
AIRBYTE_TOPIC_TEMPLATE = "{workspace} - {connection}"
AIRBYTE_MESSAGE_TEMPLATE = """\
{sync_status_emoji} Airbyte sync **{status}** for [{connection_name}]({connection_url}).
* **Source:** [{source_name}]({source_url})
* **Destination:** [{destination_name}]({destination_url})
* **Records:** {records_emitted} emitted, {records_committed} committed
* **Bytes:** {bytes_emitted} emitted, {bytes_committed} committed
* **Duration:** {duration}
"""
def extract_data_from_payload(payload_data: WildValue) -> dict[str, str | int | bool]:
data: dict[str, str | int | bool] = {
"workspace_name": payload_data["workspace"]["name"].tame(check_string),
"connection_name": payload_data["connection"]["name"].tame(check_string),
"source_name": payload_data["source"]["name"].tame(check_string),
"destination_name": payload_data["destination"]["name"].tame(check_string),
"connection_url": payload_data["connection"]["url"].tame(check_string),
"source_url": payload_data["source"]["url"].tame(check_string),
"destination_url": payload_data["destination"]["url"].tame(check_string),
"successful_sync": payload_data["success"].tame(check_bool),
"duration_formatted": payload_data["durationFormatted"].tame(check_string),
"records_emitted": payload_data["recordsEmitted"].tame(check_int),
"records_committed": payload_data["recordsCommitted"].tame(check_int),
"bytes_emitted_formatted": payload_data["bytesEmittedFormatted"].tame(check_string),
"bytes_committed_formatted": payload_data["bytesCommittedFormatted"].tame(check_string),
}
if not data["successful_sync"]:
data["error_message"] = payload_data["errorMessage"].tame(check_string)
return data
def format_message_from_data(data: dict[str, str | int | bool]) -> str:
content = AIRBYTE_MESSAGE_TEMPLATE.format(
sync_status_emoji=":green_circle:" if data["successful_sync"] else ":red_circle:",
status="succeeded" if data["successful_sync"] else "failed",
connection_name=data["connection_name"],
connection_url=data["connection_url"],
source_name=data["source_name"],
source_url=data["source_url"],
destination_name=data["destination_name"],
destination_url=data["destination_url"],
duration=data["duration_formatted"],
records_emitted=data["records_emitted"],
records_committed=data["records_committed"],
bytes_emitted=data["bytes_emitted_formatted"],
bytes_committed=data["bytes_committed_formatted"],
)
if not data["successful_sync"]:
error_message = data["error_message"]
content += f"\n**Error message:** {error_message}"
return content
def create_topic_from_data(data: dict[str, str | int | bool]) -> str:
return AIRBYTE_TOPIC_TEMPLATE.format(
workspace=data["workspace_name"],
connection=data["connection_name"],
)
@webhook_view("Airbyte")
@typed_endpoint
def api_airbyte_webhook(
request: HttpRequest,
user_profile: UserProfile,
*,
payload: JsonBodyPayload[WildValue],
) -> HttpResponse:
if "data" in payload:
data = extract_data_from_payload(payload["data"])
content = format_message_from_data(data)
topic = create_topic_from_data(data)
else:
# Test Airbyte notification payloads only contain this field.
content = payload["text"].tame(check_string)
topic = "Airbyte notification"
check_send_webhook_message(request, user_profile, topic, content)
return json_success(request)