Upate adminpanel to be able to search via regex

This commit is contained in:
Adrià Casajús
2026-01-15 16:42:51 +01:00
committed by Adrià Casajús
parent 8d524e5ccd
commit d937cc34e2
26 changed files with 2231 additions and 1643 deletions
+72
View File
@@ -0,0 +1,72 @@
"""Admin models for Flask-Admin views."""
from app.admin.index import init_admin
from app.admin.base import SLModelView, SLAdminIndexView
from app.admin.user import UserAdmin
from app.admin.email_log import EmailLogAdmin
from app.admin.alias import AliasAdmin
from app.admin.mailbox import MailboxAdmin
from app.admin.coupon import CouponAdmin
from app.admin.manual_subscription import ManualSubscriptionAdmin
from app.admin.custom_domain import CustomDomainAdmin
from app.admin.referral import ReferralAdmin
from app.admin.admin_audit_log import AdminAuditLogAdmin
from app.admin.provider_complaint import ProviderComplaintAdmin
from app.admin.newsletter import NewsletterAdmin, NewsletterUserAdmin
from app.admin.metrics import DailyMetricAdmin, MetricAdmin
from app.admin.invalid_mailbox_domain import InvalidMailboxDomainAdmin
from app.admin.forbidden_mx_ip import ForbiddenMxIpAdmin
from app.admin.email_search import (
EmailSearchResult,
EmailSearchHelpers,
EmailSearchAdmin,
)
from app.admin.custom_domain_search import (
CustomDomainWithValidationData,
CustomDomainSearchResult,
CustomDomainSearchAdmin,
)
from app.admin.abuser_lookup import AbuserLookupResult, AbuserLookupAdmin
from app.admin.mailbox_search import MailboxSearchResult, MailboxSearchAdmin
from app.admin.email_domain_search import (
EmailDomainSearchResult,
EmailDomainSearchAdmin,
)
__all__ = [
# Initialization
"init_admin",
# Base classes
"SLModelView",
"SLAdminIndexView",
# Model views
"UserAdmin",
"EmailLogAdmin",
"AliasAdmin",
"MailboxAdmin",
"CouponAdmin",
"ManualSubscriptionAdmin",
"CustomDomainAdmin",
"ReferralAdmin",
"AdminAuditLogAdmin",
"ProviderComplaintAdmin",
"NewsletterAdmin",
"NewsletterUserAdmin",
"DailyMetricAdmin",
"MetricAdmin",
"InvalidMailboxDomainAdmin",
"ForbiddenMxIpAdmin",
# Search views
"EmailSearchResult",
"EmailSearchHelpers",
"EmailSearchAdmin",
"CustomDomainWithValidationData",
"CustomDomainSearchResult",
"CustomDomainSearchAdmin",
"AbuserLookupResult",
"AbuserLookupAdmin",
"MailboxSearchResult",
"MailboxSearchAdmin",
"EmailDomainSearchResult",
"EmailDomainSearchAdmin",
]
+123
View File
@@ -0,0 +1,123 @@
from __future__ import annotations
import json
from datetime import datetime
from typing import Optional, List, Dict
from flask import redirect, url_for, request, flash
from flask_admin import BaseView, expose
from flask_login import current_user
from app.abuser_audit_log_utils import AbuserAuditLog
from app.abuser_utils import get_abuser_bundles_for_address
from app.models import User
from app.utils import sanitize_email
class AbuserLookupResult:
def __init__(self):
self.no_match: bool = False
self.query: Optional[str | int] = None
self.bundles: Optional[List[Dict]] = None
self.audit_log: Optional[List[Dict]] = None
@staticmethod
def from_email_or_user_id(query: str) -> AbuserLookupResult:
out = AbuserLookupResult()
email: str
audit_log: List[AbuserAuditLog] = []
if query is None or query == "":
out.no_match = True
return out
if query.isnumeric():
user_id = int(query)
user = User.get(user_id)
if not user:
out.no_match = True
return out
email = user.email
audit_log = AbuserAuditLog.filter(AbuserAuditLog.user_id == user.id).all()
else:
email = sanitize_email(query)
user = User.get_by(email=email)
if user:
audit_log = AbuserAuditLog.filter(
AbuserAuditLog.user_id == user.id
).all()
out.query = query
bundles = get_abuser_bundles_for_address(
target_address=email,
admin_id=current_user.id,
)
if not bundles:
out.no_match = True
return out
for bundle in bundles:
bundle_json = json.dumps(bundle)
bundle["json"] = bundle_json
user = User.get(int(bundle.get("account_id")))
bundle["user"] = user
AbuserLookupResult.convert_dt(bundle, "user_created_at")
for mailbox_item in bundle.get("mailboxes", []):
AbuserLookupResult.convert_dt(mailbox_item)
for alias_item in bundle.get("aliases", []):
AbuserLookupResult.convert_dt(alias_item)
out.bundles = bundles
out.audit_log = [
{
"admin_id": alog.admin_id,
"action": alog.action,
"message": alog.message,
"created_at": alog.created_at,
}
for alog in audit_log
]
return out
@staticmethod
def convert_dt(item: Dict, key: str = "created_at"):
raw_date = item.get(key, "")
if raw_date:
item[key] = datetime.fromisoformat(raw_date)
class AbuserLookupAdmin(BaseView):
def is_accessible(self):
return current_user.is_authenticated and current_user.is_admin
def inaccessible_callback(self, name, **kwargs):
flash("You don't have access to the admin page", "error")
return redirect(url_for("dashboard.index", next=request.url))
@expose("/", methods=["GET", "POST"])
def index(self):
query: Optional[str] = request.args.get("search")
if query is None:
result = AbuserLookupResult()
else:
result = AbuserLookupResult.from_email_or_user_id(query)
return self.render(
"admin/abuser_lookup.html",
data=result,
query=query,
)
+20
View File
@@ -0,0 +1,20 @@
from flask_admin.form import SecureForm
from app.admin.base import SLModelView, _admin_action_formatter, _admin_date_formatter
class AdminAuditLogAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["admin.id", "admin.email", "model_id", "created_at"]
column_filters = ["admin.id", "admin.email", "model_id", "created_at"]
column_exclude_list = ["id"]
column_hide_backrefs = False
can_edit = False
can_create = False
can_delete = False
column_formatters = {
"action": _admin_action_formatter,
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
+38
View File
@@ -0,0 +1,38 @@
from flask import flash
from flask_admin.actions import action
from flask_admin.form import SecureForm
from app.admin.base import SLModelView, _admin_date_formatter
from app.db import Session
from app.models import Alias
class AliasAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["id", "user.email", "email", "mailbox.email"]
column_filters = ["id", "user.email", "email", "mailbox.email"]
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
@action(
"disable_email_spoofing_check",
"Disable email spoofing protection",
"Disable email spoofing protection?",
)
def disable_email_spoofing_check_for(self, ids):
for alias in Alias.filter(Alias.id.in_(ids)):
if alias.disable_email_spoofing_check:
flash(
f"Email spoofing protection is already disabled on {alias.email}",
"warning",
)
else:
alias.disable_email_spoofing_check = True
flash(
f"Email spoofing protection is disabled on {alias.email}", "success"
)
Session.commit()
+89
View File
@@ -0,0 +1,89 @@
from __future__ import annotations
import sqlalchemy
from flask import redirect, url_for, request, flash
from flask_admin import expose, AdminIndexView
from flask_admin.contrib import sqla
from flask_login import current_user
from markupsafe import Markup
from app import models
from app.models import AdminAuditLog, AuditLogActionEnum
def _admin_action_formatter(view, context, model, name):
action_name = AuditLogActionEnum.get_name(model.action)
return "{} ({})".format(action_name, model.action)
def _admin_date_formatter(view, context, model, name):
return model.created_at.format()
def _user_upgrade_channel_formatter(view, context, model, name):
return Markup(model.upgrade_channel)
class SLModelView(sqla.ModelView):
column_default_sort = ("id", True)
column_display_pk = True
page_size = 100
can_edit = False
can_create = False
can_delete = False
edit_modal = True
def is_accessible(self):
return current_user.is_authenticated and current_user.is_admin
def inaccessible_callback(self, name, **kwargs):
# redirect to login page if user doesn't have access
flash("You don't have access to the admin page", "error")
return redirect(url_for("dashboard.index", next=request.url))
def on_model_change(self, form, model, is_created):
changes = {}
for attr in sqlalchemy.inspect(model).attrs:
if attr.history.has_changes() and attr.key not in (
"created_at",
"updated_at",
):
value = attr.value
# If it's a model reference, get the source id
if issubclass(type(value), models.Base):
value = value.id
# otherwise, if its a generic object stringify it
if issubclass(type(value), object):
value = str(value)
changes[attr.key] = value
auditAction = (
AuditLogActionEnum.create_object
if is_created
else AuditLogActionEnum.update_object
)
AdminAuditLog.create(
admin_user_id=current_user.id,
model=model.__class__.__name__,
model_id=model.id,
action=auditAction.value,
data=changes,
)
def on_model_delete(self, model):
AdminAuditLog.create(
admin_user_id=current_user.id,
model=model.__class__.__name__,
model_id=model.id,
action=AuditLogActionEnum.delete_object.value,
data={},
)
class SLAdminIndexView(AdminIndexView):
@expose("/")
def index(self):
if not current_user.is_authenticated or not current_user.is_admin:
return redirect(url_for("auth.login", next=request.url))
return redirect(url_for("admin.email_search.index"))
+14
View File
@@ -0,0 +1,14 @@
from flask_admin.form import SecureForm
from app.admin.base import SLModelView, _admin_date_formatter
class CouponAdmin(SLModelView):
form_base_class = SecureForm
can_edit = False
can_create = True
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
+15
View File
@@ -0,0 +1,15 @@
from flask_admin.form import SecureForm
from app.admin.base import SLModelView, _admin_date_formatter
class CustomDomainAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["domain", "user.email", "user.id"]
column_exclude_list = ["ownership_txt_token"]
can_edit = False
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
+154
View File
@@ -0,0 +1,154 @@
from __future__ import annotations
from typing import Optional
from flask import redirect, url_for, request, flash
from flask_admin import BaseView, expose
from flask_login import current_user
from app import config
from app.custom_domain_validation import (
CustomDomainValidation,
DomainValidationResult,
ExpectedValidationRecords,
)
from app.db import Session
from app.dns_utils import get_network_dns_client
from app.models import User, CustomDomain, AdminAuditLog, AuditLogActionEnum
class CustomDomainWithValidationData:
def __init__(self, domain: CustomDomain):
self.domain: CustomDomain = domain
self.ownership_expected: Optional[ExpectedValidationRecords] = None
self.ownership_validation: Optional[DomainValidationResult] = None
self.mx_expected: Optional[dict[int, ExpectedValidationRecords]] = None
self.mx_validation: Optional[DomainValidationResult] = None
self.spf_expected: Optional[ExpectedValidationRecords] = None
self.spf_validation: Optional[DomainValidationResult] = None
self.dkim_expected: {str: ExpectedValidationRecords} = {}
self.dkim_validation: {str: str} = {}
class CustomDomainSearchResult:
def __init__(self):
self.no_match: bool = False
self.user: Optional[User] = None
self.domains: list[CustomDomainWithValidationData] = []
@staticmethod
def from_user(user: Optional[User]) -> CustomDomainSearchResult:
out = CustomDomainSearchResult()
if user is None:
out.no_match = True
return out
out.user = user
dns_client = get_network_dns_client()
validator = CustomDomainValidation(
dkim_domain=config.EMAIL_DOMAIN,
partner_domains=config.PARTNER_DNS_CUSTOM_DOMAINS,
partner_domains_validation_prefixes=config.PARTNER_CUSTOM_DOMAIN_VALIDATION_PREFIXES,
dns_client=dns_client,
)
for custom_domain in user.custom_domains:
validation_data = CustomDomainWithValidationData(custom_domain)
if not custom_domain.ownership_verified:
validation_data.ownership_expected = (
validator.get_ownership_verification_record(custom_domain)
)
validation_data.ownership_validation = (
validator.validate_domain_ownership(custom_domain)
)
if not custom_domain.verified:
validation_data.mx_expected = validator.get_expected_mx_records(
custom_domain
)
validation_data.mx_validation = validator.validate_mx_records(
custom_domain
)
if not custom_domain.spf_verified:
validation_data.spf_expected = validator.get_expected_spf_record(
custom_domain
)
validation_data.spf_validation = validator.validate_spf_records(
custom_domain
)
if not custom_domain.dkim_verified:
validation_data.dkim_expected = validator.get_dkim_records(
custom_domain
)
validation_data.dkim_validation = validator.validate_dkim_records(
custom_domain
)
out.domains.append(validation_data)
return out
class CustomDomainSearchAdmin(BaseView):
def is_accessible(self):
return current_user.is_authenticated and current_user.is_admin
def inaccessible_callback(self, name, **kwargs):
# redirect to login page if user doesn't have access
flash("You don't have access to the admin page", "error")
return redirect(url_for("dashboard.index", next=request.url))
@expose("/", methods=["GET", "POST"])
def index(self):
query = request.args.get("user")
if query is None:
search = CustomDomainSearchResult()
else:
try:
user_id = int(query)
user = User.get_by(id=user_id)
except ValueError:
user = User.get_by(email=query)
if user is None:
cd = CustomDomain.get_by(domain=query)
if cd is not None:
user = cd.user
search = CustomDomainSearchResult.from_user(user)
return self.render(
"admin/custom_domain_search.html",
data=search,
query=query,
)
@expose("/delete_domain", methods=["POST"])
def delete_custom_domain(self):
domain_id = request.form.get("domain_id")
if not domain_id:
flash("Missing domain_id", "error")
return redirect(url_for("admin.custom_domain_search.index"))
try:
domain_id = int(domain_id)
except ValueError:
flash("Missing domain_id", "error")
return redirect(url_for("admin.custom_domain_search.index"))
domain: Optional[CustomDomain] = CustomDomain.get(domain_id)
if domain is None:
flash("Domain not found", "error")
return redirect(url_for("admin.custom_domain_search.index"))
domain_user_email = domain.user.email
domain_domain = domain.domain
from app.custom_domain_utils import delete_custom_domain
delete_custom_domain(domain)
AdminAuditLog.create(
admin_user_id=current_user.id,
model=CustomDomain.__class__.__name__,
model_id=domain_id,
action=AuditLogActionEnum.delete_custom_domain.value,
data={"domain": domain_domain},
)
Session.commit()
flash("Scheduled deletion of custom domain", "success")
return redirect(
url_for("admin.custom_domain_search.index", user=domain_user_email)
)
+88
View File
@@ -0,0 +1,88 @@
from __future__ import annotations
from typing import List
from flask import redirect, url_for, request, flash
from flask_admin import BaseView, expose
from flask_login import current_user
from sqlalchemy import or_
from app.abuser import mark_user_as_abuser
from app.models import User, Mailbox
from app.admin.email_search import EmailSearchHelpers
class EmailDomainSearchResult:
def __init__(self):
self.no_match: bool = False
self.users: List[User] = []
@staticmethod
def from_mailboxes(users: List[User]) -> EmailDomainSearchResult:
out = EmailDomainSearchResult()
if not users:
out.no_match = True
return out
out.users = users
return out
class EmailDomainSearchAdmin(BaseView):
def is_accessible(self):
return current_user.is_authenticated and current_user.is_admin
def inaccessible_callback(self, name, **kwargs):
# redirect to login page if user doesn't have access
flash("You don't have access to the admin page", "error")
return redirect(url_for("dashboard.index", next=request.url))
@expose("/disable_user", methods=["POST"])
def disable_user(self):
query = request.form.get("query")
user_id = request.form.get("user_id")
if not user_id:
return redirect(url_for("admin.email_domain_search.index", query=query))
user = User.get(int(user_id))
if not user:
flash(
f"Cannot find user with {user_id}",
"warning",
)
return redirect(url_for("admin.email_domain_search.index", query=query))
mark_user_as_abuser(
user,
f"Marked as abuser from the email domain search in the admin panel while searching for '{query}'",
)
flash(
f"Marked user {user.email} ({user.id}) as abuser",
"warning",
)
return redirect(url_for("admin.email_domain_search.index", query=query))
@expose("/", methods=["GET", "POST"])
def index(self):
query = request.args.get("query")
if not query:
search = EmailDomainSearchResult()
else:
users = (
User.query()
.join(Mailbox, User.id == Mailbox.user_id, isouter=True)
.filter(
User.disabled.isnot(True),
or_(
User.email.like(f"%@{query}"), Mailbox.email.like(f"%@{query}")
),
)
.order_by(User.id.asc())
.limit(50)
.all()
)
search = EmailDomainSearchResult.from_mailboxes(users)
return self.render(
"admin/mailbox_domain_search.html",
data=search,
query=query,
helper=EmailSearchHelpers,
)
+17
View File
@@ -0,0 +1,17 @@
from flask_admin.form import SecureForm
from app.admin.base import SLModelView, _admin_date_formatter
class EmailLogAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["id"]
column_filters = ["id", "user.email", "mailbox.email", "contact.website_email"]
can_edit = False
can_create = False
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
+460
View File
@@ -0,0 +1,460 @@
from __future__ import annotations
from typing import Optional, List
from flask import redirect, url_for, request, flash
from flask_admin import BaseView, expose
from flask_login import current_user
from app.db import Session
from app.errors import ProtonPartnerNotSetUp
from app.models import (
User,
AdminAuditLog,
Alias,
Mailbox,
DeletedAlias,
DomainDeletedAlias,
PartnerUser,
AliasMailbox,
AliasAuditLog,
UserAuditLog,
AuditLogActionEnum,
)
from app.proton.proton_partner import get_proton_partner
from app.proton.proton_unlink import perform_proton_account_unlink
class EmailSearchResult:
SEARCH_TYPE_ALIAS = "alias"
SEARCH_TYPE_EMAIL = "email"
def __init__(self):
self.no_match: bool = True
self.search_type: str = self.SEARCH_TYPE_EMAIL
self.query: str = ""
# Alias search results
self.aliases: List[Alias] = []
self.aliases_found_by_regex: bool = False
self.alias_audit_log: Optional[List[AliasAuditLog]] = None
self.deleted_aliases: List[DeletedAlias] = []
self.deleted_aliases_found_by_regex: bool = False
self.deleted_alias_audit_log: Optional[List[AliasAuditLog]] = None
self.domain_deleted_aliases: List[DomainDeletedAlias] = []
self.domain_deleted_aliases_found_by_regex: bool = False
self.domain_deleted_alias_audit_log: Optional[List[AliasAuditLog]] = None
# Email search results
self.mailboxes: List[Mailbox] = []
self.mailboxes_found_by_regex: bool = False
self.mailbox_count: int = 0
self.users: List[User] = []
self.users_found_by_regex: bool = False
self.user_audit_log: Optional[List[UserAuditLog]] = None
self.partner_users: List[PartnerUser] = []
self.partner_users_found_by_regex: bool = False
@staticmethod
def search_aliases(query: str) -> EmailSearchResult:
"""Search for aliases by exact match or POSIX regex."""
output = EmailSearchResult()
output.query = query
output.search_type = EmailSearchResult.SEARCH_TYPE_ALIAS
# Try exact match first
alias = Alias.get_by(email=query)
if alias:
output.aliases = [alias]
output.aliases_found_by_regex = False
output.alias_audit_log = (
AliasAuditLog.filter_by(alias_id=alias.id)
.order_by(AliasAuditLog.created_at.desc())
.all()
)
output.no_match = False
else:
# Try regex search
aliases = (
Alias.filter(Alias.email.op("~")(query))
.order_by(Alias.id.desc())
.limit(10)
.all()
)
if aliases:
output.aliases = aliases
output.aliases_found_by_regex = True
output.no_match = False
# Search deleted aliases
deleted_alias = DeletedAlias.get_by(email=query)
if deleted_alias:
output.deleted_aliases = [deleted_alias]
output.deleted_aliases_found_by_regex = False
output.deleted_alias_audit_log = (
AliasAuditLog.filter_by(alias_email=deleted_alias.email)
.order_by(AliasAuditLog.created_at.desc())
.all()
)
output.no_match = False
else:
# Try regex search for deleted aliases
deleted_aliases = (
DeletedAlias.filter(DeletedAlias.email.op("~")(query))
.order_by(DeletedAlias.id.desc())
.limit(10)
.all()
)
if deleted_aliases:
output.deleted_aliases = deleted_aliases
output.deleted_aliases_found_by_regex = True
output.no_match = False
# Search domain deleted aliases
domain_deleted_alias = DomainDeletedAlias.get_by(email=query)
if domain_deleted_alias:
output.domain_deleted_aliases = [domain_deleted_alias]
output.domain_deleted_aliases_found_by_regex = False
output.domain_deleted_alias_audit_log = (
AliasAuditLog.filter_by(alias_email=domain_deleted_alias.email)
.order_by(AliasAuditLog.created_at.desc())
.all()
)
output.no_match = False
else:
# Try regex search for domain deleted aliases
domain_deleted_aliases = (
DomainDeletedAlias.filter(DomainDeletedAlias.email.op("~")(query))
.order_by(DomainDeletedAlias.id.desc())
.limit(10)
.all()
)
if domain_deleted_aliases:
output.domain_deleted_aliases = domain_deleted_aliases
output.domain_deleted_aliases_found_by_regex = True
output.no_match = False
return output
@staticmethod
def search_emails(query: str) -> EmailSearchResult:
"""Search for mailboxes, users, and partner users by exact match or POSIX regex."""
output = EmailSearchResult()
output.query = query
output.search_type = EmailSearchResult.SEARCH_TYPE_EMAIL
# Search mailboxes
mailbox = Mailbox.get_by(email=query)
if mailbox:
output.mailboxes = [mailbox]
output.mailboxes_found_by_regex = False
output.mailbox_count = 1
output.no_match = False
else:
# Try regex search for mailboxes
mailboxes = (
Mailbox.filter(Mailbox.email.op("~")(query))
.order_by(Mailbox.id.desc())
.limit(10)
.all()
)
if mailboxes:
output.mailboxes = mailboxes
output.mailboxes_found_by_regex = True
output.mailbox_count = len(mailboxes)
output.no_match = False
# Search users
user = None
try:
user_id = int(query)
user = User.get(user_id)
except ValueError:
user = User.get_by(email=query)
if user:
output.users = [user]
output.users_found_by_regex = False
output.user_audit_log = (
UserAuditLog.filter_by(user_id=user.id)
.order_by(UserAuditLog.created_at.desc())
.all()
)
output.no_match = False
else:
# Try regex search for users
users = (
User.filter(User.email.op("~")(query))
.order_by(User.id.desc())
.limit(10)
.all()
)
if users:
output.users = users
output.users_found_by_regex = True
output.no_match = False
# Also check user audit log by user_email
if not output.users:
user_audit_log = (
UserAuditLog.filter_by(user_email=query)
.order_by(UserAuditLog.created_at.desc())
.all()
)
if user_audit_log:
output.user_audit_log = user_audit_log
output.no_match = False
# Search partner users
try:
proton_partner = get_proton_partner()
partner_user = PartnerUser.filter_by(
partner_id=proton_partner.id, partner_email=query
).first()
if partner_user:
output.partner_users = [partner_user]
output.partner_users_found_by_regex = False
output.no_match = False
else:
# Try regex search for partner users
partner_users = (
PartnerUser.filter(
PartnerUser.partner_id == proton_partner.id,
PartnerUser.partner_email.op("~")(query),
)
.order_by(PartnerUser.id.desc())
.limit(10)
.all()
)
if partner_users:
output.partner_users = partner_users
output.partner_users_found_by_regex = True
output.no_match = False
except ProtonPartnerNotSetUp:
# Proton partner not configured, skip this search
pass
return output
@staticmethod
def from_request(query: str, search_type: str) -> EmailSearchResult:
"""Main entry point for searching based on search type."""
if search_type == EmailSearchResult.SEARCH_TYPE_ALIAS:
return EmailSearchResult.search_aliases(query)
else:
return EmailSearchResult.search_emails(query)
class EmailSearchHelpers:
@staticmethod
def mailbox_list(user: User) -> list[Mailbox]:
return (
Mailbox.filter_by(user_id=user.id)
.order_by(Mailbox.id.asc())
.limit(10)
.all()
)
@staticmethod
def mailbox_count(user: User) -> int:
return Mailbox.filter_by(user_id=user.id).order_by(Mailbox.id.desc()).count()
@staticmethod
def alias_mailboxes(alias: Alias) -> list[Mailbox]:
return (
Session.query(Mailbox)
.filter(Mailbox.id == Alias.mailbox_id, Alias.id == alias.id)
.union(
Session.query(Mailbox)
.join(AliasMailbox, Mailbox.id == AliasMailbox.mailbox_id)
.filter(AliasMailbox.alias_id == alias.id)
)
.order_by(Mailbox.id)
.limit(10)
.all()
)
@staticmethod
def alias_mailbox_count(alias: Alias) -> int:
return len(alias.mailboxes)
@staticmethod
def alias_list(user: User) -> list[Alias]:
return (
Alias.filter_by(user_id=user.id).order_by(Alias.id.desc()).limit(10).all()
)
@staticmethod
def alias_count(user: User) -> int:
return Alias.filter_by(user_id=user.id).count()
@staticmethod
def partner_user(user: User) -> Optional[PartnerUser]:
return PartnerUser.get_by(user_id=user.id)
class EmailSearchAdmin(BaseView):
def is_accessible(self):
return current_user.is_authenticated and current_user.is_admin
def inaccessible_callback(self, name, **kwargs):
# redirect to login page if user doesn't have access
flash("You don't have access to the admin page", "error")
return redirect(url_for("dashboard.index", next=request.url))
@expose("/", methods=["GET", "POST"])
def index(self):
search = EmailSearchResult()
query = request.args.get("query")
search_type = request.args.get(
"search_type", EmailSearchResult.SEARCH_TYPE_EMAIL
)
if query is not None and len(query) > 0:
query = query.strip()
search = EmailSearchResult.from_request(query, search_type)
return self.render(
"admin/email_search.html",
query=query,
search_type=search_type,
data=search,
helper=EmailSearchHelpers,
)
@expose("/partner_unlink", methods=["POST"])
def delete_partner_link(self):
user_id = request.form.get("user_id")
if not user_id:
flash("Missing user_id", "error")
return redirect(url_for("admin.email_search.index"))
try:
user_id = int(user_id)
except ValueError:
flash("Missing user_id", "error")
return redirect(url_for("admin.email_search.index", query=user_id))
user = User.get(user_id)
if user is None:
flash("User not found", "error")
return redirect(url_for("admin.email_search.index", query=user_id))
external_user_id = perform_proton_account_unlink(user, skip_check=True)
if not external_user_id:
flash("User unlinked", "success")
return redirect(url_for("admin.email_search.index", query=user_id))
AdminAuditLog.create(
admin_user_id=user.id,
model=User.__class__.__name__,
model_id=user.id,
action=AuditLogActionEnum.unlink_user.value,
data={"external_user_id": external_user_id},
)
Session.commit()
return redirect(url_for("admin.email_search.index", query=user_id))
@expose("/stop_user_deletion", methods=["POST"])
def stop_user_deletion(self):
user_id = request.form.get("user_id")
if not user_id:
flash("Missing user_id", "error")
return redirect(url_for("admin.email_search.index"))
try:
user_id = int(user_id)
except ValueError:
flash("Invalid user_id", "error")
return redirect(url_for("admin.email_search.index"))
user = User.get(user_id)
if user is None:
flash("User not found", "error")
return redirect(url_for("admin.email_search.index", query=user_id))
if user.delete_on is None:
flash("User is not scheduled for deletion", "warning")
return redirect(url_for("admin.email_search.index", query=user.email))
user.delete_on = None
AdminAuditLog.clear_delete_on(current_user.id, user.id)
Session.commit()
flash(f"Cancelled scheduled deletion for user {user.email}", "success")
return redirect(url_for("admin.email_search.index", query=user.email))
@expose("/update_subdomain_quota", methods=["POST"])
def update_subdomain_quota(self):
user_id = request.form.get("user_id")
new_quota = request.form.get("subdomain_quota")
if not user_id:
flash("Missing user_id", "error")
return redirect(url_for("admin.email_search.index"))
if not new_quota:
flash("Missing subdomain quota value", "error")
return redirect(url_for("admin.email_search.index"))
try:
user_id = int(user_id)
new_quota = int(new_quota)
except ValueError:
flash("Invalid user_id or quota value", "error")
return redirect(url_for("admin.email_search.index"))
if new_quota < 0:
flash("Subdomain quota cannot be negative", "error")
return redirect(url_for("admin.email_search.index"))
user = User.get(user_id)
if user is None:
flash("User not found", "error")
return redirect(url_for("admin.email_search.index", query=user_id))
old_quota = user._subdomain_quota
user._subdomain_quota = new_quota
AdminAuditLog.update_subdomain_quota(
current_user.id, user.id, old_quota, new_quota
)
Session.commit()
flash(
f"Updated subdomain quota for user {user.email} from {old_quota} to {new_quota}",
"success",
)
return redirect(url_for("admin.email_search.index", query=user.email))
@expose("/update_directory_quota", methods=["POST"])
def update_directory_quota(self):
user_id = request.form.get("user_id")
new_quota = request.form.get("directory_quota")
if not user_id:
flash("Missing user_id", "error")
return redirect(url_for("admin.email_search.index"))
if not new_quota:
flash("Missing directory quota value", "error")
return redirect(url_for("admin.email_search.index"))
try:
user_id = int(user_id)
new_quota = int(new_quota)
except ValueError:
flash("Invalid user_id or quota value", "error")
return redirect(url_for("admin.email_search.index"))
if new_quota < 0:
flash("Directory quota cannot be negative", "error")
return redirect(url_for("admin.email_search.index"))
user = User.get(user_id)
if user is None:
flash("User not found", "error")
return redirect(url_for("admin.email_search.index", query=user_id))
old_quota = user._directory_quota
user._directory_quota = new_quota
AdminAuditLog.update_directory_quota(
current_user.id, user.id, old_quota, new_quota
)
Session.commit()
flash(
f"Updated directory quota for user {user.email} from {old_quota} to {new_quota}",
"success",
)
return redirect(url_for("admin.email_search.index", query=user.email))
+9
View File
@@ -0,0 +1,9 @@
from flask_admin.form import SecureForm
from app.admin.base import SLModelView
class ForbiddenMxIpAdmin(SLModelView):
form_base_class = SecureForm
can_create = True
can_delete = True
+75
View File
@@ -0,0 +1,75 @@
from flask import Flask
from flask_admin import Admin
from app.db import Session
from app.models import (
User,
Alias,
Mailbox,
Coupon,
ManualSubscription,
CustomDomain,
AdminAuditLog,
ProviderComplaint,
Newsletter,
NewsletterUser,
DailyMetric,
Metric2,
InvalidMailboxDomain,
ForbiddenMxIp,
)
from app.admin.base import SLAdminIndexView
from app.admin.user import UserAdmin
from app.admin.alias import AliasAdmin
from app.admin.mailbox import MailboxAdmin
from app.admin.coupon import CouponAdmin
from app.admin.manual_subscription import ManualSubscriptionAdmin
from app.admin.custom_domain import CustomDomainAdmin
from app.admin.admin_audit_log import AdminAuditLogAdmin
from app.admin.provider_complaint import ProviderComplaintAdmin
from app.admin.newsletter import NewsletterAdmin, NewsletterUserAdmin
from app.admin.metrics import DailyMetricAdmin, MetricAdmin
from app.admin.invalid_mailbox_domain import InvalidMailboxDomainAdmin
from app.admin.forbidden_mx_ip import ForbiddenMxIpAdmin
from app.admin.email_search import EmailSearchAdmin
from app.admin.custom_domain_search import CustomDomainSearchAdmin
from app.admin.abuser_lookup import AbuserLookupAdmin
from app.admin.mailbox_search import MailboxSearchAdmin
from app.admin.email_domain_search import EmailDomainSearchAdmin
def init_admin(app: Flask):
admin = Admin(name="SimpleLogin", template_mode="bootstrap4")
admin.init_app(app, index_view=SLAdminIndexView())
admin.add_view(EmailSearchAdmin(name="Email Search", endpoint="admin.email_search"))
admin.add_view(
MailboxSearchAdmin(name="Mailbox search", endpoint="admin.mailbox_search")
)
admin.add_view(
CustomDomainSearchAdmin(
name="Custom domain search", endpoint="admin.custom_domain_search"
)
)
admin.add_view(
EmailDomainSearchAdmin(
name="Email domain search", endpoint="admin.email_domain_search"
)
)
admin.add_view(
AbuserLookupAdmin(name="Abuser Lookup", endpoint="admin.abuser_lookup")
)
admin.add_view(UserAdmin(User, Session))
admin.add_view(AliasAdmin(Alias, Session))
admin.add_view(MailboxAdmin(Mailbox, Session))
admin.add_view(CouponAdmin(Coupon, Session))
admin.add_view(ManualSubscriptionAdmin(ManualSubscription, Session))
admin.add_view(CustomDomainAdmin(CustomDomain, Session))
admin.add_view(AdminAuditLogAdmin(AdminAuditLog, Session))
admin.add_view(ProviderComplaintAdmin(ProviderComplaint, Session))
admin.add_view(NewsletterAdmin(Newsletter, Session))
admin.add_view(NewsletterUserAdmin(NewsletterUser, Session))
admin.add_view(DailyMetricAdmin(DailyMetric, Session))
admin.add_view(MetricAdmin(Metric2, Session))
admin.add_view(InvalidMailboxDomainAdmin(InvalidMailboxDomain, Session))
admin.add_view(ForbiddenMxIpAdmin(ForbiddenMxIp, Session))
+9
View File
@@ -0,0 +1,9 @@
from flask_admin.form import SecureForm
from app.admin.base import SLModelView
class InvalidMailboxDomainAdmin(SLModelView):
form_base_class = SecureForm
can_create = True
can_delete = True
+14
View File
@@ -0,0 +1,14 @@
from flask_admin.form import SecureForm
from app.admin.base import SLModelView, _admin_date_formatter
class MailboxAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["id", "user.email", "email"]
column_filters = ["id", "user.email", "email"]
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
+77
View File
@@ -0,0 +1,77 @@
from __future__ import annotations
from typing import Optional, List
from flask import redirect, url_for, request, flash
from flask_admin import BaseView, expose
from flask_login import current_user
from sqlalchemy import or_, and_
from app.db import Session
from app.models import Alias, Mailbox, AliasMailbox
from app.admin.email_search import EmailSearchHelpers
class MailboxSearchResult:
def __init__(self):
self.no_match: bool = False
self.mailbox: Optional[Mailbox] = None
self.aliases: List[Alias] = []
@staticmethod
def from_mailbox(mbox: Optional[Mailbox]) -> MailboxSearchResult:
out = MailboxSearchResult()
if mbox is None:
out.no_match = True
return out
out.mailbox = mbox
out.aliases = mbox.aliases[:10]
out.aliases = (
Session.query(Alias)
.join(
AliasMailbox,
and_(
AliasMailbox.alias_id == Alias.id,
AliasMailbox.mailbox_id == mbox.id,
),
isouter=True,
)
.filter(
or_(Alias.mailbox_id == mbox.id, AliasMailbox.mailbox_id == mbox.id)
)
.order_by(Alias.id)
.limit(10)
.all()
)
return out
class MailboxSearchAdmin(BaseView):
def is_accessible(self):
return current_user.is_authenticated and current_user.is_admin
def inaccessible_callback(self, name, **kwargs):
# redirect to login page if user doesn't have access
flash("You don't have access to the admin page", "error")
return redirect(url_for("dashboard.index", next=request.url))
@expose("/", methods=["GET", "POST"])
def index(self):
query = request.args.get("query")
if query is None:
search = MailboxSearchResult()
else:
try:
mailbox_id = int(query)
mailbox = Mailbox.get_by(id=mailbox_id)
except ValueError:
mailbox = Mailbox.get_by(email=query)
search = MailboxSearchResult.from_mailbox(mailbox)
return self.render(
"admin/mailbox_search.html",
data=search,
query=query,
helper=EmailSearchHelpers,
)
+62
View File
@@ -0,0 +1,62 @@
from typing import List
from flask import flash
from flask_admin.actions import action
from flask_admin.form import SecureForm
from flask_login import current_user
from app.admin.base import SLModelView, _admin_date_formatter
from app.db import Session
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import EventContent, UserPlanChanged
from app.models import ManualSubscription, AdminAuditLog
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
class ManualSubscriptionAdmin(SLModelView):
form_base_class = SecureForm
can_edit = True
column_searchable_list = ["id", "user.email"]
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
@action(
"extend_1y",
"Extend for 1 year",
"Extend 1 year more?",
)
def extend_1y(self, ids):
self.__extend_manual_subscription(ids, msg="1 year", years=1)
@action(
"extend_1m",
"Extend for 1 month",
"Extend 1 month more?",
)
def extend_1m(self, ids):
self.__extend_manual_subscription(ids, msg="1 month", months=1)
def __extend_manual_subscription(self, ids: List[int], msg: str, **kwargs):
for ms in ManualSubscription.filter(ManualSubscription.id.in_(ids)):
sub: ManualSubscription = ms
sub.end_at = sub.end_at.shift(**kwargs)
flash(f"Extend subscription for {msg} for {sub.user}", "success")
emit_user_audit_log(
user=sub.user,
action=UserAuditLogAction.Upgrade,
message=f"Admin {current_user.email} extended manual subscription for {msg} for {sub.user}",
)
AdminAuditLog.extend_subscription(
current_user.id, sub.user.id, sub.end_at, msg
)
EventDispatcher.send_event(
user=sub.user,
content=EventContent(
user_plan_change=UserPlanChanged(plan_end_time=sub.end_at.timestamp)
),
)
Session.commit()
+17
View File
@@ -0,0 +1,17 @@
from flask_admin.form import SecureForm
from app.admin.base import SLModelView
class DailyMetricAdmin(SLModelView):
form_base_class = SecureForm
column_exclude_list = ["created_at", "updated_at", "id"]
can_export = True
class MetricAdmin(SLModelView):
form_base_class = SecureForm
column_exclude_list = ["created_at", "updated_at", "id"]
can_export = True
+111
View File
@@ -0,0 +1,111 @@
from flask import flash, request
from flask_admin.actions import action
from flask_admin.form import SecureForm
from flask_login import current_user
from markupsafe import Markup
from app.admin.base import SLModelView
from app.models import User, Newsletter
from app.newsletter_utils import send_newsletter_to_user, send_newsletter_to_address
def _newsletter_plain_text_formatter(view, context, model: Newsletter, name):
# to display newsletter plain_text with linebreaks in the list view
return Markup(model.plain_text.replace("\n", "<br>"))
def _newsletter_html_formatter(view, context, model: Newsletter, name):
# to display newsletter html with linebreaks in the list view
return Markup(model.html.replace("\n", "<br>"))
class NewsletterAdmin(SLModelView):
form_base_class = SecureForm
list_template = "admin/model/newsletter-list.html"
edit_template = "admin/model/newsletter-edit.html"
edit_modal = False
can_edit = True
can_create = True
column_formatters = {
"plain_text": _newsletter_plain_text_formatter,
"html": _newsletter_html_formatter,
}
@action(
"send_newsletter_to_user",
"Send this newsletter to myself or the specified userID",
)
def send_newsletter_to_user(self, newsletter_ids):
user_id = request.form["user_id"]
if user_id:
user = User.get(user_id)
if not user:
flash(f"No such user with ID {user_id}", "error")
return
else:
flash("use the current user", "info")
user = current_user
for newsletter_id in newsletter_ids:
newsletter = Newsletter.get(newsletter_id)
sent, error_msg = send_newsletter_to_user(newsletter, user)
if sent:
flash(f"{newsletter} sent to {user}", "success")
else:
flash(error_msg, "error")
@action(
"send_newsletter_to_address",
"Send this newsletter to a specific address",
)
def send_newsletter_to_address(self, newsletter_ids):
to_address = request.form["to_address"]
if not to_address:
flash("to_address missing", "error")
return
for newsletter_id in newsletter_ids:
newsletter = Newsletter.get(newsletter_id)
# use the current_user for rendering email
sent, error_msg = send_newsletter_to_address(
newsletter, current_user, to_address
)
if sent:
flash(
f"{newsletter} sent to {to_address} with {current_user} context",
"success",
)
else:
flash(error_msg, "error")
@action(
"clone_newsletter",
"Clone this newsletter",
)
def clone_newsletter(self, newsletter_ids):
if len(newsletter_ids) != 1:
flash("you can only select 1 newsletter", "error")
return
newsletter_id = newsletter_ids[0]
newsletter: Newsletter = Newsletter.get(newsletter_id)
new_newsletter = Newsletter.create(
subject=newsletter.subject,
html=newsletter.html,
plain_text=newsletter.plain_text,
commit=True,
)
flash(f"Newsletter {new_newsletter.subject} has been cloned", "success")
class NewsletterUserAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["id"]
column_filters = ["id", "user.email", "newsletter.subject"]
column_exclude_list = ["created_at", "updated_at", "id"]
can_edit = False
can_create = False
+94
View File
@@ -0,0 +1,94 @@
from typing import Optional
from flask import redirect, url_for, request, flash, Response
from flask_admin import expose
from flask_admin.form import SecureForm
from flask_admin.model.template import EndpointLinkRowAction
from flask_login import current_user
from markupsafe import Markup
from app import s3
from app.admin.base import SLModelView, _admin_date_formatter
from app.db import Session
from app.models import (
AdminAuditLog,
ProviderComplaintState,
Phase,
ProviderComplaint,
)
def _transactionalcomplaint_state_formatter(view, context, model, name):
return "{} ({})".format(ProviderComplaintState(model.state).name, model.state)
def _transactionalcomplaint_phase_formatter(view, context, model, name):
return Phase(model.phase).name
def _transactionalcomplaint_refused_email_id_formatter(view, context, model, name):
markupstring = "<a href='{}'>{}</a>".format(
url_for(".download_eml", id=model.id), model.refused_email.full_report_path
)
return Markup(markupstring)
class ProviderComplaintAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["id", "user.id", "created_at"]
column_filters = ["user.id", "state"]
column_hide_backrefs = False
can_edit = False
can_create = False
can_delete = False
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
"state": _transactionalcomplaint_state_formatter,
"phase": _transactionalcomplaint_phase_formatter,
"refused_email": _transactionalcomplaint_refused_email_id_formatter,
}
column_extra_row_actions = [ # Add a new action button
EndpointLinkRowAction("fa fa-check-square", ".mark_ok"),
]
def _get_complaint(self) -> Optional[ProviderComplaint]:
complain_id = request.args.get("id")
if complain_id is None:
flash("Missing id", "error")
return None
complaint = ProviderComplaint.get_by(id=complain_id)
if not complaint:
flash("Could not find complaint", "error")
return None
return complaint
@expose("/mark_ok", methods=["GET"])
def mark_ok(self):
complaint = self._get_complaint()
if not complaint:
return redirect("/admin/transactionalcomplaint/")
complaint.state = ProviderComplaintState.reviewed.value
Session.commit()
return redirect("/admin/transactionalcomplaint/")
@expose("/download_eml", methods=["GET"])
def download_eml(self):
complaint = self._get_complaint()
if not complaint:
return redirect("/admin/transactionalcomplaint/")
eml_path = complaint.refused_email.full_report_path
eml_data = s3.download_email(eml_path)
AdminAuditLog.downloaded_provider_complaint(current_user.id, complaint.id)
Session.commit()
return Response(
eml_data,
mimetype="message/rfc822",
headers={
"Content-Disposition": "attachment;filename={}".format(
complaint.refused_email.path
)
},
)
+20
View File
@@ -0,0 +1,20 @@
from flask_admin.form import SecureForm
from app.admin.base import SLModelView, _admin_date_formatter
class ReferralAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["id", "user.email", "code", "name"]
column_filters = ["id", "user.email", "code", "name"]
column_formatters = {
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
def scaffold_list_columns(self):
ret = super().scaffold_list_columns()
ret.insert(0, "nb_user")
ret.insert(0, "nb_paid_user")
return ret
+287
View File
@@ -0,0 +1,287 @@
from __future__ import annotations
import arrow
from flask import flash
from flask_admin.actions import action
from flask_admin.form import SecureForm
from flask_login import current_user
from app.abuser import mark_user_as_abuser, unmark_as_abusive_user
from app.admin.base import (
SLModelView,
_admin_date_formatter,
_user_upgrade_channel_formatter,
)
from app.db import Session
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import EventContent, UserPlanChanged
from app.models import (
User,
ManualSubscription,
Fido,
Subscription,
AppleSubscription,
AdminAuditLog,
PADDLE_SUBSCRIPTION_GRACE_DAYS,
)
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
def manual_upgrade(way: str, ids: [int], is_giveaway: bool):
for user in User.filter(User.id.in_(ids)).all():
if user.lifetime:
flash(f"user {user} already has a lifetime license", "warning")
continue
sub: Subscription = user.get_paddle_subscription()
if sub and not sub.cancelled:
flash(
f"user {user} already has a Paddle license, they have to cancel it first",
"warning",
)
continue
apple_sub: AppleSubscription = AppleSubscription.get_by(user_id=user.id)
if apple_sub and apple_sub.is_valid():
flash(
f"user {user} already has a Apple subscription, they have to cancel it first",
"warning",
)
continue
AdminAuditLog.create_manual_upgrade(current_user.id, way, user.id, is_giveaway)
manual_sub: ManualSubscription = ManualSubscription.get_by(user_id=user.id)
if manual_sub:
# renew existing subscription
if manual_sub.end_at > arrow.now():
manual_sub.end_at = manual_sub.end_at.shift(years=1)
else:
manual_sub.end_at = arrow.now().shift(years=1, days=1)
emit_user_audit_log(
user=user,
action=UserAuditLogAction.Upgrade,
message=f"Admin {current_user.email} extended manual subscription to user {user.email}",
)
EventDispatcher.send_event(
user=user,
content=EventContent(
user_plan_change=UserPlanChanged(
plan_end_time=manual_sub.end_at.timestamp
)
),
)
flash(f"Subscription extended to {manual_sub.end_at.humanize()}", "success")
else:
emit_user_audit_log(
user=user,
action=UserAuditLogAction.Upgrade,
message=f"Admin {current_user.email} created manual subscription to user {user.email}",
)
manual_sub = ManualSubscription.create(
user_id=user.id,
end_at=arrow.now().shift(years=1, days=1),
comment=way,
is_giveaway=is_giveaway,
)
EventDispatcher.send_event(
user=user,
content=EventContent(
user_plan_change=UserPlanChanged(
plan_end_time=manual_sub.end_at.timestamp
)
),
)
flash(f"New {way} manual subscription for {user} is created", "success")
Session.commit()
class UserAdmin(SLModelView):
form_base_class = SecureForm
column_searchable_list = ["email", "id"]
column_exclude_list = [
"salt",
"password",
"otp_secret",
"last_otp",
"fido_uuid",
"profile_picture",
]
can_edit = False
def scaffold_list_columns(self):
ret = super().scaffold_list_columns()
ret.insert(0, "upgrade_channel")
return ret
column_formatters = {
"upgrade_channel": _user_upgrade_channel_formatter,
"created_at": _admin_date_formatter,
"updated_at": _admin_date_formatter,
}
@action(
"disable_user",
"Disable user",
"Are you sure you want to disable the selected users?",
)
def action_disable_user(self, ids):
for user in User.filter(User.id.in_(ids)):
mark_user_as_abuser(
user, f"An user {user.id} was marked as abuser.", current_user.id
)
flash(f"Disabled user {user.id}")
AdminAuditLog.disable_user(current_user.id, user.id)
Session.commit()
@action(
"enable_user",
"Enable user",
"Are you sure you want to enable the selected users?",
)
def action_enable_user(self, ids):
for user in User.filter(User.id.in_(ids)):
unmark_as_abusive_user(
user.id, f"An user {user.id} was unmarked as abuser.", current_user.id
)
flash(f"Enabled user {user.id}")
AdminAuditLog.enable_user(current_user.id, user.id)
Session.commit()
@action(
"education_upgrade",
"Education upgrade",
"Are you sure you want to edu-upgrade selected users?",
)
def action_edu_upgrade(self, ids):
manual_upgrade("Edu", ids, is_giveaway=True)
@action(
"charity_org_upgrade",
"Charity Organization upgrade",
"Are you sure you want to upgrade selected users using the Charity organization program?",
)
def action_charity_org_upgrade(self, ids):
manual_upgrade("Charity Organization", ids, is_giveaway=True)
@action(
"journalist_upgrade",
"Journalist upgrade",
"Are you sure you want to upgrade selected users using the Journalist program?",
)
def action_journalist_upgrade(self, ids):
manual_upgrade("Journalist", ids, is_giveaway=True)
@action(
"cash_upgrade",
"Cash upgrade",
"Are you sure you want to cash-upgrade selected users?",
)
def action_cash_upgrade(self, ids):
manual_upgrade("Cash", ids, is_giveaway=False)
@action(
"crypto_upgrade",
"Crypto upgrade",
"Are you sure you want to crypto-upgrade selected users?",
)
def action_monero_upgrade(self, ids):
manual_upgrade("Crypto", ids, is_giveaway=False)
@action(
"adhoc_upgrade",
"Adhoc upgrade - for exceptional case",
"Are you sure you want to crypto-upgrade selected users?",
)
def action_adhoc_upgrade(self, ids):
manual_upgrade("Adhoc", ids, is_giveaway=False)
@action(
"extend_trial_1w",
"Extend trial for 1 week more",
"Extend trial for 1 week more?",
)
def extend_trial_1w(self, ids):
for user in User.filter(User.id.in_(ids)):
if user.trial_end and user.trial_end > arrow.now():
user.trial_end = user.trial_end.shift(weeks=1)
else:
user.trial_end = arrow.now().shift(weeks=1)
flash(f"Extend trial for {user} to {user.trial_end}", "success")
AdminAuditLog.extend_trial(
current_user.id, user.id, user.trial_end, "1 week"
)
Session.commit()
@action(
"remove trial",
"Stop trial period",
"Remove trial for this user?",
)
def stop_trial(self, ids):
for user in User.filter(User.id.in_(ids)):
user.trial_end = None
flash(f"Stopped trial for {user}", "success")
AdminAuditLog.stop_trial(current_user.id, user.id)
Session.commit()
@action(
"disable_otp_fido",
"Disable OTP & FIDO",
"Disable OTP & FIDO?",
)
def disable_otp_fido(self, ids):
for user in User.filter(User.id.in_(ids)):
user_had_otp = user.enable_otp
if user.enable_otp:
user.enable_otp = False
flash(f"Disable OTP for {user}", "info")
user_had_fido = user.fido_uuid is not None
if user.fido_uuid:
Fido.filter_by(uuid=user.fido_uuid).delete()
user.fido_uuid = None
flash(f"Disable FIDO for {user}", "info")
AdminAuditLog.disable_otp_fido(
current_user.id, user.id, user_had_otp, user_had_fido
)
Session.commit()
@action(
"stop_paddle_sub",
"Stop user Paddle subscription",
"This will stop the current user Paddle subscription so if user doesn't have Proton sub, they will lose all SL benefits immediately",
)
def stop_paddle_sub(self, ids):
for user in User.filter(User.id.in_(ids)):
sub: Subscription = user.get_paddle_subscription()
if not sub:
flash(f"No Paddle sub for {user}", "warning")
continue
flash(f"{user} sub will end now, instead of {sub.next_bill_date}", "info")
sub.next_bill_date = (
arrow.now().shift(days=-PADDLE_SUBSCRIPTION_GRACE_DAYS).date()
)
Session.commit()
@action(
"clear_delete_on",
"Remove scheduled deletion of user",
"This will remove the scheduled deletion for this users",
)
def clean_delete_on(self, ids):
for user in User.filter(User.id.in_(ids)):
user.delete_on = None
Session.commit()
-1500
View File
File diff suppressed because it is too large Load Diff
+1 -71
View File
@@ -19,7 +19,6 @@ from flask import (
session,
g,
)
from flask_admin import Admin
from flask_cors import cross_origin, CORS
from flask_login import current_user
from sentry_sdk.integrations.flask import FlaskIntegration
@@ -27,28 +26,7 @@ from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from werkzeug.middleware.proxy_fix import ProxyFix
from app import config, constants
from app.admin_model import (
SLAdminIndexView,
UserAdmin,
AliasAdmin,
MailboxAdmin,
ManualSubscriptionAdmin,
CouponAdmin,
CustomDomainAdmin,
AdminAuditLogAdmin,
ProviderComplaintAdmin,
NewsletterAdmin,
NewsletterUserAdmin,
DailyMetricAdmin,
MetricAdmin,
InvalidMailboxDomainAdmin,
EmailSearchAdmin,
CustomDomainSearchAdmin,
AbuserLookupAdmin,
ForbiddenMxIpAdmin,
MailboxSearchAdmin,
EmailDomainSearchAdmin,
)
from app.admin import init_admin
from app.api.base import api_bp
from app.auth.base import auth_bp
from app.build_info import SHA1
@@ -88,21 +66,10 @@ from app.jose_utils import get_jwk_key
from app.log import LOG
from app.models import (
User,
Alias,
CustomDomain,
Mailbox,
EmailLog,
Contact,
ManualSubscription,
Coupon,
AdminAuditLog,
ProviderComplaint,
Newsletter,
NewsletterUser,
DailyMetric,
Metric2,
InvalidMailboxDomain,
ForbiddenMxIp,
)
from app.monitor.base import monitor_bp
from app.monitor_utils import send_version_event
@@ -453,43 +420,6 @@ def init_extensions(app: Flask):
login_manager.init_app(app)
def init_admin(app):
admin = Admin(name="SimpleLogin", template_mode="bootstrap4")
admin.init_app(app, index_view=SLAdminIndexView())
admin.add_view(EmailSearchAdmin(name="Email Search", endpoint="admin.email_search"))
admin.add_view(
MailboxSearchAdmin(name="Mailbox search", endpoint="admin.mailbox_search")
)
admin.add_view(
CustomDomainSearchAdmin(
name="Custom domain search", endpoint="admin.custom_domain_search"
)
)
admin.add_view(
EmailDomainSearchAdmin(
name="Email domain search", endpoint="admin.email_domain_search"
)
)
admin.add_view(
AbuserLookupAdmin(name="Abuser Lookup", endpoint="admin.abuser_lookup")
)
admin.add_view(UserAdmin(User, Session))
admin.add_view(AliasAdmin(Alias, Session))
admin.add_view(MailboxAdmin(Mailbox, Session))
admin.add_view(CouponAdmin(Coupon, Session))
admin.add_view(ManualSubscriptionAdmin(ManualSubscription, Session))
admin.add_view(CustomDomainAdmin(CustomDomain, Session))
admin.add_view(AdminAuditLogAdmin(AdminAuditLog, Session))
admin.add_view(ProviderComplaintAdmin(ProviderComplaint, Session))
admin.add_view(NewsletterAdmin(Newsletter, Session))
admin.add_view(NewsletterUserAdmin(NewsletterUser, Session))
admin.add_view(DailyMetricAdmin(DailyMetric, Session))
admin.add_view(MetricAdmin(Metric2, Session))
admin.add_view(InvalidMailboxDomainAdmin(InvalidMailboxDomain, Session))
admin.add_view(ForbiddenMxIpAdmin(ForbiddenMxIp, Session))
def register_custom_commands(app):
"""
Adhoc commands run during data migration.
+305 -72
View File
@@ -1,5 +1,13 @@
{% extends 'admin/master.html' %}
{% macro highlight_email(email, query, is_regex) -%}
{% if is_regex %}
<mark>{{ email }}</mark>
{% else %}
{{ email }}
{% endif %}
{%- endmacro %}
{% macro show_user(user) -%}
<h4>User {{ user.email }} with ID {{ user.id }}.</h4>
{% set pu = helper.partner_user(user) %}
@@ -26,7 +34,7 @@
<tr>
<td>{{ user.id }}</td>
<td>
<a href="?query={{ user.email }}">{{ user.email }}</a>
<a href="?query={{ user.email }}&search_type=email">{{ user.email }}</a>
</td>
{% if user.activated %}
@@ -54,7 +62,7 @@
{% if pu %}
<td class="flex">
<a href="?query={{ pu.partner_email }}">{{ pu.partner_email }}</a>
<a href="?query={{ pu.partner_email }}&search_type=email">{{ pu.partner_email }}</a>
<form class="d-inline"
action="{{ url_for("admin.email_search.delete_partner_link") }}"
method="POST">
@@ -135,10 +143,11 @@
</div>
</div>
{%- endmacro %}
{% macro list_mailboxes(message, mbox_count, mboxes) %}
{% macro list_mailboxes(message, mbox_count, mboxes, is_regex) %}
<h4>
{{ mbox_count }} {{ message }}.
{% if mbox_count>10 %}Showing only the last 10.{% endif %}
{% if is_regex %}<span class="badge bg-info">Found by regex</span>{% endif %}
</h4>
<table class="table">
<thead>
@@ -155,7 +164,9 @@
<tr>
<td>{{ mailbox.id }}</td>
<td>
<a href="?query={{ mailbox.email }}">{{ mailbox.email }}</a>
<a href="?query={{ mailbox.email }}&search_type=email">
{{ highlight_email(mailbox.email, query, is_regex) }}
</a>
</td>
<td>{{ "Yes" if mailbox.verified else "No" }}</td>
<td>{{ mailbox.created_at }}</td>
@@ -164,10 +175,11 @@
</tbody>
</table>
{% endmacro %}
{% macro list_alias(alias_count, aliases) %}
{% macro list_aliases(alias_count, aliases, is_regex) %}
<h4>
{{ alias_count }} Aliases found.
{% if alias_count>10 %}Showing only the last 10.{% endif %}
{% if is_regex %}<span class="badge bg-info">Found by regex</span>{% endif %}
</h4>
<table class="table">
<thead>
@@ -184,7 +196,9 @@
<tr>
<td>{{ alias.id }}</td>
<td>
<a href="?query={{ alias.email }}">{{ alias.email }}</a>
<a href="?query={{ alias.email }}&search_type=alias">
{{ highlight_email(alias.email, query, is_regex) }}
</a>
</td>
<td>{{ "Yes" if alias.enabled else "No" }}</td>
<td>{{ alias.created_at }}</td>
@@ -193,8 +207,11 @@
</tbody>
</table>
{% endmacro %}
{% macro show_deleted_alias(deleted_alias) -%}
<h4>Deleted Alias {{ deleted_alias.email }} with ID {{ deleted_alias.id }}.</h4>
{% macro show_deleted_alias(deleted_alias, is_regex) -%}
<h4>
Deleted Alias {{ highlight_email(deleted_alias.email, query, is_regex) }} with ID {{ deleted_alias.id }}.
{% if is_regex %}<span class="badge bg-info">Found by regex</span>{% endif %}
</h4>
<table class="table">
<thead>
<tr>
@@ -207,17 +224,45 @@
<tbody>
<tr>
<td>{{ deleted_alias.id }}</td>
<td>{{ deleted_alias.email }}</td>
<td>{{ highlight_email(deleted_alias.email, query, is_regex) }}</td>
<td>{{ deleted_alias.created_at }}</td>
<td>{{ deleted_alias.reason }}</td>
</tr>
</tbody>
</table>
{%- endmacro %}
{% macro show_domain_deleted_alias(dom_deleted_alias) -%}
{% macro list_deleted_aliases(deleted_aliases, is_regex) %}
<h4>
Domain Deleted Alias {{ dom_deleted_alias.email }} with ID {{ dom_deleted_alias.id }} for
{{ deleted_aliases|length }} Deleted Aliases found.
{% if deleted_aliases|length > 10 %}Showing only the last 10.{% endif %}
{% if is_regex %}<span class="badge bg-info">Found by regex</span>{% endif %}
</h4>
<table class="table">
<thead>
<tr>
<th scope="col">Deleted Alias ID</th>
<th scope="col">Email</th>
<th scope="col">Deleted At</th>
<th scope="col">Reason</th>
</tr>
</thead>
<tbody>
{% for deleted_alias in deleted_aliases %}
<tr>
<td>{{ deleted_alias.id }}</td>
<td>{{ highlight_email(deleted_alias.email, query, is_regex) }}</td>
<td>{{ deleted_alias.created_at }}</td>
<td>{{ deleted_alias.reason }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endmacro %}
{% macro show_domain_deleted_alias(dom_deleted_alias, is_regex) -%}
<h4>
Domain Deleted Alias {{ highlight_email(dom_deleted_alias.email, query, is_regex) }} with ID {{ dom_deleted_alias.id }} for
domain {{ dom_deleted_alias.domain.domain }}
{% if is_regex %}<span class="badge bg-info">Found by regex</span>{% endif %}
</h4>
<table class="table">
<thead>
@@ -234,7 +279,7 @@
<tbody>
<tr>
<td>{{ dom_deleted_alias.id }}</td>
<td>{{ dom_deleted_alias.email }}</td>
<td>{{ highlight_email(dom_deleted_alias.email, query, is_regex) }}</td>
<td>{{ dom_deleted_alias.domain.domain }}</td>
<td>{{ dom_deleted_alias.domain.id }}</td>
<td>{{ dom_deleted_alias.domain.user_id }}</td>
@@ -242,8 +287,39 @@
</tr>
</tbody>
</table>
{{ show_user(data.domain_deleted_alias.domain.user) }}
{{ show_user(dom_deleted_alias.domain.user) }}
{%- endmacro %}
{% macro list_domain_deleted_aliases(domain_deleted_aliases, is_regex) %}
<h4>
{{ domain_deleted_aliases|length }} Domain Deleted Aliases found.
{% if domain_deleted_aliases|length > 10 %}Showing only the last 10.{% endif %}
{% if is_regex %}<span class="badge bg-info">Found by regex</span>{% endif %}
</h4>
<table class="table">
<thead>
<tr>
<th scope="col">Deleted Alias ID</th>
<th scope="col">Email</th>
<th scope="col">Domain</th>
<th scope="col">Domain ID</th>
<th scope="col">Domain owner user ID</th>
<th scope="col">Deleted At</th>
</tr>
</thead>
<tbody>
{% for dom_deleted_alias in domain_deleted_aliases %}
<tr>
<td>{{ dom_deleted_alias.id }}</td>
<td>{{ highlight_email(dom_deleted_alias.email, query, is_regex) }}</td>
<td>{{ dom_deleted_alias.domain.domain }}</td>
<td>{{ dom_deleted_alias.domain.id }}</td>
<td>{{ dom_deleted_alias.domain.user_id }}</td>
<td>{{ dom_deleted_alias.created_at }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endmacro %}
{% macro list_alias_audit_log(alias_audit_log) %}
<h4>Alias Audit Log</h4>
<table class="table">
@@ -264,7 +340,7 @@
<td>{{ entry.user_id }}</td>
<td>{{ entry.alias_id }}</td>
<td>
<a href="?query={{ entry.alias_email }}">{{ entry.alias_email }}</a>
<a href="?query={{ entry.alias_email }}&search_type=alias">{{ entry.alias_email }}</a>
</td>
<td>{{ entry.action }}</td>
<td>{{ entry.message }}</td>
@@ -290,7 +366,7 @@
<tr>
<td>
<a href="?query={{ entry.user_email }}">{{ entry.user_email }}</a>
<a href="?query={{ entry.user_email }}&search_type=email">{{ entry.user_email }}</a>
</td>
<td>{{ entry.action }}</td>
<td>{{ entry.message }}</td>
@@ -300,79 +376,236 @@
</tbody>
</table>
{% endmacro %}
{% macro list_users(users, is_regex) %}
<h4>
{{ users|length }} Users found.
{% if users|length > 10 %}Showing only the last 10.{% endif %}
{% if is_regex %}<span class="badge bg-info">Found by regex</span>{% endif %}
</h4>
<table class="table">
<thead>
<tr>
<th>User ID</th>
<th>Email</th>
<th>Verified</th>
<th>Status</th>
<th>Created At</th>
</tr>
</thead>
<tbody>
{% for user in users %}
<tr>
<td>{{ user.id }}</td>
<td>
<a href="?query={{ user.email }}&search_type=email">
{{ highlight_email(user.email, query, is_regex) }}
</a>
</td>
<td>{{ "Yes" if user.activated else "No" }}</td>
<td>{{ "Disabled" if user.disabled else "Enabled" }}</td>
<td>{{ user.created_at }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endmacro %}
{% macro list_partner_users(partner_users, is_regex) %}
<h4>
{{ partner_users|length }} Partner Users found.
{% if partner_users|length > 10 %}Showing only the last 10.{% endif %}
{% if is_regex %}<span class="badge bg-info">Found by regex</span>{% endif %}
</h4>
<table class="table">
<thead>
<tr>
<th>Partner User ID</th>
<th>Partner Email</th>
<th>User ID</th>
<th>User Email</th>
<th>Created At</th>
</tr>
</thead>
<tbody>
{% for pu in partner_users %}
<tr>
<td>{{ pu.id }}</td>
<td>{{ highlight_email(pu.partner_email, query, is_regex) }}</td>
<td>{{ pu.user.id }}</td>
<td>
<a href="?query={{ pu.user.email }}&search_type=email">{{ pu.user.email }}</a>
</td>
<td>{{ pu.created_at }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endmacro %}
{% block body %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<form method="get">
<div class="form-group mb-3">
<label class="form-label"><strong>Search type:</strong></label>
<div class="form-check form-check-inline">
<input class="form-check-input" type="radio" name="search_type" id="search_type_email" value="email" {{ 'checked' if search_type == 'email' else '' }}>
<label class="form-check-label" for="search_type_email">Email (Mailboxes, Users, Partner Users)</label>
</div>
<div class="form-check form-check-inline">
<input class="form-check-input" type="radio" name="search_type" id="search_type_alias" value="alias" {{ 'checked' if search_type == 'alias' else '' }}>
<label class="form-check-label" for="search_type_alias">Alias (Aliases, Deleted Aliases)</label>
</div>
</div>
<div class="form-group">
<label for="email">UserID or Email to search:</label>
<label for="query">Query (exact email, user ID, or POSIX regex pattern):</label>
<input type="text"
class="form-control"
name="query"
value="{{ email or '' }}" />
id="query"
value="{{ query or '' }}"
placeholder="e.g., user@example.com or .*@example\.com" />
<small class="form-text text-muted">
First searches for exact match. If not found, uses POSIX regex (limited to 10 results).
</small>
</div>
<button type="submit" class="btn btn-primary">Submit</button>
<button type="submit" class="btn btn-primary mt-2">Submit</button>
</form>
</div>
{% if data.no_match and email %}
{% if data.no_match and query %}
<div class="border border-dark border-2 mt-1 mb-2 p-3 alert alert-warning"
role="alert">No user, alias or mailbox found for {{ email }}</div>
{% endif %}
{% if data.alias %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">Found Alias {{ data.alias.email }}</h3>
{{ list_alias(1,[data.alias]) }}
{{ list_alias_audit_log(data.alias_audit_log) }}
{{ list_mailboxes("Mailboxes for alias", helper.alias_mailbox_count(data.alias) , helper.alias_mailboxes(data.alias)) }}
{{ show_user(data.alias.user) }}
role="alert">
No results found for "{{ query }}" in {{ 'aliases' if search_type == 'alias' else 'emails' }}
</div>
{% endif %}
{% if data.user %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">Found User {{ data.user.email }}</h3>
{{ show_user(data.user) }}
{{ list_mailboxes("Mailboxes for user", helper.mailbox_count(data.user) , helper.mailbox_list(data.user) ) }}
{{ list_alias(helper.alias_count(data.user) ,helper.alias_list(data.user)) }}
</div>
{# Alias search results #}
{% if search_type == 'alias' %}
{% if data.aliases %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">
{% if data.aliases_found_by_regex %}
Found {{ data.aliases|length }} Aliases matching regex "{{ query }}"
{% else %}
Found Alias {{ data.aliases[0].email }}
{% endif %}
</h3>
{{ list_aliases(data.aliases|length, data.aliases, data.aliases_found_by_regex) }}
{% if data.alias_audit_log %}
{{ list_alias_audit_log(data.alias_audit_log) }}
{% endif %}
{% if not data.aliases_found_by_regex and data.aliases|length == 1 %}
{{ list_mailboxes("Mailboxes for alias", helper.alias_mailbox_count(data.aliases[0]), helper.alias_mailboxes(data.aliases[0]), false) }}
{{ show_user(data.aliases[0].user) }}
{% endif %}
</div>
{% endif %}
{% if data.deleted_aliases %}
<div class="border border-dark mt-1 mb-2 p-3">
<h3 class="mb-3">
{% if data.deleted_aliases_found_by_regex %}
Found {{ data.deleted_aliases|length }} Deleted Aliases matching regex "{{ query }}"
{% else %}
Found Deleted Alias {{ data.deleted_aliases[0].email }}
{% endif %}
</h3>
{{ list_deleted_aliases(data.deleted_aliases, data.deleted_aliases_found_by_regex) }}
{% if data.deleted_alias_audit_log %}
{{ list_alias_audit_log(data.deleted_alias_audit_log) }}
{% endif %}
</div>
{% endif %}
{% if data.domain_deleted_aliases %}
<div class="border border-dark mt-1 mb-2 p-3">
<h3 class="mb-3">
{% if data.domain_deleted_aliases_found_by_regex %}
Found {{ data.domain_deleted_aliases|length }} Domain Deleted Aliases matching regex "{{ query }}"
{% else %}
Found Domain Deleted Alias {{ data.domain_deleted_aliases[0].email }}
{% endif %}
</h3>
{% if data.domain_deleted_aliases_found_by_regex %}
{{ list_domain_deleted_aliases(data.domain_deleted_aliases, data.domain_deleted_aliases_found_by_regex) }}
{% else %}
{{ show_domain_deleted_alias(data.domain_deleted_aliases[0], data.domain_deleted_aliases_found_by_regex) }}
{% if data.domain_deleted_alias_audit_log %}
{{ list_alias_audit_log(data.domain_deleted_alias_audit_log) }}
{% endif %}
{% endif %}
</div>
{% endif %}
{% else %}
{# Email search results #}
{% if data.users %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">
{% if data.users_found_by_regex %}
Found {{ data.users|length }} Users matching regex "{{ query }}"
{% else %}
Found User {{ data.users[0].email }}
{% endif %}
</h3>
{% if data.users_found_by_regex %}
{{ list_users(data.users, data.users_found_by_regex) }}
{% else %}
{{ show_user(data.users[0]) }}
{{ list_mailboxes("Mailboxes for user", helper.mailbox_count(data.users[0]), helper.mailbox_list(data.users[0]), false) }}
{{ list_aliases(helper.alias_count(data.users[0]), helper.alias_list(data.users[0]), false) }}
{% endif %}
</div>
{% endif %}
{% if data.user_audit_log and not data.users %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">Audit log entries for user {{ data.query }}</h3>
{{ list_user_audit_log(data.user_audit_log) }}
</div>
{% endif %}
{% if data.mailboxes %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">
{% if data.mailboxes_found_by_regex %}
Found {{ data.mailbox_count }} Mailboxes matching regex "{{ query }}"
{% else %}
Found {{ data.mailbox_count }} Mailbox(es) for {{ query }}
{% endif %}
</h3>
{{ list_mailboxes("Mailboxes found", data.mailbox_count, data.mailboxes, data.mailboxes_found_by_regex) }}
{% if not data.mailboxes_found_by_regex %}
{% for mailbox in data.mailboxes %}
<div class="border border-secondary mt-2 mb-2 p-2">
<h5>Owner of mailbox {{ mailbox.email }}</h5>
{{ show_user(mailbox.user) }}
</div>
{% endfor %}
{% endif %}
</div>
{% endif %}
{% if data.partner_users %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">
{% if data.partner_users_found_by_regex %}
Found {{ data.partner_users|length }} Partner Users matching regex "{{ query }}"
{% else %}
Found Partner User with email {{ data.partner_users[0].partner_email }}
{% endif %}
</h3>
{{ list_partner_users(data.partner_users, data.partner_users_found_by_regex) }}
{% if not data.partner_users_found_by_regex and data.partner_users|length == 1 %}
<div class="border border-secondary mt-2 mb-2 p-2">
<h5>Associated User</h5>
{{ show_user(data.partner_users[0].user) }}
</div>
{% endif %}
</div>
{% endif %}
{% endif %}
{% if data.user_audit_log %}
<div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">Audit log entries for user {{ data.query }}</h3>
{{ list_user_audit_log(data.user_audit_log) }}
</div>
{% endif %}
{% if data.mailbox_count > 10 %}
<h3>Found more than 10 mailboxes for {{ email }}. Showing the last 10</h3>
{% elif data.mailbox_count > 0 %}
<h3>Found {{ data.mailbox_count }} mailbox(es) for {{ email }}</h3>
{% endif %}
{% for mailbox in data.mailbox %}
<div class="border border-dark mt-1 mb-2 p-3">
<h3 class="mb-3">Found Mailbox {{ mailbox.email }}</h3>
{{ list_mailboxes("Mailbox found", 1, [mailbox]) }}
{{ show_user(mailbox.user) }}
</div>
{% endfor %}
{% if data.deleted_alias %}
<div class="border border-dark mt-1 mb-2 p-3">
<h3 class="mb-3">Found DeletedAlias {{ data.deleted_alias.email }}</h3>
{{ show_deleted_alias(data.deleted_alias) }}
{{ list_alias_audit_log(data.deleted_alias_audit_log) }}
</div>
{% endif %}
{% if data.domain_deleted_alias %}
<div class="border border-dark mt-1 mb-2 p-3">
<h3 class="mb-3">Found DomainDeletedAlias {{ data.domain_deleted_alias.email }}</h3>
{{ show_domain_deleted_alias(data.domain_deleted_alias) }}
{{ list_alias_audit_log(data.domain_deleted_alias_audit_log) }}
</div>
{% endif %}
{% endblock %}
@@ -0,0 +1,60 @@
from flask import session as flask_session
from app import user_settings
from app.db import Session
from tests.utils import create_new_user
def test_regenerate_alternative_id_changes_id(flask_client):
user = create_new_user()
original_alternative_id = user.alternative_id
Session.flush()
user_settings.regenerate_user_alternative_id(user, update_session=False)
Session.flush()
assert user.alternative_id != original_alternative_id
assert user.alternative_id is not None
def test_regenerate_alternative_id_updates_session(flask_client, flask_app):
user = create_new_user()
Session.flush()
# Need to be within a request context to use flask session
with flask_app.test_request_context():
flask_session["_user_id"] = "old_session_id"
user_settings.regenerate_user_alternative_id(user, update_session=True)
Session.flush()
assert flask_session["_user_id"] == user.alternative_id
def test_regenerate_alternative_id_does_not_update_session_when_disabled(
flask_client, flask_app
):
user = create_new_user()
Session.flush()
with flask_app.test_request_context():
flask_session["_user_id"] = "old_session_id"
user_settings.regenerate_user_alternative_id(user, update_session=False)
Session.flush()
assert flask_session["_user_id"] == "old_session_id"
assert flask_session["_user_id"] != user.alternative_id
def test_regenerate_alternative_id_generates_valid_uuid(flask_client):
import uuid
user = create_new_user()
Session.flush()
user_settings.regenerate_user_alternative_id(user, update_session=False)
Session.flush()
# Verify it's a valid UUID format
uuid.UUID(user.alternative_id)