1use std::future::Future;
7use std::panic::AssertUnwindSafe;
8use std::sync::Arc;
9use std::time::Duration;
10
11use actix_web::web::ThinData;
12use aws_sdk_sqs::error::{ProvideErrorMetadata, SdkError};
13use aws_sdk_sqs::types::{
14 DeleteMessageBatchRequestEntry, Message, MessageAttributeValue, MessageSystemAttributeName,
15};
16use futures::FutureExt;
17use serde::de::DeserializeOwned;
18use tokio::sync::watch;
19use tokio::task::{JoinHandle, JoinSet};
20use tracing::{debug, error, info, warn};
21
22use crate::queues::{backoff_config_for_queue, retry_delay_secs};
23use crate::{
24 config::ServerConfig,
25 jobs::{
26 notification_handler, relayer_health_check_handler, token_swap_request_handler,
27 transaction_request_handler, transaction_status_handler, transaction_submission_handler,
28 Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
29 TransactionSend, TransactionStatusCheck,
30 },
31};
32
33use super::{HandlerError, WorkerContext};
34use super::{QueueBackendError, QueueType, WorkerHandle};
35
36#[derive(Debug)]
37enum ProcessingError {
38 Retryable(String),
39 Permanent(String),
40}
41
42#[derive(Debug)]
45enum MessageOutcome {
46 Delete { receipt_handle: String },
48 Retain,
51}
52
53#[derive(Clone)]
56struct PollLoopConfig {
57 queue_type: QueueType,
58 polling_interval: u64,
59 visibility_timeout: u32,
60 handler_timeout: Duration,
61 max_retries: usize,
62 poller_id: usize,
63 poller_count: usize,
64}
65
66pub async fn spawn_worker_for_queue(
80 sqs_client: aws_sdk_sqs::Client,
81 queue_type: QueueType,
82 queue_url: String,
83 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
84 shutdown_rx: watch::Receiver<bool>,
85) -> Result<WorkerHandle, QueueBackendError> {
86 let concurrency = get_concurrency_for_queue(queue_type);
87 let max_retries = queue_type.max_retries();
88 let polling_interval = get_wait_time_for_queue(queue_type);
89 let poller_count = get_poller_count_for_queue(queue_type);
90 let visibility_timeout = queue_type.visibility_timeout_secs();
91 let handler_timeout_secs = handler_timeout_secs(queue_type);
92 let handler_timeout = Duration::from_secs(handler_timeout_secs);
93
94 info!(
95 queue_type = ?queue_type,
96 queue_url = %queue_url,
97 concurrency = concurrency,
98 max_retries = max_retries,
99 polling_interval_secs = polling_interval,
100 poller_count = poller_count,
101 visibility_timeout_secs = visibility_timeout,
102 handler_timeout_secs = handler_timeout_secs,
103 "Spawning SQS worker"
104 );
105
106 let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));
108
109 let handle: JoinHandle<()> = tokio::spawn(async move {
110 let mut poller_handles: JoinSet<()> = JoinSet::new();
111
112 for poller_id in 0..poller_count {
113 let client = sqs_client.clone();
114 let url = queue_url.clone();
115 let state = app_state.clone();
116 let sem = semaphore.clone();
117 let mut rx = shutdown_rx.clone();
118 let config = PollLoopConfig {
119 queue_type,
120 polling_interval,
121 visibility_timeout,
122 handler_timeout,
123 max_retries,
124 poller_id,
125 poller_count,
126 };
127
128 poller_handles.spawn(async move {
129 run_poll_loop(client, url, state, sem, &mut rx, config).await;
130 });
131 }
132
133 while let Some(join_result) = poller_handles.join_next().await {
135 if let Err(err) = join_result {
136 error!(
137 queue_type = ?queue_type,
138 error = %err,
139 "SQS poller task terminated unexpectedly"
140 );
141 }
142 }
143 info!(queue_type = ?queue_type, "SQS worker stopped");
144 });
145
146 Ok(WorkerHandle::Tokio(handle))
147}
148
149async fn run_poll_loop(
152 sqs_client: aws_sdk_sqs::Client,
153 queue_url: String,
154 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
155 semaphore: Arc<tokio::sync::Semaphore>,
156 shutdown_rx: &mut watch::Receiver<bool>,
157 config: PollLoopConfig,
158) {
159 let PollLoopConfig {
160 queue_type,
161 polling_interval,
162 visibility_timeout,
163 handler_timeout,
164 max_retries,
165 poller_id,
166 poller_count,
167 } = config;
168 let mut inflight: JoinSet<Option<String>> = JoinSet::new();
169 let mut consecutive_poll_errors: u32 = 0;
170 let mut pending_deletes: Vec<String> = Vec::new();
171
172 loop {
173 while let Some(result) = inflight.try_join_next() {
175 match result {
176 Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
177 Ok(None) => {} Err(e) => {
179 warn!(
180 queue_type = ?queue_type,
181 poller_id = poller_id,
182 error = %e,
183 "In-flight task failed"
184 );
185 }
186 }
187 }
188
189 if !pending_deletes.is_empty() {
191 flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
192 pending_deletes.clear();
193 }
194
195 if *shutdown_rx.borrow() {
197 info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received, stopping SQS poller");
198 break;
199 }
200
201 let available_permits = semaphore.available_permits();
208 let base_share = available_permits / poller_count;
209 let remainder = available_permits % poller_count;
210 let my_share = base_share + usize::from(poller_id < remainder);
211 if my_share == 0 {
212 tokio::select! {
213 _ = tokio::time::sleep(Duration::from_millis(50)) => continue,
214 _ = shutdown_rx.changed() => {
215 info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received, stopping SQS poller");
216 break;
217 }
218 }
219 }
220
221 let batch_size = my_share.min(10) as i32;
223
224 let messages_result = tokio::select! {
226 result = sqs_client
227 .receive_message()
228 .queue_url(&queue_url)
229 .max_number_of_messages(batch_size) .wait_time_seconds(polling_interval as i32)
231 .visibility_timeout(visibility_timeout as i32)
232 .message_system_attribute_names(MessageSystemAttributeName::ApproximateReceiveCount)
233 .message_system_attribute_names(MessageSystemAttributeName::MessageGroupId)
234 .message_attribute_names("target_scheduled_on")
235 .message_attribute_names("retry_attempt")
236 .send() => result,
237 _ = shutdown_rx.changed() => {
238 info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received during SQS poll, stopping poller");
239 break;
240 }
241 };
242
243 match messages_result {
244 Ok(output) => {
245 if consecutive_poll_errors > 0 {
246 info!(
247 queue_type = ?queue_type,
248 poller_id = poller_id,
249 previous_errors = consecutive_poll_errors,
250 "SQS polling recovered after consecutive errors"
251 );
252 }
253 consecutive_poll_errors = 0;
254
255 if let Some(messages) = output.messages {
256 if !messages.is_empty() {
257 debug!(
258 queue_type = ?queue_type,
259 poller_id = poller_id,
260 message_count = messages.len(),
261 "Received messages from SQS"
262 );
263
264 for message in messages {
266 let permit = match semaphore.clone().acquire_owned().await {
267 Ok(permit) => permit,
268 Err(err) => {
269 error!(
270 queue_type = ?queue_type,
271 poller_id = poller_id,
272 error = %err,
273 "Semaphore closed, stopping SQS poller loop"
274 );
275 return;
276 }
277 };
278 let client = sqs_client.clone();
279 let url = queue_url.clone();
280 let state = app_state.clone();
281
282 inflight.spawn(async move {
283 let _permit = permit; let result = tokio::time::timeout(
286 handler_timeout,
287 AssertUnwindSafe(process_message(
288 client.clone(),
289 message,
290 queue_type,
291 &url,
292 state,
293 max_retries,
294 ))
295 .catch_unwind(),
296 )
297 .await;
298
299 match result {
300 Ok(Ok(Ok(MessageOutcome::Delete { receipt_handle }))) => {
301 Some(receipt_handle)
302 }
303 Ok(Ok(Ok(MessageOutcome::Retain))) => None,
304 Ok(Ok(Err(e))) => {
305 error!(
306 queue_type = ?queue_type,
307 error = %e,
308 "Failed to process message"
309 );
310 None
311 }
312 Ok(Err(panic_info)) => {
313 let msg = panic_info
314 .downcast_ref::<String>()
315 .map(|s| s.as_str())
316 .or_else(|| {
317 panic_info.downcast_ref::<&str>().copied()
318 })
319 .unwrap_or("unknown panic");
320 error!(
321 queue_type = ?queue_type,
322 panic = %msg,
323 "Message handler panicked"
324 );
325 None
326 }
327 Err(_) => {
328 error!(
329 queue_type = ?queue_type,
330 timeout_secs = handler_timeout.as_secs(),
331 "Message handler timed out; message will be retried after visibility timeout"
332 );
333 None
334 }
335 }
336 });
337 }
338 }
339 }
340 }
341 Err(e) => {
342 consecutive_poll_errors = consecutive_poll_errors.saturating_add(1);
343 let backoff_secs = poll_error_backoff_secs(consecutive_poll_errors);
344 let (error_kind, error_code, error_message) = match &e {
345 SdkError::ServiceError(ctx) => {
346 ("service", ctx.err().code(), ctx.err().message())
347 }
348 SdkError::DispatchFailure(_) => ("dispatch", None, None),
349 SdkError::ResponseError(_) => ("response", None, None),
350 SdkError::TimeoutError(_) => ("timeout", None, None),
351 _ => ("other", None, None),
352 };
353 error!(
354 queue_type = ?queue_type,
355 poller_id = poller_id,
356 error_kind = error_kind,
357 error_code = error_code.unwrap_or("unknown"),
358 error_message = error_message.unwrap_or("n/a"),
359 error = %e,
360 error_debug = ?e,
361 consecutive_errors = consecutive_poll_errors,
362 backoff_secs = backoff_secs,
363 "Failed to receive messages from SQS, backing off"
364 );
365 tokio::select! {
366 _ = tokio::time::sleep(Duration::from_secs(backoff_secs)) => {}
367 _ = shutdown_rx.changed() => {
368 info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received during backoff, stopping poller");
369 break;
370 }
371 }
372 }
373 }
374 }
375
376 if !inflight.is_empty() {
378 info!(
379 queue_type = ?queue_type,
380 poller_id = poller_id,
381 count = inflight.len(),
382 "Draining in-flight tasks before shutdown"
383 );
384 match tokio::time::timeout(Duration::from_secs(30), async {
385 while let Some(result) = inflight.join_next().await {
386 match result {
387 Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
388 Ok(None) => {}
389 Err(e) => {
390 warn!(
391 queue_type = ?queue_type,
392 poller_id = poller_id,
393 error = %e,
394 "In-flight task failed during drain"
395 );
396 }
397 }
398 }
399 })
400 .await
401 {
402 Ok(()) => {
403 info!(queue_type = ?queue_type, poller_id = poller_id, "All in-flight tasks drained")
404 }
405 Err(_) => {
406 warn!(
407 queue_type = ?queue_type,
408 poller_id = poller_id,
409 remaining = inflight.len(),
410 "Drain timeout, abandoning remaining tasks"
411 );
412 inflight.abort_all();
413 }
414 }
415 }
416
417 if !pending_deletes.is_empty() {
419 flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
420 }
421}
422
423async fn process_message(
428 sqs_client: aws_sdk_sqs::Client,
429 message: Message,
430 queue_type: QueueType,
431 queue_url: &str,
432 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
433 max_retries: usize,
434) -> Result<MessageOutcome, QueueBackendError> {
435 let body = message
436 .body()
437 .ok_or_else(|| QueueBackendError::QueueError("Empty message body".to_string()))?;
438
439 let receipt_handle = message
440 .receipt_handle()
441 .ok_or_else(|| QueueBackendError::QueueError("Missing receipt handle".to_string()))?;
442
443 if let Some(target_scheduled_on) = parse_target_scheduled_on(&message) {
445 let now = std::time::SystemTime::now()
446 .duration_since(std::time::SystemTime::UNIX_EPOCH)
447 .map_err(|e| QueueBackendError::QueueError(format!("System clock error: {e}")))?
448 .as_secs() as i64;
449 let remaining = target_scheduled_on - now;
450 if remaining > 0 {
451 let should_delete_original = defer_message(
452 &sqs_client,
453 queue_url,
454 body.to_string(),
455 &message,
456 target_scheduled_on,
457 remaining.min(900) as i32,
458 )
459 .await?;
460
461 debug!(
462 queue_type = ?queue_type,
463 remaining_seconds = remaining,
464 "Deferred scheduled SQS message for next delay hop"
465 );
466 return if should_delete_original {
467 Ok(MessageOutcome::Delete {
468 receipt_handle: receipt_handle.to_string(),
469 })
470 } else {
471 Ok(MessageOutcome::Retain)
472 };
473 }
474 }
475
476 let receive_count = message
478 .attributes()
479 .and_then(|attrs| attrs.get(&MessageSystemAttributeName::ApproximateReceiveCount))
480 .and_then(|count| count.parse::<usize>().ok())
481 .unwrap_or(1);
482 let attempt_number = receive_count.saturating_sub(1);
484 let logical_retry_attempt = parse_retry_attempt(&message).unwrap_or(attempt_number);
487
488 let sqs_message_id = message.message_id().unwrap_or("unknown").to_string();
490
491 debug!(
492 queue_type = ?queue_type,
493 message_id = %sqs_message_id,
494 attempt = attempt_number,
495 receive_count = receive_count,
496 max_retries = max_retries,
497 "Processing message"
498 );
499
500 let result = match queue_type {
502 QueueType::TransactionRequest => {
503 process_job::<TransactionRequest, _, _>(
504 body,
505 app_state,
506 attempt_number,
507 sqs_message_id,
508 "TransactionRequest",
509 transaction_request_handler,
510 )
511 .await
512 }
513 QueueType::TransactionSubmission => {
514 process_job::<TransactionSend, _, _>(
515 body,
516 app_state,
517 attempt_number,
518 sqs_message_id,
519 "TransactionSend",
520 transaction_submission_handler,
521 )
522 .await
523 }
524 QueueType::StatusCheck | QueueType::StatusCheckEvm | QueueType::StatusCheckStellar => {
525 process_job::<TransactionStatusCheck, _, _>(
526 body,
527 app_state,
528 attempt_number,
529 sqs_message_id,
530 "TransactionStatusCheck",
531 transaction_status_handler,
532 )
533 .await
534 }
535 QueueType::Notification => {
536 process_job::<NotificationSend, _, _>(
537 body,
538 app_state,
539 attempt_number,
540 sqs_message_id,
541 "NotificationSend",
542 notification_handler,
543 )
544 .await
545 }
546 QueueType::TokenSwapRequest => {
547 process_job::<TokenSwapRequest, _, _>(
548 body,
549 app_state,
550 attempt_number,
551 sqs_message_id,
552 "TokenSwapRequest",
553 token_swap_request_handler,
554 )
555 .await
556 }
557 QueueType::RelayerHealthCheck => {
558 process_job::<RelayerHealthCheck, _, _>(
559 body,
560 app_state,
561 attempt_number,
562 sqs_message_id,
563 "RelayerHealthCheck",
564 relayer_health_check_handler,
565 )
566 .await
567 }
568 };
569
570 match result {
571 Ok(()) => {
572 debug!(
573 queue_type = ?queue_type,
574 attempt = attempt_number,
575 "Message processed successfully"
576 );
577
578 Ok(MessageOutcome::Delete {
579 receipt_handle: receipt_handle.to_string(),
580 })
581 }
582 Err(ProcessingError::Permanent(e)) => {
583 error!(
584 queue_type = ?queue_type,
585 attempt = attempt_number,
586 error = %e,
587 "Permanent handler failure, message will be deleted"
588 );
589
590 Ok(MessageOutcome::Delete {
591 receipt_handle: receipt_handle.to_string(),
592 })
593 }
594 Err(ProcessingError::Retryable(e)) => {
595 if max_retries != usize::MAX && receive_count > max_retries {
597 error!(
598 queue_type = ?queue_type,
599 attempt = attempt_number,
600 receive_count = receive_count,
601 max_retries = max_retries,
602 error = %e,
603 "Max retries exceeded; message will be automatically moved to DLQ by SQS redrive policy"
604 );
605 return Ok(MessageOutcome::Retain);
606 }
607
608 let delay = if queue_type.is_status_check() {
612 compute_status_retry_delay(body, logical_retry_attempt)
613 } else {
614 retry_delay_secs(backoff_config_for_queue(queue_type), logical_retry_attempt)
615 };
616
617 if is_fifo_queue_url(queue_url) {
620 if let Err(err) = sqs_client
621 .change_message_visibility()
622 .queue_url(queue_url)
623 .receipt_handle(receipt_handle)
624 .visibility_timeout(delay.clamp(1, 900))
625 .send()
626 .await
627 {
628 error!(
629 queue_type = ?queue_type,
630 error = %err,
631 "Failed to set visibility timeout for retry; falling back to existing visibility timeout"
632 );
633 return Ok(MessageOutcome::Retain);
634 }
635
636 debug!(
637 queue_type = ?queue_type,
638 attempt = logical_retry_attempt,
639 delay_seconds = delay,
640 error = %e,
641 "Retry scheduled via visibility timeout"
642 );
643
644 return Ok(MessageOutcome::Retain);
645 }
646
647 let next_retry_attempt = logical_retry_attempt.saturating_add(1);
648
649 if let Err(send_err) = sqs_client
653 .send_message()
654 .queue_url(queue_url)
655 .message_body(body.to_string())
656 .delay_seconds(delay)
657 .message_attributes(
658 "retry_attempt",
659 MessageAttributeValue::builder()
660 .data_type("Number")
661 .string_value(next_retry_attempt.to_string())
662 .build()
663 .map_err(|err| {
664 QueueBackendError::SqsError(format!(
665 "Failed to build retry_attempt attribute: {err}"
666 ))
667 })?,
668 )
669 .send()
670 .await
671 {
672 error!(
673 queue_type = ?queue_type,
674 error = %send_err,
675 "Failed to re-enqueue message; leaving original for visibility timeout retry"
676 );
677 return Ok(MessageOutcome::Retain);
679 }
680
681 debug!(
682 queue_type = ?queue_type,
683 attempt = logical_retry_attempt,
684 delay_seconds = delay,
685 error = %e,
686 "Message re-enqueued with backoff delay"
687 );
688
689 Ok(MessageOutcome::Delete {
691 receipt_handle: receipt_handle.to_string(),
692 })
693 }
694 }
695}
696
697async fn process_job<T, F, Fut>(
700 body: &str,
701 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
702 attempt: usize,
703 task_id: String,
704 type_name: &str,
705 handler: F,
706) -> Result<(), ProcessingError>
707where
708 T: DeserializeOwned,
709 F: FnOnce(Job<T>, ThinData<crate::models::DefaultAppState>, WorkerContext) -> Fut,
710 Fut: Future<Output = Result<(), HandlerError>>,
711{
712 let job: Job<T> = serde_json::from_str(body).map_err(|e| {
713 error!(error = %e, "Failed to deserialize {} job", type_name);
714 ProcessingError::Permanent(format!("Failed to deserialize {type_name} job: {e}"))
716 })?;
717
718 let ctx = WorkerContext::new(attempt, task_id);
719 handler(job, (*app_state).clone(), ctx)
720 .await
721 .map_err(map_handler_error)
722}
723
724fn map_handler_error(error: HandlerError) -> ProcessingError {
725 match error {
726 HandlerError::Abort(msg) => ProcessingError::Permanent(msg),
727 HandlerError::Retry(msg) => ProcessingError::Retryable(msg),
728 }
729}
730
731fn parse_target_scheduled_on(message: &Message) -> Option<i64> {
732 message
733 .message_attributes()
734 .and_then(|attrs| attrs.get("target_scheduled_on"))
735 .and_then(|value| value.string_value())
736 .and_then(|value| value.parse::<i64>().ok())
737}
738
739fn parse_retry_attempt(message: &Message) -> Option<usize> {
740 message
741 .message_attributes()
742 .and_then(|attrs| attrs.get("retry_attempt"))
743 .and_then(|value| value.string_value())
744 .and_then(|value| value.parse::<usize>().ok())
745}
746
747fn is_fifo_queue_url(queue_url: &str) -> bool {
748 queue_url.ends_with(".fifo")
749}
750
751async fn defer_message(
752 sqs_client: &aws_sdk_sqs::Client,
753 queue_url: &str,
754 body: String,
755 message: &Message,
756 target_scheduled_on: i64,
757 delay_seconds: i32,
758) -> Result<bool, QueueBackendError> {
759 if is_fifo_queue_url(queue_url) {
760 let receipt_handle = message.receipt_handle().ok_or_else(|| {
761 QueueBackendError::QueueError(
762 "Cannot defer FIFO message: missing receipt handle".to_string(),
763 )
764 })?;
765
766 sqs_client
767 .change_message_visibility()
768 .queue_url(queue_url)
769 .receipt_handle(receipt_handle)
770 .visibility_timeout(delay_seconds.clamp(1, 900))
771 .send()
772 .await
773 .map_err(|e| {
774 QueueBackendError::SqsError(format!(
775 "Failed to defer FIFO message via visibility timeout: {e}"
776 ))
777 })?;
778
779 return Ok(false);
780 }
781
782 let request = sqs_client
785 .send_message()
786 .queue_url(queue_url)
787 .message_body(body)
788 .delay_seconds(delay_seconds.clamp(1, 900))
789 .message_attributes(
790 "target_scheduled_on",
791 MessageAttributeValue::builder()
792 .data_type("Number")
793 .string_value(target_scheduled_on.to_string())
794 .build()
795 .map_err(|e| {
796 QueueBackendError::SqsError(format!(
797 "Failed to build deferred scheduled attribute: {e}"
798 ))
799 })?,
800 );
801
802 request.send().await.map_err(|e| {
803 QueueBackendError::SqsError(format!("Failed to defer scheduled message: {e}"))
804 })?;
805
806 Ok(true)
807}
808
809#[derive(serde::Deserialize)]
814struct StatusCheckData {
815 network_type: Option<crate::models::NetworkType>,
816}
817
818#[derive(serde::Deserialize)]
823struct PartialStatusCheckJob {
824 data: StatusCheckData,
825}
826
827fn compute_status_retry_delay(body: &str, attempt: usize) -> i32 {
834 let network_type = serde_json::from_str::<PartialStatusCheckJob>(body)
835 .ok()
836 .and_then(|j| j.data.network_type);
837
838 crate::queues::retry_config::status_check_retry_delay_secs(network_type, attempt)
839}
840
841fn get_wait_time_for_queue(queue_type: QueueType) -> u64 {
843 ServerConfig::get_sqs_wait_time(
844 queue_type.sqs_env_key(),
845 queue_type.default_wait_time_secs(),
846 )
847}
848
849fn get_poller_count_for_queue(queue_type: QueueType) -> usize {
851 let configured = ServerConfig::get_sqs_poller_count(
852 queue_type.sqs_env_key(),
853 queue_type.default_poller_count(),
854 );
855 if configured == 0 {
856 warn!(
857 queue_type = ?queue_type,
858 "Configured poller count is 0; clamping to 1"
859 );
860 1
861 } else {
862 configured
863 }
864}
865
866fn get_concurrency_for_queue(queue_type: QueueType) -> usize {
868 let configured = ServerConfig::get_worker_concurrency(
869 queue_type.concurrency_env_key(),
870 queue_type.default_concurrency(),
871 );
872 if configured == 0 {
873 warn!(
874 queue_type = ?queue_type,
875 "Configured concurrency is 0; clamping to 1"
876 );
877 1
878 } else {
879 configured
880 }
881}
882
883fn handler_timeout_secs(queue_type: QueueType) -> u64 {
887 u64::from(queue_type.visibility_timeout_secs().max(1))
888}
889
890const MAX_POLL_BACKOFF_SECS: u64 = 60;
892
893const RECOVERY_PROBE_EVERY: u32 = 4;
897
898fn poll_error_backoff_secs(consecutive_errors: u32) -> u64 {
902 let base: u64 = 5;
903
904 if consecutive_errors >= 7 && consecutive_errors % RECOVERY_PROBE_EVERY == 0 {
907 return base;
908 }
909
910 let exponent = consecutive_errors.saturating_sub(1).min(16);
911 base.saturating_mul(2_u64.saturating_pow(exponent))
912 .min(MAX_POLL_BACKOFF_SECS)
913}
914
915async fn flush_delete_batch(
921 sqs_client: &aws_sdk_sqs::Client,
922 queue_url: &str,
923 batch: &[String],
924 queue_type: QueueType,
925) -> usize {
926 if batch.is_empty() {
927 return 0;
928 }
929
930 let mut deleted = 0;
931
932 for chunk in batch.chunks(10) {
933 let entries: Vec<DeleteMessageBatchRequestEntry> = chunk
934 .iter()
935 .enumerate()
936 .map(|(i, handle)| {
937 DeleteMessageBatchRequestEntry::builder()
938 .id(i.to_string())
939 .receipt_handle(handle)
940 .build()
941 .expect("id and receipt_handle are always set")
942 })
943 .collect();
944
945 match sqs_client
946 .delete_message_batch()
947 .queue_url(queue_url)
948 .set_entries(Some(entries))
949 .send()
950 .await
951 {
952 Ok(output) => {
953 deleted += output.successful().len();
954
955 for f in output.failed() {
956 warn!(
957 queue_type = ?queue_type,
958 id = %f.id(),
959 code = %f.code(),
960 message = f.message().unwrap_or("unknown"),
961 "Batch delete entry failed (message will be redelivered)"
962 );
963 }
964 }
965 Err(e) => {
966 error!(
967 queue_type = ?queue_type,
968 error = %e,
969 batch_size = chunk.len(),
970 "Batch delete API call failed (messages will be redelivered)"
971 );
972 }
973 }
974 }
975
976 deleted
977}
978
979#[cfg(test)]
980mod tests {
981 use super::*;
982
983 #[test]
984 fn test_get_concurrency_for_queue() {
985 let concurrency = get_concurrency_for_queue(QueueType::TransactionRequest);
987 assert!(concurrency > 0);
988
989 let concurrency = get_concurrency_for_queue(QueueType::StatusCheck);
990 assert!(concurrency > 0);
991 }
992
993 #[test]
994 fn test_handler_timeout_secs_is_positive() {
995 let all = [
996 QueueType::TransactionRequest,
997 QueueType::TransactionSubmission,
998 QueueType::StatusCheck,
999 QueueType::StatusCheckEvm,
1000 QueueType::StatusCheckStellar,
1001 QueueType::Notification,
1002 QueueType::TokenSwapRequest,
1003 QueueType::RelayerHealthCheck,
1004 ];
1005 for queue_type in all {
1006 assert!(handler_timeout_secs(queue_type) > 0);
1007 }
1008 }
1009
1010 #[test]
1011 fn test_handler_timeout_secs_uses_visibility_timeout() {
1012 assert_eq!(
1013 handler_timeout_secs(QueueType::StatusCheckEvm),
1014 QueueType::StatusCheckEvm.visibility_timeout_secs() as u64
1015 );
1016 assert_eq!(
1017 handler_timeout_secs(QueueType::Notification),
1018 QueueType::Notification.visibility_timeout_secs() as u64
1019 );
1020 }
1021
1022 #[test]
1023 fn test_parse_target_scheduled_on() {
1024 let message = Message::builder().build();
1026
1027 assert_eq!(parse_target_scheduled_on(&message), None);
1029
1030 let message = Message::builder()
1032 .message_attributes(
1033 "target_scheduled_on",
1034 MessageAttributeValue::builder()
1035 .data_type("Number")
1036 .string_value("1234567890")
1037 .build()
1038 .unwrap(),
1039 )
1040 .build();
1041
1042 assert_eq!(parse_target_scheduled_on(&message), Some(1234567890));
1043 }
1044
1045 #[test]
1046 fn test_parse_retry_attempt() {
1047 let message = Message::builder().build();
1048 assert_eq!(parse_retry_attempt(&message), None);
1049
1050 let message = Message::builder()
1051 .message_attributes(
1052 "retry_attempt",
1053 MessageAttributeValue::builder()
1054 .data_type("Number")
1055 .string_value("7")
1056 .build()
1057 .unwrap(),
1058 )
1059 .build();
1060 assert_eq!(parse_retry_attempt(&message), Some(7));
1061 }
1062
1063 #[test]
1064 fn test_map_handler_error() {
1065 let error = HandlerError::Abort("Validation failed".to_string());
1067 let result = map_handler_error(error);
1068 assert!(matches!(result, ProcessingError::Permanent(_)));
1069
1070 let error = HandlerError::Retry("Network timeout".to_string());
1072 let result = map_handler_error(error);
1073 assert!(matches!(result, ProcessingError::Retryable(_)));
1074 }
1075
1076 #[test]
1077 fn test_is_fifo_queue_url() {
1078 assert!(is_fifo_queue_url(
1079 "https://sqs.us-east-1.amazonaws.com/123/queue.fifo"
1080 ));
1081 assert!(!is_fifo_queue_url(
1082 "https://sqs.us-east-1.amazonaws.com/123/queue"
1083 ));
1084 }
1085
1086 #[test]
1087 fn test_compute_status_retry_delay_evm() {
1088 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1090 assert_eq!(compute_status_retry_delay(body, 0), 8);
1091 assert_eq!(compute_status_retry_delay(body, 1), 12);
1092 assert_eq!(compute_status_retry_delay(body, 8), 12);
1093 }
1094
1095 #[test]
1096 fn test_compute_status_retry_delay_stellar() {
1097 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"stellar"}}"#;
1098 assert_eq!(compute_status_retry_delay(body, 0), 2);
1099 assert_eq!(compute_status_retry_delay(body, 1), 3);
1100 assert_eq!(compute_status_retry_delay(body, 8), 3);
1101 }
1102
1103 #[test]
1104 fn test_compute_status_retry_delay_solana() {
1105 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"solana"}}"#;
1106 assert_eq!(compute_status_retry_delay(body, 0), 5);
1107 assert_eq!(compute_status_retry_delay(body, 1), 8);
1108 assert_eq!(compute_status_retry_delay(body, 8), 8);
1109 }
1110
1111 #[test]
1112 fn test_compute_status_retry_delay_missing_network() {
1113 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1"}}"#;
1114 assert_eq!(compute_status_retry_delay(body, 0), 5);
1115 assert_eq!(compute_status_retry_delay(body, 1), 8);
1116 assert_eq!(compute_status_retry_delay(body, 8), 8);
1117 }
1118
1119 #[test]
1120 fn test_compute_status_retry_delay_invalid_body() {
1121 assert_eq!(compute_status_retry_delay("not json", 0), 5);
1122 assert_eq!(compute_status_retry_delay("not json", 1), 8);
1123 assert_eq!(compute_status_retry_delay("not json", 8), 8);
1124 }
1125
1126 #[tokio::test]
1127 async fn test_semaphore_released_on_panic() {
1128 let sem = Arc::new(tokio::sync::Semaphore::new(1));
1129 let permit = sem.clone().acquire_owned().await.unwrap();
1130
1131 let handle = tokio::spawn(async move {
1132 let _permit = permit; let _ = AssertUnwindSafe(async { panic!("test panic") })
1134 .catch_unwind()
1135 .await;
1136 });
1137
1138 handle.await.unwrap();
1139 let _p = tokio::time::timeout(Duration::from_millis(100), sem.acquire())
1141 .await
1142 .expect("permit should be available after panic");
1143 }
1144
1145 #[test]
1146 fn test_poll_error_backoff_secs() {
1147 assert_eq!(poll_error_backoff_secs(1), 5);
1149 assert_eq!(poll_error_backoff_secs(2), 10);
1151 assert_eq!(poll_error_backoff_secs(3), 20);
1153 assert_eq!(poll_error_backoff_secs(4), 40);
1155 assert_eq!(poll_error_backoff_secs(5), 60);
1157 assert_eq!(poll_error_backoff_secs(6), 60);
1158 assert_eq!(poll_error_backoff_secs(7), 60);
1159 assert_eq!(poll_error_backoff_secs(8), 5);
1161 assert_eq!(poll_error_backoff_secs(9), 60);
1162 assert_eq!(poll_error_backoff_secs(12), 5); }
1164
1165 #[test]
1166 fn test_poll_error_backoff_zero_errors() {
1167 assert_eq!(poll_error_backoff_secs(0), 5);
1169 }
1170
1171 #[test]
1172 fn test_poll_error_backoff_recovery_probes() {
1173 for i in (8..=100).step_by(RECOVERY_PROBE_EVERY as usize) {
1175 assert_eq!(
1176 poll_error_backoff_secs(i as u32),
1177 5,
1178 "Expected recovery probe at error {i}"
1179 );
1180 }
1181 }
1182
1183 #[test]
1184 fn test_message_outcome_delete_carries_receipt_handle() {
1185 let handle = "test-receipt-handle-123".to_string();
1186 let outcome = MessageOutcome::Delete {
1187 receipt_handle: handle.clone(),
1188 };
1189 match outcome {
1190 MessageOutcome::Delete { receipt_handle } => {
1191 assert_eq!(receipt_handle, handle);
1192 }
1193 MessageOutcome::Retain => panic!("Expected Delete variant"),
1194 }
1195 }
1196
1197 #[test]
1198 fn test_message_outcome_retain() {
1199 let outcome = MessageOutcome::Retain;
1200 assert!(matches!(outcome, MessageOutcome::Retain));
1201 }
1202
1203 #[test]
1204 fn test_batch_delete_entry_builder() {
1205 let handles = vec![
1208 "receipt-0".to_string(),
1209 "receipt-1".to_string(),
1210 "receipt-2".to_string(),
1211 ];
1212 let entries: Vec<DeleteMessageBatchRequestEntry> = handles
1213 .iter()
1214 .enumerate()
1215 .map(|(i, handle)| {
1216 DeleteMessageBatchRequestEntry::builder()
1217 .id(i.to_string())
1218 .receipt_handle(handle)
1219 .build()
1220 .expect("id and receipt_handle are set")
1221 })
1222 .collect();
1223
1224 assert_eq!(entries.len(), 3);
1225 assert_eq!(entries[0].id(), "0");
1226 assert_eq!(entries[0].receipt_handle(), "receipt-0");
1227 assert_eq!(entries[2].id(), "2");
1228 assert_eq!(entries[2].receipt_handle(), "receipt-2");
1229 }
1230
1231 #[test]
1232 fn test_batch_chunking_logic() {
1233 let handles: Vec<String> = (0..25).map(|i| format!("receipt-{i}")).collect();
1236 let chunks: Vec<&[String]> = handles.chunks(10).collect();
1237
1238 assert_eq!(chunks.len(), 3);
1239 assert_eq!(chunks[0].len(), 10);
1240 assert_eq!(chunks[1].len(), 10);
1241 assert_eq!(chunks[2].len(), 5);
1242 }
1243
1244 #[test]
1245 fn test_outcome_collection_pattern() {
1246 let outcomes = vec![
1249 Some("receipt-1".to_string()), None, Some("receipt-2".to_string()), None, Some("receipt-3".to_string()), ];
1255
1256 let pending_deletes: Vec<String> = outcomes.into_iter().flatten().collect();
1257
1258 assert_eq!(pending_deletes.len(), 3);
1259 assert_eq!(pending_deletes[0], "receipt-1");
1260 assert_eq!(pending_deletes[1], "receipt-2");
1261 assert_eq!(pending_deletes[2], "receipt-3");
1262 }
1263
1264 #[test]
1267 fn test_parse_target_scheduled_on_non_numeric_string() {
1268 let message = Message::builder()
1269 .message_attributes(
1270 "target_scheduled_on",
1271 MessageAttributeValue::builder()
1272 .data_type("String")
1273 .string_value("not-a-number")
1274 .build()
1275 .unwrap(),
1276 )
1277 .build();
1278 assert_eq!(parse_target_scheduled_on(&message), None);
1279 }
1280
1281 #[test]
1282 fn test_parse_target_scheduled_on_empty_string() {
1283 let message = Message::builder()
1284 .message_attributes(
1285 "target_scheduled_on",
1286 MessageAttributeValue::builder()
1287 .data_type("Number")
1288 .string_value("")
1289 .build()
1290 .unwrap(),
1291 )
1292 .build();
1293 assert_eq!(parse_target_scheduled_on(&message), None);
1294 }
1295
1296 #[test]
1297 fn test_parse_target_scheduled_on_negative_value() {
1298 let message = Message::builder()
1299 .message_attributes(
1300 "target_scheduled_on",
1301 MessageAttributeValue::builder()
1302 .data_type("Number")
1303 .string_value("-1000")
1304 .build()
1305 .unwrap(),
1306 )
1307 .build();
1308 assert_eq!(parse_target_scheduled_on(&message), Some(-1000));
1310 }
1311
1312 #[test]
1313 fn test_parse_target_scheduled_on_float_string() {
1314 let message = Message::builder()
1315 .message_attributes(
1316 "target_scheduled_on",
1317 MessageAttributeValue::builder()
1318 .data_type("Number")
1319 .string_value("1234567890.5")
1320 .build()
1321 .unwrap(),
1322 )
1323 .build();
1324 assert_eq!(parse_target_scheduled_on(&message), None);
1326 }
1327
1328 #[test]
1329 fn test_parse_target_scheduled_on_zero() {
1330 let message = Message::builder()
1331 .message_attributes(
1332 "target_scheduled_on",
1333 MessageAttributeValue::builder()
1334 .data_type("Number")
1335 .string_value("0")
1336 .build()
1337 .unwrap(),
1338 )
1339 .build();
1340 assert_eq!(parse_target_scheduled_on(&message), Some(0));
1341 }
1342
1343 #[test]
1344 fn test_parse_target_scheduled_on_wrong_attribute_name() {
1345 let message = Message::builder()
1347 .message_attributes(
1348 "wrong_key",
1349 MessageAttributeValue::builder()
1350 .data_type("Number")
1351 .string_value("1234567890")
1352 .build()
1353 .unwrap(),
1354 )
1355 .build();
1356 assert_eq!(parse_target_scheduled_on(&message), None);
1357 }
1358
1359 #[test]
1362 fn test_parse_retry_attempt_non_numeric_string() {
1363 let message = Message::builder()
1364 .message_attributes(
1365 "retry_attempt",
1366 MessageAttributeValue::builder()
1367 .data_type("String")
1368 .string_value("abc")
1369 .build()
1370 .unwrap(),
1371 )
1372 .build();
1373 assert_eq!(parse_retry_attempt(&message), None);
1374 }
1375
1376 #[test]
1377 fn test_parse_retry_attempt_negative_value() {
1378 let message = Message::builder()
1379 .message_attributes(
1380 "retry_attempt",
1381 MessageAttributeValue::builder()
1382 .data_type("Number")
1383 .string_value("-1")
1384 .build()
1385 .unwrap(),
1386 )
1387 .build();
1388 assert_eq!(parse_retry_attempt(&message), None);
1390 }
1391
1392 #[test]
1393 fn test_parse_retry_attempt_zero() {
1394 let message = Message::builder()
1395 .message_attributes(
1396 "retry_attempt",
1397 MessageAttributeValue::builder()
1398 .data_type("Number")
1399 .string_value("0")
1400 .build()
1401 .unwrap(),
1402 )
1403 .build();
1404 assert_eq!(parse_retry_attempt(&message), Some(0));
1405 }
1406
1407 #[test]
1408 fn test_parse_retry_attempt_large_value() {
1409 let message = Message::builder()
1410 .message_attributes(
1411 "retry_attempt",
1412 MessageAttributeValue::builder()
1413 .data_type("Number")
1414 .string_value("999999")
1415 .build()
1416 .unwrap(),
1417 )
1418 .build();
1419 assert_eq!(parse_retry_attempt(&message), Some(999999));
1420 }
1421
1422 #[test]
1425 fn test_is_fifo_queue_url_empty_string() {
1426 assert!(!is_fifo_queue_url(""));
1427 }
1428
1429 #[test]
1430 fn test_is_fifo_queue_url_just_fifo_suffix() {
1431 assert!(is_fifo_queue_url("my-queue.fifo"));
1432 }
1433
1434 #[test]
1435 fn test_is_fifo_queue_url_fifo_in_middle() {
1436 assert!(!is_fifo_queue_url(
1438 "https://sqs.us-east-1.amazonaws.com/123/.fifo/queue"
1439 ));
1440 }
1441
1442 #[test]
1443 fn test_is_fifo_queue_url_case_sensitive() {
1444 assert!(!is_fifo_queue_url(
1445 "https://sqs.us-east-1.amazonaws.com/123/queue.FIFO"
1446 ));
1447 assert!(!is_fifo_queue_url(
1448 "https://sqs.us-east-1.amazonaws.com/123/queue.Fifo"
1449 ));
1450 }
1451
1452 #[test]
1453 fn test_is_fifo_queue_url_standard_queue_variations() {
1454 assert!(!is_fifo_queue_url(
1455 "https://sqs.us-east-1.amazonaws.com/123456789/my-queue"
1456 ));
1457 assert!(!is_fifo_queue_url(
1458 "https://sqs.eu-west-1.amazonaws.com/123456789/relayer-tx-request"
1459 ));
1460 assert!(!is_fifo_queue_url(
1461 "http://localhost:4566/000000000000/test-queue"
1462 ));
1463 }
1464
1465 #[test]
1466 fn test_is_fifo_queue_url_localstack() {
1467 assert!(is_fifo_queue_url(
1469 "http://localhost:4566/000000000000/test-queue.fifo"
1470 ));
1471 }
1472
1473 #[test]
1476 fn test_map_handler_error_preserves_abort_message() {
1477 let msg = "Validation failed: invalid nonce";
1478 let error = HandlerError::Abort(msg.to_string());
1479 match map_handler_error(error) {
1480 ProcessingError::Permanent(s) => assert_eq!(s, msg),
1481 ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1482 }
1483 }
1484
1485 #[test]
1486 fn test_map_handler_error_preserves_retry_message() {
1487 let msg = "RPC timeout after 30s";
1488 let error = HandlerError::Retry(msg.to_string());
1489 match map_handler_error(error) {
1490 ProcessingError::Retryable(s) => assert_eq!(s, msg),
1491 ProcessingError::Permanent(_) => panic!("Expected Retryable"),
1492 }
1493 }
1494
1495 #[test]
1496 fn test_map_handler_error_empty_message() {
1497 let error = HandlerError::Abort(String::new());
1498 match map_handler_error(error) {
1499 ProcessingError::Permanent(s) => assert!(s.is_empty()),
1500 ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1501 }
1502 }
1503
1504 #[test]
1507 fn test_handler_timeout_secs_matches_visibility_timeout_for_all_queues() {
1508 let all = [
1509 QueueType::TransactionRequest,
1510 QueueType::TransactionSubmission,
1511 QueueType::StatusCheck,
1512 QueueType::StatusCheckEvm,
1513 QueueType::StatusCheckStellar,
1514 QueueType::Notification,
1515 QueueType::TokenSwapRequest,
1516 QueueType::RelayerHealthCheck,
1517 ];
1518 for qt in all {
1519 assert_eq!(
1520 handler_timeout_secs(qt),
1521 qt.visibility_timeout_secs().max(1) as u64,
1522 "{qt:?}: handler timeout should equal max(visibility_timeout, 1)"
1523 );
1524 }
1525 }
1526
1527 #[test]
1530 fn test_get_concurrency_for_queue_all_types_positive() {
1531 let all = [
1532 QueueType::TransactionRequest,
1533 QueueType::TransactionSubmission,
1534 QueueType::StatusCheck,
1535 QueueType::StatusCheckEvm,
1536 QueueType::StatusCheckStellar,
1537 QueueType::Notification,
1538 QueueType::TokenSwapRequest,
1539 QueueType::RelayerHealthCheck,
1540 ];
1541 for qt in all {
1542 assert!(
1543 get_concurrency_for_queue(qt) > 0,
1544 "{qt:?}: concurrency must be positive (clamped to at least 1)"
1545 );
1546 }
1547 }
1548
1549 #[test]
1552 fn test_poll_error_backoff_never_exceeds_max() {
1553 for i in 0..200 {
1554 let backoff = poll_error_backoff_secs(i);
1555 assert!(
1556 backoff <= MAX_POLL_BACKOFF_SECS,
1557 "Error count {i}: backoff {backoff}s exceeds MAX {MAX_POLL_BACKOFF_SECS}s"
1558 );
1559 }
1560 }
1561
1562 #[test]
1563 fn test_poll_error_backoff_u32_max_does_not_overflow() {
1564 let backoff = poll_error_backoff_secs(u32::MAX);
1565 assert!(backoff <= MAX_POLL_BACKOFF_SECS);
1566 assert!(backoff > 0);
1567 }
1568
1569 #[test]
1570 fn test_poll_error_backoff_always_positive() {
1571 for i in 0..200 {
1572 assert!(
1573 poll_error_backoff_secs(i) > 0,
1574 "Error count {i}: backoff must be positive"
1575 );
1576 }
1577 }
1578
1579 #[test]
1580 fn test_poll_error_backoff_monotonic_before_cap() {
1581 let mut prev = poll_error_backoff_secs(0);
1583 for i in 1..=4 {
1584 let curr = poll_error_backoff_secs(i);
1585 assert!(
1586 curr >= prev,
1587 "Backoff should be non-decreasing before cap: {prev} -> {curr} at error {i}"
1588 );
1589 prev = curr;
1590 }
1591 }
1592
1593 #[test]
1596 fn test_max_poll_backoff_is_reasonable() {
1597 assert!(
1598 MAX_POLL_BACKOFF_SECS >= 10,
1599 "Max backoff should be at least 10s to avoid tight error loops"
1600 );
1601 assert!(
1602 MAX_POLL_BACKOFF_SECS <= 300,
1603 "Max backoff should be at most 5 minutes to detect recovery promptly"
1604 );
1605 }
1606
1607 #[test]
1608 fn test_recovery_probe_every_is_valid() {
1609 assert!(
1610 RECOVERY_PROBE_EVERY >= 2,
1611 "Recovery probe interval must be at least 2 to avoid probing every attempt"
1612 );
1613 assert!(
1614 RECOVERY_PROBE_EVERY <= 10,
1615 "Recovery probe interval should not be too large or recovery detection is slow"
1616 );
1617 }
1618
1619 #[test]
1622 fn test_compute_status_retry_delay_very_high_attempt() {
1623 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1624 assert_eq!(compute_status_retry_delay(body, 1000), 12);
1626 assert_eq!(compute_status_retry_delay(body, usize::MAX), 12);
1627 }
1628
1629 #[test]
1630 fn test_compute_status_retry_delay_empty_body() {
1631 assert_eq!(compute_status_retry_delay("", 0), 5);
1633 assert_eq!(compute_status_retry_delay("{}", 0), 5);
1634 }
1635
1636 #[test]
1637 fn test_compute_status_retry_delay_partial_json() {
1638 assert_eq!(compute_status_retry_delay(r#"{"data":{}}"#, 0), 5);
1640 assert_eq!(
1641 compute_status_retry_delay(r#"{"data":{"network_type":"evm"}}"#, 0),
1642 8
1643 );
1644 }
1645
1646 #[test]
1649 fn test_partial_status_check_job_deserializes_network_type() {
1650 let body = r#"{"data":{"network_type":"evm","extra_field":"ignored"}}"#;
1651 let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1652 assert_eq!(
1653 parsed.data.network_type,
1654 Some(crate::models::NetworkType::Evm)
1655 );
1656 }
1657
1658 #[test]
1659 fn test_partial_status_check_job_handles_missing_network_type() {
1660 let body = r#"{"data":{"transaction_id":"tx1"}}"#;
1661 let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1662 assert_eq!(parsed.data.network_type, None);
1663 }
1664
1665 #[test]
1666 fn test_partial_status_check_job_rejects_missing_data() {
1667 let body = r#"{"not_data":{}}"#;
1668 let result = serde_json::from_str::<PartialStatusCheckJob>(body);
1669 assert!(result.is_err());
1670 }
1671
1672 #[test]
1675 fn test_fifo_detection_consistent_with_defer_and_retry_logic() {
1676 let standard = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check";
1681 let fifo = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check.fifo";
1682
1683 assert!(!is_fifo_queue_url(standard));
1684 assert!(is_fifo_queue_url(fifo));
1685 }
1686
1687 #[test]
1690 fn test_get_wait_time_for_queue_returns_positive() {
1691 let all = [
1692 QueueType::TransactionRequest,
1693 QueueType::TransactionSubmission,
1694 QueueType::StatusCheck,
1695 QueueType::StatusCheckEvm,
1696 QueueType::StatusCheckStellar,
1697 QueueType::Notification,
1698 QueueType::TokenSwapRequest,
1699 QueueType::RelayerHealthCheck,
1700 ];
1701 for qt in all {
1702 let wt = get_wait_time_for_queue(qt);
1703 assert!(
1704 wt <= 20,
1705 "{qt:?}: wait time {wt} exceeds SQS maximum of 20s"
1706 );
1707 }
1708 }
1709
1710 #[test]
1711 fn test_get_wait_time_for_queue_matches_defaults() {
1712 assert_eq!(
1714 get_wait_time_for_queue(QueueType::TransactionRequest),
1715 QueueType::TransactionRequest.default_wait_time_secs()
1716 );
1717 assert_eq!(
1718 get_wait_time_for_queue(QueueType::StatusCheck),
1719 QueueType::StatusCheck.default_wait_time_secs()
1720 );
1721 }
1722
1723 #[test]
1724 #[serial_test::serial]
1725 fn test_get_wait_time_for_queue_respects_env_override() {
1726 let env_var = format!(
1728 "SQS_{}_WAIT_TIME_SECONDS",
1729 QueueType::StatusCheck.sqs_env_key()
1730 );
1731 std::env::set_var(&env_var, "12");
1732 assert_eq!(get_wait_time_for_queue(QueueType::StatusCheck), 12);
1733 std::env::remove_var(&env_var);
1734 }
1735
1736 #[test]
1737 #[serial_test::serial]
1738 fn test_get_wait_time_for_queue_env_override_clamped_to_20() {
1739 let env_var = format!(
1740 "SQS_{}_WAIT_TIME_SECONDS",
1741 QueueType::Notification.sqs_env_key()
1742 );
1743 std::env::set_var(&env_var, "99");
1744 assert_eq!(
1745 get_wait_time_for_queue(QueueType::Notification),
1746 20,
1747 "Should clamp to SQS maximum of 20"
1748 );
1749 std::env::remove_var(&env_var);
1750 }
1751
1752 #[test]
1755 fn test_get_poller_count_for_queue_all_types_positive() {
1756 let all = [
1757 QueueType::TransactionRequest,
1758 QueueType::TransactionSubmission,
1759 QueueType::StatusCheck,
1760 QueueType::StatusCheckEvm,
1761 QueueType::StatusCheckStellar,
1762 QueueType::Notification,
1763 QueueType::TokenSwapRequest,
1764 QueueType::RelayerHealthCheck,
1765 ];
1766 for qt in all {
1767 assert!(
1768 get_poller_count_for_queue(qt) >= 1,
1769 "{qt:?}: poller count must be at least 1"
1770 );
1771 }
1772 }
1773
1774 #[test]
1775 fn test_get_poller_count_for_queue_matches_defaults() {
1776 assert_eq!(
1778 get_poller_count_for_queue(QueueType::TransactionRequest),
1779 QueueType::TransactionRequest.default_poller_count().max(1)
1780 );
1781 assert_eq!(
1782 get_poller_count_for_queue(QueueType::Notification),
1783 QueueType::Notification.default_poller_count().max(1)
1784 );
1785 }
1786
1787 #[test]
1788 #[serial_test::serial]
1789 fn test_get_poller_count_for_queue_respects_env_override() {
1790 let env_var = format!("SQS_{}_POLLER_COUNT", QueueType::Notification.sqs_env_key());
1791 std::env::set_var(&env_var, "5");
1792 assert_eq!(get_poller_count_for_queue(QueueType::Notification), 5);
1793 std::env::remove_var(&env_var);
1794 }
1795
1796 #[test]
1797 #[serial_test::serial]
1798 fn test_get_poller_count_for_queue_env_zero_clamped_to_1() {
1799 let env_var = format!("SQS_{}_POLLER_COUNT", QueueType::StatusCheck.sqs_env_key());
1800 std::env::set_var(&env_var, "0");
1801 assert_eq!(
1802 get_poller_count_for_queue(QueueType::StatusCheck),
1803 1,
1804 "Zero poller count from env should be clamped to 1"
1805 );
1806 std::env::remove_var(&env_var);
1807 }
1808
1809 #[test]
1812 fn test_poll_loop_config_clone() {
1813 let config = PollLoopConfig {
1814 queue_type: QueueType::TransactionRequest,
1815 polling_interval: 15,
1816 visibility_timeout: 120,
1817 handler_timeout: Duration::from_secs(120),
1818 max_retries: 3,
1819 poller_id: 0,
1820 poller_count: 2,
1821 };
1822 let cloned = config.clone();
1823 assert_eq!(cloned.polling_interval, 15);
1824 assert_eq!(cloned.poller_id, 0);
1825 assert_eq!(cloned.poller_count, 2);
1826 assert_eq!(cloned.max_retries, 3);
1827 }
1828}