connector-telegram/__init__.py

473 lines
16 KiB
Python

"""A connector for Telegram."""
import json
import logging
import secrets
import aiohttp
import emoji
from voluptuous import Required
from opsdroid.connector import Connector, register_event
from opsdroid.events import (
EditedMessage,
File,
Image,
JoinGroup,
LeaveGroup,
Message,
PinMessage,
Reply,
)
from . import events as telegram_events
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = {
Required("token"): str,
"whitelisted-users": list,
"bot-name": str,
"reply-unauthorized": bool,
"parse_mode": str,
}
class ConnectorTelegram(Connector):
"""A connector for the chat service Telegram."""
def __init__(self, config, opsdroid=None):
"""Create the connector.
Args:
config (dict): configuration settings from the
file config.yaml.
opsdroid (OpsDroid): An instance of opsdroid.core.
"""
_LOGGER.debug(_("Loaded Telegram Connector"))
super().__init__(config, opsdroid=opsdroid)
self.name = config.get("name", "telegram")
self.bot_name = config.get("bot-name", "opsdroid")
self.opsdroid = opsdroid
self.whitelisted_users = config.get("whitelisted-users", None)
self.webhook_secret = secrets.token_urlsafe(32)
self.webhook_endpoint = f"/connector/{self.name}/{self.webhook_secret}"
self.token = config["token"]
self.parse_mode = config.get("parse_mode","MarkdownV2")
try:
self.base_url = opsdroid.config["web"]["base-url"]
except KeyError:
self.base_url = None
_LOGGER.warning(
_(
"Breaking changes introduced in 0.20.0 - you must expose opsdroid to the web and add 'base-url' to the 'web' section of your configuration. Read more on the docs: https://docs.opsdroid.dev/en/stable/connectors/telegram.html"
)
)
@staticmethod
def get_user(response, bot_name):
"""Get user from response.
The API response is different depending on how
the bot is set up and where the message is coming
from.
Since Telegram sends different payloads, depending of where the message is
being sent from, this method tries to handle all the cases.
If the message came from a channel, we use either the ``author_signature``
or the bot name for the user and use the ``message_id`` for the ``user_id``,
this is because channel posts don't contain users.
Similarly, if a message was posted on a channel, Telegram will forward it to
a group - if it was created from the channel. So we handle the case where there
is a ``forward_signature`` in the payload otherwise we use the bot name.
Args:
response (dict): Response returned by aiohttp.ClientSession.
bot_name (str): Name of the bot used in opsdroid configuration.
Return:
string, string: Extracted username and user id
"""
user = None
user_id = None
channel_post = response.get("channel_post")
message = response.get("message")
if channel_post:
user = channel_post.get("author_signature", bot_name)
user_id = channel_post.get("message_id", 0)
if message:
from_user = message.get("from")
user_id = from_user.get("id")
if message.get("forward_from_chat"):
user = message.get("forward_signature", bot_name)
return user, user_id
if from_user:
if "username" in from_user:
user = from_user.get("username")
elif "first_name" in from_user:
user = from_user.get("first_name")
return user, user_id
def handle_user_permission(self, response, user, user_id):
"""Handle user permissions.
This will check if the user that tried to talk with
the bot is allowed to do so. It will also work with
userid to improve security.
"""
if (
not self.whitelisted_users
or user in self.whitelisted_users
or user_id in self.whitelisted_users
):
return True
return False
def build_url(self, method):
"""Build the url to connect to the API.
Args:
method (string): API call end point.
Return:
String that represents the full API url.
"""
return "https://api.telegram.org/bot{}/{}".format(self.token, method)
async def connect(self):
"""Create route and subscribe to Telegram webhooks.
The first thing we do on connect is to set up the route to
receive events from Telegram, we also pass some arguments to
the webhook to get events from messages, edited messages, channel
posts and update id which is basically the event id.
One thing that is worth mentioning here, is that Telegram doesn't
implement a request authenticity policy, instead they suggest that we
use our token on the webhook route, but using the token on the url doesn't
seem like a good idea, we are instead generating a strong pseudo-random string
using the ``secrets`` library and add that string to our webhook route.
"""
self.opsdroid.web_server.web_app.router.add_post(
self.webhook_endpoint, self.telegram_webhook_handler
)
async with aiohttp.ClientSession() as session:
if self.base_url:
payload = {
"url": f"{self.base_url}{self.webhook_endpoint}",
"allowed_updates": [
"messages",
"edited_message",
"channel_post",
"edited_channel_post",
"update_id",
],
}
response = await session.post(
self.build_url("setWebhook"), params=payload
)
if response.status >= 400:
_LOGGER.error(
_("Error when connecting to Telegram Webhook: - %s - %s"),
response.status,
response.text,
)
async def telegram_webhook_handler(self, request):
"""Handle event from Telegram webhooks.
This method will try to handle three different kinds of events:
- Edited messages
- Messages
- Channel posts
Since the payload is pretty much the same both for channel posts and
messages we are using the same method to handle both of these events.
We also check the permissions of the user that talked with the bot, if
the user has permissions then the event is parsed, if not we either send
a message saying that the user can't talk with the bot or just keep silent.
Args:
request (aiohttp.web.Request): Request made to the post route created for webhook subscription.
Return:
aiohttp.web.Response: Send a ``received`` message and a status 200 back to Telegram.
"""
payload = await request.json()
user, user_id = self.get_user(payload, self.bot_name)
if payload.get("edited_message"):
event = EditedMessage(
text=payload["edited_message"]["text"],
target=payload["edited_message"]["chat"]["id"],
user=user,
user_id=user_id,
connector=self,
)
if payload.get("message"):
event = await self.handle_messages(
payload["message"], user, user_id, payload["update_id"]
)
if payload.get("channel_post"):
event = await self.handle_messages(
payload["channel_post"], user, user_id, payload["update_id"]
)
if self.handle_user_permission(payload, user, user_id):
await self.opsdroid.parse(event)
else:
if self.config.get("reply-unauthorized"):
await self.send_message(
Message(
text=f"Sorry, {user} you're not allowed to speak with this bot.",
connector=self,
user=user,
user_id=user_id,
)
)
return aiohttp.web.Response(text=json.dumps("Received"), status=200)
async def handle_messages(self, message, user, user_id, update_id):
"""Handle text messages received from Telegram.
Here we create our opsdroid events depending of the type of message
that we get from Telegram.
Unfortunately, telegram doesn't give much information when the message
is an image, video, sticker or documents. It only give us back the file id,
sizes, formats and that's it. Since we can't really use any of this information
to make opsdroid parse the message, we decided to just log a message in debug mode
with the payload and return None.
Args:
message (dict): The payload received from Telegram
user (string): The name of the user that sent the message
user_id (int): The unique user id from the user that send the message
update_id (int): The unique id for the event sent by Telegram
Return:
opsdroid.event or None: Will only return none if it's an event we can't parse.
"""
event = None
if message.get("new_chat_member"):
event = JoinGroup(
user=user,
user_id=user_id,
event_id=update_id,
target=message["chat"]["id"],
connector=self,
raw_event=message,
)
if message.get("left_chat_member"):
event = LeaveGroup(
user=user,
user_id=user_id,
event_id=update_id,
target=message["chat"]["id"],
connector=self,
raw_event=message,
)
if message.get("pinned_message"):
event = PinMessage(
user=user,
user_id=user_id,
event_id=update_id,
target=message["chat"]["id"],
connector=self,
raw_event=message,
)
if message.get("reply_to_message"):
event = Reply(
text=emoji.demojize(message.get("text", "")),
user=user,
user_id=user_id,
event_id=message["message_id"],
linked_event=message["reply_to_message"]["message_id"],
target=message["chat"]["id"],
connector=self,
raw_event=message,
)
if message.get("text"):
event = Message(
text=emoji.demojize(message.get("text", "")),
user=user,
user_id=user_id,
target=message["chat"]["id"],
connector=self,
)
if message.get("location"):
event = telegram_events.Location(
user=user,
user_id=user_id,
event_id=update_id,
target=message["chat"]["id"],
location=message["location"],
latitude=message["location"]["latitude"],
longitude=message["location"]["longitude"],
connector=self,
raw_event=message,
)
if message.get("poll"):
event = telegram_events.Poll(
user=user,
user_id=user_id,
event_id=update_id,
target=message["chat"]["id"],
poll=message["poll"],
question=message["poll"]["question"],
options=message["poll"]["options"],
total_votes=message["poll"]["total_voter_count"],
connector=self,
raw_event=message,
)
if message.get("contact"):
event = telegram_events.Contact(
user=user,
user_id=user_id,
event_id=update_id,
target=message["chat"]["id"],
contact=message["contact"],
phone_number=message["contact"]["phone_number"],
first_name=message["contact"]["first_name"],
connector=self,
raw_event=message,
)
if event:
return event
_LOGGER.debug(
_("Received unparsable event from Telegram. Payload: %s"), message
)
async def listen(self):
"""Listen method of the connector.
Since we are using webhooks, we don't need to implement the listen
method.
"""
@register_event(Message)
async def send_message(self, message):
"""Respond with a message.
Args:
message (object): An instance of Message.
"""
_LOGGER.debug(
_("Responding with: '%s' at target: '%s'"), message.text, message.target
)
data = dict()
data["chat_id"] = message.target
data["text"] = message.text
data["parse_mode"] = self.parse_mode
async with aiohttp.ClientSession() as session:
resp = await session.post(self.build_url("sendMessage"), data=data)
if resp.status == 200:
_LOGGER.debug(_("Successfully responded."))
else:
_LOGGER.error(_("Unable to respond."))
@register_event(Image)
async def send_image(self, file_event):
"""Send Image to Telegram.
Gets the chat id from the channel and then
sends the bytes of the image as multipart/form-data.
"""
data = aiohttp.FormData()
data.add_field(
"chat_id", str(file_event.target["id"]), content_type="multipart/form-data"
)
data.add_field(
"photo",
await file_event.get_file_bytes(),
content_type="multipart/form-data",
)
async with aiohttp.ClientSession() as session:
resp = await session.post(self.build_url("sendPhoto"), data=data)
if resp.status == 200:
_LOGGER.debug(_("Sent %s image successfully."), file_event.name)
else:
_LOGGER.debug(_("Unable to send image - Status Code %s."), resp.status)
@register_event(File)
async def send_file(self, file_event):
"""Send File to Telegram.
Gets the chat id from the channel and then
sends the bytes of the file as multipart/form-data.
"""
data = aiohttp.FormData()
data.add_field(
"chat_id", str(file_event.target["id"]), content_type="multipart/form-data"
)
data.add_field(
"document",
await file_event.get_file_bytes(),
content_type="multipart/form-data",
)
async with aiohttp.ClientSession() as session:
resp = await session.post(self.build_url("sendDocument"), data=data)
if resp.status == 200:
_LOGGER.debug(_("Sent %s file successfully."), file_event.name)
else:
_LOGGER.debug(_("Unable to send file - Status Code %s."), resp.status)
async def disconnect(self):
"""Delete active webhook.
If we terminate opsdroid, we should delete the active webhook, otherwise
Telegram will keep pinging out webhook for a few minutes before giving up.
"""
_LOGGER.debug(_("Sending deleteWebhook request to Telegram..."))
async with aiohttp.ClientSession() as session:
resp = await session.get(self.build_url("deleteWebhook"))
if resp.status == 200:
_LOGGER.debug(_("Telegram webhook deleted successfully."))
else:
_LOGGER.debug(_("Unable to delete webhook..."))