8000
Skip to content

feat(engine): inflight queue#66

Merged
qzhuyan merged 5 commits intomainfrom
dev/inflight-queue
Jan 5, 2026
Merged

feat(engine): inflight queue#66
qzhuyan merged 5 commits intomainfrom
dev/inflight-queue

Conversation

@qzhuyan
Copy link
Copy Markdown
Collaborator
@qzhuyan qzhuyan commented Dec 29, 2025

Description

inflight queue mechanism for managing QoS 1 and QoS 2

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

Checklist:

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • Any dependent changes have been merged and published in downstream modules

Performance Impact

  • This change improves performance
  • This change has no performance impact
  • This change may have performance implications (please describe)

Security Considerations

  • This change has no security implications
  • This change has security implications (please describe)

@codecov-commenter
Copy link
Copy Markdown
codecov-commenter commented Dec 29, 2025

Codecov Report

❌ Patch coverage is 91.14754% with 27 lines in your changes missing coverage. Please review.
✅ Project coverage is 71.64%. Comparing base (e9442cf) to head (7e084e3).

Files with missing lines Patch % Lines
src/mqtt_client/inflight.rs 93.37% 12 Missing ⚠️
src/mqtt_client/engine.rs 90.09% 11 Missing ⚠️
src/mqtt_client/opts.rs 20.00% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #66      +/-   ##
==========================================
+ Coverage   70.89%   71.64%   +0.74%     
==========================================
  Files          54       54              
  Lines       12198    12286      +88     
==========================================
+ Hits         8648     8802     +154     
+ Misses       3550     3484      -66     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an inflight queue mechanism for managing QoS 1 and QoS 2 MQTT messages, replacing the previous HashMap-based tracking with a more sophisticated queue that handles retransmission, flow control, and MQTT version-specific behavior.

Key Changes

  • Added InflightQueue struct to centralize tracking of unacknowledged messages with O(1) lookups and deadline-based timeout checking
  • Introduced receive_maximum configuration option (MQTT v5) to enforce flow control limits on inflight PUBLISH messages
  • Refactored MqttEngine to use InflightQueue instead of separate HashMaps for tracking pending subscribes, unsubscribes, and publishes

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 17 comments.

