Skip to content

Instantly share code, notes, and snippets.

@Irostovsky
Created February 5, 2026 08:35
Show Gist options
  • Select an option

  • Save Irostovsky/36aba7038b54843b6685e533298a65de to your computer and use it in GitHub Desktop.

Select an option

Save Irostovsky/36aba7038b54843b6685e533298a65de to your computer and use it in GitHub Desktop.
Celery ack_late idempotency audit - all tasks analysis

Celery ack_late Idempotency Audit

Context

Celery 5.x has a known bug: without ack_late, the prefetch count is not enforced correctly. Workers grab all messages from the queue into an internal buffer, acking them immediately on receipt. RabbitMQ then shows 0 "Ready" messages, and autoscalers have nothing to trigger on.

With ack_late enabled, the prefetch limit works correctly: each worker holds at most prefetch_multiplier * concurrency (= 2 in our case) unacked messages, and the rest stay as "Ready" in the queue.

Trade-off

Scenario Current (early ack) With ack_late
Worker crashes mid-task Task lost forever Task redelivered (runs twice)
Task raises exception Task acked, exception logged Task acked, exception logged (same)
Task hits time limit Task acked Task acked (with acks_on_failure_or_timeout=True)
Autoscaler visibility Broken (0 Ready) Works correctly

Recommended configuration

app.conf.task_acks_late = True
app.conf.task_acks_on_failure_or_timeout = True   # default in 5.1+, be explicit
app.conf.task_reject_on_worker_lost = True

The risk

If a worker dies mid-task (OOM, SIGKILL, deploy), the message is redelivered and the task runs twice. Tasks that are not idempotent can cause duplicate side effects: double emails, duplicate financial records, duplicate external API calls.


Audit scope

Audited ~179 Celery tasks across 54 tasks.py files in the project. Each task was analyzed for:

  • External API calls (Stripe, OVH, Keynest, Seam, Keyless, Revolut, etc.)
  • Email/SMS/push notification sends
  • Database writes without existence guards
  • Financial record creation (payouts, invoices, splits)

CRITICAL RISK

Duplicate execution causes real damage (financial loss, broken auth, incorrect external state).

1. send_host_welcome_email

File: authentification/tasks.py:47

@shared_task
def send_host_welcome_email(host_id):
    host = Proprietaire.objects.get(id=host_id)
    user = host.user
    password = "".join(random.choices(string.ascii_letters + string.digits, k=8))
    user.set_password(password)
    user.save()

    if not EmailLock.is_locked(host.user.email):
        mail.send(...)

Problem: Generates a new random password on every execution, resets it, then sends the email. If the task runs twice:

  • The host's password is reset twice to two different values
  • Two emails are sent with different passwords -- the first one becomes invalid
  • The host cannot log in using the credentials from the first email

Proposed fix: Add a welcome_email_sent flag on the host model (or a NotificationAudit check). Only generate password + send email if the flag is not set. Wrap password set + flag update in a transaction.


2. create_virtual_payouts -> CreateVirtualPayout

File: payment_system/tasks.py:47 / payment_system/services/payout_services.py

# Inside CreateVirtualPayout service
def _get_or_create_virtual_payout(self):
    self.virtual_payout = VirtualPayout.objects.create(  # NO EXISTENCE CHECK
        virtual_account=self.virtual_account,
        amount=self.virtual_account.get_amount_for_payout(),
        currency=self.virtual_account.get_currency(),
        status="PENDING",
    )

Problem: The method is named _get_or_create_virtual_payout but only calls .create(). If ProcessPayoutsBulk runs twice for the same virtual account, duplicate payouts are created.

Proposed fix: Change .create() to .get_or_create() keyed on (virtual_account, status="PENDING"), or add a query guard in ProcessPayoutsBulk to exclude accounts that already have a pending payout.


3. resolve_pt_small_resolutions

File: payment_system/tasks.py:188

