openzeppelin_relayer/queues/sqs/
worker.rs

1//! SQS worker implementation for polling and processing messages.
2//!
3//! This module provides worker tasks that poll SQS queues and process jobs
4//! using the existing handler functions.
5
6use std::future::Future;
7use std::panic::AssertUnwindSafe;
8use std::sync::Arc;
9use std::time::Duration;
10
11use actix_web::web::ThinData;
12use aws_sdk_sqs::error::{ProvideErrorMetadata, SdkError};
13use aws_sdk_sqs::types::{
14    DeleteMessageBatchRequestEntry, Message, MessageAttributeValue, MessageSystemAttributeName,
15};
16use futures::FutureExt;
17use serde::de::DeserializeOwned;
18use tokio::sync::watch;
19use tokio::task::{JoinHandle, JoinSet};
20use tracing::{debug, error, info, warn};
21
22use crate::queues::{backoff_config_for_queue, retry_delay_secs};
23use crate::{
24    config::ServerConfig,
25    jobs::{
26        notification_handler, relayer_health_check_handler, token_swap_request_handler,
27        transaction_request_handler, transaction_status_handler, transaction_submission_handler,
28        Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
29        TransactionSend, TransactionStatusCheck,
30    },
31};
32
33use super::{HandlerError, WorkerContext};
34use super::{QueueBackendError, QueueType, WorkerHandle};
35
36#[derive(Debug)]
37enum ProcessingError {
38    Retryable(String),
39    Permanent(String),
40}
41
42/// Outcome of processing a single SQS message, used to decide whether the
43/// message should be batch-deleted or left in the queue.
44#[derive(Debug)]
45enum MessageOutcome {
46    /// Message processed successfully — should be deleted from queue.
47    Delete { receipt_handle: String },
48    /// Message should remain in queue (e.g. status-check retry via visibility
49    /// change, or retryable error awaiting visibility timeout).
50    Retain,
51}
52
53/// Configuration for a single SQS poll loop, bundling parameters that
54/// would otherwise require too many function arguments.
55#[derive(Clone)]
56struct PollLoopConfig {
57    queue_type: QueueType,
58    polling_interval: u64,
59    visibility_timeout: u32,
60    handler_timeout: Duration,
61    max_retries: usize,
62    poller_id: usize,
63    poller_count: usize,
64}
65
66/// Spawns a worker task for a specific SQS queue.
67///
68/// The worker continuously polls the queue, processes messages, and handles
69/// retries via SQS visibility timeout.
70///
71/// # Arguments
72/// * `sqs_client` - AWS SQS client for all operations (poll, send, delete, change visibility)
73/// * `queue_type` - Type of queue (determines handler and concurrency)
74/// * `queue_url` - SQS queue URL
75/// * `app_state` - Application state with repositories and services
76///
77/// # Returns
78/// JoinHandle to the spawned worker task
79pub async fn spawn_worker_for_queue(
80    sqs_client: aws_sdk_sqs::Client,
81    queue_type: QueueType,
82    queue_url: String,
83    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
84    shutdown_rx: watch::Receiver<bool>,
85) -> Result<WorkerHandle, QueueBackendError> {
86    let concurrency = get_concurrency_for_queue(queue_type);
87    let max_retries = queue_type.max_retries();
88    let polling_interval = get_wait_time_for_queue(queue_type);
89    let poller_count = get_poller_count_for_queue(queue_type);
90    let visibility_timeout = queue_type.visibility_timeout_secs();
91    let handler_timeout_secs = handler_timeout_secs(queue_type);
92    let handler_timeout = Duration::from_secs(handler_timeout_secs);
93
94    info!(
95        queue_type = ?queue_type,
96        queue_url = %queue_url,
97        concurrency = concurrency,
98        max_retries = max_retries,
99        polling_interval_secs = polling_interval,
100        poller_count = poller_count,
101        visibility_timeout_secs = visibility_timeout,
102        handler_timeout_secs = handler_timeout_secs,
103        "Spawning SQS worker"
104    );
105
106    // All pollers share the same semaphore so total concurrency is bounded.
107    let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));
108
109    let handle: JoinHandle<()> = tokio::spawn(async move {
110        let mut poller_handles: JoinSet<()> = JoinSet::new();
111
112        for poller_id in 0..poller_count {
113            let client = sqs_client.clone();
114            let url = queue_url.clone();
115            let state = app_state.clone();
116            let sem = semaphore.clone();
117            let mut rx = shutdown_rx.clone();
118            let config = PollLoopConfig {
119                queue_type,
120                polling_interval,
121                visibility_timeout,
122                handler_timeout,
123                max_retries,
124                poller_id,
125                poller_count,
126            };
127
128            poller_handles.spawn(async move {
129                run_poll_loop(client, url, state, sem, &mut rx, config).await;
130            });
131        }
132
133        // Wait for all pollers to finish (they exit on shutdown signal)
134        while let Some(join_result) = poller_handles.join_next().await {
135            if let Err(err) = join_result {
136                error!(
137                    queue_type = ?queue_type,
138                    error = %err,
139                    "SQS poller task terminated unexpectedly"
140                );
141            }
142        }
143        info!(queue_type = ?queue_type, "SQS worker stopped");
144    });
145
146    Ok(WorkerHandle::Tokio(handle))
147}
148
149/// Runs a single SQS poll loop. Multiple instances may share the same semaphore
150/// to increase pickup smoothness without exceeding handler concurrency limits.
151async fn run_poll_loop(
152    sqs_client: aws_sdk_sqs::Client,
153    queue_url: String,
154    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
155    semaphore: Arc<tokio::sync::Semaphore>,
156    shutdown_rx: &mut watch::Receiver<bool>,
157    config: PollLoopConfig,
158) {
159    let PollLoopConfig {
160        queue_type,
161        polling_interval,
162        visibility_timeout,
163        handler_timeout,
164        max_retries,
165        poller_id,
166        poller_count,
167    } = config;
168    let mut inflight: JoinSet<Option<String>> = JoinSet::new();
169    let mut consecutive_poll_errors: u32 = 0;
170    let mut pending_deletes: Vec<String> = Vec::new();
171
172    loop {
173        // Reap completed tasks and collect receipt handles for batch delete
174        while let Some(result) = inflight.try_join_next() {
175            match result {
176                Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
177                Ok(None) => {} // Retained message, no delete needed
178                Err(e) => {
179                    warn!(
180                        queue_type = ?queue_type,
181                        poller_id = poller_id,
182                        error = %e,
183                        "In-flight task failed"
184                    );
185                }
186            }
187        }
188
189        // Flush any accumulated deletes as a batch
190        if !pending_deletes.is_empty() {
191            flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
192            pending_deletes.clear();
193        }
194
195        // Check shutdown before each iteration
196        if *shutdown_rx.borrow() {
197            info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received, stopping SQS poller");
198            break;
199        }
200
201        // Distribute available permits fairly across pollers to prevent
202        // collective overfetch. Each poller gets floor(available / N)
203        // messages, and the first (available % N) pollers (by poller_id)
204        // each get one extra from the remainder. This ensures:
205        // - No stall: at least one poller polls when any permits exist
206        // - Bounded overfetch: at most poller_count extra from racing
207        let available_permits = semaphore.available_permits();
208        let base_share = available_permits / poller_count;
209        let remainder = available_permits % poller_count;
210        let my_share = base_share + usize::from(poller_id < remainder);
211        if my_share == 0 {
212            tokio::select! {
213                _ = tokio::time::sleep(Duration::from_millis(50)) => continue,
214                _ = shutdown_rx.changed() => {
215                    info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received, stopping SQS poller");
216                    break;
217                }
218            }
219        }
220
221        // SQS MaxNumberOfMessages must be 1-10.
222        let batch_size = my_share.min(10) as i32;
223
224        // Poll SQS for messages, racing with shutdown signal
225        let messages_result = tokio::select! {
226            result = sqs_client
227                .receive_message()
228                .queue_url(&queue_url)
229                .max_number_of_messages(batch_size) // SQS max is 10
230                .wait_time_seconds(polling_interval as i32)
231                .visibility_timeout(visibility_timeout as i32)
232                .message_system_attribute_names(MessageSystemAttributeName::ApproximateReceiveCount)
233                .message_system_attribute_names(MessageSystemAttributeName::MessageGroupId)
234                .message_attribute_names("target_scheduled_on")
235                .message_attribute_names("retry_attempt")
236                .send() => result,
237            _ = shutdown_rx.changed() => {
238                info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received during SQS poll, stopping poller");
239                break;
240            }
241        };
242
243        match messages_result {
244            Ok(output) => {
245                if consecutive_poll_errors > 0 {
246                    info!(
247                        queue_type = ?queue_type,
248                        poller_id = poller_id,
249                        previous_errors = consecutive_poll_errors,
250                        "SQS polling recovered after consecutive errors"
251                    );
252                }
253                consecutive_poll_errors = 0;
254
255                if let Some(messages) = output.messages {
256                    if !messages.is_empty() {
257                        debug!(
258                            queue_type = ?queue_type,
259                            poller_id = poller_id,
260                            message_count = messages.len(),
261                            "Received messages from SQS"
262                        );
263
264                        // Process messages concurrently (up to semaphore limit)
265                        for message in messages {
266                            let permit = match semaphore.clone().acquire_owned().await {
267                                Ok(permit) => permit,
268                                Err(err) => {
269                                    error!(
270                                        queue_type = ?queue_type,
271                                        poller_id = poller_id,
272                                        error = %err,
273                                        "Semaphore closed, stopping SQS poller loop"
274                                    );
275                                    return;
276                                }
277                            };
278                            let client = sqs_client.clone();
279                            let url = queue_url.clone();
280                            let state = app_state.clone();
281
282                            inflight.spawn(async move {
283                                let _permit = permit; // always dropped, even on panic
284
285                                let result = tokio::time::timeout(
286                                    handler_timeout,
287                                    AssertUnwindSafe(process_message(
288                                        client.clone(),
289                                        message,
290                                        queue_type,
291                                        &url,
292                                        state,
293                                        max_retries,
294                                    ))
295                                    .catch_unwind(),
296                                )
297                                .await;
298
299                                match result {
300                                    Ok(Ok(Ok(MessageOutcome::Delete { receipt_handle }))) => {
301                                        Some(receipt_handle)
302                                    }
303                                    Ok(Ok(Ok(MessageOutcome::Retain))) => None,
304                                    Ok(Ok(Err(e))) => {
305                                        error!(
306                                            queue_type = ?queue_type,
307                                            error = %e,
308                                            "Failed to process message"
309                                        );
310                                        None
311                                    }
312                                    Ok(Err(panic_info)) => {
313                                        let msg = panic_info
314                                            .downcast_ref::<String>()
315                                            .map(|s| s.as_str())
316                                            .or_else(|| {
317                                                panic_info.downcast_ref::<&str>().copied()
318                                            })
319                                            .unwrap_or("unknown panic");
320                                        error!(
321                                            queue_type = ?queue_type,
322                                            panic = %msg,
323                                            "Message handler panicked"
324                                        );
325                                        None
326                                    }
327                                    Err(_) => {
328                                        error!(
329                                            queue_type = ?queue_type,
330                                            timeout_secs = handler_timeout.as_secs(),
331                                            "Message handler timed out; message will be retried after visibility timeout"
332                                        );
333                                        None
334                                    }
335                                }
336                            });
337                        }
338                    }
339                }
340            }
341            Err(e) => {
342                consecutive_poll_errors = consecutive_poll_errors.saturating_add(1);
343                let backoff_secs = poll_error_backoff_secs(consecutive_poll_errors);
344                let (error_kind, error_code, error_message) = match &e {
345                    SdkError::ServiceError(ctx) => {
346                        ("service", ctx.err().code(), ctx.err().message())
347                    }
348                    SdkError::DispatchFailure(_) => ("dispatch", None, None),
349                    SdkError::ResponseError(_) => ("response", None, None),
350                    SdkError::TimeoutError(_) => ("timeout", None, None),
351                    _ => ("other", None, None),
352                };
353                error!(
354                    queue_type = ?queue_type,
355                    poller_id = poller_id,
356                    error_kind = error_kind,
357                    error_code = error_code.unwrap_or("unknown"),
358                    error_message = error_message.unwrap_or("n/a"),
359                    error = %e,
360                    error_debug = ?e,
361                    consecutive_errors = consecutive_poll_errors,
362                    backoff_secs = backoff_secs,
363                    "Failed to receive messages from SQS, backing off"
364                );
365                tokio::select! {
366                    _ = tokio::time::sleep(Duration::from_secs(backoff_secs)) => {}
367                    _ = shutdown_rx.changed() => {
368                        info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received during backoff, stopping poller");
369                        break;
370                    }
371                }
372            }
373        }
374    }
375
376    // Drain in-flight tasks before shutdown, collecting final deletes
377    if !inflight.is_empty() {
378        info!(
379            queue_type = ?queue_type,
380            poller_id = poller_id,
381            count = inflight.len(),
382            "Draining in-flight tasks before shutdown"
383        );
384        match tokio::time::timeout(Duration::from_secs(30), async {
385            while let Some(result) = inflight.join_next().await {
386                match result {
387                    Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
388                    Ok(None) => {}
389                    Err(e) => {
390                        warn!(
391                            queue_type = ?queue_type,
392                            poller_id = poller_id,
393                            error = %e,
394                            "In-flight task failed during drain"
395                        );
396                    }
397                }
398            }
399        })
400        .await
401        {
402            Ok(()) => {
403                info!(queue_type = ?queue_type, poller_id = poller_id, "All in-flight tasks drained")
404            }
405            Err(_) => {
406                warn!(
407                    queue_type = ?queue_type,
408                    poller_id = poller_id,
409                    remaining = inflight.len(),
410                    "Drain timeout, abandoning remaining tasks"
411                );
412                inflight.abort_all();
413            }
414        }
415    }
416
417    // Flush any remaining deletes accumulated during drain
418    if !pending_deletes.is_empty() {
419        flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
420    }
421}
422
423/// Processes a single SQS message.
424///
425/// Routes the message to the appropriate handler based on queue type,
426/// handles success/failure, and manages message deletion/retry.
427async fn process_message(
428    sqs_client: aws_sdk_sqs::Client,
429    message: Message,
430    queue_type: QueueType,
431    queue_url: &str,
432    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
433    max_retries: usize,
434) -> Result<MessageOutcome, QueueBackendError> {
435    let body = message
436        .body()
437        .ok_or_else(|| QueueBackendError::QueueError("Empty message body".to_string()))?;
438
439    let receipt_handle = message
440        .receipt_handle()
441        .ok_or_else(|| QueueBackendError::QueueError("Missing receipt handle".to_string()))?;
442
443    // For jobs with scheduling beyond SQS 15-minute max delay, keep deferring in hops.
444    if let Some(target_scheduled_on) = parse_target_scheduled_on(&message) {
445        let now = std::time::SystemTime::now()
446            .duration_since(std::time::SystemTime::UNIX_EPOCH)
447            .map_err(|e| QueueBackendError::QueueError(format!("System clock error: {e}")))?
448            .as_secs() as i64;
449        let remaining = target_scheduled_on - now;
450        if remaining > 0 {
451            let should_delete_original = defer_message(
452                &sqs_client,
453                queue_url,
454                body.to_string(),
455                &message,
456                target_scheduled_on,
457                remaining.min(900) as i32,
458            )
459            .await?;
460
461            debug!(
462                queue_type = ?queue_type,
463                remaining_seconds = remaining,
464                "Deferred scheduled SQS message for next delay hop"
465            );
466            return if should_delete_original {
467                Ok(MessageOutcome::Delete {
468                    receipt_handle: receipt_handle.to_string(),
469                })
470            } else {
471                Ok(MessageOutcome::Retain)
472            };
473        }
474    }
475
476    // Get retry attempt count from message attributes
477    let receive_count = message
478        .attributes()
479        .and_then(|attrs| attrs.get(&MessageSystemAttributeName::ApproximateReceiveCount))
480        .and_then(|count| count.parse::<usize>().ok())
481        .unwrap_or(1);
482    // SQS receive count starts at 1; Apalis Attempt starts at 0.
483    let attempt_number = receive_count.saturating_sub(1);
484    // Persisted retry attempt for self-reenqueued status checks. Falls back to receive_count-based
485    // attempt when attribute is missing.
486    let logical_retry_attempt = parse_retry_attempt(&message).unwrap_or(attempt_number);
487
488    // Use SQS MessageId as the worker task_id for log correlation.
489    let sqs_message_id = message.message_id().unwrap_or("unknown").to_string();
490
491    debug!(
492        queue_type = ?queue_type,
493        message_id = %sqs_message_id,
494        attempt = attempt_number,
495        receive_count = receive_count,
496        max_retries = max_retries,
497        "Processing message"
498    );
499
500    // Route to appropriate handler
501    let result = match queue_type {
502        QueueType::TransactionRequest => {
503            process_job::<TransactionRequest, _, _>(
504                body,
505                app_state,
506                attempt_number,
507                sqs_message_id,
508                "TransactionRequest",
509                transaction_request_handler,
510            )
511            .await
512        }
513        QueueType::TransactionSubmission => {
514            process_job::<TransactionSend, _, _>(
515                body,
516                app_state,
517                attempt_number,
518                sqs_message_id,
519                "TransactionSend",
520                transaction_submission_handler,
521            )
522            .await
523        }
524        QueueType::StatusCheck | QueueType::StatusCheckEvm | QueueType::StatusCheckStellar => {
525            process_job::<TransactionStatusCheck, _, _>(
526                body,
527                app_state,
528                attempt_number,
529                sqs_message_id,
530                "TransactionStatusCheck",
531                transaction_status_handler,
532            )
533            .await
534        }
535        QueueType::Notification => {
536            process_job::<NotificationSend, _, _>(
537                body,
538                app_state,
539                attempt_number,
540                sqs_message_id,
541                "NotificationSend",
542                notification_handler,
543            )
544            .await
545        }
546        QueueType::TokenSwapRequest => {
547            process_job::<TokenSwapRequest, _, _>(
548                body,
549                app_state,
550                attempt_number,
551                sqs_message_id,
552                "TokenSwapRequest",
553                token_swap_request_handler,
554            )
555            .await
556        }
557        QueueType::RelayerHealthCheck => {
558            process_job::<RelayerHealthCheck, _, _>(
559                body,
560                app_state,
561                attempt_number,
562                sqs_message_id,
563                "RelayerHealthCheck",
564                relayer_health_check_handler,
565            )
566            .await
567        }
568    };
569
570    match result {
571        Ok(()) => {
572            debug!(
573                queue_type = ?queue_type,
574                attempt = attempt_number,
575                "Message processed successfully"
576            );
577
578            Ok(MessageOutcome::Delete {
579                receipt_handle: receipt_handle.to_string(),
580            })
581        }
582        Err(ProcessingError::Permanent(e)) => {
583            error!(
584                queue_type = ?queue_type,
585                attempt = attempt_number,
586                error = %e,
587                "Permanent handler failure, message will be deleted"
588            );
589
590            Ok(MessageOutcome::Delete {
591                receipt_handle: receipt_handle.to_string(),
592            })
593        }
594        Err(ProcessingError::Retryable(e)) => {
595            // Check max retries for non-infinite queues (status checks use usize::MAX)
596            if max_retries != usize::MAX && receive_count > max_retries {
597                error!(
598                    queue_type = ?queue_type,
599                    attempt = attempt_number,
600                    receive_count = receive_count,
601                    max_retries = max_retries,
602                    error = %e,
603                    "Max retries exceeded; message will be automatically moved to DLQ by SQS redrive policy"
604                );
605                return Ok(MessageOutcome::Retain);
606            }
607
608            // Compute retry delay based on queue type:
609            // - Status checks use network-type-aware backoff from the message body
610            // - All other queues use their configured backoff profile from retry_config
611            let delay = if queue_type.is_status_check() {
612                compute_status_retry_delay(body, logical_retry_attempt)
613            } else {
614                retry_delay_secs(backoff_config_for_queue(queue_type), logical_retry_attempt)
615            };
616
617            // FIFO queues do not support per-message DelaySeconds. Use visibility
618            // timeout on the in-flight message to schedule the retry.
619            if is_fifo_queue_url(queue_url) {
620                if let Err(err) = sqs_client
621                    .change_message_visibility()
622                    .queue_url(queue_url)
623                    .receipt_handle(receipt_handle)
624                    .visibility_timeout(delay.clamp(1, 900))
625                    .send()
626                    .await
627                {
628                    error!(
629                        queue_type = ?queue_type,
630                        error = %err,
631                        "Failed to set visibility timeout for retry; falling back to existing visibility timeout"
632                    );
633                    return Ok(MessageOutcome::Retain);
634                }
635
636                debug!(
637                    queue_type = ?queue_type,
638                    attempt = logical_retry_attempt,
639                    delay_seconds = delay,
640                    error = %e,
641                    "Retry scheduled via visibility timeout"
642                );
643
644                return Ok(MessageOutcome::Retain);
645            }
646
647            let next_retry_attempt = logical_retry_attempt.saturating_add(1);
648
649            // Standard queues: re-enqueue with native DelaySeconds,
650            // no group_id or dedup_id needed. Duplicate deliveries are
651            // harmless because handlers are idempotent.
652            if let Err(send_err) = sqs_client
653                .send_message()
654                .queue_url(queue_url)
655                .message_body(body.to_string())
656                .delay_seconds(delay)
657                .message_attributes(
658                    "retry_attempt",
659                    MessageAttributeValue::builder()
660                        .data_type("Number")
661                        .string_value(next_retry_attempt.to_string())
662                        .build()
663                        .map_err(|err| {
664                            QueueBackendError::SqsError(format!(
665                                "Failed to build retry_attempt attribute: {err}"
666                            ))
667                        })?,
668                )
669                .send()
670                .await
671            {
672                error!(
673                    queue_type = ?queue_type,
674                    error = %send_err,
675                    "Failed to re-enqueue message; leaving original for visibility timeout retry"
676                );
677                // Fall through — original message will retry after visibility timeout
678                return Ok(MessageOutcome::Retain);
679            }
680
681            debug!(
682                queue_type = ?queue_type,
683                attempt = logical_retry_attempt,
684                delay_seconds = delay,
685                error = %e,
686                "Message re-enqueued with backoff delay"
687            );
688
689            // Delete the original message now that the re-enqueue succeeded
690            Ok(MessageOutcome::Delete {
691                receipt_handle: receipt_handle.to_string(),
692            })
693        }
694    }
695}
696
697/// Generic job processor — deserializes `Job<T>`, creates a `WorkerContext`,
698/// and delegates to the provided handler function.
699async fn process_job<T, F, Fut>(
700    body: &str,
701    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
702    attempt: usize,
703    task_id: String,
704    type_name: &str,
705    handler: F,
706) -> Result<(), ProcessingError>
707where
708    T: DeserializeOwned,
709    F: FnOnce(Job<T>, ThinData<crate::models::DefaultAppState>, WorkerContext) -> Fut,
710    Fut: Future<Output = Result<(), HandlerError>>,
711{
712    let job: Job<T> = serde_json::from_str(body).map_err(|e| {
713        error!(error = %e, "Failed to deserialize {} job", type_name);
714        // Malformed payload is not recoverable by retrying the same message body.
715        ProcessingError::Permanent(format!("Failed to deserialize {type_name} job: {e}"))
716    })?;
717
718    let ctx = WorkerContext::new(attempt, task_id);
719    handler(job, (*app_state).clone(), ctx)
720        .await
721        .map_err(map_handler_error)
722}
723
724fn map_handler_error(error: HandlerError) -> ProcessingError {
725    match error {
726        HandlerError::Abort(msg) => ProcessingError::Permanent(msg),
727        HandlerError::Retry(msg) => ProcessingError::Retryable(msg),
728    }
729}
730
731fn parse_target_scheduled_on(message: &Message) -> Option<i64> {
732    message
733        .message_attributes()
734        .and_then(|attrs| attrs.get("target_scheduled_on"))
735        .and_then(|value| value.string_value())
736        .and_then(|value| value.parse::<i64>().ok())
737}
738
739fn parse_retry_attempt(message: &Message) -> Option<usize> {
740    message
741        .message_attributes()
742        .and_then(|attrs| attrs.get("retry_attempt"))
743        .and_then(|value| value.string_value())
744        .and_then(|value| value.parse::<usize>().ok())
745}
746
747fn is_fifo_queue_url(queue_url: &str) -> bool {
748    queue_url.ends_with(".fifo")
749}
750
751async fn defer_message(
752    sqs_client: &aws_sdk_sqs::Client,
753    queue_url: &str,
754    body: String,
755    message: &Message,
756    target_scheduled_on: i64,
757    delay_seconds: i32,
758) -> Result<bool, QueueBackendError> {
759    if is_fifo_queue_url(queue_url) {
760        let receipt_handle = message.receipt_handle().ok_or_else(|| {
761            QueueBackendError::QueueError(
762                "Cannot defer FIFO message: missing receipt handle".to_string(),
763            )
764        })?;
765
766        sqs_client
767            .change_message_visibility()
768            .queue_url(queue_url)
769            .receipt_handle(receipt_handle)
770            .visibility_timeout(delay_seconds.clamp(1, 900))
771            .send()
772            .await
773            .map_err(|e| {
774                QueueBackendError::SqsError(format!(
775                    "Failed to defer FIFO message via visibility timeout: {e}"
776                ))
777            })?;
778
779        return Ok(false);
780    }
781
782    // Standard queues support native per-message DelaySeconds — no need for
783    // group_id or dedup_id. Just re-send with the delay and scheduling attribute.
784    let request = sqs_client
785        .send_message()
786        .queue_url(queue_url)
787        .message_body(body)
788        .delay_seconds(delay_seconds.clamp(1, 900))
789        .message_attributes(
790            "target_scheduled_on",
791            MessageAttributeValue::builder()
792                .data_type("Number")
793                .string_value(target_scheduled_on.to_string())
794                .build()
795                .map_err(|e| {
796                    QueueBackendError::SqsError(format!(
797                        "Failed to build deferred scheduled attribute: {e}"
798                    ))
799                })?,
800        );
801
802    request.send().await.map_err(|e| {
803        QueueBackendError::SqsError(format!("Failed to defer scheduled message: {e}"))
804    })?;
805
806    Ok(true)
807}
808
809/// Partial struct for deserializing only the `network_type` field from a status check job.
810///
811/// Used to avoid deserializing the entire `Job<TransactionStatusCheck>` when we only
812/// need the network type to determine retry delay.
813#[derive(serde::Deserialize)]
814struct StatusCheckData {
815    network_type: Option<crate::models::NetworkType>,
816}
817
818/// Partial struct matching `Job<TransactionStatusCheck>` structure.
819///
820/// Used for efficient partial deserialization to extract only the `network_type`
821/// field without parsing the entire job payload.
822#[derive(serde::Deserialize)]
823struct PartialStatusCheckJob {
824    data: StatusCheckData,
825}
826
827/// Extracts `network_type` from a status check payload and computes retry delay.
828///
829/// This uses hardcoded network-specific backoff windows aligned with Redis/Apalis:
830/// - EVM: 8s -> 12s cap
831/// - Stellar: 2s -> 3s cap
832/// - Solana/default: 5s -> 8s cap
833fn compute_status_retry_delay(body: &str, attempt: usize) -> i32 {
834    let network_type = serde_json::from_str::<PartialStatusCheckJob>(body)
835        .ok()
836        .and_then(|j| j.data.network_type);
837
838    crate::queues::retry_config::status_check_retry_delay_secs(network_type, attempt)
839}
840
841/// Gets the SQS long-poll wait time for a queue type from environment or default.
842fn get_wait_time_for_queue(queue_type: QueueType) -> u64 {
843    ServerConfig::get_sqs_wait_time(
844        queue_type.sqs_env_key(),
845        queue_type.default_wait_time_secs(),
846    )
847}
848
849/// Gets the number of poll loops to run for a queue type from environment or default.
850fn get_poller_count_for_queue(queue_type: QueueType) -> usize {
851    let configured = ServerConfig::get_sqs_poller_count(
852        queue_type.sqs_env_key(),
853        queue_type.default_poller_count(),
854    );
855    if configured == 0 {
856        warn!(
857            queue_type = ?queue_type,
858            "Configured poller count is 0; clamping to 1"
859        );
860        1
861    } else {
862        configured
863    }
864}
865
866/// Gets the concurrency limit for a queue type from environment.
867fn get_concurrency_for_queue(queue_type: QueueType) -> usize {
868    let configured = ServerConfig::get_worker_concurrency(
869        queue_type.concurrency_env_key(),
870        queue_type.default_concurrency(),
871    );
872    if configured == 0 {
873        warn!(
874            queue_type = ?queue_type,
875            "Configured concurrency is 0; clamping to 1"
876        );
877        1
878    } else {
879        configured
880    }
881}
882
883/// Maximum allowed wall-clock processing time per message before the handler task is canceled.
884///
885/// Keep this bounded so permits cannot be held forever by hung handlers.
886fn handler_timeout_secs(queue_type: QueueType) -> u64 {
887    u64::from(queue_type.visibility_timeout_secs().max(1))
888}
889
890/// Maximum backoff duration for poll errors (1 minute).
891const MAX_POLL_BACKOFF_SECS: u64 = 60;
892
893/// Number of consecutive errors between recovery probes at the backoff ceiling.
894/// Once the backoff reaches `MAX_POLL_BACKOFF_SECS`, every Nth error cycle uses
895/// the base interval (5s) to quickly detect when the SQS endpoint recovers.
896const RECOVERY_PROBE_EVERY: u32 = 4;
897
898/// Computes exponential backoff for consecutive poll errors with recovery probes.
899///
900/// Returns: 5, 10, 20, 40, 60, 60, 60, **5** (probe), 60, 60, 60, **5**, ...
901fn poll_error_backoff_secs(consecutive_errors: u32) -> u64 {
902    let base: u64 = 5;
903
904    // Once well past the ceiling, periodically try the base interval
905    // to quickly detect when the SQS endpoint recovers.
906    if consecutive_errors >= 7 && consecutive_errors % RECOVERY_PROBE_EVERY == 0 {
907        return base;
908    }
909
910    let exponent = consecutive_errors.saturating_sub(1).min(16);
911    base.saturating_mul(2_u64.saturating_pow(exponent))
912        .min(MAX_POLL_BACKOFF_SECS)
913}
914
915/// Deletes messages from SQS in batches of up to 10 (the SQS maximum per call).
916///
917/// Returns the total number of successfully deleted messages. Any per-entry
918/// failures are logged as warnings — SQS will redeliver those messages after
919/// the visibility timeout expires.
920async fn flush_delete_batch(
921    sqs_client: &aws_sdk_sqs::Client,
922    queue_url: &str,
923    batch: &[String],
924    queue_type: QueueType,
925) -> usize {
926    if batch.is_empty() {
927        return 0;
928    }
929
930    let mut deleted = 0;
931
932    for chunk in batch.chunks(10) {
933        let entries: Vec<DeleteMessageBatchRequestEntry> = chunk
934            .iter()
935            .enumerate()
936            .map(|(i, handle)| {
937                DeleteMessageBatchRequestEntry::builder()
938                    .id(i.to_string())
939                    .receipt_handle(handle)
940                    .build()
941                    .expect("id and receipt_handle are always set")
942            })
943            .collect();
944
945        match sqs_client
946            .delete_message_batch()
947            .queue_url(queue_url)
948            .set_entries(Some(entries))
949            .send()
950            .await
951        {
952            Ok(output) => {
953                deleted += output.successful().len();
954
955                for f in output.failed() {
956                    warn!(
957                        queue_type = ?queue_type,
958                        id = %f.id(),
959                        code = %f.code(),
960                        message = f.message().unwrap_or("unknown"),
961                        "Batch delete entry failed (message will be redelivered)"
962                    );
963                }
964            }
965            Err(e) => {
966                error!(
967                    queue_type = ?queue_type,
968                    error = %e,
969                    batch_size = chunk.len(),
970                    "Batch delete API call failed (messages will be redelivered)"
971                );
972            }
973        }
974    }
975
976    deleted
977}
978
979#[cfg(test)]
980mod tests {
981    use super::*;
982
983    #[test]
984    fn test_get_concurrency_for_queue() {
985        // Test that concurrency is retrieved (exact value depends on env)
986        let concurrency = get_concurrency_for_queue(QueueType::TransactionRequest);
987        assert!(concurrency > 0);
988
989        let concurrency = get_concurrency_for_queue(QueueType::StatusCheck);
990        assert!(concurrency > 0);
991    }
992
993    #[test]
994    fn test_handler_timeout_secs_is_positive() {
995        let all = [
996            QueueType::TransactionRequest,
997            QueueType::TransactionSubmission,
998            QueueType::StatusCheck,
999            QueueType::StatusCheckEvm,
1000            QueueType::StatusCheckStellar,
1001            QueueType::Notification,
1002            QueueType::TokenSwapRequest,
1003            QueueType::RelayerHealthCheck,
1004        ];
1005        for queue_type in all {
1006            assert!(handler_timeout_secs(queue_type) > 0);
1007        }
1008    }
1009
1010    #[test]
1011    fn test_handler_timeout_secs_uses_visibility_timeout() {
1012        assert_eq!(
1013            handler_timeout_secs(QueueType::StatusCheckEvm),
1014            QueueType::StatusCheckEvm.visibility_timeout_secs() as u64
1015        );
1016        assert_eq!(
1017            handler_timeout_secs(QueueType::Notification),
1018            QueueType::Notification.visibility_timeout_secs() as u64
1019        );
1020    }
1021
1022    #[test]
1023    fn test_parse_target_scheduled_on() {
1024        // Test parsing target_scheduled_on from message attributes
1025        let message = Message::builder().build();
1026
1027        // Message without attribute should return None
1028        assert_eq!(parse_target_scheduled_on(&message), None);
1029
1030        // Message with valid attribute
1031        let message = Message::builder()
1032            .message_attributes(
1033                "target_scheduled_on",
1034                MessageAttributeValue::builder()
1035                    .data_type("Number")
1036                    .string_value("1234567890")
1037                    .build()
1038                    .unwrap(),
1039            )
1040            .build();
1041
1042        assert_eq!(parse_target_scheduled_on(&message), Some(1234567890));
1043    }
1044
1045    #[test]
1046    fn test_parse_retry_attempt() {
1047        let message = Message::builder().build();
1048        assert_eq!(parse_retry_attempt(&message), None);
1049
1050        let message = Message::builder()
1051            .message_attributes(
1052                "retry_attempt",
1053                MessageAttributeValue::builder()
1054                    .data_type("Number")
1055                    .string_value("7")
1056                    .build()
1057                    .unwrap(),
1058            )
1059            .build();
1060        assert_eq!(parse_retry_attempt(&message), Some(7));
1061    }
1062
1063    #[test]
1064    fn test_map_handler_error() {
1065        // Test Abort maps to Permanent
1066        let error = HandlerError::Abort("Validation failed".to_string());
1067        let result = map_handler_error(error);
1068        assert!(matches!(result, ProcessingError::Permanent(_)));
1069
1070        // Test Retry maps to Retryable
1071        let error = HandlerError::Retry("Network timeout".to_string());
1072        let result = map_handler_error(error);
1073        assert!(matches!(result, ProcessingError::Retryable(_)));
1074    }
1075
1076    #[test]
1077    fn test_is_fifo_queue_url() {
1078        assert!(is_fifo_queue_url(
1079            "https://sqs.us-east-1.amazonaws.com/123/queue.fifo"
1080        ));
1081        assert!(!is_fifo_queue_url(
1082            "https://sqs.us-east-1.amazonaws.com/123/queue"
1083        ));
1084    }
1085
1086    #[test]
1087    fn test_compute_status_retry_delay_evm() {
1088        // NetworkType uses #[serde(rename_all = "lowercase")]
1089        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1090        assert_eq!(compute_status_retry_delay(body, 0), 8);
1091        assert_eq!(compute_status_retry_delay(body, 1), 12);
1092        assert_eq!(compute_status_retry_delay(body, 8), 12);
1093    }
1094
1095    #[test]
1096    fn test_compute_status_retry_delay_stellar() {
1097        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"stellar"}}"#;
1098        assert_eq!(compute_status_retry_delay(body, 0), 2);
1099        assert_eq!(compute_status_retry_delay(body, 1), 3);
1100        assert_eq!(compute_status_retry_delay(body, 8), 3);
1101    }
1102
1103    #[test]
1104    fn test_compute_status_retry_delay_solana() {
1105        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"solana"}}"#;
1106        assert_eq!(compute_status_retry_delay(body, 0), 5);
1107        assert_eq!(compute_status_retry_delay(body, 1), 8);
1108        assert_eq!(compute_status_retry_delay(body, 8), 8);
1109    }
1110
1111    #[test]
1112    fn test_compute_status_retry_delay_missing_network() {
1113        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1"}}"#;
1114        assert_eq!(compute_status_retry_delay(body, 0), 5);
1115        assert_eq!(compute_status_retry_delay(body, 1), 8);
1116        assert_eq!(compute_status_retry_delay(body, 8), 8);
1117    }
1118
1119    #[test]
1120    fn test_compute_status_retry_delay_invalid_body() {
1121        assert_eq!(compute_status_retry_delay("not json", 0), 5);
1122        assert_eq!(compute_status_retry_delay("not json", 1), 8);
1123        assert_eq!(compute_status_retry_delay("not json", 8), 8);
1124    }
1125
1126    #[tokio::test]
1127    async fn test_semaphore_released_on_panic() {
1128        let sem = Arc::new(tokio::sync::Semaphore::new(1));
1129        let permit = sem.clone().acquire_owned().await.unwrap();
1130
1131        let handle = tokio::spawn(async move {
1132            let _permit = permit; // dropped on scope exit, even after panic
1133            let _ = AssertUnwindSafe(async { panic!("test panic") })
1134                .catch_unwind()
1135                .await;
1136        });
1137
1138        handle.await.unwrap();
1139        // Would hang forever if permit leaked
1140        let _p = tokio::time::timeout(Duration::from_millis(100), sem.acquire())
1141            .await
1142            .expect("permit should be available after panic");
1143    }
1144
1145    #[test]
1146    fn test_poll_error_backoff_secs() {
1147        // First error: 5s
1148        assert_eq!(poll_error_backoff_secs(1), 5);
1149        // Second: 10s
1150        assert_eq!(poll_error_backoff_secs(2), 10);
1151        // Third: 20s
1152        assert_eq!(poll_error_backoff_secs(3), 20);
1153        // Fourth: 40s
1154        assert_eq!(poll_error_backoff_secs(4), 40);
1155        // Capped at MAX_POLL_BACKOFF_SECS (60)
1156        assert_eq!(poll_error_backoff_secs(5), 60);
1157        assert_eq!(poll_error_backoff_secs(6), 60);
1158        assert_eq!(poll_error_backoff_secs(7), 60);
1159        // Recovery probe: base interval at multiples of RECOVERY_PROBE_EVERY (>= 7)
1160        assert_eq!(poll_error_backoff_secs(8), 5);
1161        assert_eq!(poll_error_backoff_secs(9), 60);
1162        assert_eq!(poll_error_backoff_secs(12), 5); // next probe
1163    }
1164
1165    #[test]
1166    fn test_poll_error_backoff_zero_errors() {
1167        // Zero consecutive errors should still produce a reasonable value
1168        assert_eq!(poll_error_backoff_secs(0), 5);
1169    }
1170
1171    #[test]
1172    fn test_poll_error_backoff_recovery_probes() {
1173        // Verify probes repeat at regular intervals once past threshold
1174        for i in (8..=100).step_by(RECOVERY_PROBE_EVERY as usize) {
1175            assert_eq!(
1176                poll_error_backoff_secs(i as u32),
1177                5,
1178                "Expected recovery probe at error {i}"
1179            );
1180        }
1181    }
1182
1183    #[test]
1184    fn test_message_outcome_delete_carries_receipt_handle() {
1185        let handle = "test-receipt-handle-123".to_string();
1186        let outcome = MessageOutcome::Delete {
1187            receipt_handle: handle.clone(),
1188        };
1189        match outcome {
1190            MessageOutcome::Delete { receipt_handle } => {
1191                assert_eq!(receipt_handle, handle);
1192            }
1193            MessageOutcome::Retain => panic!("Expected Delete variant"),
1194        }
1195    }
1196
1197    #[test]
1198    fn test_message_outcome_retain() {
1199        let outcome = MessageOutcome::Retain;
1200        assert!(matches!(outcome, MessageOutcome::Retain));
1201    }
1202
1203    #[test]
1204    fn test_batch_delete_entry_builder() {
1205        // Verify DeleteMessageBatchRequestEntry builds correctly with sequential IDs,
1206        // matching the pattern used in flush_delete_batch.
1207        let handles = vec![
1208            "receipt-0".to_string(),
1209            "receipt-1".to_string(),
1210            "receipt-2".to_string(),
1211        ];
1212        let entries: Vec<DeleteMessageBatchRequestEntry> = handles
1213            .iter()
1214            .enumerate()
1215            .map(|(i, handle)| {
1216                DeleteMessageBatchRequestEntry::builder()
1217                    .id(i.to_string())
1218                    .receipt_handle(handle)
1219                    .build()
1220                    .expect("id and receipt_handle are set")
1221            })
1222            .collect();
1223
1224        assert_eq!(entries.len(), 3);
1225        assert_eq!(entries[0].id(), "0");
1226        assert_eq!(entries[0].receipt_handle(), "receipt-0");
1227        assert_eq!(entries[2].id(), "2");
1228        assert_eq!(entries[2].receipt_handle(), "receipt-2");
1229    }
1230
1231    #[test]
1232    fn test_batch_chunking_logic() {
1233        // Verify that chunks(10) correctly splits receipt handles,
1234        // matching the pattern used in flush_delete_batch.
1235        let handles: Vec<String> = (0..25).map(|i| format!("receipt-{i}")).collect();
1236        let chunks: Vec<&[String]> = handles.chunks(10).collect();
1237
1238        assert_eq!(chunks.len(), 3);
1239        assert_eq!(chunks[0].len(), 10);
1240        assert_eq!(chunks[1].len(), 10);
1241        assert_eq!(chunks[2].len(), 5);
1242    }
1243
1244    #[test]
1245    fn test_outcome_collection_pattern() {
1246        // Verify the pattern used in the main loop to collect receipt handles
1247        // from a mix of Delete and Retain outcomes.
1248        let outcomes = vec![
1249            Some("receipt-1".to_string()), // Delete
1250            None,                          // Retain
1251            Some("receipt-2".to_string()), // Delete
1252            None,                          // Retain
1253            Some("receipt-3".to_string()), // Delete
1254        ];
1255
1256        let pending_deletes: Vec<String> = outcomes.into_iter().flatten().collect();
1257
1258        assert_eq!(pending_deletes.len(), 3);
1259        assert_eq!(pending_deletes[0], "receipt-1");
1260        assert_eq!(pending_deletes[1], "receipt-2");
1261        assert_eq!(pending_deletes[2], "receipt-3");
1262    }
1263
1264    // ── parse_target_scheduled_on: edge cases ─────────────────────────
1265
1266    #[test]
1267    fn test_parse_target_scheduled_on_non_numeric_string() {
1268        let message = Message::builder()
1269            .message_attributes(
1270                "target_scheduled_on",
1271                MessageAttributeValue::builder()
1272                    .data_type("String")
1273                    .string_value("not-a-number")
1274                    .build()
1275                    .unwrap(),
1276            )
1277            .build();
1278        assert_eq!(parse_target_scheduled_on(&message), None);
1279    }
1280
1281    #[test]
1282    fn test_parse_target_scheduled_on_empty_string() {
1283        let message = Message::builder()
1284            .message_attributes(
1285                "target_scheduled_on",
1286                MessageAttributeValue::builder()
1287                    .data_type("Number")
1288                    .string_value("")
1289                    .build()
1290                    .unwrap(),
1291            )
1292            .build();
1293        assert_eq!(parse_target_scheduled_on(&message), None);
1294    }
1295
1296    #[test]
1297    fn test_parse_target_scheduled_on_negative_value() {
1298        let message = Message::builder()
1299            .message_attributes(
1300                "target_scheduled_on",
1301                MessageAttributeValue::builder()
1302                    .data_type("Number")
1303                    .string_value("-1000")
1304                    .build()
1305                    .unwrap(),
1306            )
1307            .build();
1308        // Negative values parse fine as i64
1309        assert_eq!(parse_target_scheduled_on(&message), Some(-1000));
1310    }
1311
1312    #[test]
1313    fn test_parse_target_scheduled_on_float_string() {
1314        let message = Message::builder()
1315            .message_attributes(
1316                "target_scheduled_on",
1317                MessageAttributeValue::builder()
1318                    .data_type("Number")
1319                    .string_value("1234567890.5")
1320                    .build()
1321                    .unwrap(),
1322            )
1323            .build();
1324        // Floats can't parse as i64
1325        assert_eq!(parse_target_scheduled_on(&message), None);
1326    }
1327
1328    #[test]
1329    fn test_parse_target_scheduled_on_zero() {
1330        let message = Message::builder()
1331            .message_attributes(
1332                "target_scheduled_on",
1333                MessageAttributeValue::builder()
1334                    .data_type("Number")
1335                    .string_value("0")
1336                    .build()
1337                    .unwrap(),
1338            )
1339            .build();
1340        assert_eq!(parse_target_scheduled_on(&message), Some(0));
1341    }
1342
1343    #[test]
1344    fn test_parse_target_scheduled_on_wrong_attribute_name() {
1345        // Attribute exists but under a different key
1346        let message = Message::builder()
1347            .message_attributes(
1348                "wrong_key",
1349                MessageAttributeValue::builder()
1350                    .data_type("Number")
1351                    .string_value("1234567890")
1352                    .build()
1353                    .unwrap(),
1354            )
1355            .build();
1356        assert_eq!(parse_target_scheduled_on(&message), None);
1357    }
1358
1359    // ── parse_retry_attempt: edge cases ───────────────────────────────
1360
1361    #[test]
1362    fn test_parse_retry_attempt_non_numeric_string() {
1363        let message = Message::builder()
1364            .message_attributes(
1365                "retry_attempt",
1366                MessageAttributeValue::builder()
1367                    .data_type("String")
1368                    .string_value("abc")
1369                    .build()
1370                    .unwrap(),
1371            )
1372            .build();
1373        assert_eq!(parse_retry_attempt(&message), None);
1374    }
1375
1376    #[test]
1377    fn test_parse_retry_attempt_negative_value() {
1378        let message = Message::builder()
1379            .message_attributes(
1380                "retry_attempt",
1381                MessageAttributeValue::builder()
1382                    .data_type("Number")
1383                    .string_value("-1")
1384                    .build()
1385                    .unwrap(),
1386            )
1387            .build();
1388        // Negative values can't parse as usize
1389        assert_eq!(parse_retry_attempt(&message), None);
1390    }
1391
1392    #[test]
1393    fn test_parse_retry_attempt_zero() {
1394        let message = Message::builder()
1395            .message_attributes(
1396                "retry_attempt",
1397                MessageAttributeValue::builder()
1398                    .data_type("Number")
1399                    .string_value("0")
1400                    .build()
1401                    .unwrap(),
1402            )
1403            .build();
1404        assert_eq!(parse_retry_attempt(&message), Some(0));
1405    }
1406
1407    #[test]
1408    fn test_parse_retry_attempt_large_value() {
1409        let message = Message::builder()
1410            .message_attributes(
1411                "retry_attempt",
1412                MessageAttributeValue::builder()
1413                    .data_type("Number")
1414                    .string_value("999999")
1415                    .build()
1416                    .unwrap(),
1417            )
1418            .build();
1419        assert_eq!(parse_retry_attempt(&message), Some(999999));
1420    }
1421
1422    // ── is_fifo_queue_url: comprehensive cases ────────────────────────
1423
1424    #[test]
1425    fn test_is_fifo_queue_url_empty_string() {
1426        assert!(!is_fifo_queue_url(""));
1427    }
1428
1429    #[test]
1430    fn test_is_fifo_queue_url_just_fifo_suffix() {
1431        assert!(is_fifo_queue_url("my-queue.fifo"));
1432    }
1433
1434    #[test]
1435    fn test_is_fifo_queue_url_fifo_in_middle() {
1436        // .fifo appearing in the path but not as suffix
1437        assert!(!is_fifo_queue_url(
1438            "https://sqs.us-east-1.amazonaws.com/123/.fifo/queue"
1439        ));
1440    }
1441
1442    #[test]
1443    fn test_is_fifo_queue_url_case_sensitive() {
1444        assert!(!is_fifo_queue_url(
1445            "https://sqs.us-east-1.amazonaws.com/123/queue.FIFO"
1446        ));
1447        assert!(!is_fifo_queue_url(
1448            "https://sqs.us-east-1.amazonaws.com/123/queue.Fifo"
1449        ));
1450    }
1451
1452    #[test]
1453    fn test_is_fifo_queue_url_standard_queue_variations() {
1454        assert!(!is_fifo_queue_url(
1455            "https://sqs.us-east-1.amazonaws.com/123456789/my-queue"
1456        ));
1457        assert!(!is_fifo_queue_url(
1458            "https://sqs.eu-west-1.amazonaws.com/123456789/relayer-tx-request"
1459        ));
1460        assert!(!is_fifo_queue_url(
1461            "http://localhost:4566/000000000000/test-queue"
1462        ));
1463    }
1464
1465    #[test]
1466    fn test_is_fifo_queue_url_localstack() {
1467        // LocalStack FIFO queue URL format
1468        assert!(is_fifo_queue_url(
1469            "http://localhost:4566/000000000000/test-queue.fifo"
1470        ));
1471    }
1472
1473    // ── map_handler_error: message preservation ───────────────────────
1474
1475    #[test]
1476    fn test_map_handler_error_preserves_abort_message() {
1477        let msg = "Validation failed: invalid nonce";
1478        let error = HandlerError::Abort(msg.to_string());
1479        match map_handler_error(error) {
1480            ProcessingError::Permanent(s) => assert_eq!(s, msg),
1481            ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1482        }
1483    }
1484
1485    #[test]
1486    fn test_map_handler_error_preserves_retry_message() {
1487        let msg = "RPC timeout after 30s";
1488        let error = HandlerError::Retry(msg.to_string());
1489        match map_handler_error(error) {
1490            ProcessingError::Retryable(s) => assert_eq!(s, msg),
1491            ProcessingError::Permanent(_) => panic!("Expected Retryable"),
1492        }
1493    }
1494
1495    #[test]
1496    fn test_map_handler_error_empty_message() {
1497        let error = HandlerError::Abort(String::new());
1498        match map_handler_error(error) {
1499            ProcessingError::Permanent(s) => assert!(s.is_empty()),
1500            ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1501        }
1502    }
1503
1504    // ── handler_timeout_secs: all queue types ─────────────────────────
1505
1506    #[test]
1507    fn test_handler_timeout_secs_matches_visibility_timeout_for_all_queues() {
1508        let all = [
1509            QueueType::TransactionRequest,
1510            QueueType::TransactionSubmission,
1511            QueueType::StatusCheck,
1512            QueueType::StatusCheckEvm,
1513            QueueType::StatusCheckStellar,
1514            QueueType::Notification,
1515            QueueType::TokenSwapRequest,
1516            QueueType::RelayerHealthCheck,
1517        ];
1518        for qt in all {
1519            assert_eq!(
1520                handler_timeout_secs(qt),
1521                qt.visibility_timeout_secs().max(1) as u64,
1522                "{qt:?}: handler timeout should equal max(visibility_timeout, 1)"
1523            );
1524        }
1525    }
1526
1527    // ── get_concurrency_for_queue: all queue types ────────────────────
1528
1529    #[test]
1530    fn test_get_concurrency_for_queue_all_types_positive() {
1531        let all = [
1532            QueueType::TransactionRequest,
1533            QueueType::TransactionSubmission,
1534            QueueType::StatusCheck,
1535            QueueType::StatusCheckEvm,
1536            QueueType::StatusCheckStellar,
1537            QueueType::Notification,
1538            QueueType::TokenSwapRequest,
1539            QueueType::RelayerHealthCheck,
1540        ];
1541        for qt in all {
1542            assert!(
1543                get_concurrency_for_queue(qt) > 0,
1544                "{qt:?}: concurrency must be positive (clamped to at least 1)"
1545            );
1546        }
1547    }
1548
1549    // ── poll_error_backoff_secs: overflow and invariants ───────────────
1550
1551    #[test]
1552    fn test_poll_error_backoff_never_exceeds_max() {
1553        for i in 0..200 {
1554            let backoff = poll_error_backoff_secs(i);
1555            assert!(
1556                backoff <= MAX_POLL_BACKOFF_SECS,
1557                "Error count {i}: backoff {backoff}s exceeds MAX {MAX_POLL_BACKOFF_SECS}s"
1558            );
1559        }
1560    }
1561
1562    #[test]
1563    fn test_poll_error_backoff_u32_max_does_not_overflow() {
1564        let backoff = poll_error_backoff_secs(u32::MAX);
1565        assert!(backoff <= MAX_POLL_BACKOFF_SECS);
1566        assert!(backoff > 0);
1567    }
1568
1569    #[test]
1570    fn test_poll_error_backoff_always_positive() {
1571        for i in 0..200 {
1572            assert!(
1573                poll_error_backoff_secs(i) > 0,
1574                "Error count {i}: backoff must be positive"
1575            );
1576        }
1577    }
1578
1579    #[test]
1580    fn test_poll_error_backoff_monotonic_before_cap() {
1581        // Before hitting the cap, backoff should be non-decreasing
1582        let mut prev = poll_error_backoff_secs(0);
1583        for i in 1..=4 {
1584            let curr = poll_error_backoff_secs(i);
1585            assert!(
1586                curr >= prev,
1587                "Backoff should be non-decreasing before cap: {prev} -> {curr} at error {i}"
1588            );
1589            prev = curr;
1590        }
1591    }
1592
1593    // ── Constants validation ──────────────────────────────────────────
1594
1595    #[test]
1596    fn test_max_poll_backoff_is_reasonable() {
1597        assert!(
1598            MAX_POLL_BACKOFF_SECS >= 10,
1599            "Max backoff should be at least 10s to avoid tight error loops"
1600        );
1601        assert!(
1602            MAX_POLL_BACKOFF_SECS <= 300,
1603            "Max backoff should be at most 5 minutes to detect recovery promptly"
1604        );
1605    }
1606
1607    #[test]
1608    fn test_recovery_probe_every_is_valid() {
1609        assert!(
1610            RECOVERY_PROBE_EVERY >= 2,
1611            "Recovery probe interval must be at least 2 to avoid probing every attempt"
1612        );
1613        assert!(
1614            RECOVERY_PROBE_EVERY <= 10,
1615            "Recovery probe interval should not be too large or recovery detection is slow"
1616        );
1617    }
1618
1619    // ── compute_status_retry_delay: edge cases ────────────────────────
1620
1621    #[test]
1622    fn test_compute_status_retry_delay_very_high_attempt() {
1623        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1624        // Very high attempts should stay capped at the max (12s for EVM)
1625        assert_eq!(compute_status_retry_delay(body, 1000), 12);
1626        assert_eq!(compute_status_retry_delay(body, usize::MAX), 12);
1627    }
1628
1629    #[test]
1630    fn test_compute_status_retry_delay_empty_body() {
1631        // Empty JSON body should fall back to generic/Solana defaults
1632        assert_eq!(compute_status_retry_delay("", 0), 5);
1633        assert_eq!(compute_status_retry_delay("{}", 0), 5);
1634    }
1635
1636    #[test]
1637    fn test_compute_status_retry_delay_partial_json() {
1638        // JSON with missing inner structure
1639        assert_eq!(compute_status_retry_delay(r#"{"data":{}}"#, 0), 5);
1640        assert_eq!(
1641            compute_status_retry_delay(r#"{"data":{"network_type":"evm"}}"#, 0),
1642            8
1643        );
1644    }
1645
1646    // ── PartialStatusCheckJob deserialization ──────────────────────────
1647
1648    #[test]
1649    fn test_partial_status_check_job_deserializes_network_type() {
1650        let body = r#"{"data":{"network_type":"evm","extra_field":"ignored"}}"#;
1651        let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1652        assert_eq!(
1653            parsed.data.network_type,
1654            Some(crate::models::NetworkType::Evm)
1655        );
1656    }
1657
1658    #[test]
1659    fn test_partial_status_check_job_handles_missing_network_type() {
1660        let body = r#"{"data":{"transaction_id":"tx1"}}"#;
1661        let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1662        assert_eq!(parsed.data.network_type, None);
1663    }
1664
1665    #[test]
1666    fn test_partial_status_check_job_rejects_missing_data() {
1667        let body = r#"{"not_data":{}}"#;
1668        let result = serde_json::from_str::<PartialStatusCheckJob>(body);
1669        assert!(result.is_err());
1670    }
1671
1672    // ── is_fifo_queue_url used consistently ───────────────────────────
1673
1674    #[test]
1675    fn test_fifo_detection_consistent_with_defer_and_retry_logic() {
1676        // Both defer_message and the retry path in process_message use
1677        // is_fifo_queue_url to decide between visibility-timeout vs re-enqueue.
1678        // Verify our standard and FIFO URLs are classified identically by both
1679        // call sites (they both call the same function).
1680        let standard = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check";
1681        let fifo = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check.fifo";
1682
1683        assert!(!is_fifo_queue_url(standard));
1684        assert!(is_fifo_queue_url(fifo));
1685    }
1686
1687    // ── get_wait_time_for_queue ──────────────────────────────────────────
1688
1689    #[test]
1690    fn test_get_wait_time_for_queue_returns_positive() {
1691        let all = [
1692            QueueType::TransactionRequest,
1693            QueueType::TransactionSubmission,
1694            QueueType::StatusCheck,
1695            QueueType::StatusCheckEvm,
1696            QueueType::StatusCheckStellar,
1697            QueueType::Notification,
1698            QueueType::TokenSwapRequest,
1699            QueueType::RelayerHealthCheck,
1700        ];
1701        for qt in all {
1702            let wt = get_wait_time_for_queue(qt);
1703            assert!(
1704                wt <= 20,
1705                "{qt:?}: wait time {wt} exceeds SQS maximum of 20s"
1706            );
1707        }
1708    }
1709
1710    #[test]
1711    fn test_get_wait_time_for_queue_matches_defaults() {
1712        // Without env overrides the helper should return the queue's default
1713        assert_eq!(
1714            get_wait_time_for_queue(QueueType::TransactionRequest),
1715            QueueType::TransactionRequest.default_wait_time_secs()
1716        );
1717        assert_eq!(
1718            get_wait_time_for_queue(QueueType::StatusCheck),
1719            QueueType::StatusCheck.default_wait_time_secs()
1720        );
1721    }
1722
1723    #[test]
1724    #[serial_test::serial]
1725    fn test_get_wait_time_for_queue_respects_env_override() {
1726        // StatusCheck default is 5; override to 12 via the real env var path
1727        let env_var = format!(
1728            "SQS_{}_WAIT_TIME_SECONDS",
1729            QueueType::StatusCheck.sqs_env_key()
1730        );
1731        std::env::set_var(&env_var, "12");
1732        assert_eq!(get_wait_time_for_queue(QueueType::StatusCheck), 12);
1733        std::env::remove_var(&env_var);
1734    }
1735
1736    #[test]
1737    #[serial_test::serial]
1738    fn test_get_wait_time_for_queue_env_override_clamped_to_20() {
1739        let env_var = format!(
1740            "SQS_{}_WAIT_TIME_SECONDS",
1741            QueueType::Notification.sqs_env_key()
1742        );
1743        std::env::set_var(&env_var, "99");
1744        assert_eq!(
1745            get_wait_time_for_queue(QueueType::Notification),
1746            20,
1747            "Should clamp to SQS maximum of 20"
1748        );
1749        std::env::remove_var(&env_var);
1750    }
1751
1752    // ── get_poller_count_for_queue ───────────────────────────────────────
1753
1754    #[test]
1755    fn test_get_poller_count_for_queue_all_types_positive() {
1756        let all = [
1757            QueueType::TransactionRequest,
1758            QueueType::TransactionSubmission,
1759            QueueType::StatusCheck,
1760            QueueType::StatusCheckEvm,
1761            QueueType::StatusCheckStellar,
1762            QueueType::Notification,
1763            QueueType::TokenSwapRequest,
1764            QueueType::RelayerHealthCheck,
1765        ];
1766        for qt in all {
1767            assert!(
1768                get_poller_count_for_queue(qt) >= 1,
1769                "{qt:?}: poller count must be at least 1"
1770            );
1771        }
1772    }
1773
1774    #[test]
1775    fn test_get_poller_count_for_queue_matches_defaults() {
1776        // Without env overrides the helper should return the queue's default (clamped to >= 1)
1777        assert_eq!(
1778            get_poller_count_for_queue(QueueType::TransactionRequest),
1779            QueueType::TransactionRequest.default_poller_count().max(1)
1780        );
1781        assert_eq!(
1782            get_poller_count_for_queue(QueueType::Notification),
1783            QueueType::Notification.default_poller_count().max(1)
1784        );
1785    }
1786
1787    #[test]
1788    #[serial_test::serial]
1789    fn test_get_poller_count_for_queue_respects_env_override() {
1790        let env_var = format!("SQS_{}_POLLER_COUNT", QueueType::Notification.sqs_env_key());
1791        std::env::set_var(&env_var, "5");
1792        assert_eq!(get_poller_count_for_queue(QueueType::Notification), 5);
1793        std::env::remove_var(&env_var);
1794    }
1795
1796    #[test]
1797    #[serial_test::serial]
1798    fn test_get_poller_count_for_queue_env_zero_clamped_to_1() {
1799        let env_var = format!("SQS_{}_POLLER_COUNT", QueueType::StatusCheck.sqs_env_key());
1800        std::env::set_var(&env_var, "0");
1801        assert_eq!(
1802            get_poller_count_for_queue(QueueType::StatusCheck),
1803            1,
1804            "Zero poller count from env should be clamped to 1"
1805        );
1806        std::env::remove_var(&env_var);
1807    }
1808
1809    // ── PollLoopConfig ──────────────────────────────────────────────────
1810
1811    #[test]
1812    fn test_poll_loop_config_clone() {
1813        let config = PollLoopConfig {
1814            queue_type: QueueType::TransactionRequest,
1815            polling_interval: 15,
1816            visibility_timeout: 120,
1817            handler_timeout: Duration::from_secs(120),
1818            max_retries: 3,
1819            poller_id: 0,
1820            poller_count: 2,
1821        };
1822        let cloned = config.clone();
1823        assert_eq!(cloned.polling_interval, 15);
1824        assert_eq!(cloned.poller_id, 0);
1825        assert_eq!(cloned.poller_count, 2);
1826        assert_eq!(cloned.max_retries, 3);
1827    }
1828}