Celery 5.x has a known bug: without ack_late, the prefetch count is not enforced correctly. Workers grab all messages from the queue into an internal buffer, acking them immediately on receipt. RabbitMQ then shows 0 "Ready" messages, and autoscalers have nothing to trigger on.
With ack_late enabled, the prefetch limit works correctly: each worker holds at most prefetch_multiplier * concurrency (= 2 in our case) unacked messages, and the rest stay as "Ready" in the queue.
| Scenario | Current (early ack) | With ack_late |
|---|---|---|
| Worker crashes mid-task | Task lost forever | Task redelivered (runs twice) |
| Task raises exception | Task acked, exception logged | Task acked, exception logged (same) |
| Task hits time limit | Task acked | Task acked (with acks_on_failure_or_timeout=True) |
| Autoscaler visibility | Broken (0 Ready) | Works correctly |
app.conf.task_acks_late = True
app.conf.task_acks_on_failure_or_timeout = True # default in 5.1+, be explicit
app.conf.task_reject_on_worker_lost = TrueIf a worker dies mid-task (OOM, SIGKILL, deploy), the message is redelivered and the task runs twice. Tasks that are not idempotent can cause duplicate side effects: double emails, duplicate financial records, duplicate external API calls.
Audited ~179 Celery tasks across 54 tasks.py files in the project. Each task was analyzed for:
- External API calls (Stripe, OVH, Keynest, Seam, Keyless, Revolut, etc.)
- Email/SMS/push notification sends
- Database writes without existence guards
- Financial record creation (payouts, invoices, splits)
Duplicate execution causes real damage (financial loss, broken auth, incorrect external state).
File: authentification/tasks.py:47
@shared_task
def send_host_welcome_email(host_id):
host = Proprietaire.objects.get(id=host_id)
user = host.user
password = "".join(random.choices(string.ascii_letters + string.digits, k=8))
user.set_password(password)
user.save()
if not EmailLock.is_locked(host.user.email):
mail.send(...)Problem: Generates a new random password on every execution, resets it, then sends the email. If the task runs twice:
- The host's password is reset twice to two different values
- Two emails are sent with different passwords -- the first one becomes invalid
- The host cannot log in using the credentials from the first email
Proposed fix: Add a welcome_email_sent flag on the host model (or a NotificationAudit check). Only generate password + send email if the flag is not set. Wrap password set + flag update in a transaction.
File: payment_system/tasks.py:47 / payment_system/services/payout_services.py
# Inside CreateVirtualPayout service
def _get_or_create_virtual_payout(self):
self.virtual_payout = VirtualPayout.objects.create( # NO EXISTENCE CHECK
virtual_account=self.virtual_account,
amount=self.virtual_account.get_amount_for_payout(),
currency=self.virtual_account.get_currency(),
status="PENDING",
)Problem: The method is named _get_or_create_virtual_payout but only calls .create(). If ProcessPayoutsBulk runs twice for the same virtual account, duplicate payouts are created.
Proposed fix: Change .create() to .get_or_create() keyed on (virtual_account, status="PENDING"), or add a query guard in ProcessPayoutsBulk to exclude accounts that already have a pending payout.
File: payment_system/tasks.py:188
@shared_task
def resolve_pt_small_resolutions():
p_r_s = PaymentResolution.objects.filter(status="OPENED", amount__lte=200, ...)
for p_r in p_r_s:
CloseResolutionService.execute(resolution=p_r, user=user, host_amount=host_amount)Problem: Filters status="OPENED" resolutions and closes them. The status filter acts as a guard if CloseResolutionService changes the status atomically. But if the service involves Stripe refunds/transfers internally, those may not be idempotent.
Proposed fix: Verify that CloseResolutionService checks resolution status at the start and is a no-op if already closed. If it isn't, add that guard.
Duplicate execution creates incorrect state in external systems.
File: seam/tasks.py -> seam/services/access_code.py
# Inside CreateAccessCodeService
access_code = AccessCode.objects.create(**payload, mission=mission, reservation=reservation)Problem: Calls .create() without checking if an access code already exists for the device+mission combination. Duplicate execution creates duplicate access codes on the physical lock system.
Proposed fix: Use AccessCode.objects.get_or_create(mission=mission, device=device, ...) or add an existence check before creation.
File: keyless_dubai/tasks.py -> keyless_dubai/services.py
response = keyless_client_wrapper.create_booking(data=data, token=...)
if response and response.get("success"):
reservation.dubai_keyless_booking_id = response.get("bookingRefNumber")
reservation.save()Problem: No check for reservation.dubai_keyless_booking_id before calling the API. If the task succeeds at the API but crashes before .save(), the retry creates a duplicate booking in the Keyless system.
Proposed fix: Add if reservation.dubai_keyless_booking_id: return guard at the start of the service.
File: authentification/tasks.py:73
@shared_task
def create_ovh_redirections(airbnb_account_id):
ovh_client = get_ovh_client()
airbnb_account = CompteAirBnb.objects.get(pk=airbnb_account_id)
create_ovh_redirection(logger, airbnb_account, ovh_client)Problem: Creates OVH email redirections via external API. No check if the redirection already exists. Duplicate execution may create duplicate redirections or produce an API error.
Proposed fix: Check if the redirection already exists via the OVH API before creating, or handle the "already exists" error gracefully.
Duplicate execution causes noise, duplicate notifications, or minor data inconsistencies.
The notify_reservation function checks NotificationAudit for existing records before sending:
def audit_exists_for_reservation(reservation_id, trigger_name, force, bypass_disabled, ...):
audit = NotificationAudit.objects.filter(
Q(reservation_id=reservation_id) & Q(trigger__name=trigger_name) & ~Q(status="REMOVED")
)
if not audit:
return False # -> proceed to send
if not force:
return True # -> skipThis is a good first line of defense, but there's a race condition window between the audit check and the audit record creation. Two concurrent executions of the same task could both pass the check.
Affected tasks:
send_confirm_notifications(staffing/tasks.py:123)send_get_ready_notifications(staffing/tasks.py:499)email_activity(notify/tasks.py:81)send_midterm_notifications(reservation_payments/tasks.py)send_down_payment_notifications(reservation_payments/tasks.py)review_reminder,review_reminder_rentalready_30h,review_reminder_rentalready_6h(reservation_reviews/tasks.py)
Proposed fix: Add a unique constraint on (reservation_id, trigger_name, trigger_repeated_index) where status != REMOVED, or use select_for_update() in the audit check.
File: onboarding/tasks.py
Tasks: send_email_rental_onboarding_form_created, send_email_rental_onboarding_form_validated, send_email_host_onboarding_form_created
if not EmailLock.is_locked(host.user.email):
mail.send([host.user.email], ...)Problem: Only guarded by EmailLock.is_locked(), which is an email blacklist (blocks specific addresses), not a deduplication mechanism. No "already sent" tracking. Duplicate execution sends duplicate emails to hosts.
Proposed fix: Add a notification_sent flag on the RentalOnboardingForm / HostOnboardingForm model, or route through NotificationAudit.
File: channel_manager_integration/tasks.py:12
@shared_task(bind=True, max_retries=50, retry_backoff=False)
def retry_publish(self, data):
publish(data)Problem: Publishes events to a message queue with no source-side deduplication. Whether this is safe depends entirely on whether the downstream consumer is idempotent.
Proposed fix: Verify consumer idempotency. If the consumer is not idempotent, add a message ID / deduplication key to the event payload.
File: ai_ready/tasks.py
# Queries reviews with replies__isnull=True, then:
AIReviewReply.objects.create(guest_review=review, prompt_version="original", ...)
AIReviewReply.objects.create(guest_review=review, prompt_version="improved", ...)Problem: If the task crashes after creating the first reply but before the second, a retry creates partial duplicates.
Proposed fix: Wrap both creations in a transaction, or use bulk_create with ignore_conflicts.
Affected tasks:
send_slack_mission_cancelled_when_overlapping(staffing/tasks.py:152)send_slack_rental_onboarding_form_submitted(onboarding/tasks.py)send_slack_rental_onboarding_form_validated(onboarding/tasks.py)last_cleaning_mission_alert(staffing/tasks.py:267)revolut_token_90_days_rolling(payment_system/tasks.py:61)notify_slack_daily_unpaid_deposits(security_deposit/tasks.py)create_deleted_financial_snapshots(snapshots/tasks.py)
Problem: No dedup on Slack messages. Duplicate execution = duplicate Slack noise.
Impact: Low -- annoying but not damaging.
Proposed fix: Generally acceptable as-is. If needed, add a simple cache key check (cache.add(f"slack:{task}:{id}", True, ttl=3600)).
Naturally idempotent -- safe to run multiple times with no side effects.
All remove_unused_* tasks delete old records by date. Running twice is a no-op.
remove_unused_chekin_events,remove_unused_chekin_webhook_eventsremove_unused_talkjs_eventsremove_unused_callcenter_recording_eventsremove_unused_whatsapp_webhook_events,remove_unused_send_whatsapp_message_eventsremove_unused_keynest_api_events,remove_unused_keynest_webhook_eventsremove_unused_keyless_eventsremove_unused_igloohome_notificationsremove_unused_remotelock_api_eventsremove_unused_yacan_api_eventsclean_old_model_history,clean_old_data,cleanup_mail
Overwrite state to match a source of truth -- running twice produces the same result.
synchronize_accounts,synchronize_transactions(Revolut)sync_keys,clean_keys(Keynest -- usesautoretry_for)sync_stores(Keynest)refresh_host_transaction_view,refresh_reservation_split_status_viewsync_support_antennes,sync_permissionsupdate_exchange_rates,update_currenciessynchronize_rental_platform_settingssync_rentals_with_channel_manager,verify_rental_synchronizationsynchronize_rentals_after_contact_updateschedule_extra_fees_syncsync_all_ical_calendars,sync_single_ical_calendar
All invoice generators extend GenerateInvoicesBase which filters has_invoices=False:
def get_reservations_to_invoice(self):
category_invoices = AccountingInvoice.objects.filter(
category=self.category, reservation=OuterRef("pk")
)
return (
get_reservations_in_creation_window()
.annotate(has_invoices=Exists(category_invoices))
.filter(has_invoices=False)
)generate_commission_invoices,generate_split_invoices,generate_cleaning_invoicesgenerate_adjustment_invoices,generate_payment_link_invoicesgenerate_payment_resolution_invoices,generate_platform_fee_pm_invoicesgenerate_platform_fee_pms_invoices,generate_extra_fee_invoicesgenerate_guest_invoices,regenerate_missing_hostkit_invoices
Filters has_security_deposit_payment_link=False and swikly_deposit__isnull=True:
generate_security_deposits
Guards via existence checks:
create_host_customer-- checkshost.stripe_customerbefore creationcreate_or_update_stripe_account_for_host-- checkshost.stripe_accountBatchCreateStripeAccounts-- filtersstripe_account_id__isnull=TrueBatchCreateHostCustomers-- filtersstripe_customer_id__isnull=Truecreate_stripe_bank_account_for_hosts-- batch filters existing bank accounts
Status-based guard + Stripe idempotency keys:
provision_revolut_accounts-- checksstatus="CREATED", usesidempotency_key
All use update_or_create() keyed on mission:
create_collection_codes,delete_cancelled_missions_collection_codescreate_lock_code_for_check_in_missioncreate_remotelock_code_for_missionscreate_yacan_code_for_missions
All use get_or_create() / update_or_create():
propagate_promotions_rental,propagate_rate_rule_rental,propagate_rate_plans_rentalassign_daily_prices,run_pricing_algorithms
Uses get_or_create() + django_lock:
split_short_term_reservations
Uses update_or_create() on reporting rows:
update_rental_metrics,update_rental_statuscompute_all_rentals_quality_score
Filters extended=False / checks review existence:
extend_good_review_vouchers,create_voucher_for_reservation_without_review
Filters nextpax_rentals__isnull=True:
automatic_onboarding
log_out_inactive_users-- idempotent (logging out is idempotent)generate_default_pricing_algorithm-- usesobjects.exclude(antenne_pricing_algorithms__default=True)toggle_traveller_status-- status toggle based on current statecancel_test_bookings--.cancel()on already-cancelled is a no-opvoid_old_pending_topups,void_topups_for_hosts_with_positive_balance-- void operations are idempotentvoid_expired_payment_links-- voiding is idempotentdelete_recordings-- deletion is idempotentrecalculate_rental_public_average_ratings-- recalculation is idempotentcheck_unverified_accounts-- read + check, no mutationsxero_readytasks --refresh_xero_tokens(idempotent),post_invoices_to_xero(needs Xero-side verification)
| Risk | Count | Action required |
|---|---|---|
| Critical | 3 tasks | Must fix before enabling ack_late |
| High | 3 tasks | Should fix before enabling ack_late |
| Moderate | 5 categories (~15 tasks) | Fix incrementally; acceptable short-term risk |
| Low / Safe | ~158 tasks | No action needed |
| Priority | Task | Fix |
|---|---|---|
| P0 | send_host_welcome_email |
Add "sent" flag, skip if already sent |
| P0 | CreateVirtualPayout |
Change .create() to .get_or_create() |
| P0 | resolve_pt_small_resolutions |
Verify CloseResolutionService checks status |
| P1 | Seam AutoGenerateAccessCodes |
Add existence check before .create() |
| P1 | Keyless CreateKeylessBookingForReservation |
Add booking_id check |
| P1 | create_ovh_redirections |
Check existence or handle "already exists" |
| P2 | Onboarding emails | Add sent tracking |
| P2 | notify_reservation race condition |
Add unique constraint on audit table |
| P2 | retry_publish |
Verify downstream consumer idempotency |
| P3 | Slack notifications | Optional: add cache-based dedup |