@shared_task
def resolve_pt_small_resolutions():
    p_r_s = PaymentResolution.objects.filter(status="OPENED", amount__lte=200, ...)
    for p_r in p_r_s:
        CloseResolutionService.execute(resolution=p_r, user=user, host_amount=host_amount)

Problem: Filters status="OPENED" resolutions and closes them. The status filter acts as a guard if CloseResolutionService changes the status atomically. But if the service involves Stripe refunds/transfers internally, those may not be idempotent.

Proposed fix: Verify that CloseResolutionService checks resolution status at the start and is a no-op if already closed. If it isn't, add that guard.


HIGH RISK

Duplicate execution creates incorrect state in external systems.

4. AutoGenerateAccessCodes (Seam smart locks)

File: seam/tasks.py -> seam/services/access_code.py

# Inside CreateAccessCodeService
access_code = AccessCode.objects.create(**payload, mission=mission, reservation=reservation)

Problem: Calls .create() without checking if an access code already exists for the device+mission combination. Duplicate execution creates duplicate access codes on the physical lock system.

Proposed fix: Use AccessCode.objects.get_or_create(mission=mission, device=device, ...) or add an existence check before creation.


5. CreateKeylessBookingForReservation (Dubai Keyless)

File: keyless_dubai/tasks.py -> keyless_dubai/services.py

response = keyless_client_wrapper.create_booking(data=data, token=...)
if response and response.get("success"):
    reservation.dubai_keyless_booking_id = response.get("bookingRefNumber")
    reservation.save()

Problem: No check for reservation.dubai_keyless_booking_id before calling the API. If the task succeeds at the API but crashes before .save(), the retry creates a duplicate booking in the Keyless system.

Proposed fix: Add if reservation.dubai_keyless_booking_id: return guard at the start of the service.


6. create_ovh_redirections

File: authentification/tasks.py:73

@shared_task
def create_ovh_redirections(airbnb_account_id):
    ovh_client = get_ovh_client()
    airbnb_account = CompteAirBnb.objects.get(pk=airbnb_account_id)
    create_ovh_redirection(logger, airbnb_account, ovh_client)

Problem: Creates OVH email redirections via external API. No check if the redirection already exists. Duplicate execution may create duplicate redirections or produce an API error.

Proposed fix: Check if the redirection already exists via the OVH API before creating, or handle the "already exists" error gracefully.


MODERATE RISK

Duplicate execution causes noise, duplicate notifications, or minor data inconsistencies.

7. Notification tasks dispatching via notify_reservation

The notify_reservation function checks NotificationAudit for existing records before sending:

def audit_exists_for_reservation(reservation_id, trigger_name, force, bypass_disabled, ...):
    audit = NotificationAudit.objects.filter(
        Q(reservation_id=reservation_id) & Q(trigger__name=trigger_name) & ~Q(status="REMOVED")
    )
    if not audit:
        return False  # -> proceed to send
    if not force:
        return True   # -> skip

This is a good first line of defense, but there's a race condition window between the audit check and the audit record creation. Two concurrent executions of the same task could both pass the check.

Affected tasks:

  • send_confirm_notifications (staffing/tasks.py:123)
  • send_get_ready_notifications (staffing/tasks.py:499)
  • email_activity (notify/tasks.py:81)
  • send_midterm_notifications (reservation_payments/tasks.py)
  • send_down_payment_notifications (reservation_payments/tasks.py)
  • review_reminder, review_reminder_rentalready_30h, review_reminder_rentalready_6h (reservation_reviews/tasks.py)

Proposed fix: Add a unique constraint on (reservation_id, trigger_name, trigger_repeated_index) where status != REMOVED, or use select_for_update() in the audit check.


8. Onboarding email tasks

File: onboarding/tasks.py

Tasks: send_email_rental_onboarding_form_created, send_email_rental_onboarding_form_validated, send_email_host_onboarding_form_created

if not EmailLock.is_locked(host.user.email):
    mail.send([host.user.email], ...)

