commit bc71f913f828a94ce4e4f205eb8efe4952333013 Author: Andrey Sokolovskiy Date: Thu Jun 13 10:17:32 2024 +0700 test new telegram connector diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..d310632 --- /dev/null +++ b/__init__.py @@ -0,0 +1,472 @@ +"""A connector for Telegram.""" +import json +import logging +import secrets + +import aiohttp +import emoji +from voluptuous import Required + +from opsdroid.connector import Connector, register_event +from opsdroid.events import ( + EditedMessage, + File, + Image, + JoinGroup, + LeaveGroup, + Message, + PinMessage, + Reply, +) + +from . import events as telegram_events + +_LOGGER = logging.getLogger(__name__) + +CONFIG_SCHEMA = { + Required("token"): str, + "whitelisted-users": list, + "bot-name": str, + "reply-unauthorized": bool, + "parse_mode": str, +} + + +class ConnectorTelegram(Connector): + """A connector for the chat service Telegram.""" + + def __init__(self, config, opsdroid=None): + """Create the connector. + + Args: + config (dict): configuration settings from the + file config.yaml. + opsdroid (OpsDroid): An instance of opsdroid.core. + + """ + _LOGGER.debug(_("Loaded Telegram Connector")) + super().__init__(config, opsdroid=opsdroid) + self.name = config.get("name", "telegram") + self.bot_name = config.get("bot-name", "opsdroid") + self.opsdroid = opsdroid + self.whitelisted_users = config.get("whitelisted-users", None) + self.webhook_secret = secrets.token_urlsafe(32) + self.webhook_endpoint = f"/connector/{self.name}/{self.webhook_secret}" + self.token = config["token"] + self.parse_mode = config.get("parse_mode","MarkdownV2") + try: + self.base_url = opsdroid.config["web"]["base-url"] + except KeyError: + self.base_url = None + _LOGGER.warning( + _( + "Breaking changes introduced in 0.20.0 - you must expose opsdroid to the web and add 'base-url' to the 'web' section of your configuration. Read more on the docs: https://docs.opsdroid.dev/en/stable/connectors/telegram.html" + ) + ) + + @staticmethod + def get_user(response, bot_name): + """Get user from response. + + The API response is different depending on how + the bot is set up and where the message is coming + from. + + Since Telegram sends different payloads, depending of where the message is + being sent from, this method tries to handle all the cases. + + If the message came from a channel, we use either the ``author_signature`` + or the bot name for the user and use the ``message_id`` for the ``user_id``, + this is because channel posts don't contain users. + + Similarly, if a message was posted on a channel, Telegram will forward it to + a group - if it was created from the channel. So we handle the case where there + is a ``forward_signature`` in the payload otherwise we use the bot name. + + Args: + response (dict): Response returned by aiohttp.ClientSession. + bot_name (str): Name of the bot used in opsdroid configuration. + + Return: + string, string: Extracted username and user id + + """ + user = None + user_id = None + + channel_post = response.get("channel_post") + message = response.get("message") + + if channel_post: + user = channel_post.get("author_signature", bot_name) + user_id = channel_post.get("message_id", 0) + + if message: + from_user = message.get("from") + user_id = from_user.get("id") + + if message.get("forward_from_chat"): + user = message.get("forward_signature", bot_name) + + return user, user_id + + if from_user: + if "username" in from_user: + user = from_user.get("username") + + elif "first_name" in from_user: + user = from_user.get("first_name") + + return user, user_id + + def handle_user_permission(self, response, user, user_id): + """Handle user permissions. + + This will check if the user that tried to talk with + the bot is allowed to do so. It will also work with + userid to improve security. + + """ + if ( + not self.whitelisted_users + or user in self.whitelisted_users + or user_id in self.whitelisted_users + ): + return True + + return False + + def build_url(self, method): + """Build the url to connect to the API. + + Args: + method (string): API call end point. + + Return: + String that represents the full API url. + + """ + return "https://api.telegram.org/bot{}/{}".format(self.token, method) + + async def connect(self): + """Create route and subscribe to Telegram webhooks. + + The first thing we do on connect is to set up the route to + receive events from Telegram, we also pass some arguments to + the webhook to get events from messages, edited messages, channel + posts and update id which is basically the event id. + + One thing that is worth mentioning here, is that Telegram doesn't + implement a request authenticity policy, instead they suggest that we + use our token on the webhook route, but using the token on the url doesn't + seem like a good idea, we are instead generating a strong pseudo-random string + using the ``secrets`` library and add that string to our webhook route. + + """ + self.opsdroid.web_server.web_app.router.add_post( + self.webhook_endpoint, self.telegram_webhook_handler + ) + + async with aiohttp.ClientSession() as session: + if self.base_url: + payload = { + "url": f"{self.base_url}{self.webhook_endpoint}", + "allowed_updates": [ + "messages", + "edited_message", + "channel_post", + "edited_channel_post", + "update_id", + ], + } + + response = await session.post( + self.build_url("setWebhook"), params=payload + ) + + if response.status >= 400: + _LOGGER.error( + _("Error when connecting to Telegram Webhook: - %s - %s"), + response.status, + response.text, + ) + + async def telegram_webhook_handler(self, request): + """Handle event from Telegram webhooks. + + This method will try to handle three different kinds of events: + + - Edited messages + - Messages + - Channel posts + + Since the payload is pretty much the same both for channel posts and + messages we are using the same method to handle both of these events. + + We also check the permissions of the user that talked with the bot, if + the user has permissions then the event is parsed, if not we either send + a message saying that the user can't talk with the bot or just keep silent. + + Args: + request (aiohttp.web.Request): Request made to the post route created for webhook subscription. + + Return: + aiohttp.web.Response: Send a ``received`` message and a status 200 back to Telegram. + + """ + payload = await request.json() + user, user_id = self.get_user(payload, self.bot_name) + + if payload.get("edited_message"): + event = EditedMessage( + text=payload["edited_message"]["text"], + target=payload["edited_message"]["chat"]["id"], + user=user, + user_id=user_id, + connector=self, + ) + + if payload.get("message"): + event = await self.handle_messages( + payload["message"], user, user_id, payload["update_id"] + ) + + if payload.get("channel_post"): + event = await self.handle_messages( + payload["channel_post"], user, user_id, payload["update_id"] + ) + + if self.handle_user_permission(payload, user, user_id): + await self.opsdroid.parse(event) + else: + if self.config.get("reply-unauthorized"): + await self.send_message( + Message( + text=f"Sorry, {user} you're not allowed to speak with this bot.", + connector=self, + user=user, + user_id=user_id, + ) + ) + + return aiohttp.web.Response(text=json.dumps("Received"), status=200) + + async def handle_messages(self, message, user, user_id, update_id): + """Handle text messages received from Telegram. + + Here we create our opsdroid events depending of the type of message + that we get from Telegram. + + Unfortunately, telegram doesn't give much information when the message + is an image, video, sticker or documents. It only give us back the file id, + sizes, formats and that's it. Since we can't really use any of this information + to make opsdroid parse the message, we decided to just log a message in debug mode + with the payload and return None. + + Args: + message (dict): The payload received from Telegram + user (string): The name of the user that sent the message + user_id (int): The unique user id from the user that send the message + update_id (int): The unique id for the event sent by Telegram + + Return: + opsdroid.event or None: Will only return none if it's an event we can't parse. + + """ + event = None + + if message.get("new_chat_member"): + event = JoinGroup( + user=user, + user_id=user_id, + event_id=update_id, + target=message["chat"]["id"], + connector=self, + raw_event=message, + ) + + if message.get("left_chat_member"): + event = LeaveGroup( + user=user, + user_id=user_id, + event_id=update_id, + target=message["chat"]["id"], + connector=self, + raw_event=message, + ) + + if message.get("pinned_message"): + event = PinMessage( + user=user, + user_id=user_id, + event_id=update_id, + target=message["chat"]["id"], + connector=self, + raw_event=message, + ) + + if message.get("reply_to_message"): + event = Reply( + text=emoji.demojize(message.get("text", "")), + user=user, + user_id=user_id, + event_id=message["message_id"], + linked_event=message["reply_to_message"]["message_id"], + target=message["chat"]["id"], + connector=self, + raw_event=message, + ) + + if message.get("text"): + event = Message( + text=emoji.demojize(message.get("text", "")), + user=user, + user_id=user_id, + target=message["chat"]["id"], + connector=self, + ) + + if message.get("location"): + event = telegram_events.Location( + user=user, + user_id=user_id, + event_id=update_id, + target=message["chat"]["id"], + location=message["location"], + latitude=message["location"]["latitude"], + longitude=message["location"]["longitude"], + connector=self, + raw_event=message, + ) + + if message.get("poll"): + event = telegram_events.Poll( + user=user, + user_id=user_id, + event_id=update_id, + target=message["chat"]["id"], + poll=message["poll"], + question=message["poll"]["question"], + options=message["poll"]["options"], + total_votes=message["poll"]["total_voter_count"], + connector=self, + raw_event=message, + ) + + if message.get("contact"): + event = telegram_events.Contact( + user=user, + user_id=user_id, + event_id=update_id, + target=message["chat"]["id"], + contact=message["contact"], + phone_number=message["contact"]["phone_number"], + first_name=message["contact"]["first_name"], + connector=self, + raw_event=message, + ) + + if event: + return event + + _LOGGER.debug( + _("Received unparsable event from Telegram. Payload: %s"), message + ) + + async def listen(self): + """Listen method of the connector. + + Since we are using webhooks, we don't need to implement the listen + method. + + """ + + @register_event(Message) + async def send_message(self, message): + """Respond with a message. + + Args: + message (object): An instance of Message. + + """ + _LOGGER.debug( + _("Responding with: '%s' at target: '%s'"), message.text, message.target + ) + + data = dict() + data["chat_id"] = message.target + data["text"] = message.text + data["parse_mode"] = self.parse_mode + + async with aiohttp.ClientSession() as session: + resp = await session.post(self.build_url("sendMessage"), data=data) + if resp.status == 200: + _LOGGER.debug(_("Successfully responded.")) + else: + _LOGGER.error(_("Unable to respond.")) + + @register_event(Image) + async def send_image(self, file_event): + """Send Image to Telegram. + + Gets the chat id from the channel and then + sends the bytes of the image as multipart/form-data. + + """ + data = aiohttp.FormData() + data.add_field( + "chat_id", str(file_event.target["id"]), content_type="multipart/form-data" + ) + data.add_field( + "photo", + await file_event.get_file_bytes(), + content_type="multipart/form-data", + ) + + async with aiohttp.ClientSession() as session: + resp = await session.post(self.build_url("sendPhoto"), data=data) + if resp.status == 200: + _LOGGER.debug(_("Sent %s image successfully."), file_event.name) + else: + _LOGGER.debug(_("Unable to send image - Status Code %s."), resp.status) + + @register_event(File) + async def send_file(self, file_event): + """Send File to Telegram. + + Gets the chat id from the channel and then + sends the bytes of the file as multipart/form-data. + + """ + data = aiohttp.FormData() + data.add_field( + "chat_id", str(file_event.target["id"]), content_type="multipart/form-data" + ) + data.add_field( + "document", + await file_event.get_file_bytes(), + content_type="multipart/form-data", + ) + + async with aiohttp.ClientSession() as session: + resp = await session.post(self.build_url("sendDocument"), data=data) + if resp.status == 200: + _LOGGER.debug(_("Sent %s file successfully."), file_event.name) + else: + _LOGGER.debug(_("Unable to send file - Status Code %s."), resp.status) + + async def disconnect(self): + """Delete active webhook. + + If we terminate opsdroid, we should delete the active webhook, otherwise + Telegram will keep pinging out webhook for a few minutes before giving up. + + """ + _LOGGER.debug(_("Sending deleteWebhook request to Telegram...")) + async with aiohttp.ClientSession() as session: + resp = await session.get(self.build_url("deleteWebhook")) + + if resp.status == 200: + _LOGGER.debug(_("Telegram webhook deleted successfully.")) + else: + _LOGGER.debug(_("Unable to delete webhook...")) diff --git a/events.py b/events.py new file mode 100644 index 0000000..9c82016 --- /dev/null +++ b/events.py @@ -0,0 +1,68 @@ +"""Events for the Telegram Connector.""" +from opsdroid import events + + +class Poll(events.Event): + """Event class that triggers when a poll is sent.""" + + def __init__(self, poll, question, options, total_votes, *args, **kwargs): + """Contain some attributes that you can access. + + - ``poll`` - The extracted poll details from the payload + - ``question`` - The question asked in the poll + - ``options`` - An array containing all options in the poll + - ``total_votes`` - Sum of total votes that the poll received + + Telegram allows you to create polls or quizzes, this type of message also + contains a lot of different things that you can access with the ``poll`` + attribute, such as if the poll is closed, if it allows multiple answers, etc. + + """ + super().__init__(*args, **kwargs) + self.poll = poll + self.question = question + self.options = options + self.total_votes = total_votes + + +class Contact(events.Event): + """Event class that triggers when a contact is sent.""" + + def __init__(self, contact, phone_number, first_name, *args, **kwargs): + """Contain some attributes that you can access. + + - ``contact`` - The extracted contact details from the payload + - ``phone_numer`` - Extracted phone number from contact + - ``first_name`` - Extracted first name from contact + + Your contact event might contain other information such as the + contact last name or a ``vcard`` field, you can use the ``contact`` + attribute to access more information if available. + + """ + super().__init__(*args, **kwargs) + self.contact = contact + self.phone_number = phone_number + self.first_name = first_name + + +class Location(events.Event): + """Event class that triggers when a location message is sent.""" + + def __init__(self, location, latitude, longitude, *args, **kwargs): + """Contain some attributes that you can access. + + - ``location`` - The extracted location details from the payload + - ``latitude`` - Extracted latitude from the payload + - ``longitude`` - Extracted longitude from the payload + + Since Telegram doesn't add any information to the location other than + the latitude and longitude, you can probably just access these attributes, + we decided to include the location attribute in case Telegram adds more + useful things to his message type. + + """ + super().__init__(*args, **kwargs) + self.location = location + self.latitude = latitude + self.longitude = longitude diff --git a/tests/test_connector_telegram.py b/tests/test_connector_telegram.py new file mode 100644 index 0000000..1b51d6b --- /dev/null +++ b/tests/test_connector_telegram.py @@ -0,0 +1,1020 @@ +import logging +import asyncio +import pytest +import asynctest.mock as amock + + +from opsdroid.connector.telegram import ConnectorTelegram + +import opsdroid.connector.telegram.events as telegram_events +import opsdroid.events as opsdroid_events + + +connector_config = { + "token": "test:token", +} + + +def test_init_no_base_url(opsdroid, caplog): + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + caplog.set_level(logging.ERROR) + + assert connector.name == "telegram" + assert connector.token == "test:token" + assert connector.whitelisted_users is None + assert connector.webhook_secret is not None + assert connector.base_url is None + assert "Breaking changes introduced" in caplog.text + + +def test_init(opsdroid): + config = { + "token": "test:token", + "whitelisted-users": ["bob", 1234], + "bot-name": "bot McBotty", + } + + connector = ConnectorTelegram(config, opsdroid=opsdroid) + + opsdroid.config["web"] = {"base-url": "https://test.com"} + + assert connector.name == "telegram" + assert connector.token == "test:token" + assert connector.whitelisted_users == ["bob", 1234] + assert connector.bot_name == "bot McBotty" + assert connector.webhook_secret is not None + + +def test_get_user_from_channel_with_signature(opsdroid): + response = { + "update_id": 639974076, + "channel_post": { + "message_id": 15, + "author_signature": "Fabio Rosado", + "chat": {"id": -1001474700000, "title": "Opsdroid-test", "type": "channel"}, + "date": 1603827365, + "text": "hi", + }, + } + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + user, user_id = connector.get_user(response, "") + + assert user == "Fabio Rosado" + assert user_id == 15 + + +def test_get_user_from_channel_without_signature(opsdroid): + response = { + "update_id": 639974076, + "channel_post": { + "message_id": 16, + "chat": {"id": -1001474700000, "title": "Opsdroid-test", "type": "channel"}, + "date": 1603827365, + "text": "hi", + }, + } + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + user, user_id = connector.get_user(response, "Opsdroid!") + + assert user == "Opsdroid!" + assert user_id == 16 + + +def test_get_user_from_forwarded_message(opsdroid): + response = { + "update_id": 639974077, + "message": { + "message_id": 31, + "from": {"id": 100000, "is_bot": False, "first_name": "Telegram"}, + "chat": { + "id": -10014170000, + "title": "Opsdroid-test Chat", + "type": "supergroup", + }, + "date": 1603827368, + "forward_from_chat": { + "id": -10014740000, + "title": "Opsdroid-test", + "type": "channel", + }, + "forward_from_message_id": 15, + "forward_signature": "Fabio Rosado", + "forward_date": 1603827365, + "text": "hi", + }, + } + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + user, user_id = connector.get_user(response, "Opsdroid!") + + assert user == "Fabio Rosado" + assert user_id == 100000 + + +def test_get_user_from_first_name(opsdroid): + response = { + "update_id": 639974077, + "message": { + "message_id": 31, + "from": {"id": 100000, "is_bot": False, "first_name": "Fabio"}, + "chat": { + "id": -10014170000, + "title": "Opsdroid-test Chat", + "type": "supergroup", + }, + "date": 1603827368, + "text": "hi", + }, + } + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + user, user_id = connector.get_user(response, "") + + assert user == "Fabio" + assert user_id == 100000 + + +def test_get_user_from_username(opsdroid): + response = { + "update_id": 639974077, + "message": { + "message_id": 31, + "from": {"id": 100000, "is_bot": False, "username": "FabioRosado"}, + "chat": { + "id": -10014170000, + "title": "Opsdroid-test Chat", + "type": "supergroup", + }, + "date": 1603827368, + "text": "hi", + }, + } + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + user, user_id = connector.get_user(response, "") + + assert user == "FabioRosado" + assert user_id == 100000 + + +def test_handle_user_permission(opsdroid): + response = { + "update_id": 639974077, + "message": { + "message_id": 31, + "from": {"id": 100000, "is_bot": False, "username": "FabioRosado"}, + "chat": { + "id": -10014170000, + "title": "Opsdroid-test Chat", + "type": "supergroup", + }, + "date": 1603827368, + "text": "hi", + }, + } + + connector_config["whitelisted-users"] = ["FabioRosado"] + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + permission = connector.handle_user_permission(response, "FabioRosado", 100000) + + assert permission is True + + +def test_handle_user_id_permission(opsdroid): + response = { + "update_id": 639974077, + "message": { + "message_id": 31, + "from": {"id": 100000, "is_bot": False, "username": "FabioRosado"}, + "chat": { + "id": -10014170000, + "title": "Opsdroid-test Chat", + "type": "supergroup", + }, + "date": 1603827368, + "text": "hi", + }, + } + + connector_config["whitelisted-users"] = [100000] + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + permission = connector.handle_user_permission(response, "FabioRosado", 100000) + + assert permission is True + + +def test_handle_user_no_permission(opsdroid): + response = { + "update_id": 639974077, + "message": { + "message_id": 31, + "from": {"id": 100000, "is_bot": False, "username": "FabioRosado"}, + "chat": { + "id": -10014170000, + "title": "Opsdroid-test Chat", + "type": "supergroup", + }, + "date": 1603827368, + "text": "hi", + }, + } + + connector_config["whitelisted-users"] = [1, "AllowedUser"] + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + permission = connector.handle_user_permission(response, "FabioRosado", 100000) + + assert permission is False + + +def test_build_url(opsdroid): + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + url = connector.build_url("getUpdates") + + assert url == "https://api.telegram.org/bottest:token/getUpdates" + + +@pytest.mark.anyio +async def test_connect(opsdroid): + + opsdroid.config["web"] = {"base-url": "https://test.com"} + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + connector.webhook_secret = "test_secret" + + opsdroid.web_server = amock.Mock() + response = amock.Mock() + response.status = 200 + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as patched_request, amock.patch.object( + connector, "build_url" + ) as mocked_build_url: + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(response) + + await connector.connect() + + assert opsdroid.web_server.web_app.router.add_post.called + assert patched_request is not None + assert mocked_build_url.called + + +@pytest.mark.anyio +async def test_connect_failure(opsdroid, caplog): + caplog.set_level(logging.ERROR) + + opsdroid.config["web"] = {"base-url": "https://test.com"} + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + connector.webhook_secret = "test_secret" + + opsdroid.web_server = amock.Mock() + response = amock.Mock() + + response.status = 404 + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as patched_request, amock.patch.object( + connector, "build_url" + ) as mocked_build_url: + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(response) + + await connector.connect() + + assert opsdroid.web_server.web_app.router.add_post.called + assert patched_request is not None + assert mocked_build_url.called + assert "Error when connecting to Telegram" in caplog.text + + +@pytest.mark.anyio +async def test_respond(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + response = amock.Mock() + response.status = 200 + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as patched_request, amock.patch.object( + connector, "build_url" + ) as mocked_build_url: + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(response) + + assert opsdroid.__class__.instances + + test_message = opsdroid_events.Message( + text="This is a test", + user="opsdroid", + target={"id": 12404}, + connector=connector, + ) + + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(response) + + await test_message.respond("Response") + + assert patched_request.called + assert mocked_build_url.called + assert "Responding" in caplog.text + assert "Successfully responded" in caplog.text + + +@pytest.mark.anyio +async def test_respond_failure(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + response = amock.Mock() + response.status = 500 + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as patched_request, amock.patch.object( + connector, "build_url" + ) as mocked_build_url: + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(response) + + assert opsdroid.__class__.instances + + test_message = opsdroid_events.Message( + text="This is a test", + user="opsdroid", + target={"id": 12404}, + connector=connector, + ) + + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(response) + + await test_message.respond("Response") + + assert patched_request.called + assert mocked_build_url.called + assert "Responding" in caplog.text + assert "Unable to respond" in caplog.text + + +@pytest.mark.anyio +async def test_respond_image(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + post_response = amock.Mock() + post_response.status = 200 + + gif_bytes = ( + b"GIF89a\x01\x00\x01\x00\x00\xff\x00," + b"\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x00;" + ) + + image = opsdroid_events.Image(file_bytes=gif_bytes, target={"id": "123"}) + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as patched_request, amock.patch.object( + connector, "build_url" + ) as mocked_build_url: + + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(post_response) + + await connector.send_image(image) + + assert mocked_build_url.called + assert patched_request.called + assert "Sent" in caplog.text + + +@pytest.mark.anyio +async def test_respond_image_failure(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + post_response = amock.Mock() + post_response.status = 400 + + gif_bytes = ( + b"GIF89a\x01\x00\x01\x00\x00\xff\x00," + b"\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x00;" + ) + + image = opsdroid_events.Image(file_bytes=gif_bytes, target={"id": "123"}) + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as patched_request, amock.patch.object( + connector, "build_url" + ) as mocked_build_url: + + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(post_response) + + await connector.send_image(image) + + assert mocked_build_url.called + assert patched_request.called + assert "Unable to send image" in caplog.text + + +@pytest.mark.anyio +async def test_respond_file(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + post_response = amock.Mock() + post_response.status = 200 + + file_bytes = b"plain text file example" + + file = opsdroid_events.File(file_bytes=file_bytes, target={"id": "123"}) + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as patched_request, amock.patch.object( + connector, "build_url" + ) as mocked_build_url: + + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(post_response) + + await connector.send_file(file) + + assert mocked_build_url.called + assert patched_request.called + assert "Sent" in caplog.text + + +@pytest.mark.anyio +async def test_respond_file_failure(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + post_response = amock.Mock() + post_response.status = 400 + + file_bytes = b"plain text file example" + + file = opsdroid_events.File(file_bytes=file_bytes, target={"id": "123"}) + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as patched_request, amock.patch.object( + connector, "build_url" + ) as mocked_build_url: + + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(post_response) + + await connector.send_file(file) + + assert mocked_build_url.called + assert patched_request.called + assert "Unable to send file" in caplog.text + + +@pytest.mark.anyio +async def test_disconnect_successful(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + response = amock.Mock() + response.status = 200 + + with amock.patch( + "aiohttp.ClientSession.get", new=amock.CoroutineMock() + ) as patched_request, amock.patch.object( + connector, "build_url" + ) as mocked_build_url: + + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(response) + + await connector.disconnect() + + assert mocked_build_url.called + assert patched_request.called + assert "Sending deleteWebhook" in caplog.text + assert "Telegram webhook deleted" in caplog.text + + +@pytest.mark.anyio +async def test_disconnect_failure(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + response = amock.Mock() + response.status = 400 + + with amock.patch( + "aiohttp.ClientSession.get", new=amock.CoroutineMock() + ) as patched_request, amock.patch.object( + connector, "build_url" + ) as mocked_build_url: + + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(response) + + await connector.disconnect() + + assert mocked_build_url.called + assert patched_request.called + assert "Sending deleteWebhook" in caplog.text + assert "Unable to delete webhook" in caplog.text + + +@pytest.mark.anyio +async def test_edited_message_event(opsdroid): + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "update_id": 639974040, + "edited_message": { + "message_id": 1247, + "from": { + "id": 6399348, + "is_bot": False, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "language_code": "en", + }, + "chat": { + "id": 6399348, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "type": "private", + }, + "date": 1603818326, + "edit_date": 1603818330, + "text": "hi", + }, + } + + edited_message = opsdroid_events.EditedMessage("hi", 6399348, "Fabio", 6399348) + + await connector.telegram_webhook_handler(mock_request) + + assert "hi" in edited_message.text + assert "Fabio" in edited_message.user + assert edited_message.target == 6399348 + assert edited_message.user_id == 6399348 + + +@pytest.mark.anyio +async def test_join_group_event(opsdroid): + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "update_id": 639974040, + "message": { + "message_id": 1247, + "from": { + "id": 6399348, + "is_bot": False, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "language_code": "en", + }, + "chat": { + "id": 6399348, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "type": "private", + }, + "date": 1603818326, + "edit_date": 1603818330, + "new_chat_member": True, + }, + } + + join_message = opsdroid_events.JoinGroup(6399348, "Fabio", 6399348) + + await connector.telegram_webhook_handler(mock_request) + + assert "Fabio" in join_message.user + assert join_message.target == 6399348 + assert join_message.user_id == 6399348 + + +@pytest.mark.anyio +async def test_leave_group_event(opsdroid): + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "update_id": 639974040, + "message": { + "message_id": 1247, + "from": { + "id": 6399348, + "is_bot": False, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "language_code": "en", + }, + "chat": { + "id": 6399348, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "type": "private", + }, + "date": 1603818326, + "edit_date": 1603818330, + "left_chat_member": True, + }, + } + + left_message = opsdroid_events.LeaveGroup(6399348, "Fabio", 6399348) + + await connector.telegram_webhook_handler(mock_request) + + assert "Fabio" in left_message.user + assert left_message.target == 6399348 + assert left_message.user_id == 6399348 + + +@pytest.mark.anyio +async def test_pinned_message_event(opsdroid): + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "update_id": 639974040, + "message": { + "message_id": 1247, + "from": { + "id": 6399348, + "is_bot": False, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "language_code": "en", + }, + "chat": { + "id": 6399348, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "type": "private", + }, + "date": 1603818326, + "edit_date": 1603818330, + "pinned_message": True, + }, + } + + pinned_message = opsdroid_events.PinMessage(6399348, "Fabio", 6399348) + + await connector.telegram_webhook_handler(mock_request) + + assert "Fabio" in pinned_message.user + assert pinned_message.target == 6399348 + assert pinned_message.user_id == 6399348 + + +@pytest.mark.anyio +async def test_reply_to_message_event(opsdroid): + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "update_id": 639974084, + "message": { + "message_id": 1272, + "from": { + "id": 639348, + "is_bot": False, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "language_code": "en", + }, + "chat": { + "id": 639348, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "type": "private", + }, + "date": 1603834922, + "reply_to_message": { + "message_id": 1271, + "from": { + "id": 639348, + "is_bot": False, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "language_code": "en", + }, + "chat": { + "id": 63948, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "type": "private", + }, + "date": 1603834912, + "text": "Hi", + }, + "text": "This is a reply", + }, + } + + reply_message = opsdroid_events.Reply( + "This is a reply", 639348, "FabioRosado", 63948 + ) + + await connector.telegram_webhook_handler(mock_request) + + assert "This is a reply" in reply_message.text + assert "FabioRosado" in reply_message.user + assert reply_message.target == 63948 + + +@pytest.mark.anyio +async def test_location_event(opsdroid): + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "update_id": 639974101, + "message": { + "message_id": 42, + "from": { + "id": 1087968824, + "is_bot": True, + "first_name": "Group", + "username": "GroupAnonymousBot", + }, + "chat": { + "id": -1001417735217, + "title": "Opsdroid-test Chat", + "type": "supergroup", + }, + "date": 1603992829, + "location": {"latitude": 56.159849, "longitude": -5.230604}, + }, + } + + event_location = telegram_events.Location( + {"location": {"latitude": 56.159849, "longitude": -5.230604}}, + 56.159849, + -5.230604, + ) + + await connector.telegram_webhook_handler(mock_request) + + assert event_location.latitude == 56.159849 + assert event_location.longitude == -5.230604 + + +@pytest.mark.anyio +async def test_poll_event(opsdroid): + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "update_id": 639974103, + "message": { + "message_id": 44, + "from": { + "id": 1087968824, + "is_bot": True, + "first_name": "Group", + "username": "GroupAnonymousBot", + }, + "chat": { + "id": -1001417735217, + "title": "Opsdroid-test Chat", + "type": "supergroup", + }, + "date": 1603993170, + "poll": { + "id": "5825895662671101957", + "question": "Test", + "options": [ + {"text": "Test", "voter_count": 0}, + {"text": "Testing", "voter_count": 0}, + ], + "total_voter_count": 0, + "is_closed": False, + "is_anonymous": True, + "type": "regular", + "allows_multiple_answers": False, + }, + }, + } + + poll_event = telegram_events.Poll( + { + "question": "question", + "option": ["option1", "option2"], + "total_voter_count": 1, + }, + "question", + ["option1", "option2"], + 1, + ) + + await connector.telegram_webhook_handler(mock_request) + + assert poll_event.question == "question" + assert poll_event.options == ["option1", "option2"] + assert poll_event.total_votes == 1 + + +@pytest.mark.anyio +async def test_contact_event(opsdroid): + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "update_id": 1, + "message": { + "chat": {"id": 321}, + "from": {"id": 123}, + "contact": {"phone_number": 123456, "first_name": "opsdroid"}, + }, + } + + contact_event = telegram_events.Contact( + {"phone_number": 123456, "first_name": "opsdroid"}, 123456, "opsdroid" + ) + + await connector.telegram_webhook_handler(mock_request) + + assert contact_event.first_name == "opsdroid" + assert contact_event.phone_number == 123456 + + +@pytest.mark.anyio +async def test_unparseable_event(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + + opsdroid.config["web"] = {"base-url": "https://test.com"} + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + message = { + "update_id": 1, + "message": { + "message_id": 1279, + "from": { + "id": 639889348, + "is_bot": False, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "language_code": "en", + }, + "chat": { + "id": 639889348, + "first_name": "Fabio", + "last_name": "Rosado", + "username": "FabioRosado", + "type": "private", + }, + "date": 1604013500, + "sticker": { + "width": 512, + "height": 512, + "emoji": "👌", + "set_name": "HotCherry", + "is_animated": True, + "file_size": 42311, + }, + }, + } + event = await connector.handle_messages(message, "opsdroid", 0, 1) + + assert "Received unparsable event" in caplog.text + assert event is None + + +@pytest.mark.anyio +async def test_channel_post(opsdroid): + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "update_id": 639974037, + "channel_post": { + "message_id": 4, + "chat": {"id": -1001474709998, "title": "Opsdroid-test", "type": "channel"}, + "date": 1603817533, + "text": "dance", + }, + } + + message = opsdroid_events.Message("dance", 4, opsdroid) + await connector.telegram_webhook_handler(mock_request) + + assert message.text == "dance" + + +@pytest.mark.anyio +async def test_parse_user_no_permissions(opsdroid): + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "update_id": 639974077, + "message": { + "message_id": 31, + "from": {"id": 100000, "is_bot": False, "username": "FabioRosado"}, + "chat": { + "id": -10014170000, + "title": "Opsdroid-test Chat", + "type": "supergroup", + }, + "date": 1603827368, + "text": "hi", + }, + } + + connector_config["whitelisted-users"] = [1, "AllowedUser"] + connector_config["reply-unauthorized"] = True + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + with amock.patch.object(connector, "send_message") as mocked_send_message: + + await connector.telegram_webhook_handler(mock_request) + + assert mocked_send_message.called + + +@pytest.mark.anyio +async def test_parse_user_permissions(opsdroid): + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "update_id": 639974077, + "message": { + "message_id": 31, + "from": {"id": 100000, "is_bot": False, "username": "FabioRosado"}, + "chat": { + "id": -10014170000, + "title": "Opsdroid-test Chat", + "type": "supergroup", + }, + "date": 1603827368, + "text": "hi", + }, + } + + connector_config["whitelisted-users"] = ["FabioRosado", 100000] + + connector = ConnectorTelegram(connector_config, opsdroid=opsdroid) + + with amock.patch.object(connector.opsdroid, "parse") as mocked_parse: + + await connector.telegram_webhook_handler(mock_request) + + assert mocked_parse.called