Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #66 +/- ##
==========================================
+ Coverage 70.89% 71.64% +0.74%
==========================================
Files 54 54
Lines 12198 12286 +88
==========================================
+ Hits 8648 8802 +154
+ Misses 3550 3484 -66 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR introduces an inflight queue mechanism for managing QoS 1 and QoS 2 MQTT messages, replacing the previous HashMap-based tracking with a more sophisticated queue that handles retransmission, flow control, and MQTT version-specific behavior.
Key Changes
- Added
InflightQueuestruct to centralize tracking of unacknowledged messages with O(1) lookups and deadline-based timeout checking - Introduced
receive_maximumconfiguration option (MQTT v5) to enforce flow control limits on inflight PUBLISH messages - Refactored
MqttEngineto useInflightQueueinstead of separate HashMaps for tracking pending subscribes, unsubscribes, and publishes
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
| src/mqtt_client/inflight.rs | New module implementing the inflight queue with support for retransmission, flow control, and MQTT v3/v5 differences |
| src/mqtt_client/opts.rs | Added receive_maximum field with default value of 65535 and builder method |
| src/mqtt_client/engine.rs | Integrated InflightQueue, refactored packet handling to track all inflight operations, updated session resumption logic |
| src/mqtt_client/mod.rs | Added inflight module to public API |
| src/mqtt_client/tokio_quic_client.rs | Added TODO comment about binding to specific address |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| match packet.to_bytes() { | ||
| Ok(bytes) => { | ||
| // All checks pass, dequeue and track | ||
| if let Some((_, packet)) = self.priority_queue.dequeue() { | ||
| match &packet { | ||
| MqttPacket::Publish5(p) if p.qos > 0 => { | ||
| let pid = p.packet_id.unwrap(); | ||
| let _ = self.inflight_queue.push(pid, packet.clone(), p.qos); | ||
| } | ||
| MqttPacket::Publish3(p) if p.qos > 0 => { | ||
| let pid = p.message_id.unwrap(); | ||
| let _ = self.inflight_queue.push(pid, packet.clone(), p.qos); | ||
| } | ||
| _ => {} | ||
| } | ||
| self.outgoing_buffer.push_back(bytes); | ||
| self.last_packet_sent = Instant::now(); | ||
| } |
There was a problem hiding this comment.
The code calls packet.to_bytes() twice for the same packet: once in peek at line 808 and again after dequeue. This is inefficient and could lead to subtle bugs if to_bytes() has side effects or is non-deterministic. The bytes from the first call (line 809) should be reused after dequeuing.
| let rel = MqttPacket::PubRel5(MqttPubRel::new(rec.packet_id, 0, Vec::new())); | ||
| // Update entry to store PUBREL for retransmission | ||
| entry.packet = rel.clone(); | ||
| entry.sent_at = Instant::now(); | ||
| let _ = self.inflight_queue.push(entry.packet_id, entry.packet, 2); | ||
| let _ = self.enqueue_packet(rel); |
There was a problem hiding this comment.
The rel packet is cloned twice: once when updating entry.packet (line 712) and once when calling enqueue_packet (line 715). Since entry.packet is immediately pushed back into inflight_queue and not used elsewhere, consider removing the clone on line 712 or restructuring to avoid the double clone.
| if let Some(mut entry) = self.inflight_queue.acknowledge(rec.packet_id) { | ||
| let rel = MqttPacket::PubRel5(MqttPubRel::new(rec.packet_id, 0, Vec::new())); | ||
| // Update entry to store PUBREL for retransmission | ||
| entry.packet = rel.clone(); | ||
| entry.sent_at = Instant::now(); | ||
| let _ = self.inflight_queue.push(entry.packet_id, entry.packet, 2); | ||
| let _ = self.enqueue_packet(rel); | ||
| } |
There was a problem hiding this comment.
The code mutates entry after removing it from the inflight_queue via acknowledge, then tries to push it back. However, the QoS value passed to push is hardcoded to 2, but entry.qos may still be 1 from the original PUBLISH. This creates an inconsistency where the stored qos field doesn't match what was used for flow control checks. Consider using entry.qos instead of hardcoded values.
There was a problem hiding this comment.
pubrel is only for QoS 2.
| let now = Instant::now(); | ||
| if matches!(packet, MqttPacket::Publish5(_) | MqttPacket::Publish3(_)) { | ||
| self.publish_count += 1; | ||
| } | ||
|
|
||
| let entry = InflightEntry { | ||
| packet_id, | ||
| packet, | ||
| sent_at: now, | ||
| retry_count: 0, | ||
| qos, | ||
| }; | ||
|
|
||
| self.entries.insert(packet_id, entry); | ||
| self.deadline_queue.push_back((packet_id, now)); | ||
| Ok(()) |
There was a problem hiding this comment.
When pushing a packet into the inflight queue after a previous entry exists with the same packet_id, the publish_count may become inconsistent. If the old entry was a PUBLISH packet and the new entry (PUBREL) is not, publish_count will be incremented in line 92 when it shouldn't be. The push method should check if an entry already exists and handle the replacement properly to maintain accurate publish_count.
| self.inflight_queue.push(pid, packet.clone(), 1)?; | ||
| self.enqueue_packet(packet)?; | ||
| Ok(pid) |
There was a problem hiding this comment.
The packet is cloned to push into inflight_queue and then immediately passed to enqueue_packet. This double encoding (packet.clone() materializes the packet, then enqueue_packet calls to_bytes()) is inefficient. Consider restructuring to encode once and reuse the bytes, or pass ownership and reconstruct if needed.
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::mqtt_serde::mqttv5::publish::MqttPublish; | ||
|
|
||
| fn create_packet(pid: u16) -> MqttPacket { | ||
| MqttPacket::Publish5(MqttPublish { | ||
| dup: false, | ||
| qos: 1, | ||
| retain: false, | ||
| topic_name: "test".to_string(), | ||
| packet_id: Some(pid), | ||
| payload: vec![], | ||
| properties: vec![], | ||
| }) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_inflight_push_ack() { | ||
| let mut q = InflightQueue::new(10, 5, Duration::from_secs(5)); | ||
| assert!(q.can_push_publish()); | ||
| q.push(1, create_packet(1), 1).unwrap(); | ||
| assert_eq!(q.len(), 1); | ||
| q.acknowledge(1).unwrap(); | ||
| assert_eq!(q.len(), 0); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_inflight_receive_maximum() { | ||
| let mut q = InflightQueue::new(2, 5, Duration::from_secs(5)); | ||
| q.push(1, create_packet(1), 1).unwrap(); | ||
| q.push(2, create_packet(2), 1).unwrap(); | ||
| < 9E79 td data-line-number="222" class="blob-num blob-num-addition"> | assert!(!q.can_push_publish()); | |
| assert!(q.push(3, create_packet(3), 1).is_err()); | ||
|
|
||
| // Update limit | ||
| q.update_receive_maximum(5); | ||
| assert!(q.can_push_publish()); | ||
| q.push(3, create_packet(3), 1).unwrap(); | ||
| assert_eq!(q.len(), 3); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_inflight_v3_retransmission() { | ||
| let mut q = InflightQueue::new(10, 3, Duration::from_secs(1)); | ||
| let start = Instant::now(); | ||
| q.push(1, create_packet(1), 1).unwrap(); | ||
|
|
||
| let expired = q.get_expired(start + Duration::from_secs(2)); | ||
| assert_eq!(expired.len(), 1); | ||
| assert_eq!(q.len(), 1); | ||
|
|
||
| // Should be queued again | ||
| let expired2 = q.get_expired(start + Duration::from_secs(4)); | ||
| assert_eq!(expired2.len(), 1); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_inflight_v5_no_retransmission() { | ||
| let mut q = InflightQueue::new(10, 5, Duration::from_secs(1)); | ||
| let start = Instant::now(); | ||
| q.push(1, create_packet(1), 1).unwrap(); | ||
|
|
||
| let expired = q.get_expired(start + Duration::from_secs(2)); | ||
| assert!(expired.is_empty()); | ||
| } | ||
| } |
There was a problem hiding this comment.
The test coverage is missing important edge cases: duplicate packet_id push (which would cause publish_count corruption), clear() verification, get_all_for_reconnect() functionality, update_receive_maximum with value 0, and non-PUBLISH packet handling. Consider adding tests for these scenarios to ensure robustness.
c996ae8 to
051277d
Compare
…nsmission, and buffer backpressure logic for MQTT v3 and v5.
051277d to
afb55b2
Compare
a37ef51 to
7e084e3
Compare
Description
inflight queue mechanism for managing QoS 1 and QoS 2
Type of change
Please delete options that are not relevant.
Checklist:
Performance Impact
Security Considerations