Initial change propagation work
All checks were successful
CI / Formatting (push) Successful in 42s

This commit is contained in:
Michael Bradley 2025-10-26 00:44:30 -04:00
parent fe967d70b9
commit 3dfeae14f7
Signed by: MichaelBradley
SSH key fingerprint: SHA256:BKO2eI2LPsCbQS3n3i5SdwZTAIV3F1lHezR07qP+Ob0
13 changed files with 146 additions and 72 deletions

View file

@ -1,17 +1,27 @@
use bevy::prelude::*;
use bevy::{
ecs::{component::Mutable, query::QueryFilter},
prelude::*,
};
use uuid::Uuid;
use super::{
packet::{InboundPacket, OutboundPacket, Packet},
peer::PeerID,
};
#[derive(Component)]
/// Entities wishing to be networked must have this in their bundle
#[derive(Component, Debug)]
pub struct EntityNetworkID(Uuid);
impl Default for EntityNetworkID {
fn default() -> Self {
Self(Uuid::new_v4())
}
}
#[derive(Component, Debug)]
pub struct PeerOwned;
pub trait Networked: Component + NetworkEncodable + NetworkDecodable {}
impl<T> Networked for T where T: Component + NetworkEncodable + NetworkDecodable {}
pub trait NetworkEncodable {
fn encode(&self) -> Vec<u8>;
}
@ -40,45 +50,87 @@ where
}
}
fn incoming_network_entity<T: NetworkDecodable + Component>(
/// Components wishing to be networked must implement this type
pub trait Networked: Component<Mutability = Mutable> + NetworkEncodable + NetworkDecodable {
type LocalFilter: QueryFilter;
type RemoteFilter: QueryFilter;
}
impl<T> Networked for T
where
T: Component<Mutability = Mutable> + NetworkEncodable + NetworkDecodable,
{
type LocalFilter = Without<PeerOwned>;
type RemoteFilter = With<PeerOwned>;
}
fn incoming_network_entity<
T: NetworkDecodable + Component<Mutability = Mutable>,
F: QueryFilter,
>(
mut inbound: MessageReader<InboundPacket>,
mut components: Query<(&mut T, &EntityNetworkID), F>,
mut commands: Commands,
) {
for InboundPacket(packet) in inbound.read() {
'packets: for InboundPacket(packet) in inbound.read() {
if let Ok(component) = T::decode(packet.message.clone()) {
commands.spawn((component, PeerOwned));
for (mut existing_component, id) in &mut components {
if id.0 == packet.entity {
*existing_component = component;
continue 'packets;
}
}
commands.spawn((component, EntityNetworkID(packet.entity), PeerOwned));
}
}
}
fn new_peer<T: NetworkEncodable + Component>(
add: On<Add, PeerID>,
components: Query<&T, Without<PeerOwned>>,
components: Query<(&T, &EntityNetworkID), Without<PeerOwned>>,
peers: Query<&PeerID>,
mut outbound: MessageWriter<OutboundPacket>,
) -> Result {
let peer = peers.get(add.entity)?;
for component in components {
outbound.write(Packet::create(peer.id, component.encode()));
for (component, id) in components {
outbound.write(Packet::create(peer.id, id.0, component.encode()));
}
Ok(())
}
fn new_entity<T: NetworkEncodable + Component>(
fn new_local_entity<T: NetworkEncodable + Component>(
add: On<Add, T>,
components: Query<&T, Without<PeerOwned>>,
components: Query<(&T, &EntityNetworkID), Without<PeerOwned>>,
peers: Query<&PeerID>,
mut outbound: MessageWriter<OutboundPacket>,
) {
if let Ok(component) = components.get(add.entity) {
if let Ok((component, id)) = components.get(add.entity) {
for peer in peers {
outbound.write(Packet::create(peer.id, component.encode()));
outbound.write(Packet::create(peer.id, id.0, component.encode()));
}
}
}
fn changed_local_entity<T: NetworkEncodable + Component, F: QueryFilter>(
components: Query<(&T, &EntityNetworkID), (F, Changed<T>)>,
peers: Query<&PeerID>,
mut outbound: MessageWriter<OutboundPacket>,
) {
for (component, id) in components {
for peer in peers {
outbound.write(Packet::create(peer.id, id.0, component.encode()));
}
}
}
pub fn distribution_plugin<T: Networked>(app: &mut App) {
app.add_systems(FixedUpdate, incoming_network_entity::<T>)
.add_observer(new_peer::<T>)
.add_observer(new_entity::<T>);
app.add_systems(
FixedUpdate,
(
changed_local_entity::<T, T::LocalFilter>,
incoming_network_entity::<T, T::RemoteFilter>,
),
)
.add_observer(new_peer::<T>)
.add_observer(new_local_entity::<T>);
}

View file

@ -1,13 +1,12 @@
use std::time::Duration;
use bevy::prelude::*;
use crate::net::peer::PotentialPeers;
use uuid::Uuid;
use super::{
io::{Config, format_message},
packet::{OutboundPacket, Packet},
peer::{Peer, PeerChangeMessage, PeerID, PeerReceiveTiming, PeerSendTiming},
peer::{Peer, PeerChangeMessage, PeerID, PeerReceiveTiming, PeerSendTiming, PotentialPeers},
queues::NetworkSend,
};
@ -24,7 +23,7 @@ pub fn heartbeat(
if last.time() + PING_FREQUENCY > time.elapsed() {
continue;
}
outbound.write(Packet::create(peer.id, Vec::new()));
outbound.write(Packet::create(peer.id, Uuid::nil(), Vec::new()));
}
Ok(())
}
@ -48,7 +47,7 @@ pub fn ping_potential_peers(
config: Res<Config>,
) -> Result {
for peer in &peers.addresses {
to_socket.send(format_message(config.id, &Vec::new()), *peer)?;
to_socket.send(format_message(config.id, Uuid::nil(), &Vec::new()), *peer)?;
}
Ok(())
}

View file

@ -18,8 +18,8 @@ impl Default for Config {
}
}
pub fn format_message(id: Uuid, data: &Vec<u8>) -> Vec<u8> {
[id.as_bytes(), data.as_slice()].concat()
pub fn format_message(peer: Uuid, entity: Uuid, data: &Vec<u8>) -> Vec<u8> {
[peer.as_bytes(), entity.as_bytes(), data.as_slice()].concat()
}
pub fn handle_network_input(
@ -33,7 +33,6 @@ pub fn handle_network_input(
for (message, address) in from_socket.iter() {
match Packet::try_from(message) {
Ok(packet) => {
// TODO: Handle packet variant
if !packet.message.is_empty() {
to_app.write(packet.clone().into());
}
@ -63,7 +62,7 @@ pub fn handle_network_output(
for OutboundPacket(packet) in from_app.read() {
let peer_id = peer_map.try_get(&packet.peer)?;
let (peer, mut last) = peers.get_mut(*peer_id)?;
let message = format_message(config.id, &packet.message);
let message = format_message(config.id, packet.entity, &packet.message);
to_socket.send(message, peer.addr.into())?;
last.update(&time);
}

View file

@ -11,7 +11,7 @@ mod thread;
#[allow(unused_imports)]
pub mod prelude {
pub use super::distribution::{
NetworkDecodable, NetworkEncodable, Networked, distribution_plugin,
EntityNetworkID, NetworkDecodable, NetworkEncodable, Networked, distribution_plugin,
};
pub use super::packet::{InboundPacket, OutboundPacket, Packet};
pub use super::peer::{Peer, PeerID, PeerReceiveTiming, PeerSendTiming, PotentialPeers};

View file

@ -7,30 +7,45 @@ pub enum TryFromBytesError {
NotUUID,
}
pub const UUID_SIZE: usize = 16;
#[derive(Clone, Debug)]
pub struct Packet {
pub peer: Uuid,
pub entity: Uuid,
pub message: Vec<u8>,
}
impl Packet {
pub fn create<T: From<Packet>>(peer: Uuid, message: Vec<u8>) -> T {
Self { peer, message }.into()
pub fn create<T: From<Packet>>(peer: Uuid, entity: Uuid, message: Vec<u8>) -> T {
Self {
peer,
entity,
message,
}
.into()
}
}
pub const UUID_SIZE: usize = 16;
fn extract_uuid(buffer: &mut Vec<u8>) -> std::result::Result<Uuid, TryFromBytesError> {
if buffer.len() < UUID_SIZE {
return Err(TryFromBytesError::InsufficientLength);
}
let og_not_uuid = &mut buffer.split_off(UUID_SIZE);
// Return the rest of the vector through the argument
// TODO: Check if this has a performance penalty
std::mem::swap(og_not_uuid, buffer);
// TODO: The Uuid crate has support for zerocopy (although I'm copying in a ton of other place regardless)
Uuid::from_slice(og_not_uuid.as_slice()).map_err(|_| TryFromBytesError::NotUUID)
}
impl TryFrom<Vec<u8>> for Packet {
type Error = TryFromBytesError;
fn try_from(mut value: Vec<u8>) -> std::result::Result<Self, Self::Error> {
if value.len() < UUID_SIZE {
return Err(TryFromBytesError::InsufficientLength);
}
let message = value.split_off(UUID_SIZE);
let uuid = Uuid::from_slice(value.as_slice()).map_err(|_| TryFromBytesError::NotUUID)?;
Ok(Packet::create(uuid, message))
let peer = extract_uuid(&mut value)?;
let entity = extract_uuid(&mut value)?;
Ok(Packet::create(peer, entity, value))
}
}

View file

@ -280,7 +280,7 @@ pub fn handle_new_peer(
if change.is_added() {
for (_, other, data) in peers {
if peer.id != other.id {
outbound.write(Packet::create(peer.id, data.addr.into()));
outbound.write(Packet::create(peer.id, other.id, data.addr.into()));
}
}
}