Problem: Only guarded by EmailLock.is_locked(), which is an email blacklist (blocks specific addresses), not a deduplication mechanism. No "already sent" tracking. Duplicate execution sends duplicate emails to hosts.

Proposed fix: Add a notification_sent flag on the RentalOnboardingForm / HostOnboardingForm model, or route through NotificationAudit.


9. retry_publish (Channel Manager)

File: channel_manager_integration/tasks.py:12

@shared_task(bind=True, max_retries=50, retry_backoff=False)
def retry_publish(self, data):
    publish(data)

Problem: Publishes events to a message queue with no source-side deduplication. Whether this is safe depends entirely on whether the downstream consumer is idempotent.

Proposed fix: Verify consumer idempotency. If the consumer is not idempotent, add a message ID / deduplication key to the event payload.


10. reply_to_reviews (AI Ready)

File: ai_ready/tasks.py

# Queries reviews with replies__isnull=True, then:
AIReviewReply.objects.create(guest_review=review, prompt_version="original", ...)
AIReviewReply.objects.create(guest_review=review, prompt_version="improved", ...)

Problem: If the task crashes after creating the first reply but before the second, a retry creates partial duplicates.

Proposed fix: Wrap both creations in a transaction, or use bulk_create with ignore_conflicts.


11. Slack notification tasks (various)

Affected tasks:

  • send_slack_mission_cancelled_when_overlapping (staffing/tasks.py:152)
  • send_slack_rental_onboarding_form_submitted (onboarding/tasks.py)
  • send_slack_rental_onboarding_form_validated (onboarding/tasks.py)
  • last_cleaning_mission_alert (staffing/tasks.py:267)
  • revolut_token_90_days_rolling (payment_system/tasks.py:61)
  • notify_slack_daily_unpaid_deposits (security_deposit/tasks.py)
  • create_deleted_financial_snapshots (snapshots/tasks.py)

Problem: No dedup on Slack messages. Duplicate execution = duplicate Slack noise.

Impact: Low -- annoying but not damaging.

Proposed fix: Generally acceptable as-is. If needed, add a simple cache key check (cache.add(f"slack:{task}:{id}", True, ttl=3600)).


LOW RISK / SAFE

Naturally idempotent -- safe to run multiple times with no side effects.

Cleanup / deletion tasks (~12 tasks)

All remove_unused_* tasks delete old records by date. Running twice is a no-op.

  • remove_unused_chekin_events, remove_unused_chekin_webhook_events
  • remove_unused_talkjs_events
  • remove_unused_callcenter_recording_events
  • remove_unused_whatsapp_webhook_events, remove_unused_send_whatsapp_message_events
  • remove_unused_keynest_api_events, remove_unused_keynest_webhook_events
  • remove_unused_keyless_events
  • remove_unused_igloohome_notifications
  • remove_unused_remotelock_api_events
  • remove_unused_yacan_api_events
  • clean_old_model_history, clean_old_data, cleanup_mail

Sync / refresh tasks

Overwrite state to match a source of truth -- running twice produces the same result.

  • synchronize_accounts, synchronize_transactions (Revolut)
  • sync_keys, clean_keys (Keynest -- uses autoretry_for)
  • sync_stores (Keynest)
  • refresh_host_transaction_view, refresh_reservation_split_status_view
  • sync_support_antennes, sync_permissions
  • update_exchange_rates, update_currencies
  • synchronize_rental_platform_settings
  • sync_rentals_with_channel_manager, verify_rental_synchronization
  • synchronize_rentals_after_contact_update
  • schedule_extra_fees_sync
  • sync_all_ical_calendars, sync_single_ical_calendar

Invoice generation (~10 tasks)

All invoice generators extend GenerateInvoicesBase which filters has_invoices=False:

