remove coinbase hook

This commit is contained in:
Son Nguyen
2026-03-20 10:50:54 +00:00
committed by Adrià Casajús
parent de85b766ae
commit 67de89e781
3 changed files with 0 additions and 169 deletions
-121
View File
@@ -1,121 +0,0 @@
from typing import Optional
import arrow
from coinbase_commerce.error import WebhookInvalidPayload, SignatureVerificationError
from coinbase_commerce.webhook import Webhook
from flask import Flask, request
from app.config import COINBASE_WEBHOOK_SECRET
from app.db import Session
from app.email_utils import send_email, render
from app.log import LOG
from app.models import CoinbaseSubscription, User
from app.subscription_webhook import execute_subscription_webhook
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
def setup_coinbase_commerce(app: Flask):
@app.route("/coinbase", methods=["POST"])
def coinbase_webhook():
# event payload
request_data = request.data.decode("utf-8")
# webhook signature
request_sig = request.headers.get("X-CC-Webhook-Signature", None)
try:
# signature verification and event object construction
event = Webhook.construct_event(
request_data, request_sig, COINBASE_WEBHOOK_SECRET
)
except (WebhookInvalidPayload, SignatureVerificationError) as e:
LOG.e("Invalid Coinbase webhook")
return str(e), 400
LOG.d("Coinbase event %s", event)
if event["type"] == "charge:confirmed":
if handle_coinbase_event(event):
return "success", 200
else:
return "error", 400
return "success", 200
def handle_coinbase_event(event) -> bool:
server_user_id = event["data"]["metadata"]["user_id"]
try:
user_id = int(server_user_id)
except ValueError:
user_id = int(float(server_user_id))
code = event["data"]["code"]
user: Optional[User] = User.get(user_id)
if not user:
LOG.e("User not found %s", user_id)
return False
coinbase_subscription: CoinbaseSubscription = CoinbaseSubscription.get_by(
user_id=user_id
)
if not coinbase_subscription:
LOG.d("Create a coinbase subscription for %s", user)
coinbase_subscription = CoinbaseSubscription.create(
user_id=user_id, end_at=arrow.now().shift(years=1), code=code, commit=True
)
emit_user_audit_log(
user=user,
action=UserAuditLogAction.Upgrade,
message="Upgraded though Coinbase",
commit=True,
)
send_email(
user.email,
"Your SimpleLogin account has been upgraded",
render(
"transactional/coinbase/new-subscription.txt",
user=user,
coinbase_subscription=coinbase_subscription,
),
render(
"transactional/coinbase/new-subscription.html",
user=user,
coinbase_subscription=coinbase_subscription,
),
)
else:
if coinbase_subscription.code != code:
LOG.d("Update code from %s to %s", coinbase_subscription.code, code)
coinbase_subscription.code = code
if coinbase_subscription.is_active():
coinbase_subscription.end_at = coinbase_subscription.end_at.shift(years=1)
else: # already expired subscription
coinbase_subscription.end_at = arrow.now().shift(years=1)
emit_user_audit_log(
user=user,
action=UserAuditLogAction.SubscriptionExtended,
message="Extended coinbase subscription",
)
Session.commit()
send_email(
user.email,
"Your SimpleLogin account has been extended",
render(
"transactional/coinbase/extend-subscription.txt",
user=user,
coinbase_subscription=coinbase_subscription,
),
render(
"transactional/coinbase/extend-subscription.html",
user=user,
coinbase_subscription=coinbase_subscription,
),
)
execute_subscription_webhook(user)
return True
-2
View File
@@ -76,7 +76,6 @@ from app.monitor_utils import send_version_event
from app.newsletter_utils import send_newsletter_to_user
from app.oauth.base import oauth_bp
from app.onboarding.base import onboarding_bp
from app.payments.coinbase import setup_coinbase_commerce
from app.payments.paddle import setup_paddle_callback
from app.phone.base import phone_bp
from app.redis_services import initialize_redis_services
@@ -154,7 +153,6 @@ def create_app() -> Flask:
init_admin(app)
setup_paddle_callback(app)
setup_coinbase_commerce(app)
setup_do_not_track(app)
register_custom_commands(app)
-46
View File
@@ -1,10 +1,4 @@
import arrow
from app.config import EMAIL_DOMAIN
from app.db import Session
from app.models import CoinbaseSubscription
from app.payments.coinbase import handle_coinbase_event
from tests.utils import create_new_user
def test_redirect_login_page(flask_client):
@@ -13,43 +7,3 @@ def test_redirect_login_page(flask_client):
rv = flask_client.get("/")
assert rv.status_code == 302
assert rv.location == f"http://{EMAIL_DOMAIN}/auth/login"
def test_coinbase_webhook(flask_client):
r = flask_client.post("/coinbase")
assert r.status_code == 400
def test_handle_coinbase_event_new_subscription(flask_client):
user = create_new_user()
handle_coinbase_event(
{"data": {"code": "AAAAAA", "metadata": {"user_id": str(user.id)}}}
)
assert user.is_paid()
assert user.is_premium()
assert CoinbaseSubscription.get_by(user_id=user.id) is not None
def test_handle_coinbase_event_extend_subscription(flask_client):
user = create_new_user()
user.trial_end = None
Session.commit()
cb = CoinbaseSubscription.create(
user_id=user.id, end_at=arrow.now().shift(days=-400), commit=True
)
assert not cb.is_active()
assert not user.is_paid()
assert not user.is_premium()
handle_coinbase_event(
{"data": {"code": "AAAAAA", "metadata": {"user_id": str(user.id)}}}
)
assert user.is_paid()
assert user.is_premium()
assert CoinbaseSubscription.get_by(user_id=user.id) is not None