Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
41dec39
Fixed locking deadlock and refactored to improve maintainability. Add…
Mar 5, 2026
73dc4ba
Fixed flake issue
Mar 5, 2026
7d9f26c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 5, 2026
b9d6822
Merge branch 'main' into main
auvipy Mar 5, 2026
7b4e017
Merge branch 'main' into main
auvipy Mar 8, 2026
63bfe1c
Update t/unit/transport/SQS/test_SQS_SNS.py
auvipy Mar 8, 2026
16002ed
Docstring fix
Mar 8, 2026
6407803
Merge branch 'main' of https://github.com/rlaunch/kombu
rlaunch Apr 11, 2026
f434cac
Added basic SQS & SNS integration tests
rlaunch Apr 11, 2026
6eab05d
Fixed incorrect handling of None values when handling async SQS reque…
rlaunch Apr 11, 2026
2432e38
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 11, 2026
a0b211e
Update tox.ini
auvipy Apr 11, 2026
67ede48
Update t/unit/asynchronous/aws/sqs/test_connection.py
auvipy Apr 11, 2026
ae3621b
Update kombu/asynchronous/aws/sqs/connection.py
auvipy Apr 11, 2026
ad8222b
Update t/unit/asynchronous/aws/sqs/test_connection.py
auvipy Apr 11, 2026
ee2ce63
Update t/integration/test_sqs.py
rlaunch Apr 12, 2026
ed5e721
Merge branch 'celery:main' into main
rlaunch Apr 12, 2026
1871f6b
Fixed Copilot auto-merged fix in devcontainer PR, which tried to inst…
rlaunch Apr 12, 2026
ab7ecf6
Fixed potential issue when running tests in parallel. Addressed issue…
rlaunch Apr 12, 2026
341b4a7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 12, 2026
0037a04
Fixed import naming issue with the SNS module, preventing mocking of …
rlaunch Apr 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion kombu/asynchronous/aws/sqs/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ def make_request(self, operation_name, params, queue_url, verb, callback=None, p

service_model = self.sqs_connection.meta.service_model
protocol = service_model.protocol
all_params = {**(params or {}), **protocol_params.get(protocol, {})}
all_params = self._build_make_request_params(
params=params,
protocol_params=protocol_params,
protocol=protocol
)

if protocol == 'query':
request = self._create_query_request(
Expand All @@ -129,6 +133,20 @@ def make_request(self, operation_name, params, queue_url, verb, callback=None, p

return self._mexe(prepared_request, callback=callback)

@staticmethod
def _build_make_request_params(params: dict | None, protocol_params: dict | None, protocol: str | None) -> dict:
Comment thread
auvipy marked this conversation as resolved.
Outdated
"""Build the parameters for the make_request call.

This method safely builds the request parameters, and correctly handles when parameters are not set.

:param params: The original request parameters.
:param protocol_params: The protocol-specific request parameters.
:param protocol: The protocol being used for the request.
:return: The combined request parameters.
"""
safe_proto_params = protocol_params or {}
return {**(params or {}), **safe_proto_params.get(protocol, {})}

def create_queue(self, queue_name,
visibility_timeout=None, callback=None):
params = {'QueueName': queue_name}
Expand Down
87 changes: 87 additions & 0 deletions t/integration/test_sqs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

import os
from unittest.mock import patch

import pytest

import kombu
import kombu.asynchronous

from .common import BaseExchangeTypes, BaseMessage, BasicFunctionality


def get_connection(hostname, port):
return kombu.Connection(
f'sqs://{hostname}:{port}',
userid="TestUsername", # This can be anything
password="TestPassword", # This can be anything
transport_options={
"supports_fanout": True,
"client-config": {
"region_name": "us-east-1",
}
},
)
Comment thread
rlaunch marked this conversation as resolved.
Outdated


@pytest.fixture()
def hub():
"""Provide a Kombu hub (event loop) for async I/O and callbacks."""
h = kombu.asynchronous.Hub()
kombu.asynchronous.set_event_loop(h)
yield h
h.close()
kombu.asynchronous.set_event_loop(None)
Comment thread
rlaunch marked this conversation as resolved.
Outdated


@pytest.fixture()
def invalid_connection():
return kombu.Connection('sqs://localhost:12345')

Comment thread
rlaunch marked this conversation as resolved.

@pytest.fixture()
def connection(hub):
conn = get_connection(
hostname=os.environ.get('SQS_HOST', 'localhost'),
port=os.environ.get('SQS_PORT', '4100'),
)
Comment thread
rlaunch marked this conversation as resolved.
Outdated
conn.transport_options['hub'] = hub
return conn


@pytest.fixture(autouse=True)
def mock_set_policy():
"""Mock the _set_policy_on_sqs_queue method as this is not supported by GoAws."""
with patch("kombu.transport.SQS.SNS._SnsSubscription._set_policy_on_sqs_queue") as mock:
yield mock


@pytest.mark.env('sqs')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_SQSBasicFunctionality(BasicFunctionality):
pass


@pytest.mark.env('sqs')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_SQSBaseExchangeTypes(BaseExchangeTypes):
def test_fanout(self, connection):
ex = kombu.Exchange('test_fanout', type='fanout')
test_queue1 = kombu.Queue('fanout1', exchange=ex)
consumer1 = self._create_consumer(connection, test_queue1)
test_queue2 = kombu.Queue('fanout2', exchange=ex)
consumer2 = self._create_consumer(connection, test_queue2)

with connection as conn:
with conn.channel() as channel:
self._publish(channel, ex, [test_queue1, test_queue2])

self._consume_from(conn, consumer1)
self._consume_from(conn, consumer2)


@pytest.mark.env('sqs')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_SQSMessage(BaseMessage):
pass
39 changes: 39 additions & 0 deletions t/unit/asynchronous/aws/sqs/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,3 +708,42 @@ def test_query_protocol_encoding(self):
'Grandparent.1.Parent.Child': '1',
'Grandparent.1.Parent.Sibling': '2',
}

@pytest.mark.parametrize(
"params, protocol_params, protocol, expected_result",
[
(
{'foo': 'bar'},
{'json': {'baz': 'qux'}},
'json',
{
'foo': 'bar',
'baz': 'qux',
},
),
(
None,
{'json': {'baz': 'qux'}},
'json',
{
'baz': 'qux',
},
),
(
None,
{'json': {'baz': 'qux'}},
'xml',
{},
),
(
None,
None,
"json",
{},
),
],
)
def test_make_request_parameters_are_handled_correctly(self, params, protocol_params, protocol, expected_result):
Comment thread
auvipy marked this conversation as resolved.
Outdated
result = self.x._build_make_request_params(params, protocol_params, protocol)

assert result == expected_result
16 changes: 15 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ envlist =
{pypy3.11,3.10,3.11,3.12,3.13,3.14}-linux-integration-redis
{pypy3.11,3.10,3.11,3.12,3.13,3.14}-linux-integration-mongodb
{pypy3.11,3.10,3.11,3.12,3.13,3.14}-linux-integration-kafka
{pypy3.11,3.10,3.11,3.12,3.13,3.14}-linux-integration-sqs
flake8
apicheck
pydocstyle
Expand Down Expand Up @@ -47,6 +48,7 @@ commands =
integration-redis: pytest -xv -E redis t/integration -n auto --reruns 2 --reruns-delay 1 {posargs}
integration-mongodb: pytest -xv -E mongodb t/integration -n auto --reruns 2 --reruns-delay 1 {posargs}
integration-kafka: pytest -xv -E kafka t/integration -n auto --reruns 2 --reruns-delay 1 {posargs}
integration-sqs: pytest -xv -E sqs t/integration -n auto --reruns 2 --reruns-delay 1 {posargs}

basepython =
pypy3: pypy3
Expand All @@ -65,6 +67,7 @@ docker =
integration-mongodb: mongodb
integration-kafka: zookeeper
integration-kafka: kafka
integration-sqs: goaws

dockerenv =
PYAMQP_INTEGRATION_INSTANCE=1
Expand Down Expand Up @@ -106,6 +109,16 @@ healthcheck_retries = 30
healthcheck_start_period = 5
environment = ALLOW_ANONYMOUS_LOGIN=yes

[docker:goaws]
image = admiralpiett/goaws
ports =
4100:4100/tcp
healthcheck_cmd = /bin/sh -c 'nc -z localhost 4100'
healthcheck_interval = 10
healthcheck_timeout = 10
healthcheck_retries = 30
healthcheck_start_period = 5

[docker:kafka]
image = bitnami/kafka:3.8
ports =
Expand Down Expand Up @@ -167,5 +180,6 @@ commands =
3.13-linux-integration-py-amqp,\
3.13-linux-integration-redis,\
3.13-linux-integration-mongodb,\
3.13-linux-integration-kafka \
3.13-linux-integration-kafka,\
3.13-linux-integration-sqs \
-p -o -- --exitfirst {posargs}
Loading