Compare commits

...

4 commits

Author SHA1 Message Date
0f2248a2cb
Saved timed out peers as potential peers
All checks were successful
CI / Formatting (push) Successful in 30s
2025-10-18 18:21:33 -04:00
46f00e2047
Convert game loggers to observers
These functions don't really do anything, I just want to have the code in place for reference
2025-10-18 17:49:56 -04:00
af11fa97fb
Convert distribution to observers
And only distribute your own components
2025-10-18 17:49:07 -04:00
1ba4b96863
Convert potential peer ping to a timer run condition 2025-10-18 17:46:13 -04:00
7 changed files with 72 additions and 72 deletions

View file

@ -2,28 +2,20 @@ use bevy::prelude::*;
use crate::net::prelude::*; use crate::net::prelude::*;
pub fn handle_new_peer(new_peers: Query<&PeerID, Added<Peer>>) { pub fn handle_new_peer(add: On<Add, PeerID>, peers: Query<&PeerID>) -> Result {
for peer in new_peers { let peer = peers.get(add.entity)?;
info!("Peer {} was added", peer.id); info!("Game: Peer {} was added", peer.id);
} Ok(())
} }
pub fn handle_deleted_peer( pub fn handle_deleted_peer(remove: On<Remove, PeerID>, peers: Query<&PeerID>) -> Result {
mut old_peers: RemovedComponents<Peer>, let peer = peers.get(remove.entity)?;
peers: Query<&PeerID>, info!("Game: Peer {} was removed", peer.id);
) -> Result {
for entity in old_peers.read() {
if let Ok(peer) = peers.get(entity) {
info!("Peer {} was removed", peer.id);
} else {
info!("Peer {} was removed", entity);
}
}
Ok(()) Ok(())
} }
pub fn handle_incoming_packets(mut packets: MessageReader<InboundPacket>) { pub fn handle_incoming_packets(mut packets: MessageReader<InboundPacket>) {
for InboundPacket(packet) in packets.read() { for InboundPacket(packet) in packets.read() {
info!("Packet received: {:?}", packet.message); info!("Game: Packet received: {:?}", packet.message);
} }
} }

View file

