Files
app/tests/test_email_handler.py
2026-03-13 10:55:28 +01:00

538 lines
18 KiB
Python

import random
from email.message import EmailMessage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from typing import List
import arrow
import pytest
from aiosmtpd.smtp import Envelope
import email_handler
from app import config
from app.config import EMAIL_DOMAIN, ALERT_DMARC_FAILED_REPLY_PHASE
from app.db import Session
from app.email import headers, status
from app.email_utils import generate_verp_email
from app.mail_sender import mail_sender
from app.models import (
Alias,
IgnoredEmail,
EmailLog,
Notification,
VerpType,
Contact,
SentAlert,
)
from app.utils import random_string, canonicalize_email
from email_handler import (
should_ignore,
is_automatic_out_of_office,
)
from tests.utils import load_eml_file, create_new_user, random_email
def test_should_ignore(flask_client):
IgnoredEmail.filter_by(rcpt_to="rcpt_to").delete()
assert should_ignore("mail_from", []) is False
assert not should_ignore("mail_from", ["rcpt_to"])
IgnoredEmail.create(mail_from="mail_from", rcpt_to="rcpt_to", commit=True)
assert should_ignore("mail_from", ["rcpt_to"])
def test_is_automatic_out_of_office():
msg = EmailMessage()
assert not is_automatic_out_of_office(msg)
msg[headers.AUTO_SUBMITTED] = "auto-replied"
assert is_automatic_out_of_office(msg)
del msg[headers.AUTO_SUBMITTED]
assert not is_automatic_out_of_office(msg)
msg[headers.AUTO_SUBMITTED] = "auto-generated"
assert is_automatic_out_of_office(msg)
def test_dmarc_forward_quarantine(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
msg = load_eml_file("dmarc_quarantine.eml", {"alias_email": alias.email})
envelope = Envelope()
envelope.mail_from = msg["from"]
envelope.rcpt_tos = [msg["to"]]
result = email_handler.handle(envelope, msg)
assert result == status.E215
email_logs = (
EmailLog.filter_by(user_id=user.id, alias_id=alias.id)
.order_by(EmailLog.id.desc())
.all()
)
assert len(email_logs) == 1
email_log = email_logs[0]
assert email_log.blocked
assert email_log.refused_email_id
notifications = Notification.filter_by(user_id=user.id).all()
assert len(notifications) == 1
assert f"{alias.email} has a new mail in quarantine" == notifications[0].title
def test_gmail_dmarc_softfail(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
msg = load_eml_file("dmarc_gmail_softfail.eml", {"alias_email": alias.email})
envelope = Envelope()
envelope.mail_from = msg["from"]
envelope.rcpt_tos = [msg["to"]]
result = email_handler.handle(envelope, msg)
assert result == status.E200
# Enable when we can verify that the actual message sent has this content
# payload = msg.get_payload()
# assert payload.find("failed anti-phishing checks") > -1
def test_prevent_5xx_from_spf(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
msg = load_eml_file(
"5xx_overwrite_spf.eml",
{"alias_email": alias.email, "spf_result": "R_SPF_FAIL"},
)
envelope = Envelope()
envelope.mail_from = msg["from"]
# Ensure invalid email log
envelope.rcpt_tos = [generate_verp_email(VerpType.bounce_forward, 99999999999999)]
result = email_handler.MailHandler()._handle(envelope, msg)
assert status.E216 == result
def test_preserve_5xx_with_valid_spf(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
msg = load_eml_file(
"5xx_overwrite_spf.eml",
{"alias_email": alias.email, "spf_result": "R_SPF_ALLOW"},
)
envelope = Envelope()
envelope.mail_from = msg["from"]
# Ensure invalid email log
envelope.rcpt_tos = [generate_verp_email(VerpType.bounce_forward, 99999999999999)]
result = email_handler.MailHandler()._handle(envelope, msg)
assert status.E512 == result
def test_preserve_5xx_with_no_header(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
msg = load_eml_file(
"no_spamd_header.eml",
{"alias_email": alias.email},
)
envelope = Envelope()
envelope.mail_from = msg["from"]
# Ensure invalid email log
envelope.rcpt_tos = [generate_verp_email(VerpType.bounce_forward, 99999999999999)]
result = email_handler.MailHandler()._handle(envelope, msg)
assert status.E512 == result
def generate_dmarc_result() -> List:
return ["DMARC_POLICY_QUARANTINE", "DMARC_POLICY_REJECT", "DMARC_POLICY_SOFTFAIL"]
@pytest.mark.parametrize("dmarc_result", generate_dmarc_result())
def test_dmarc_reply_quarantine(flask_client, dmarc_result):
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
user_id=alias.user_id,
alias_id=alias.id,
website_email="random-{}@nowhere.net".format(int(random.random())),
name="Name {}".format(int(random.random())),
reply_email="random-{}@{}".format(random.random(), EMAIL_DOMAIN),
)
Session.commit()
msg = load_eml_file(
"dmarc_reply_check.eml",
{
"alias_email": alias.email,
"contact_email": contact.reply_email,
"dmarc_result": dmarc_result,
},
)
envelope = Envelope()
envelope.mail_from = msg["from"]
envelope.rcpt_tos = [msg["to"]]
result = email_handler.handle(envelope, msg)
assert result == status.E215
alerts = SentAlert.filter_by(
user_id=user.id, alert_type=ALERT_DMARC_FAILED_REPLY_PHASE
).all()
assert len(alerts) == 1
def test_add_alias_to_header_if_needed():
msg = EmailMessage()
user = create_new_user()
alias = Alias.filter_by(user_id=user.id).first()
assert msg[headers.TO] is None
email_handler.add_alias_to_header_if_needed(msg, alias)
assert msg[headers.TO] == alias.email
def test_append_alias_to_header_if_needed_existing_to():
msg = EmailMessage()
original_to = "noone@nowhere.no"
msg[headers.TO] = original_to
user = create_new_user()
alias = Alias.filter_by(user_id=user.id).first()
email_handler.add_alias_to_header_if_needed(msg, alias)
assert msg[headers.TO] == f"{original_to}, {alias.email}"
def test_avoid_add_to_header_already_present():
msg = EmailMessage()
user = create_new_user()
alias = Alias.filter_by(user_id=user.id).first()
msg[headers.TO] = alias.email
email_handler.add_alias_to_header_if_needed(msg, alias)
assert msg[headers.TO] == alias.email
def test_avoid_add_to_header_already_present_in_cc():
msg = EmailMessage()
create_new_user()
alias = Alias.first()
msg[headers.CC] = alias.email
email_handler.add_alias_to_header_if_needed(msg, alias)
assert msg[headers.TO] is None
assert msg[headers.CC] == alias.email
def test_email_sent_to_noreply(flask_client):
msg = EmailMessage()
envelope = Envelope()
envelope.mail_from = "from@domain.lan"
envelope.rcpt_tos = [config.NOREPLY]
result = email_handler.handle(envelope, msg)
assert result == status.E200
def test_email_sent_to_noreplies(flask_client):
msg = EmailMessage()
envelope = Envelope()
envelope.mail_from = "from@domain.lan"
config.NOREPLIES = ["other-no-reply@sl.lan"]
envelope.rcpt_tos = ["other-no-reply@sl.lan"]
result = email_handler.handle(envelope, msg)
assert result == status.E200
# NOREPLY isn't used anymore
envelope.rcpt_tos = [config.NOREPLY]
result = email_handler.handle(envelope, msg)
assert result == status.E515
def test_references_header(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
msg = load_eml_file("reference_encoded.eml", {"alias_email": alias.email})
envelope = Envelope()
envelope.mail_from = "somewhere@rainbow.com"
envelope.rcpt_tos = [alias.email]
result = email_handler.handle(envelope, msg)
assert result == status.E200
@mail_sender.store_emails_test_decorator
def test_replace_contacts_and_user_in_reply_phase(flask_client):
user = create_new_user()
user.replace_reverse_alias = True
alias = Alias.create_new_random(user)
Session.flush()
contact = Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email=random_email(),
reply_email=f"{random.random()}@{EMAIL_DOMAIN}",
commit=True,
)
contact_real_mail = contact.website_email
contact2 = Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email=random_email(),
reply_email=f"{random.random()}@{EMAIL_DOMAIN}",
commit=True,
)
contact2_real_mail = contact2.website_email
msg = load_eml_file(
"replacement_on_reply_phase.eml",
{
"contact_reply_email": contact.reply_email,
"other_contact_reply_email": contact2.reply_email,
},
)
envelope = Envelope()
envelope.mail_from = alias.mailbox.email
envelope.rcpt_tos = [contact.reply_email]
result = email_handler.handle(envelope, msg)
assert result == status.E200
sent_mails = mail_sender.get_stored_emails()
assert len(sent_mails) == 1
payload = sent_mails[0].msg.get_payload()[0].get_payload()
assert payload.find("Contact is {}".format(contact_real_mail)) > -1
assert payload.find("Other contact is {}".format(contact2_real_mail)) > -1
@mail_sender.store_emails_test_decorator
def test_send_email_from_non_canonical_address_on_reply(flask_client):
email_address = f"{random_string(10)}.suf@gmail.com"
user = create_new_user(email=canonicalize_email(email_address))
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email=random_email(),
reply_email=f"{random_string(10)}@{EMAIL_DOMAIN}",
commit=True,
)
envelope = Envelope()
envelope.mail_from = email_address
envelope.rcpt_tos = [contact.reply_email]
msg = EmailMessage()
msg[headers.TO] = contact.reply_email
msg[headers.SUBJECT] = random_string()
result = email_handler.handle(envelope, msg)
assert result == status.E200
sent_mails = mail_sender.get_stored_emails()
assert len(sent_mails) == 1
email_logs = EmailLog.filter_by(user_id=user.id).all()
assert len(email_logs) == 1
assert email_logs[0].alias_id == alias.id
assert email_logs[0].mailbox_id == user.default_mailbox_id
@mail_sender.store_emails_test_decorator
def test_send_email_from_non_canonical_matches_already_existing_user(flask_client):
email_address = f"{random_string(10)}.suf@gmail.com"
create_new_user(email=canonicalize_email(email_address))
user = create_new_user(email=email_address)
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email=random_email(),
reply_email=f"{random_string(10)}@{EMAIL_DOMAIN}",
commit=True,
)
envelope = Envelope()
envelope.mail_from = email_address
envelope.rcpt_tos = [contact.reply_email]
msg = EmailMessage()
msg[headers.TO] = contact.reply_email
msg[headers.SUBJECT] = random_string()
result = email_handler.handle(envelope, msg)
assert result == status.E200
sent_mails = mail_sender.get_stored_emails()
assert len(sent_mails) == 1
email_logs = EmailLog.filter_by(user_id=user.id).all()
assert len(email_logs) == 1
assert email_logs[0].alias_id == alias.id
assert email_logs[0].mailbox_id == user.default_mailbox_id
@mail_sender.store_emails_test_decorator
def test_break_loop_alias_as_mailbox(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
user.default_mailbox.email = alias.email
Session.commit()
envelope = Envelope()
envelope.mail_from = random_email()
envelope.rcpt_tos = [alias.email]
msg = EmailMessage()
msg[headers.TO] = alias.email
msg[headers.SUBJECT] = random_string()
result = email_handler.handle(envelope, msg)
assert result == status.E525
@mail_sender.store_emails_test_decorator
def test_preserve_headers(flask_client):
headers_to_keep = [
headers.SUBJECT,
headers.DATE,
headers.MESSAGE_ID,
headers.REFERENCES,
headers.IN_REPLY_TO,
headers.SL_QUEUE_ID,
] + headers.MIME_HEADERS
user = create_new_user()
alias = Alias.create_new_random(user)
envelope = Envelope()
envelope.mail_from = "somewhere@lo.cal"
envelope.rcpt_tos = [alias.email]
msg = EmailMessage()
for header in headers_to_keep:
msg[header] = header + "keep"
result = email_handler.handle(envelope, msg)
assert result == status.E200
sent_mails = mail_sender.get_stored_emails()
assert len(sent_mails) == 1
msg = sent_mails[0].msg
for header in headers_to_keep:
assert msg[header] == header + "keep"
def test_not_send_to_pending_to_delete_users(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
user.delete_on = arrow.utcnow()
envelope = Envelope()
envelope.mail_from = "somewhere@lo.cal"
envelope.rcpt_tos = [alias.email]
msg = EmailMessage()
result = email_handler.handle(envelope, msg)
assert result == status.E504
@mail_sender.store_emails_test_decorator
def test_forward_preserves_pgp_key_attachment(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()
msg = MIMEMultipart("mixed")
msg[headers.FROM] = "sender@external.com"
msg[headers.TO] = alias.email
msg[headers.SUBJECT] = "Signed message with public key"
msg.attach(MIMEText("Hello, here is my signed message.", "plain"))
pgp_key = MIMEBase("application", "pgp-keys")
pgp_key.set_payload(
b"-----BEGIN PGP PUBLIC KEY BLOCK-----\nfake\n-----END PGP PUBLIC KEY BLOCK-----"
)
pgp_key.add_header(
"Content-Disposition",
"attachment",
filename="publickey - sender@external.com.asc",
)
msg.attach(pgp_key)
envelope = Envelope()
envelope.mail_from = "sender@external.com"
envelope.rcpt_tos = [alias.email]
result = email_handler.handle(envelope, msg)
assert result == status.E200
sent_mails = mail_sender.get_stored_emails()
assert len(sent_mails) == 1
forwarded_msg = sent_mails[0].msg
# The PGP public key attachment should still be present in the forwarded email
attachments = [
part
for part in forwarded_msg.walk()
if part.get_content_type() == "application/pgp-keys"
]
assert len(attachments) == 1
assert attachments[0].get_filename() == "publickey - sender@external.com.asc"
def _create_reply_with_pgp_key_attachment(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()
contact = Contact.create(
user_id=user.id,
alias_id=alias.id,
website_email=random_email(),
reply_email=f"{random_string(10)}@{EMAIL_DOMAIN}",
commit=True,
)
msg = MIMEMultipart("mixed")
msg[headers.TO] = contact.reply_email
msg[headers.SUBJECT] = "Re: signed reply"
msg.attach(MIMEText("Here is my reply.", "plain"))
pgp_key = MIMEBase("application", "pgp-keys")
pgp_key.set_payload(
b"-----BEGIN PGP PUBLIC KEY BLOCK-----\nfake\n-----END PGP PUBLIC KEY BLOCK-----"
)
pgp_key.add_header(
"Content-Disposition",
"attachment",
filename=f"publickey - {user.email}.asc",
)
msg.attach(pgp_key)
envelope = Envelope()
envelope.mail_from = user.email
envelope.rcpt_tos = [contact.reply_email]
return user, msg, envelope
@mail_sender.store_emails_test_decorator
def test_reply_strips_pgp_key_when_config_enabled(flask_client):
# Verify that when DROP_PGP_KEY_ATTACHMENTS_ON_REPLY is enabled,
# PGP public key attachments should be removed from replies
original = config.DROP_PGP_KEY_ATTACHMENTS_ON_REPLY
config.DROP_PGP_KEY_ATTACHMENTS_ON_REPLY = True
try:
_, msg, envelope = _create_reply_with_pgp_key_attachment(flask_client)
result = email_handler.handle(envelope, msg)
assert result == status.E200
sent_mails = mail_sender.get_stored_emails()
assert len(sent_mails) == 1
reply_msg = sent_mails[0].msg
pgp_attachments = [
part
for part in reply_msg.walk()
if part.get_content_type() == "application/pgp-keys"
]
assert len(pgp_attachments) == 0
finally:
config.DROP_PGP_KEY_ATTACHMENTS_ON_REPLY = original
@mail_sender.store_emails_test_decorator
def test_reply_preserves_pgp_key_when_config_disabled(flask_client):
# Verify that when DROP_PGP_KEY_ATTACHMENTS_ON_REPLY is disabled (default),
# PGP public key attachments should be preserved in replies
original = config.DROP_PGP_KEY_ATTACHMENTS_ON_REPLY
config.DROP_PGP_KEY_ATTACHMENTS_ON_REPLY = False
try:
user, msg, envelope = _create_reply_with_pgp_key_attachment(flask_client)
result = email_handler.handle(envelope, msg)
assert result == status.E200
sent_mails = mail_sender.get_stored_emails()
assert len(sent_mails) == 1
reply_msg = sent_mails[0].msg
pgp_attachments = [
part
for part in reply_msg.walk()
if part.get_content_type() == "application/pgp-keys"
]
assert len(pgp_attachments) == 1
assert pgp_attachments[0].get_filename() == f"publickey - {user.email}.asc"
finally:
config.DROP_PGP_KEY_ATTACHMENTS_ON_REPLY = original