Rework networking interface
All checks were successful
CI / Formatting (push) Successful in 1m6s

Add Bevy event queues for inbound and outbound packets, and use Bevy change detection for consumer new Peer handling.
This commit is contained in:
Michael Bradley 2025-07-05 15:01:33 -04:00
parent cceca83dac
commit c10f6cfb82
Signed by: MichaelBradley
SSH key fingerprint: SHA256:o/aaeYtRubILK7OYYjYP12DmU7BsPUhKji1AgaQ+ge4
9 changed files with 348 additions and 72 deletions

118
src/net/io.rs Normal file
View file

@ -0,0 +1,118 @@
use std::time::Duration;
use bevy::prelude::*;
use uuid::Uuid;
use super::{
peer::{Peer, PeerChangeEvent, PeerMap, PeerReceiveTiming, PeerSendTiming},
queues::{NetworkReceive, NetworkSend},
};
pub fn handle_network_input(
from_socket: Res<NetworkReceive>,
peer_map: Res<PeerMap>,
mut peers: Query<(&Peer, &mut PeerReceiveTiming)>,
mut to_app: EventWriter<InboundPacket>,
time: Res<Time>,
mut change_peer: EventWriter<PeerChangeEvent>,
) -> Result {
for (mut message, address) in from_socket.iter() {
if message.len() < 16 {
return Err(format!("Message of length {} cannot contain UUID", message.len()).into());
}
let uuid = Uuid::from_slice(message.split_off(message.len() - 16).as_slice())?;
info!("Receiving: {:?} from {}", message, uuid);
to_app.write(InboundPacket(Packet::new(message, uuid)));
if let Some(peer_id) = peer_map.get(&uuid) {
let (peer, mut last) = peers.get_mut(*peer_id)?;
last.update(&time);
if address == peer.addr {
continue;
}
}
change_peer.write(PeerChangeEvent::new(uuid, Some(address)));
}
Ok(())
}
pub fn handle_network_output(
mut from_app: EventReader<OutboundPacket>,
peer_map: Res<PeerMap>,
mut peers: Query<(&Peer, &mut PeerSendTiming)>,
config: Res<Config>,
to_socket: Res<NetworkSend>,
time: Res<Time>,
) -> Result {
for packet in from_app.read() {
let peer_id = peer_map.try_get(&packet.0.peer)?;
let (peer, mut last) = peers.get_mut(*peer_id)?;
warn!("Sending: {:?} to {}", packet.0.message, peer.uuid);
// Append our UUID for client identification
let message = [packet.0.message.as_slice(), config.id.as_bytes()].concat();
to_socket.send(message, peer.addr)?;
last.update(&time);
}
Ok(())
}
const TIMEOUT: Duration = Duration::from_secs(10);
pub fn heartbeat(
peers: Query<(&Peer, &PeerSendTiming)>,
time: Res<Time>,
mut outbound: EventWriter<OutboundPacket>,
) {
for (peer, last) in peers {
if let Some(previous) = last.timestamp() {
// Allow for 2 consecutive missed heartbeats without timing out
if previous + TIMEOUT / 3 > time.elapsed() {
continue;
}
}
outbound.write(OutboundPacket(Packet::new(Vec::new(), peer.uuid)));
}
}
pub fn timeouts(
peers: Query<(&Peer, &PeerReceiveTiming)>,
time: Res<Time>,
mut delete: EventWriter<PeerChangeEvent>,
) {
for (peer, last) in peers {
if let Some(previous) = last.timestamp() {
if previous + TIMEOUT < time.elapsed() {
warn!("Peer {} timed out", peer.uuid);
delete.write(PeerChangeEvent::new(peer.uuid, None));
}
}
}
}
#[derive(Debug, Resource)]
pub struct Config {
pub id: Uuid,
}
impl Config {
pub fn new() -> Self {
Self { id: Uuid::new_v4() }
}
}
#[derive(Debug)]
pub struct Packet {
pub message: Vec<u8>,
pub peer: Uuid,
}
impl Packet {
pub fn new(message: Vec<u8>, peer: Uuid) -> Self {
Self { peer, message }
}
}
#[derive(Debug, Event)]
pub struct OutboundPacket(pub Packet);
#[derive(Debug, Event)]
pub struct InboundPacket(pub Packet);