@ -74,20 +74,13 @@ impl Plugin for GamePlugin {
) )
.add_systems( .add_systems(
OnEnter(AppState::InGame), OnEnter(AppState::InGame),
( (setup_from_seed, (setup_player, setup_balls, setup_walls)).chain(),
setup_from_seed,
(setup_player, setup_balls, setup_walls).after(setup_from_seed),
),
) )
.add_systems( .add_systems(
FixedUpdate, FixedUpdate,
( (
check_for_seed.run_if(in_state(AppState::Loading)), check_for_seed.run_if(in_state(AppState::Loading)),
( handle_incoming_packets,
handle_new_peer,
handle_deleted_peer,
handle_incoming_packets,
),
), ),
) )
.add_systems( .add_systems(
@ -103,7 +96,9 @@ impl Plugin for GamePlugin {
), ),
quit.run_if(input_pressed(KeyCode::KeyQ)), quit.run_if(input_pressed(KeyCode::KeyQ)),
), ),
); )
.add_observer(handle_new_peer)
.add_observer(handle_deleted_peer);
match self.source { match self.source {
DataSource::Address(peer) => { DataSource::Address(peer) => {

View file

@ -104,7 +104,7 @@ pub struct PotentialPeerUI;
pub fn setup_potential_peer_ui(mut commands: Commands) { pub fn setup_potential_peer_ui(mut commands: Commands) {
commands commands
.spawn(( .spawn((
Text::new("Potential peers:"), Text::new("Potential peers:\n"),
Node { Node {
position_type: PositionType::Absolute, position_type: PositionType::Absolute,
bottom: Val::Px(5.0), bottom: Val::Px(5.0),

View file

@ -2,32 +2,51 @@ use bevy::prelude::*;
use super::{ use super::{
packet::{InboundPacket, OutboundPacket, Packet}, packet::{InboundPacket, OutboundPacket, Packet},
peer::{Peer, PeerID}, peer::PeerID,
}; };
#[derive(Component)]
pub struct PeerOwned;
fn spawner<T: TryFrom<Vec<u8>> + Component>( fn spawner<T: TryFrom<Vec<u8>> + Component>(
mut inbound: MessageReader<InboundPacket>, mut inbound: MessageReader<InboundPacket>,
mut commands: Commands, mut commands: Commands,
) { ) {
for InboundPacket(packet) in inbound.read() { for InboundPacket(packet) in inbound.read() {
if let Ok(entity) = T::try_from(packet.message.clone()) { if let Ok(component) = T::try_from(packet.message.clone()) {
commands.spawn(entity); commands.spawn((component, PeerOwned));
} }
} }
} }
fn sender<T: Into<Vec<u8>> + Component + Clone>( fn new_peer<T: Into<Vec<u8>> + Component + Clone>(
peers: Query<&PeerID, Added<Peer>>, add: On<Add, PeerID>,
entities: Query<&T>, peers: Query<&PeerID>,
components: Query<&T, Without<PeerOwned>>,
mut outbound: MessageWriter<OutboundPacket>,
) -> Result {
let peer = peers.get(add.entity)?;
for component in components {
outbound.write(Packet::create(component.clone().into(), peer.id));
}
Ok(())
}
fn new_entity<T: Into<Vec<u8>> + Component + Clone>(
add: On<Add, T>,
peers: Query<&PeerID>,
components: Query<&T, Without<PeerOwned>>,
mut outbound: MessageWriter<OutboundPacket>, mut outbound: MessageWriter<OutboundPacket>,
) { ) {
for peer in peers { if let Ok(component) = components.get(add.entity) {
for entity in entities { for peer in peers {
outbound.write(Packet::create((*entity).clone().into(), peer.id)); outbound.write(Packet::create(component.clone().into(), peer.id));
} }
} }
} }
pub fn distribution_plugin<T: Into<Vec<u8>> + TryFrom<Vec<u8>> + Component + Clone>(app: &mut App) { pub fn distribution_plugin<T: Into<Vec<u8>> + TryFrom<Vec<u8>> + Component + Clone>(app: &mut App) {
app.add_systems(FixedUpdate, (sender::<T>, spawner::<T>)); app.add_systems(FixedUpdate, spawner::<T>)
.add_observer(new_peer::<T>)
.add_observer(new_entity::<T>);
} }

View file

@ -11,11 +11,9 @@ use super::{
queues::NetworkSend, queues::NetworkSend,
}; };
const PING_FREQUENCY: Duration = Duration::from_secs(3); pub const PING_FREQUENCY: Duration = Duration::from_secs(1);
const MISSED_PINGS: u32 = 3; const MISSED_PINGS: u32 = 5;
// TODO: Perhaps this needs a state rethink, is Single/Multiplayer actually useful vs Disconnected, Connecting, Connected?
// Would also help to state-scope some of these things, like InitialAddresses vs PeerMap
pub fn heartbeat( pub fn heartbeat(
peers: Query<(&PeerID, &PeerSendTiming)>, peers: Query<(&PeerID, &PeerSendTiming)>,
time: Res<Time>, time: Res<Time>,
@ -44,34 +42,16 @@ pub fn timeout(
} }
} }
#[derive(Debug, Resource)]
pub struct PotentialPeerTimer {
timer: Timer,
}
impl Default for PotentialPeerTimer {
fn default() -> Self {
Self {
timer: Timer::new(PING_FREQUENCY, TimerMode::Repeating),
}
}
}
pub fn ping_potential_peers( pub fn ping_potential_peers(
mut timer: ResMut<PotentialPeerTimer>,
time: Res<Time>,
peers: Res<PotentialPeers>, peers: Res<PotentialPeers>,
to_socket: Res<NetworkSend>, to_socket: Res<NetworkSend>,
config: Res<Config>, config: Res<Config>,
) -> Result { ) -> Result {
timer.timer.tick(time.delta()); for peer in &peers.addresses {
if timer.timer.is_finished() { to_socket.send(
for peer in &peers.addresses { format_message(&Vec::new(), PacketType::Peer, config.id),
to_socket.send( *peer,
format_message(&Vec::new(), PacketType::Peer, config.id), )?;
*peer,
)?;
}
} }
Ok(()) Ok(())
} }

View file

@ -210,11 +210,23 @@ pub fn handle_peer_change(
} else { } else {
warn!("Peer {} doesn't exist (just added?)", change.peer); warn!("Peer {} doesn't exist (just added?)", change.peer);
} }
potential_peers.addresses.remove(&addr);
} }
// Peer deletion // Peer deletion
(Some(entity), None) => { (Some(entity), None) => {
commands.get_entity(*entity)?.despawn(); commands.get_entity(*entity)?.despawn();
peer_map.remove(&change.peer); if let Ok(peer) = peers.get(*entity) {
potential_peers.addresses.insert(peer.addr.into());
} else {
warn!(
"Peer {} could not be saved as a potential peer",
change.peer
)
}
peer_map.remove(&change.peer).ok_or(format!(
"Peer {} could not be removed from the peer map",
change.peer
))?;
} }
// Peer addition // Peer addition
(None, Some(addr)) => { (None, Some(addr)) => {
@ -230,13 +242,11 @@ pub fn handle_peer_change(
)) ))
.id(), .id(),
); );
potential_peers.addresses.remove(&addr);
} }
// Double peer deletion // Double peer deletion
(None, None) => warn!("Peer {} already deleted", change.peer), (None, None) => warn!("Peer {} already deleted", change.peer),
} }
if let Some(addr) = change.addr {
potential_peers.addresses.remove(&addr);
}
} }
Ok(()) Ok(())
} }

View file

@ -1,9 +1,9 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use bevy::prelude::*; use bevy::{prelude::*, time::common_conditions::on_timer};
use super::{ use super::{
heartbeat::{PotentialPeerTimer, heartbeat, ping_potential_peers, timeout}, heartbeat::{PING_FREQUENCY, heartbeat, ping_potential_peers, timeout},
io::{Config, handle_network_input, handle_network_output}, io::{Config, handle_network_input, handle_network_output},
packet::{InboundPacket, OutboundPacket}, packet::{InboundPacket, OutboundPacket},
peer::{ peer::{
@ -39,12 +39,16 @@ impl Plugin for NetIOPlugin {
) )
.add_systems( .add_systems(
FixedUpdate, FixedUpdate,
(heartbeat, timeout, handle_new_peer, ping_potential_peers), (
heartbeat,
timeout,
handle_new_peer,
ping_potential_peers.run_if(on_timer(PING_FREQUENCY)),
),
) )
.add_systems(FixedPostUpdate, handle_network_output) .add_systems(FixedPostUpdate, handle_network_output)
.init_resource::<Config>() .init_resource::<Config>()
.init_resource::<PeerMap>() .init_resource::<PeerMap>()
.init_resource::<PotentialPeerTimer>()
.insert_resource(PotentialPeers::new(self.initial_peers.clone())) .insert_resource(PotentialPeers::new(self.initial_peers.clone()))
.add_message::<PeerChangeMessage>() .add_message::<PeerChangeMessage>()
.add_message::<InboundPacket>() .add_message::<InboundPacket>()