"""A connector for Telegram.""" import json import logging import secrets import aiohttp import emoji from voluptuous import Required from opsdroid.connector import Connector, register_event from opsdroid.events import ( EditedMessage, File, Image, JoinGroup, LeaveGroup, Message, PinMessage, Reply, ) from . import events as telegram_events _LOGGER = logging.getLogger(__name__) CONFIG_SCHEMA = { Required("token"): str, "whitelisted-users": list, "bot-name": str, "reply-unauthorized": bool, "parse_mode": str, } class ConnectorTelegram(Connector): """A connector for the chat service Telegram.""" def __init__(self, config, opsdroid=None): """Create the connector. Args: config (dict): configuration settings from the file config.yaml. opsdroid (OpsDroid): An instance of opsdroid.core. """ _LOGGER.debug(_("Loaded Telegram Connector")) super().__init__(config, opsdroid=opsdroid) self.name = config.get("name", "telegram") self.bot_name = config.get("bot-name", "opsdroid") self.opsdroid = opsdroid self.whitelisted_users = config.get("whitelisted-users", None) self.webhook_secret = secrets.token_urlsafe(32) self.webhook_endpoint = f"/connector/{self.name}/{self.webhook_secret}" self.token = config["token"] self.parse_mode = config.get("parse_mode","MarkdownV2") try: self.base_url = opsdroid.config["web"]["base-url"] except KeyError: self.base_url = None _LOGGER.warning( _( "Breaking changes introduced in 0.20.0 - you must expose opsdroid to the web and add 'base-url' to the 'web' section of your configuration. Read more on the docs: https://docs.opsdroid.dev/en/stable/connectors/telegram.html" ) ) @staticmethod def get_user(response, bot_name): """Get user from response. The API response is different depending on how the bot is set up and where the message is coming from. Since Telegram sends different payloads, depending of where the message is being sent from, this method tries to handle all the cases. If the message came from a channel, we use either the ``author_signature`` or the bot name for the user and use the ``message_id`` for the ``user_id``, this is because channel posts don't contain users. Similarly, if a message was posted on a channel, Telegram will forward it to a group - if it was created from the channel. So we handle the case where there is a ``forward_signature`` in the payload otherwise we use the bot name. Args: response (dict): Response returned by aiohttp.ClientSession. bot_name (str): Name of the bot used in opsdroid configuration. Return: string, string: Extracted username and user id """ user = None user_id = None channel_post = response.get("channel_post") message = response.get("message") if channel_post: user = channel_post.get("author_signature", bot_name) user_id = channel_post.get("message_id", 0) if message: from_user = message.get("from") user_id = from_user.get("id") if message.get("forward_from_chat"): user = message.get("forward_signature", bot_name) return user, user_id if from_user: if "username" in from_user: user = from_user.get("username") elif "first_name" in from_user: user = from_user.get("first_name") return user, user_id def handle_user_permission(self, response, user, user_id): """Handle user permissions. This will check if the user that tried to talk with the bot is allowed to do so. It will also work with userid to improve security. """ if ( not self.whitelisted_users or user in self.whitelisted_users or user_id in self.whitelisted_users ): return True return False def build_url(self, method): """Build the url to connect to the API. Args: method (string): API call end point. Return: String that represents the full API url. """ return "https://api.telegram.org/bot{}/{}".format(self.token, method) async def connect(self): """Create route and subscribe to Telegram webhooks. The first thing we do on connect is to set up the route to receive events from Telegram, we also pass some arguments to the webhook to get events from messages, edited messages, channel posts and update id which is basically the event id. One thing that is worth mentioning here, is that Telegram doesn't implement a request authenticity policy, instead they suggest that we use our token on the webhook route, but using the token on the url doesn't seem like a good idea, we are instead generating a strong pseudo-random string using the ``secrets`` library and add that string to our webhook route. """ self.opsdroid.web_server.web_app.router.add_post( self.webhook_endpoint, self.telegram_webhook_handler ) async with aiohttp.ClientSession() as session: if self.base_url: payload = { "url": f"{self.base_url}{self.webhook_endpoint}", "allowed_updates": [ "messages", "edited_message", "channel_post", "edited_channel_post", "update_id", ], } response = await session.post( self.build_url("setWebhook"), params=payload ) if response.status >= 400: _LOGGER.error( _("Error when connecting to Telegram Webhook: - %s - %s"), response.status, response.text, ) async def telegram_webhook_handler(self, request): """Handle event from Telegram webhooks. This method will try to handle three different kinds of events: - Edited messages - Messages - Channel posts Since the payload is pretty much the same both for channel posts and messages we are using the same method to handle both of these events. We also check the permissions of the user that talked with the bot, if the user has permissions then the event is parsed, if not we either send a message saying that the user can't talk with the bot or just keep silent. Args: request (aiohttp.web.Request): Request made to the post route created for webhook subscription. Return: aiohttp.web.Response: Send a ``received`` message and a status 200 back to Telegram. """ payload = await request.json() user, user_id = self.get_user(payload, self.bot_name) if payload.get("edited_message"): event = EditedMessage( text=payload["edited_message"]["text"], target=payload["edited_message"]["chat"]["id"], user=user, user_id=user_id, connector=self, ) if payload.get("message"): event = await self.handle_messages( payload["message"], user, user_id, payload["update_id"] ) if payload.get("channel_post"): event = await self.handle_messages( payload["channel_post"], user, user_id, payload["update_id"] ) if self.handle_user_permission(payload, user, user_id): await self.opsdroid.parse(event) else: if self.config.get("reply-unauthorized"): await self.send_message( Message( text=f"Sorry, {user} you're not allowed to speak with this bot.", connector=self, user=user, user_id=user_id, ) ) return aiohttp.web.Response(text=json.dumps("Received"), status=200) async def handle_messages(self, message, user, user_id, update_id): """Handle text messages received from Telegram. Here we create our opsdroid events depending of the type of message that we get from Telegram. Unfortunately, telegram doesn't give much information when the message is an image, video, sticker or documents. It only give us back the file id, sizes, formats and that's it. Since we can't really use any of this information to make opsdroid parse the message, we decided to just log a message in debug mode with the payload and return None. Args: message (dict): The payload received from Telegram user (string): The name of the user that sent the message user_id (int): The unique user id from the user that send the message update_id (int): The unique id for the event sent by Telegram Return: opsdroid.event or None: Will only return none if it's an event we can't parse. """ event = None if message.get("new_chat_member"): event = JoinGroup( user=user, user_id=user_id, event_id=update_id, target=message["chat"]["id"], connector=self, raw_event=message, ) if message.get("left_chat_member"): event = LeaveGroup( user=user, user_id=user_id, event_id=update_id, target=message["chat"]["id"], connector=self, raw_event=message, ) if message.get("pinned_message"): event = PinMessage( user=user, user_id=user_id, event_id=update_id, target=message["chat"]["id"], connector=self, raw_event=message, ) if message.get("reply_to_message"): event = Reply( text=emoji.demojize(message.get("text", "")), user=user, user_id=user_id, event_id=message["message_id"], linked_event=message["reply_to_message"]["message_id"], target=message["chat"]["id"], connector=self, raw_event=message, ) if message.get("text"): event = Message( text=emoji.demojize(message.get("text", "")), user=user, user_id=user_id, target=message["chat"]["id"], connector=self, ) if message.get("location"): event = telegram_events.Location( user=user, user_id=user_id, event_id=update_id, target=message["chat"]["id"], location=message["location"], latitude=message["location"]["latitude"], longitude=message["location"]["longitude"], connector=self, raw_event=message, ) if message.get("poll"): event = telegram_events.Poll( user=user, user_id=user_id, event_id=update_id, target=message["chat"]["id"], poll=message["poll"], question=message["poll"]["question"], options=message["poll"]["options"], total_votes=message["poll"]["total_voter_count"], connector=self, raw_event=message, ) if message.get("contact"): event = telegram_events.Contact( user=user, user_id=user_id, event_id=update_id, target=message["chat"]["id"], contact=message["contact"], phone_number=message["contact"]["phone_number"], first_name=message["contact"]["first_name"], connector=self, raw_event=message, ) if event: return event _LOGGER.debug( _("Received unparsable event from Telegram. Payload: %s"), message ) async def listen(self): """Listen method of the connector. Since we are using webhooks, we don't need to implement the listen method. """ @register_event(Message) async def send_message(self, message): """Respond with a message. Args: message (object): An instance of Message. """ _LOGGER.debug( _("Responding with: '%s' at target: '%s'"), message.text, message.target ) data = dict() data["chat_id"] = message.target data["text"] = message.text data["parse_mode"] = self.parse_mode async with aiohttp.ClientSession() as session: resp = await session.post(self.build_url("sendMessage"), data=data) if resp.status == 200: _LOGGER.debug(_("Successfully responded.")) else: _LOGGER.error(_("Unable to respond.")) @register_event(Image) async def send_image(self, file_event): """Send Image to Telegram. Gets the chat id from the channel and then sends the bytes of the image as multipart/form-data. """ data = aiohttp.FormData() data.add_field( "chat_id", str(file_event.target["id"]), content_type="multipart/form-data" ) data.add_field( "photo", await file_event.get_file_bytes(), content_type="multipart/form-data", ) async with aiohttp.ClientSession() as session: resp = await session.post(self.build_url("sendPhoto"), data=data) if resp.status == 200: _LOGGER.debug(_("Sent %s image successfully."), file_event.name) else: _LOGGER.debug(_("Unable to send image - Status Code %s."), resp.status) @register_event(File) async def send_file(self, file_event): """Send File to Telegram. Gets the chat id from the channel and then sends the bytes of the file as multipart/form-data. """ data = aiohttp.FormData() data.add_field( "chat_id", str(file_event.target["id"]), content_type="multipart/form-data" ) data.add_field( "document", await file_event.get_file_bytes(), content_type="multipart/form-data", ) async with aiohttp.ClientSession() as session: resp = await session.post(self.build_url("sendDocument"), data=data) if resp.status == 200: _LOGGER.debug(_("Sent %s file successfully."), file_event.name) else: _LOGGER.debug(_("Unable to send file - Status Code %s."), resp.status) async def disconnect(self): """Delete active webhook. If we terminate opsdroid, we should delete the active webhook, otherwise Telegram will keep pinging out webhook for a few minutes before giving up. """ _LOGGER.debug(_("Sending deleteWebhook request to Telegram...")) async with aiohttp.ClientSession() as session: resp = await session.get(self.build_url("deleteWebhook")) if resp.status == 200: _LOGGER.debug(_("Telegram webhook deleted successfully.")) else: _LOGGER.debug(_("Unable to delete webhook..."))