def get_reservations_to_invoice(self):
    category_invoices = AccountingInvoice.objects.filter(
        category=self.category, reservation=OuterRef("pk")
    )
    return (
        get_reservations_in_creation_window()
        .annotate(has_invoices=Exists(category_invoices))
        .filter(has_invoices=False)
    )
  • generate_commission_invoices, generate_split_invoices, generate_cleaning_invoices
  • generate_adjustment_invoices, generate_payment_link_invoices
  • generate_payment_resolution_invoices, generate_platform_fee_pm_invoices
  • generate_platform_fee_pms_invoices, generate_extra_fee_invoices
  • generate_guest_invoices, regenerate_missing_hostkit_invoices

Security deposits

Filters has_security_deposit_payment_link=False and swikly_deposit__isnull=True:

  • generate_security_deposits

Stripe account/customer creation

Guards via existence checks:

  • create_host_customer -- checks host.stripe_customer before creation
  • create_or_update_stripe_account_for_host -- checks host.stripe_account
  • BatchCreateStripeAccounts -- filters stripe_account_id__isnull=True
  • BatchCreateHostCustomers -- filters stripe_customer_id__isnull=True
  • create_stripe_bank_account_for_hosts -- batch filters existing bank accounts

Revolut provisioning

Status-based guard + Stripe idempotency keys:

  • provision_revolut_accounts -- checks status="CREATED", uses idempotency_key

Lock code creation (Keynest, Igloohome, RemoteLock, Yacan)

All use update_or_create() keyed on mission:

  • create_collection_codes, delete_cancelled_missions_collection_codes
  • create_lock_code_for_check_in_mission
  • create_remotelock_code_for_missions
  • create_yacan_code_for_missions

Pricing / rate propagation

All use get_or_create() / update_or_create():

  • propagate_promotions_rental, propagate_rate_rule_rental, propagate_rate_plans_rental
  • assign_daily_prices, run_pricing_algorithms

Reservation split

Uses get_or_create() + django_lock:

  • split_short_term_reservations

Metrics / reporting

Uses update_or_create() on reporting rows:

  • update_rental_metrics, update_rental_status
  • compute_all_rentals_quality_score

Vouchers

Filters extended=False / checks review existence:

  • extend_good_review_vouchers, create_voucher_for_reservation_without_review

NextPax onboarding

Filters nextpax_rentals__isnull=True:

  • automatic_onboarding

Other safe tasks

  • log_out_inactive_users -- idempotent (logging out is idempotent)
  • generate_default_pricing_algorithm -- uses objects.exclude(antenne_pricing_algorithms__default=True)
  • toggle_traveller_status -- status toggle based on current state
  • cancel_test_bookings -- .cancel() on already-cancelled is a no-op
  • void_old_pending_topups, void_topups_for_hosts_with_positive_balance -- void operations are idempotent
  • void_expired_payment_links -- voiding is idempotent
  • delete_recordings -- deletion is idempotent
  • recalculate_rental_public_average_ratings -- recalculation is idempotent
  • check_unverified_accounts -- read + check, no mutations
  • xero_ready tasks -- refresh_xero_tokens (idempotent), post_invoices_to_xero (needs Xero-side verification)

Summary

Risk Count Action required
Critical 3 tasks Must fix before enabling ack_late
High 3 tasks Should fix before enabling ack_late
Moderate 5 categories (~15 tasks) Fix incrementally; acceptable short-term risk
Low / Safe ~158 tasks No action needed

Priority action list

Priority Task Fix
P0 send_host_welcome_email Add "sent" flag, skip if already sent
P0 CreateVirtualPayout Change .create() to .get_or_create()
P0 resolve_pt_small_resolutions Verify CloseResolutionService checks status
P1 Seam AutoGenerateAccessCodes Add existence check before .create()
P1 Keyless CreateKeylessBookingForReservation Add booking_id check
P1 create_ovh_redirections Check existence or handle "already exists"
P2 Onboarding emails Add sent tracking
P2 notify_reservation race condition Add unique constraint on audit table
P2 retry_publish Verify downstream consumer idempotency
P3 Slack notifications Optional: add cache-based dedup
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment