Conversation
auvipy
left a comment
There was a problem hiding this comment.
can you please also fix the failing test
=================================== FAILURES ===================================
____________________________ test_Channel.test_get _____________________________
self = <t.unit.transport.test_rocketmq.test_Channel object at 0x7f75eb63bbf0>
def test_get(self):
queue = 'new-queue'
self.channel.basic_consume(queue, True, None, 'cg1')
self.consumer.receive.return_value = []
with pytest.raises(Empty):
self.channel._get(queue)
rocketmq_message = _message_to_rocketmq_ack_message(_mock_message(topic=queue, queue=queue))
rocketmq_message.body = str_to_bytes(dumps(self.channel.prepare_message({})))
t/unit/transport/test_rocketmq.py:150:
self = <rocketmq.v5.model.message.Message object at 0x7f75d9132c00>
body = b'{"body": {}, "content-encoding": null, "content-type": null, "headers": {}, "properties": {"delivery_info": {}, "priority": 0}}'
Restoring 2 unacknowledged message(s)
@body.setter
def body(self, body):
if body is None or body.strip() == "":
E BytesWarning: Comparison between bytes and string
.tox/3.12-unit/lib/python3.12/site-packages/rocketmq/v5/model/message.py:175: BytesWarning
for more information, see https://pre-commit.ci
hi, I've suppressed the BytesWarning for those test cases, and I'll submit an issue to the SDK community to get it fixed. |
|
Ok, thanks |
|
@auvipy hi, I've fixed the issue which causes test failure in py313, could you help me trigger the workflow? |
|
Sure |
|
@auvipy hi, the conflict was fixed and I notice that python-version has changed, so could you help me trigger the workflow? |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2306 +/- ##
==========================================
- Coverage 82.37% 82.32% -0.06%
==========================================
Files 79 80 +1
Lines 10116 10447 +331
Branches 1159 1197 +38
==========================================
+ Hits 8333 8600 +267
- Misses 1580 1628 +48
- Partials 203 219 +16 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Pull Request Overview
This PR adds RocketMQ transport support to Kombu, enabling message queuing through Apache RocketMQ as a backend. The implementation follows the virtual transport pattern and provides a comprehensive mapping between AMQP semantics and RocketMQ concepts.
- Implements RocketMQ transport module with Channel, QoS, and Transport classes
- Adds comprehensive test coverage for the new RocketMQ transport
- Includes documentation and dependency configuration for the RocketMQ integration
Reviewed Changes
Copilot reviewed 7 out of 8 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| kombu/transport/rocketmq.py | Main implementation of RocketMQ transport with Channel, QoS, and Transport classes, including message handling, producer/consumer management, and AMQP-to-RocketMQ mapping |
| t/unit/transport/test_rocketmq.py | Comprehensive unit tests for RocketMQ transport covering QoS operations, channel operations, producer/consumer management, and message handling |
| requirements/extras/rocketmq.txt | Defines RocketMQ-specific dependencies (rocketmq-python-client and grpcio-tools) |
| requirements/test-ci.txt | Adds RocketMQ requirements to CI test dependencies |
| requirements/extras/gcpubsub.txt | Updates protobuf version specification |
| kombu/transport/init.py | Registers the RocketMQ transport in the transport registry |
| docs/reference/kombu.transport.rocketmq.rst | Adds Sphinx documentation for the RocketMQ transport module |
| docs/reference/index.rst | Adds RocketMQ transport to the documentation index |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Change-Id: Ib0fcf35f367c3292ab415bcf46cfd4105c57248a
|
@auvipy I've fixed the typo, indentation, deprecated method in resolve-conflict branch based on Copilot's suggestions. |
|
@auvipy Hi, is there anything else I should improve or clarify? I’m happy to make any further adjustments needed. |
|
hi @auvipy |
auvipy
left a comment
There was a problem hiding this comment.
please fix the merge conflicts
The gcpubsub requirement conflict has been resolved. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 8 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 9 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if message.topic in self._auto_ack_topics: | ||
| _ack_rocketmq_message(consumer, message, False) | ||
| amqp_queue = self.connection._topic_to_queue[message.topic] | ||
| payload = _rocketmq_message_to_payload(message, consumer.consumer_group, amqp_queue) | ||
| if callback is not None: |
There was a problem hiding this comment.
_get_bulk assumes every received message topic exists in self.connection._topic_to_queue and indexes it with [...]. If a message arrives for a topic that is no longer registered (e.g., races with basic_cancel, or subscriptions shared across topics/groups), this will raise KeyError and break the drain loop. Consider using .get() and handling unknown topics explicitly (e.g., log + ack/requeue/skip) to keep the consumer robust.
| if message.topic in self._auto_ack_topics: | ||
| _ack_rocketmq_message(consumer, message, False) | ||
| amqp_queue = self.connection._topic_to_queue[message.topic] | ||
| return _rocketmq_message_to_payload(message, consumer.consumer_group, amqp_queue) |
There was a problem hiding this comment.
_get() has the same assumption as _get_bulk() when resolving amqp_queue via self.connection._topic_to_queue[message.topic]. A missing mapping will raise KeyError and surface as an unexpected exception instead of being treated as an unconsumed/undeliverable message. Consider guarding this lookup (e.g., .get()) and handling the unknown topic similarly to _get_bulk().
| if topic in self._bound_topics: | ||
| return consumer | ||
|
|
||
| def _subscribe(): | ||
| consumer.subscribe( | ||
| topic, | ||
| FilterExpression( | ||
| expression=group_config.filter_exp, | ||
| filter_type=group_config.filter_type, | ||
| ), | ||
| ) | ||
| self._bound_topics.add(topic) | ||
|
|
||
| retry_over_time( | ||
| _subscribe, Exception, max_retries=4, interval_max=4, timeout=20 | ||
| ) | ||
| return consumer |
There was a problem hiding this comment.
_get_consumer(..., passive=True) currently prevents creating a new consumer, but it can still perform a (retrying) subscribe() call if the group exists and the topic isn’t in _bound_topics. That makes “passive” calls (e.g., from ack/reject/cancel paths) unexpectedly do network I/O and can also race between threads because _bound_topics is not protected by self.mutex. Consider either (1) skipping subscribe when passive=True, and (2) guarding the check/subscribe/add-to-_bound_topics section with the same lock to avoid duplicate subscriptions.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
#2305
Add virtual transport for RocketMQ