Show a summary per file
File Description
src/mqtt_client/inflight.rs New module implementing the inflight queue with support for retransmission, flow control, and MQTT v3/v5 differences
src/mqtt_client/opts.rs Added receive_maximum field with default value of 65535 and builder method
src/mqtt_client/engine.rs Integrated InflightQueue, refactored packet handling to track all inflight operations, updated session resumption logic
src/mqtt_client/mod.rs Added inflight module to public API
src/mqtt_client/tokio_quic_client.rs Added TODO comment about binding to specific address

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +808 to +825
match packet.to_bytes() {
Ok(bytes) => {
// All checks pass, dequeue and track
if let Some((_, packet)) = self.priority_queue.dequeue() {
match &packet {
MqttPacket::Publish5(p) if p.qos > 0 => {
let pid = p.packet_id.unwrap();
let _ = self.inflight_queue.push(pid, packet.clone(), p.qos);
}
MqttPacket::Publish3(p) if p.qos > 0 => {
let pid = p.message_id.unwrap();
let _ = self.inflight_queue.push(pid, packet.clone(), p.qos);
}
_ => {}
}
self.outgoing_buffer.push_back(bytes);
self.last_packet_sent = Instant::now();
}
Copy link
Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code calls packet.to_bytes() twice for the same packet: once in peek at line 808 and again after dequeue. This is inefficient and could lead to subtle bugs if to_bytes() has side effects or is non-deterministic. The bytes from the first call (line 809) should be reused after dequeuing.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo

Comment on lines +710 to +715
let rel = MqttPacket::PubRel5(MqttPubRel::new(rec.packet_id, 0, Vec::new()));
// Update entry to store PUBREL for retransmission
entry.packet = rel.clone();
entry.sent_at = Instant::now();
let _ = self.inflight_queue.push(entry.packet_id, entry.packet, 2);
let _ = self.enqueue_packet(rel);
Copy link
Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rel packet is cloned twice: once when updating entry.packet (line 712) and once when calling enqueue_packet (line 715). Since entry.packet is immediately pushed back into inflight_queue and not used elsewhere, consider removing the clone on line 712 or restructuring to avoid the double clone.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo

Comment on lines +709 to +716
if let Some(mut entry) = self.inflight_queue.acknowledge(rec.packet_id) {
let rel = MqttPacket::PubRel5(MqttPubRel::new(rec.packet_id, 0, Vec::new()));
// Update entry to store PUBREL for retransmission
entry.packet = rel.clone();
entry.sent_at = Instant::now();
let _ = self.inflight_queue.push(entry.packet_id, entry.packet, 2);
let _ = self.enqueue_packet(rel);
}
Copy link
Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code mutates entry after removing it from the inflight_queue via acknowledge, then tries to push it back. However, the QoS value passed to push is hardcoded to 2, but entry.qos may still be 1 from the original PUBLISH. This creates an inconsistency where the stored qos field doesn't match what was used for flow control checks. Consider using entry.qos instead of hardcoded values.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pubrel is only for QoS 2.

Comment on lines +90 to +105
let now = Instant::now();
if matches!(packet, MqttPacket::Publish5(_) | MqttPacket::Publish3(_)) {
self.publish_count += 1;
}

let entry = InflightEntry {
packet_id,
packet,
sent_at: now,
retry_count: 0,
qos,
};

self.entries.insert(packet_id, entry);
self.deadline_queue.push_back((packet_id, now));
Ok(())
Copy link
Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When pushing a packet into the inflight queue after a previous entry exists with the same packet_id, the publish_count may become inconsistent. If the old entry was a PUBLISH packet and the new entry (PUBREL) is not, publish_count will be incremented in line 92 when it shouldn't be. The push method should check if an entry already exists and handle the replacement properly to maintain accurate publish_count.

Copilot uses AI. Check for mistakes.
Comment on lines +520 to 522
self.inflight_queue.push(pid, packet.clone(), 1)?;
self.enqueue_packet(packet)?;
Ok(pid)
Copy link
Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The packet is cloned to push into inflight_queue and then immediately passed to enqueue_packet. This double encoding (packet.clone() materializes the packet, then enqueue_packet calls to_bytes()) is inefficient. Consider restructuring to encode once and reuse the bytes, or pass ownership and reconstruct if needed.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo , nit

Comment on lines +190 to +256
< 9E79 td data-line-number="222" class="blob-num blob-num-addition">
#[cfg(test)]
mod tests {
use super::*;
use crate::mqtt_serde::mqttv5::publish::MqttPublish;

fn create_packet(pid: u16) -> MqttPacket {
MqttPacket::Publish5(MqttPublish {
dup: false,
qos: 1,
retain: false,
topic_name: "test".to_string(),
packet_id: Some(pid),
payload: vec![],
properties: vec![],
})
}

#[test]
fn test_inflight_push_ack() {
let mut q = InflightQueue::new(10, 5, Duration::from_secs(5));
assert!(q.can_push_publish());
q.push(1, create_packet(1), 1).unwrap();
assert_eq!(q.len(), 1);
q.acknowledge(1).unwrap();
assert_eq!(q.len(), 0);
}

#[test]
fn test_inflight_receive_maximum() {
let mut q = InflightQueue::new(2, 5, Duration::from_secs(5));
q.push(1, create_packet(1), 1).unwrap();
q.push(2, create_packet(2), 1).unwrap();
assert!(!q.can_push_publish());
assert!(q.push(3, create_packet(3), 1).is_err());

// Update limit
q.update_receive_maximum(5);
assert!(q.can_push_publish());
q.push(3, create_packet(3), 1).unwrap();
assert_eq!(q.len(), 3);
}

#[test]
fn test_inflight_v3_retransmission() {
let mut q = InflightQueue::new(10, 3, Duration::from_secs(1));
let start = Instant::now();
q.push(1, create_packet(1), 1).unwrap();

let expired = q.get_expired(start + Duration::from_secs(2));
assert_eq!(expired.len(), 1);
assert_eq!(q.len(), 1);

// Should be queued again
let expired2 = q.get_expired(start + Duration::from_secs(4));
assert_eq!(expired2.len(), 1);
}

#[test]
fn test_inflight_v5_no_retransmission() {
let mut q = InflightQueue::new(10, 5, Duration::from_secs(1));
let start = Instant::now();
q.push(1, create_packet(1), 1).unwrap();

let expired = q.get_expired(start + Duration::from_secs(2));
assert!(expired.is_empty());
}
}
Copy link
Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test coverage is missing important edge cases: duplicate packet_id push (which would cause publish_count corruption), clear() verification, get_all_for_reconnect() functionality, update_receive_maximum with value 0, and non-PUBLISH packet handling. Consider adding tests for these scenarios to ensure robustness.

Copilot uses AI. Check for mistakes.
@qzhuyan qzhuyan force-pushed the dev/inflight-queue branch from c996ae8 to 051277d Compare January 5, 2026 09:42
…nsmission, and buffer backpressure logic for MQTT v3 and v5.
@qzhuyan qzhuyan force-pushed the dev/inflight-queue branch from 051277d to afb55b2 Compare January 5, 2026 10:12
@qzhuyan qzhuyan force-pushed the dev/inflight-queue branch from a37ef51 to 7e084e3 Compare January 5, 2026 10:59
@qzhuyan qzhuyan merged commit 1787402 into main Jan 5, 2026
35 checks passed
@qzhuyan qzhuyan deleted the dev/inflight-queue branch January 7, 2026 18:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

0