Compare commits

..

2 commits

Author SHA1 Message Date
53fe3333f0
More complete Peer distribution
All checks were successful
CI / Formatting (push) Successful in 45s
Slightly better implementation of peers, still need to create a more generic system for deciding which components to distribute where and then use that for Peers.
2025-10-13 01:06:31 -04:00
e013fb427a
Switch back to nightly, update deps 2025-10-13 01:04:16 -04:00
21 changed files with 495 additions and 293 deletions

View file

@ -1,4 +1,4 @@
[target.x86_64-unknown-linux-gnu] [target.x86_64-unknown-linux-gnu]
linker = "clang" linker = "clang"
# TODO: Share generics when using nightly no longer causes an undefined symbol error # TODO: Share generics when using nightly no longer causes an undefined symbol error
rustflags = ["-C", "link-arg=-fuse-ld=/usr/bin/mold"] #, "-Zshare-generics=y"] rustflags = ["-C", "link-arg=-fuse-ld=/usr/bin/mold", "-Zshare-generics=y"]

18
.vscode/launch.json vendored
View file

@ -15,13 +15,13 @@
"cwd": "${workspaceFolder}", "cwd": "${workspaceFolder}",
"env": { "env": {
"CARGO_MANIFEST_DIR": "${workspaceFolder}", "CARGO_MANIFEST_DIR": "${workspaceFolder}",
"LD_LIBRARY_PATH": "${workspaceFolder}/target/debug/deps:${env:HOME}/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/x86_64-unknown-linux-gnu/lib" "LD_LIBRARY_PATH": "${workspaceFolder}/target/debug/deps:${env:HOME}/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/x86_64-unknown-linux-gnu/lib"
}, },
"presentation": { "presentation": {
"hidden": false, "hidden": false,
"group": "run", "group": "run",
"order": 1 "order": 1
} },
}, },
{ {
"type": "lldb", "type": "lldb",
@ -34,13 +34,14 @@
"cwd": "${workspaceFolder}", "cwd": "${workspaceFolder}",
"env": { "env": {
"CARGO_MANIFEST_DIR": "${workspaceFolder}", "CARGO_MANIFEST_DIR": "${workspaceFolder}",
"LD_LIBRARY_PATH": "${workspaceFolder}/target/debug/deps:${env:HOME}/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/x86_64-unknown-linux-gnu/lib" "LD_LIBRARY_PATH": "${workspaceFolder}/target/debug/deps:${env:HOME}/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/x86_64-unknown-linux-gnu/lib"
}, },
"presentation": { "presentation": {
"hidden": false, "hidden": false,
"group": "run", "group": "run",
"order": 2 "order": 2
} },
"suppressMultipleSessionWarning": true,
}, },
{ {
"type": "lldb", "type": "lldb",
@ -53,13 +54,14 @@
"cwd": "${workspaceFolder}", "cwd": "${workspaceFolder}",
"env": { "env": {
"CARGO_MANIFEST_DIR": "${workspaceFolder}", "CARGO_MANIFEST_DIR": "${workspaceFolder}",
"LD_LIBRARY_PATH": "${workspaceFolder}/target/debug/deps:${env:HOME}/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/x86_64-unknown-linux-gnu/lib" "LD_LIBRARY_PATH": "${workspaceFolder}/target/debug/deps:${env:HOME}/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/x86_64-unknown-linux-gnu/lib"
}, },
"presentation": { "presentation": {
"hidden": false, "hidden": false,
"group": "run", "group": "run",
"order": 2 "order": 2
} },
} "suppressMultipleSessionWarning": true,
] },
],
} }

View file

@ -15,6 +15,12 @@
"RUSTFLAGS": "-Clinker=clang -Clink-arg=-fuse-ld=lld" "RUSTFLAGS": "-Clinker=clang -Clink-arg=-fuse-ld=lld"
}, },
"rust-analyzer.check.command": "clippy", // "check", // "rust-analyzer.check.command": "clippy", // "check", //
// "rust-analyzer.check.overrideCommand": [
// "bevy_lint",
// "--workspace",
// "--all-targets",
// "--message-format=json-diagnostic-rendered-ansi",
// ],
"rust-analyzer.cargo.targetDir": true, "rust-analyzer.cargo.targetDir": true,
"cSpell.words": [ "cSpell.words": [
"Backquote", "Backquote",

26
Cargo.lock generated
View file

@ -1623,9 +1623,9 @@ dependencies = [
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.5.39" version = "4.5.48"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd60e63e9be68e5fb56422e397cf9baddded06dae1d2e523401542383bc72a9f" checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae"
dependencies = [ dependencies = [
"clap_builder", "clap_builder",
"clap_derive", "clap_derive",
@ -1633,9 +1633,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_builder" name = "clap_builder"
version = "4.5.39" version = "4.5.48"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89cc6392a1f72bbeb820d71f32108f61fdaf18bc526e1d23954168a67759ef51" checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9"
dependencies = [ dependencies = [
"anstream", "anstream",
"anstyle", "anstyle",
@ -1645,9 +1645,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_derive" name = "clap_derive"
version = "4.5.32" version = "4.5.47"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7" checksum = "bbfd7eae0b0f1a6e63d4b13c9c478de77c2eb546fba158ad50b4203dc24b9f9c"
dependencies = [ dependencies = [
"heck", "heck",
"proc-macro2", "proc-macro2",
@ -1923,7 +1923,7 @@ dependencies = [
"clap", "clap",
"crossbeam-channel", "crossbeam-channel",
"log", "log",
"rand 0.9.1", "rand 0.9.2",
"uuid", "uuid",
"wyrand", "wyrand",
] ]
@ -2680,9 +2680,9 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.27" version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
[[package]] [[package]]
name = "malloc_buf" name = "malloc_buf"
@ -3549,9 +3549,9 @@ dependencies = [
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.9.1" version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
dependencies = [ dependencies = [
"rand_chacha 0.9.0", "rand_chacha 0.9.0",
"rand_core 0.9.3", "rand_core 0.9.3",
@ -4480,9 +4480,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.17.0" version = "1.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2"
dependencies = [ dependencies = [
"getrandom 0.3.3", "getrandom 0.3.3",
"js-sys", "js-sys",

View file

@ -5,12 +5,32 @@ edition = "2024"
description = "Experimental distributed physics system" description = "Experimental distributed physics system"
license = "AGPL-3.0-only" license = "AGPL-3.0-only"
[lints.rust]
# Mark `bevy_lint` as a valid `cfg`, as it is set when the Bevy linter runs.
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(bevy_lint)"] }
[lints.clippy]
# Bevy supplies arguments to systems via dependency injection, so it's natural for systems to
# request more than 7 arguments, which would undesirably trigger this lint.
too_many_arguments = "allow"
# Queries may access many components, which would undesirably trigger this lint.
type_complexity = "allow"
# Make sure macros use their standard braces, such as `[]` for `bevy_ecs::children!`.
nonstandard_macro_braces = "warn"
[package.metadata.bevy_lint]
panicking_methods = "deny"
pedantic = "warn"
[profile.dev] [profile.dev]
opt-level = 1 opt-level = 1
[profile.dev.package."*"] [profile.dev.package."*"]
opt-level = 3 opt-level = 3
[profile.dev.package.wgpu-types]
debug-assertions = false
[profile.release] [profile.release]
lto = true lto = true
opt-level = 3 opt-level = 3
@ -35,14 +55,14 @@ bevy = { version = "0.16.1", default-features = false, features = [
"wayland", "wayland",
] } ] }
bevy_rand = { version = "0.11.0", features = ["wyrand", "std"] } bevy_rand = { version = "0.11.0", features = ["wyrand", "std"] }
clap = { version = "4.5.39", features = ["derive"] } clap = { version = "4.5.48", features = ["derive"] }
crossbeam-channel = "0.5.15" crossbeam-channel = "0.5.15"
log = { version = "0.4.27", features = ["release_max_level_warn"] } log = { version = "0.4.28", features = ["release_max_level_warn"] }
rand = { version = "0.9.1", default-features = false, features = [ rand = { version = "0.9.2", default-features = false, features = [
"std", "std",
"thread_rng", "thread_rng",
] } ] }
uuid = { version = "1.17.0", features = ["v4"] } uuid = { version = "1.18.1", features = ["v4"] }
wyrand = "0.3.2" wyrand = "0.3.2"
[features] [features]

2
clippy.toml Normal file
View file

@ -0,0 +1,2 @@
# Require `bevy_ecs::children!` to use `[]` braces, instead of `()` or `{}`.
standard-macro-braces = [{ name = "children", brace = "[" }]

View file

@ -1,3 +1,2 @@
[toolchain] [toolchain]
# TODO: Switch back to nightly when it no longer causes an undefined symbol error channel = "nightly"
channel = "stable" # "nightly"

View file

@ -2,13 +2,16 @@ use bevy::prelude::*;
use crate::net::prelude::*; use crate::net::prelude::*;
pub fn handle_new_peer(new_peers: Query<&Peer, Added<Peer>>) { pub fn handle_new_peer(new_peers: Query<&PeerID, Added<Peer>>) {
for peer in new_peers { for peer in new_peers {
info!("Peer {} was added", peer.id); info!("Peer {} was added", peer.id);
} }
} }
pub fn handle_deleted_peer(mut old_peers: RemovedComponents<Peer>, peers: Query<&Peer>) -> Result { pub fn handle_deleted_peer(
mut old_peers: RemovedComponents<Peer>,
peers: Query<&PeerID>,
) -> Result {
for entity in old_peers.read() { for entity in old_peers.read() {
if let Ok(peer) = peers.get(entity) { if let Ok(peer) = peers.get(entity) {
info!("Peer {} was removed", peer.id); info!("Peer {} was removed", peer.id);
@ -20,7 +23,7 @@ pub fn handle_deleted_peer(mut old_peers: RemovedComponents<Peer>, peers: Query<
} }
pub fn handle_incoming_packets(mut packets: EventReader<InboundPacket>) { pub fn handle_incoming_packets(mut packets: EventReader<InboundPacket>) {
for packet in packets.read() { for InboundPacket(packet) in packets.read() {
info!("Packet received: {:?}", packet.0.message); info!("Packet received: {:?}", packet.message);
} }
} }

View file

@ -13,7 +13,10 @@ use super::{
check_for_seed, setup_balls, setup_camera, setup_from_seed, setup_player, setup_walls, check_for_seed, setup_balls, setup_camera, setup_from_seed, setup_player, setup_walls,
}, },
state::AppState, state::AppState,
ui::{setup_peer_ui, setup_seed_ui, update_peer_ui, update_peer_ui_timings, update_seed_ui}, ui::{
setup_peer_ui, setup_potential_peer_ui, setup_seed_ui, update_peer_ui,
update_peer_ui_timings, update_potential_peer_ui, update_seed_ui,
},
}; };
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
@ -55,11 +58,20 @@ impl GamePlugin {
impl Plugin for GamePlugin { impl Plugin for GamePlugin {
fn build(&self, app: &mut App) { fn build(&self, app: &mut App) {
app.add_plugins(( app.add_plugins((
NetIOPlugin::new(self.port, self.source.try_to_address()), NetIOPlugin::maybe_peer(self.port, self.source.try_to_address()),
distribution_plugin::<Seed>,
PhysicsPlugins::default().with_length_unit(50.0), PhysicsPlugins::default().with_length_unit(50.0),
)) ))
.init_state::<AppState>() .init_state::<AppState>()
.add_systems(Startup, (setup_camera, setup_seed_ui, setup_peer_ui)) .add_systems(
Startup,
(
setup_camera,
setup_seed_ui,
setup_peer_ui,
setup_potential_peer_ui,
),
)
.add_systems( .add_systems(
OnEnter(AppState::InGame), OnEnter(AppState::InGame),
( (
@ -75,8 +87,7 @@ impl Plugin for GamePlugin {
handle_new_peer, handle_new_peer,
handle_deleted_peer, handle_deleted_peer,
handle_incoming_packets, handle_incoming_packets,
) ),
.run_if(in_state(NetworkState::MultiPlayer)),
), ),
) )
.add_systems( .add_systems(
@ -85,14 +96,15 @@ impl Plugin for GamePlugin {
((move_player, move_camera).chain(), zoom_camera) ((move_player, move_camera).chain(), zoom_camera)
.run_if(in_state(AppState::InGame)), .run_if(in_state(AppState::InGame)),
update_seed_ui, update_seed_ui,
(update_peer_ui, update_peer_ui_timings) (
.run_if(in_state(NetworkState::MultiPlayer)), update_peer_ui,
update_peer_ui_timings,
update_potential_peer_ui,
),
quit.run_if(input_pressed(KeyCode::KeyQ)), quit.run_if(input_pressed(KeyCode::KeyQ)),
), ),
); );
Seed::register(app);
match self.source { match self.source {
DataSource::Address(peer) => { DataSource::Address(peer) => {
info!("Will retrieve seed from peer => {peer}"); info!("Will retrieve seed from peer => {peer}");

View file

@ -61,7 +61,7 @@ impl From<Seed> for Vec<u8> {
impl TryFrom<Vec<u8>> for Seed { impl TryFrom<Vec<u8>> for Seed {
type Error = TryFromSliceError; type Error = TryFromSliceError;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> { fn try_from(value: Vec<u8>) -> std::result::Result<Self, Self::Error> {
Ok(TryInto::<[u8; 8]>::try_into(value.as_slice())?.into()) Ok(TryInto::<[u8; 8]>::try_into(value.as_slice())?.into())
} }
} }

View file

@ -1,6 +1,6 @@
use bevy::prelude::*; use bevy::prelude::*;
use crate::net::prelude::{Peer, PeerReceiveTiming, PeerSendTiming}; use crate::net::prelude::{Peer, PeerID, PeerReceiveTiming, PeerSendTiming, PotentialPeers};
use super::seed::Seed; use super::seed::Seed;
@ -54,11 +54,11 @@ pub fn setup_peer_ui(mut commands: Commands) {
} }
#[derive(Component)] #[derive(Component)]
pub struct PeerID(Entity); pub struct UIPeerID(Entity);
pub fn update_peer_ui( pub fn update_peer_ui(
ui: Query<(Entity, &Children), With<PeerUI>>, ui: Query<(Entity, &Children), With<PeerUI>>,
rows: Query<&PeerID>, rows: Query<&UIPeerID>,
added: Query<Entity, Added<Peer>>, added: Query<Entity, Added<Peer>>,
mut removed: RemovedComponents<Peer>, mut removed: RemovedComponents<Peer>,
mut commands: Commands, mut commands: Commands,
@ -67,14 +67,14 @@ pub fn update_peer_ui(
for addition in added { for addition in added {
commands commands
.entity(table) .entity(table)
.with_child((Text::new("---- ---- ----"), PeerID(addition))); .with_child((Text::new("---- ---- ----"), UIPeerID(addition)));
} }
for removal in removed.read() { for removal in removed.read() {
for child in children { for child in children {
if let Ok(id) = rows.get(*child) { if let Ok(&UIPeerID(id)) = rows.get(*child)
if id.0 == removal { && id == removal
commands.entity(*child).despawn(); {
} commands.entity(*child).despawn();
} }
} }
} }
@ -82,12 +82,12 @@ pub fn update_peer_ui(
} }
pub fn update_peer_ui_timings( pub fn update_peer_ui_timings(
rows: Query<(&mut Text, &PeerID)>, rows: Query<(&mut Text, &UIPeerID)>,
peers: Query<(&Peer, &PeerReceiveTiming, &PeerSendTiming)>, peers: Query<(&PeerID, &PeerReceiveTiming, &PeerSendTiming)>,
time: Res<Time>, time: Res<Time>,
) { ) {
for (mut row, id) in rows { for (mut row, &UIPeerID(id)) in rows {
if let Ok((peer, recv, send)) = peers.get(id.0) { if let Ok((peer, recv, send)) = peers.get(id) {
**row = format!( **row = format!(
"{} {:.2} {:.2}", "{} {:.2} {:.2}",
peer.id, peer.id,
@ -97,3 +97,34 @@ pub fn update_peer_ui_timings(
} }
} }
} }
#[derive(Component, Debug)]
pub struct PotentialPeerUI;
pub fn setup_potential_peer_ui(mut commands: Commands) {
commands
.spawn((
Text::new("Potential peers:"),
Node {
position_type: PositionType::Absolute,
bottom: Val::Px(5.0),
right: Val::Px(5.0),
..default()
},
))
.with_child((TextSpan::new(""), PotentialPeerUI));
}
pub fn update_potential_peer_ui(
potential_peers: Res<PotentialPeers>,
text: Query<&mut TextSpan, With<PotentialPeerUI>>,
) {
for mut span in text {
**span = potential_peers
.addresses
.iter()
.map(|addr| addr.to_string())
.collect::<Vec<String>>()
.join("\n");
}
}

View file

@ -15,7 +15,7 @@ use game::prelude::*;
/// Also functions as a Bevy plugin to pass the configuration into the app. /// Also functions as a Bevy plugin to pass the configuration into the app.
#[derive(Parser)] #[derive(Parser)]
#[command(version, about)] #[command(version, about)]
pub struct AppSettings { pub struct AppSettingsPlugin {
#[command(flatten)] #[command(flatten)]
source: Source, source: Source,
@ -36,7 +36,7 @@ struct Source {
connect: Option<SocketAddr>, connect: Option<SocketAddr>,
} }
impl Plugin for AppSettings { impl Plugin for AppSettingsPlugin {
fn build(&self, app: &mut App) { fn build(&self, app: &mut App) {
app.insert_resource(Gravity(Vector::ZERO)).add_plugins(( app.insert_resource(Gravity(Vector::ZERO)).add_plugins((
DefaultPlugins.set(WindowPlugin { DefaultPlugins.set(WindowPlugin {

View file

@ -1,8 +1,8 @@
use bevy::prelude::{App, AppExit}; use bevy::prelude::{App, AppExit};
use clap::Parser; use clap::Parser;
use distributed_physics_test::AppSettings; use distributed_physics_test::AppSettingsPlugin;
fn main() -> AppExit { fn main() -> AppExit {
App::new().add_plugins(AppSettings::parse()).run() App::new().add_plugins(AppSettingsPlugin::parse()).run()
} }

View file

@ -2,48 +2,32 @@ use bevy::prelude::*;
use super::{ use super::{
packet::{InboundPacket, OutboundPacket, Packet}, packet::{InboundPacket, OutboundPacket, Packet},
peer::Peer, peer::{Peer, PeerID},
state::NetworkState,
}; };
fn spawner<T: TryFrom<Vec<u8>> + Component>( fn spawner<T: TryFrom<Vec<u8>> + Component>(
mut inbound: EventReader<InboundPacket>, mut inbound: EventReader<InboundPacket>,
mut commands: Commands, mut commands: Commands,
) { ) {
for packet in inbound.read() { for InboundPacket(packet) in inbound.read() {
if let Ok(entity) = T::try_from(packet.0.message.clone()) { if let Ok(entity) = T::try_from(packet.message.clone()) {
commands.spawn(entity); commands.spawn(entity);
} }
} }
} }
fn sender<T: Into<Vec<u8>> + Component + Clone>( fn sender<T: Into<Vec<u8>> + Component + Clone>(
peers: Query<&Peer, Added<Peer>>, peers: Query<&PeerID, Added<Peer>>,
entities: Query<&T>, entities: Query<&T>,
mut outbound: EventWriter<OutboundPacket>, mut outbound: EventWriter<OutboundPacket>,
) { ) {
for peer in peers { for peer in peers {
for entity in entities { for entity in entities {
outbound.write(OutboundPacket(Packet::new( outbound.write(Packet::create((*entity).clone().into(), peer.id));
(*entity).clone().into(),
peer.id,
)));
} }
} }
} }
pub trait Networked: Into<Vec<u8>> + TryFrom<Vec<u8>> + Component + Clone { pub fn distribution_plugin<T: Into<Vec<u8>> + TryFrom<Vec<u8>> + Component + Clone>(app: &mut App) {
fn register(app: &mut App); app.add_systems(FixedUpdate, (sender::<T>, spawner::<T>));
}
impl<T> Networked for T
where
T: Into<Vec<u8>> + TryFrom<Vec<u8>> + Component + Clone,
{
fn register(app: &mut App) {
app.add_systems(
FixedUpdate,
(sender::<T>, spawner::<T>).run_if(in_state(NetworkState::MultiPlayer)),
);
}
} }

77
src/net/heartbeat.rs Normal file
View file

@ -0,0 +1,77 @@
use std::time::Duration;
use bevy::prelude::*;
use crate::net::{packet::PacketType, peer::PotentialPeers};
use super::{
io::{Config, format_message},
packet::{OutboundPacket, Packet},
peer::{Peer, PeerChangeEvent, PeerID, PeerReceiveTiming, PeerSendTiming},
queues::NetworkSend,
};
const PING_FREQUENCY: Duration = Duration::from_secs(3);
const MISSED_PINGS: u32 = 3;
// TODO: Perhaps this needs a state rethink, is Single/Multiplayer actually useful vs Disconnected, Connecting, Connected?
// Would also help to state-scope some of these things, like InitialAddresses vs PeerMap
pub fn heartbeat(
peers: Query<(&PeerID, &PeerSendTiming)>,
time: Res<Time>,
mut outbound: EventWriter<OutboundPacket>,
) -> Result {
for (peer, last) in peers {
// Allow for 2 consecutive missed heartbeats without timing out
if last.time() + PING_FREQUENCY > time.elapsed() {
continue;
}
outbound.write(Packet::create(Vec::new(), peer.id));
}
Ok(())
}
pub fn timeout(
peers: Query<(&PeerID, &PeerReceiveTiming), With<Peer>>, // I mean... With<Peer> is inherent, but I guess I'll keep it as that might change
time: Res<Time>,
mut delete: EventWriter<PeerChangeEvent>,
) {
for (peer, last) in peers {
if last.time() + PING_FREQUENCY * MISSED_PINGS < time.elapsed() {
warn!("Peer {} timed out", peer.id);
delete.write(PeerChangeEvent::new(peer.id, None));
}
}
}
#[derive(Debug, Resource)]
pub struct PotentialPeerTimer {
timer: Timer,
}
impl Default for PotentialPeerTimer {
fn default() -> Self {
Self {
timer: Timer::new(PING_FREQUENCY, TimerMode::Repeating),
}
}
}
pub fn ping_potential_peers(
mut timer: ResMut<PotentialPeerTimer>,
time: Res<Time>,
peers: Res<PotentialPeers>,
to_socket: Res<NetworkSend>,
config: Res<Config>,
) -> Result {
timer.timer.tick(time.delta());
if timer.timer.finished() {
for peer in &peers.addresses {
to_socket.send(
format_message(&Vec::new(), PacketType::Peer, config.id),
*peer,
)?;
}
}
Ok(())
}

View file

@ -1,16 +1,29 @@
use std::time::Duration;
use bevy::prelude::*; use bevy::prelude::*;
use uuid::Uuid; use uuid::Uuid;
use crate::net::peer::PotentialPeer; use crate::net::packet::PacketType;
use super::{ use super::{
packet::{InboundPacket, OutboundPacket, Packet}, packet::{InboundPacket, OutboundPacket, Packet},
peer::{Peer, PeerChangeEvent, PeerData, PeerMap, PeerReceiveTiming, PeerSendTiming}, peer::{PeerChangeEvent, PeerData, PeerMap, PeerReceiveTiming, PeerSendTiming},
queues::{NetworkReceive, NetworkSend}, queues::{NetworkReceive, NetworkSend},
}; };
#[derive(Debug, Resource)]
pub struct Config {
pub id: Uuid,
}
impl Default for Config {
fn default() -> Self {
Self { id: Uuid::new_v4() }
}
}
pub fn format_message(data: &Vec<u8>, variant: PacketType, id: Uuid) -> Vec<u8> {
[data.as_slice(), &[variant as u8], id.as_bytes()].concat()
}
pub fn handle_network_input( pub fn handle_network_input(
from_socket: Res<NetworkReceive>, from_socket: Res<NetworkReceive>,
peer_map: Res<PeerMap>, peer_map: Res<PeerMap>,
@ -19,26 +32,24 @@ pub fn handle_network_input(
time: Res<Time>, time: Res<Time>,
mut change_peer: EventWriter<PeerChangeEvent>, mut change_peer: EventWriter<PeerChangeEvent>,
) -> Result { ) -> Result {
for (mut message, address) in from_socket.iter() { for (message, address) in from_socket.iter() {
if message.len() < 16 { match Packet::try_from(message) {
return Err(format!( Ok(packet) => {
"Message of length {} is not large enough to contain UUID", // TODO: Handle packet variant
message.len() if !packet.message.is_empty() {
) to_app.write(packet.clone().into());
.into()); }
} if let Some(peer_id) = peer_map.get(&packet.peer) {
let uuid = Uuid::from_slice(message.split_off(message.len() - 16).as_slice())?; let (peer, mut last) = peers.get_mut(*peer_id)?;
if !message.is_empty() { last.update(&time);
to_app.write(InboundPacket(Packet::new(message, uuid))); if address == peer.addr {
} continue;
if let Some(peer_id) = peer_map.get(&uuid) { }
let (peer, mut last) = peers.get_mut(*peer_id)?; }
last.update(&time); change_peer.write(PeerChangeEvent::new(packet.peer, Some(address)));
if address == peer.addr {
continue;
} }
Err(err) => warn!("Error reading packet: {:?}", err),
} }
change_peer.write(PeerChangeEvent::new(uuid, Some(address)));
} }
Ok(()) Ok(())
} }
@ -47,52 +58,16 @@ pub fn handle_network_output(
mut from_app: EventReader<OutboundPacket>, mut from_app: EventReader<OutboundPacket>,
peer_map: Res<PeerMap>, peer_map: Res<PeerMap>,
mut peers: Query<(&PeerData, &mut PeerSendTiming)>, mut peers: Query<(&PeerData, &mut PeerSendTiming)>,
config: Res<Config>,
to_socket: Res<NetworkSend>, to_socket: Res<NetworkSend>,
time: Res<Time>, time: Res<Time>,
) -> Result { ) -> Result {
for packet in from_app.read() { for OutboundPacket(packet) in from_app.read() {
let peer_id = peer_map.try_get(&packet.0.peer)?; let peer_id = peer_map.try_get(&packet.peer)?;
let (peer, mut last) = peers.get_mut(*peer_id)?; let (peer, mut last) = peers.get_mut(*peer_id)?;
// Append our UUID for client identification let message = format_message(&packet.message, packet.variant, config.id);
let message = [packet.0.message.as_slice(), peer.me.as_bytes()].concat();
to_socket.send(message, peer.addr.into())?; to_socket.send(message, peer.addr.into())?;
last.update(&time); last.update(&time);
} }
Ok(()) Ok(())
} }
const TIMEOUT: Duration = Duration::from_secs(10);
pub fn heartbeat(
peers: Query<(AnyOf<(&Peer, &PotentialPeer)>, &PeerSendTiming)>,
time: Res<Time>,
mut outbound: EventWriter<OutboundPacket>,
) -> Result {
for (peer, last) in peers {
// Allow for 2 consecutive missed heartbeats without timing out
if last.time() + TIMEOUT / 3 > time.elapsed() {
continue;
}
let id = match peer {
(None, None) => return Err("No peer identification".into()),
(None, Some(potential)) => potential.id,
(Some(actual), None) => actual.id,
(Some(_), Some(_)) => return Err("Both a potential and actual peer".into()),
};
outbound.write(OutboundPacket(Packet::new(Vec::new(), id)));
}
Ok(())
}
pub fn timeout(
peers: Query<(&Peer, &PeerReceiveTiming)>,
time: Res<Time>,
mut delete: EventWriter<PeerChangeEvent>,
) {
for (peer, last) in peers {
if last.time() + TIMEOUT < time.elapsed() {
warn!("Peer {} timed out", peer.id);
delete.write(PeerChangeEvent::new(peer.id, None));
}
}
}

View file

@ -1,18 +1,17 @@
mod distribution; mod distribution;
mod heartbeat;
mod io; mod io;
mod packet; mod packet;
mod peer; mod peer;
mod plugin; mod plugin;
mod queues; mod queues;
mod socket; mod socket;
mod state;
mod thread; mod thread;
#[allow(unused_imports)] #[allow(unused_imports)]
pub mod prelude { pub mod prelude {
pub use super::distribution::Networked; pub use super::distribution::distribution_plugin;
pub use super::packet::{InboundPacket, OutboundPacket, Packet}; pub use super::packet::{InboundPacket, OutboundPacket, Packet};
pub use super::peer::{Peer, PeerReceiveTiming, PeerSendTiming}; pub use super::peer::{Peer, PeerID, PeerReceiveTiming, PeerSendTiming, PotentialPeers};
pub use super::plugin::NetIOPlugin; pub use super::plugin::NetIOPlugin;
pub use super::state::NetworkState;
} }

View file

@ -2,19 +2,95 @@ use bevy::prelude::*;
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
pub enum TryFromBytesError {
InsufficientLength,
NotUUID,
NotVariant,
}
#[derive(Clone, Copy, Debug)]
pub enum PacketType {
Standard = 0x00,
Peer = 0x01,
}
impl From<PacketType> for u8 {
fn from(value: PacketType) -> Self {
value as u8
}
}
impl TryFrom<u8> for PacketType {
type Error = TryFromBytesError;
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
match value {
value if value == PacketType::Standard as u8 => Ok(PacketType::Standard),
value if value == PacketType::Peer as u8 => Ok(PacketType::Peer),
_ => Err(TryFromBytesError::NotVariant),
}
}
}
pub const UUID_SIZE: usize = 16;
#[derive(Clone, Debug)]
pub struct Packet { pub struct Packet {
pub message: Vec<u8>, pub message: Vec<u8>,
pub variant: PacketType,
pub peer: Uuid, pub peer: Uuid,
} }
impl Packet { impl Packet {
pub fn new(message: Vec<u8>, peer: Uuid) -> Self { pub fn new(message: Vec<u8>, variant: PacketType, peer: Uuid) -> Self {
Self { peer, message } Self {
message,
variant,
peer,
}
}
pub fn create<T: From<Packet>>(message: Vec<u8>, peer: Uuid) -> T {
Self {
message,
variant: PacketType::Standard,
peer,
}
.into()
}
}
impl TryFrom<Vec<u8>> for Packet {
type Error = TryFromBytesError;
fn try_from(mut value: Vec<u8>) -> std::result::Result<Self, Self::Error> {
if value.len() < UUID_SIZE {
return Err(TryFromBytesError::InsufficientLength);
}
let uuid = Uuid::from_slice(value.split_off(value.len() - UUID_SIZE).as_slice())
.map_err(|_| TryFromBytesError::NotUUID)?;
let variant = value
.pop()
.ok_or(TryFromBytesError::InsufficientLength)?
.try_into()?;
Ok(Packet::new(value, variant, uuid))
} }
} }
#[derive(Debug, Event)] #[derive(Debug, Event)]
pub struct OutboundPacket(pub Packet); pub struct OutboundPacket(pub Packet);
impl From<Packet> for OutboundPacket {
fn from(value: Packet) -> Self {
Self(value)
}
}
#[derive(Debug, Event)] #[derive(Debug, Event)]
pub struct InboundPacket(pub Packet); pub struct InboundPacket(pub Packet);
impl From<Packet> for InboundPacket {
fn from(value: Packet) -> Self {
Self(value)
}
}

View file

@ -1,14 +1,14 @@
use std::{ use std::{
array::TryFromSliceError, array::TryFromSliceError,
collections::HashMap, collections::{HashMap, HashSet},
net::{IpAddr, Ipv6Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
time::Duration, time::Duration,
}; };
use bevy::prelude::*; use bevy::prelude::*;
use uuid::Uuid; use uuid::Uuid;
use super::packet::{OutboundPacket, Packet}; use super::packet::{InboundPacket, OutboundPacket, Packet, PacketType};
#[derive(Component, Debug, Default)] #[derive(Component, Debug, Default)]
pub struct PeerSendTiming(Duration); pub struct PeerSendTiming(Duration);
@ -40,55 +40,12 @@ impl PeerReceiveTiming {
} }
} }
#[derive(Component, Debug)]
#[require(PeerData, PeerReceiveTiming)]
pub struct Peer {
pub id: Uuid,
}
#[derive(Component, Debug)]
#[require(PeerData)]
pub struct PotentialPeer {
pub id: Uuid,
}
impl Default for PotentialPeer {
fn default() -> Self {
Self { id: Uuid::new_v4() }
}
}
#[derive(Clone, Component, Copy, Debug)]
#[require(PeerSendTiming)]
pub struct PeerData {
pub addr: Address,
pub me: Uuid,
}
impl PeerData {
pub fn new(addr: SocketAddr, me: Uuid) -> Self {
Self {
addr: addr.into(),
me,
}
}
}
impl Default for PeerData {
fn default() -> Self {
Self {
addr: Default::default(),
me: Uuid::new_v4(),
}
}
}
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub struct Address(SocketAddr); pub struct Address(SocketAddr);
impl From<Address> for SocketAddr { impl From<Address> for SocketAddr {
fn from(value: Address) -> Self { fn from(Address(value): Address) -> Self {
value.0 value
} }
} }
@ -111,12 +68,12 @@ impl PartialEq<Address> for SocketAddr {
} }
impl From<Address> for Vec<u8> { impl From<Address> for Vec<u8> {
fn from(value: Address) -> Self { fn from(Address(value): Address) -> Self {
let mut bytes: Vec<u8> = match value.0.ip() { let mut bytes: Vec<u8> = match value.ip() {
IpAddr::V4(ipv4_addr) => ipv4_addr.octets().into(), IpAddr::V4(ipv4_addr) => ipv4_addr.octets().into(),
IpAddr::V6(ipv6_addr) => ipv6_addr.octets().into(), IpAddr::V6(ipv6_addr) => ipv6_addr.octets().into(),
}; };
bytes.extend(value.0.port().to_le_bytes()); bytes.extend(value.port().to_le_bytes());
bytes bytes
} }
} }
@ -124,14 +81,25 @@ impl From<Address> for Vec<u8> {
impl TryFrom<Vec<u8>> for Address { impl TryFrom<Vec<u8>> for Address {
type Error = TryFromSliceError; type Error = TryFromSliceError;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> { fn try_from(value: Vec<u8>) -> std::result::Result<Self, Self::Error> {
let port = u16::from_le_bytes(TryInto::<[u8; 2]>::try_into( const PORT_SIZE: usize = 2;
value.clone().split_off(value.len() - 2).as_slice(),
if value.len() < PORT_SIZE {
todo!();
}
let mut bytes = value.clone();
let port = u16::from_le_bytes(TryInto::<[u8; PORT_SIZE]>::try_into(
bytes.split_off(bytes.len() - PORT_SIZE).as_slice(),
)?); )?);
let addr = if let Ok(bytes) = TryInto::<[u8; 4]>::try_into(value.as_slice()) { let addr = if let Ok(bytes) =
TryInto::<[u8; Ipv4Addr::BITS as usize / 8]>::try_into(bytes.as_slice())
{
SocketAddr::from((bytes, port)) SocketAddr::from((bytes, port))
} else { } else {
SocketAddr::from((TryInto::<[u8; 16]>::try_into(value.as_slice())?, port)) SocketAddr::from((
TryInto::<[u8; Ipv6Addr::BITS as usize / 8]>::try_into(bytes.as_slice())?,
port,
))
}; };
Ok(Address(addr)) Ok(Address(addr))
} }
@ -143,6 +111,53 @@ impl Default for Address {
} }
} }
#[derive(Clone, Component, Copy, Debug, Default)]
#[require(PeerSendTiming)]
pub struct PeerData {
pub addr: Address,
}
impl PeerData {
pub fn new(addr: SocketAddr) -> Self {
Self { addr: addr.into() }
}
}
#[derive(Component, Debug)]
#[require(PeerData)]
pub struct PeerID {
pub id: Uuid,
}
impl PeerID {
pub fn new(id: Uuid) -> Self {
Self { id }
}
}
impl Default for PeerID {
fn default() -> Self {
Self { id: Uuid::new_v4() }
}
}
#[derive(Component, Debug)]
#[require(PeerID, PeerReceiveTiming)]
pub struct Peer;
#[derive(Debug, Default, Resource)]
pub struct PotentialPeers {
pub addresses: HashSet<SocketAddr>,
}
impl PotentialPeers {
pub fn new(addresses: Vec<SocketAddr>) -> Self {
Self {
addresses: addresses.into_iter().collect(),
}
}
}
#[derive(Debug, Default, Resource)] #[derive(Debug, Default, Resource)]
pub struct PeerMap { pub struct PeerMap {
map: HashMap<Uuid, Entity>, map: HashMap<Uuid, Entity>,
@ -157,17 +172,13 @@ impl PeerMap {
self.map.get(uuid) self.map.get(uuid)
} }
pub fn try_get(&self, uuid: &Uuid) -> Result<&Entity, String> { pub fn try_get(&self, uuid: &Uuid) -> std::result::Result<&Entity, String> {
self.get(uuid).ok_or(format!("No entity with uuid: {uuid}")) self.get(uuid).ok_or(format!("No peer with uuid: {uuid}"))
} }
pub fn remove(&mut self, uuid: &Uuid) -> Option<Entity> { pub fn remove(&mut self, uuid: &Uuid) -> Option<Entity> {
self.map.remove(uuid) self.map.remove(uuid)
} }
pub fn len(&self) -> usize {
self.map.len()
}
} }
#[derive(Debug, Event)] #[derive(Debug, Event)]
@ -186,63 +197,82 @@ pub fn handle_peer_change(
mut changes: EventReader<PeerChangeEvent>, mut changes: EventReader<PeerChangeEvent>,
mut peer_map: ResMut<PeerMap>, mut peer_map: ResMut<PeerMap>,
mut peers: Query<&mut PeerData>, mut peers: Query<&mut PeerData>,
mut potential_peers: ResMut<PotentialPeers>,
mut commands: Commands, mut commands: Commands,
time: Res<Time>, time: Res<Time>,
) -> Result { ) -> Result {
for change in changes.read() { for change in changes.read() {
if let Some(entity) = peer_map.get(&change.peer) { match (peer_map.get(&change.peer), change.addr) {
if let Some(addr) = change.addr { // Peer modification
(Some(entity), Some(addr)) => {
if let Ok(mut peer) = peers.get_mut(*entity) { if let Ok(mut peer) = peers.get_mut(*entity) {
peer.addr = Address(addr); peer.addr = addr.into();
} else { } else {
warn!("Peer {} doesn't exist (just added?)", change.peer); warn!("Peer {} doesn't exist (just added?)", change.peer);
} }
} else { }
// Peer deletion
(Some(entity), None) => {
commands.get_entity(*entity)?.despawn(); commands.get_entity(*entity)?.despawn();
peer_map.remove(&change.peer); peer_map.remove(&change.peer);
} }
} else if let Some(addr) = change.addr { // Peer addition
info!("Adding peer {} ({})", change.peer, addr); (None, Some(addr)) => {
peer_map.insert( info!("Adding peer {} ({})", change.peer, addr);
change.peer, peer_map.insert(
commands change.peer,
.spawn(( commands
PeerData::new(addr, Uuid::new_v4()), .spawn((
PeerReceiveTiming::new(&time), Peer,
)) PeerID::new(change.peer),
.id(), PeerData::new(addr),
); PeerReceiveTiming::new(&time),
} else { ))
warn!("Peer {} already deleted", change.peer); .id(),
);
}
// Double peer deletion
(None, None) => warn!("Peer {} already deleted", change.peer),
} }
} if let Some(addr) = change.addr {
if peer_map.len() > 1 { potential_peers.addresses.remove(&addr);
if let Some(entity) = peer_map.remove(&Uuid::nil()) {
commands.get_entity(entity)?.despawn();
} }
} }
Ok(()) Ok(())
} }
#[allow(clippy::type_complexity)] pub fn new_peer_message(
mut from_network: EventReader<InboundPacket>,
peers: Query<&PeerData>,
mut potential_peers: ResMut<PotentialPeers>,
) {
'packet: for packet in from_network.read() {
if let Ok(addr) = TryInto::<Address>::try_into(packet.0.message.clone()) {
if potential_peers.addresses.contains(&addr.into()) {
continue;
}
for peer in peers {
if peer.addr == addr.into() {
continue 'packet;
}
}
potential_peers.addresses.insert(addr.into());
}
}
}
// TODO: Make this a more generic system
pub fn handle_new_peer( pub fn handle_new_peer(
new_peers: Query<(Option<Ref<Peer>>, Option<&Peer>, &PeerData)>, peers: Query<(Ref<Peer>, &PeerID, &PeerData)>,
mut outbound: EventWriter<OutboundPacket>, mut outbound: EventWriter<OutboundPacket>,
) -> Result { ) {
for (change, this, data) in new_peers { for (change, peer, _) in peers {
if let Some(change) = change { if change.is_added() {
if change.is_added() { for (_, other, data) in peers {
if let Some(this) = this { if peer.id != other.id {
for (_, _, other) in new_peers { outbound.write(Packet::new(data.addr.into(), PacketType::Peer, peer.id).into());
if data.me != other.me {
outbound.write(OutboundPacket(Packet::new(other.addr.into(), this.id)));
}
}
} else {
return Err("Ref<Peer> without Peer".into());
} }
} }
} }
} }
Ok(())
} }

View file

@ -1,65 +1,59 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use bevy::prelude::*; use bevy::prelude::*;
use uuid::Uuid;
use super::{ use super::{
io::{handle_network_input, handle_network_output, heartbeat, timeout}, heartbeat::{PotentialPeerTimer, heartbeat, ping_potential_peers, timeout},
io::{Config, handle_network_input, handle_network_output},
packet::{InboundPacket, OutboundPacket}, packet::{InboundPacket, OutboundPacket},
peer::{PeerChangeEvent, PeerData, PeerMap, handle_new_peer, handle_peer_change}, peer::{
PeerChangeEvent, PeerMap, PotentialPeers, handle_new_peer, handle_peer_change,
new_peer_message,
},
queues::{NetworkReceive, NetworkSend}, queues::{NetworkReceive, NetworkSend},
socket::bind_socket, socket::bind_socket,
state::NetworkState,
}; };
pub struct NetIOPlugin { pub struct NetIOPlugin {
listen: u16, listen: u16,
peer: Option<SocketAddr>, initial_peers: Vec<SocketAddr>,
} }
impl NetIOPlugin { impl NetIOPlugin {
pub fn new(listen: u16, peer: Option<SocketAddr>) -> Self { pub fn maybe_peer(listen: u16, peer: Option<SocketAddr>) -> Self {
Self { listen, peer } Self {
listen,
initial_peers: match peer {
Some(addr) => vec![addr],
None => Vec::new(),
},
}
} }
} }
impl Plugin for NetIOPlugin { impl Plugin for NetIOPlugin {
fn build(&self, app: &mut App) { fn build(&self, app: &mut App) {
app.init_state::<NetworkState>() app.add_systems(
.add_systems( FixedPreUpdate,
FixedPreUpdate, (handle_network_input, (handle_peer_change, new_peer_message)).chain(),
(handle_network_input, handle_peer_change) )
.chain() .add_systems(
.run_if(in_state(NetworkState::MultiPlayer)), FixedUpdate,
) (heartbeat, timeout, handle_new_peer, ping_potential_peers),
.add_systems( )
FixedUpdate, .add_systems(FixedPostUpdate, handle_network_output)
(heartbeat, timeout, handle_new_peer).run_if(in_state(NetworkState::MultiPlayer)), .init_resource::<Config>()
) .init_resource::<PeerMap>()
.add_systems( .init_resource::<PotentialPeerTimer>()
FixedPostUpdate, .insert_resource(PotentialPeers::new(self.initial_peers.clone()))
handle_network_output.run_if(in_state(NetworkState::MultiPlayer)), .add_event::<PeerChangeEvent>()
) .add_event::<InboundPacket>()
.add_event::<PeerChangeEvent>() .add_event::<OutboundPacket>();
.add_event::<InboundPacket>()
.add_event::<OutboundPacket>();
match bind_socket(self.listen) { match bind_socket(self.listen) {
Ok((send, receive)) => { Ok((send, receive)) => {
app.insert_state(NetworkState::MultiPlayer) app.insert_resource(NetworkSend::new(send))
.insert_resource(NetworkSend::new(send))
.insert_resource(NetworkReceive::new(receive)); .insert_resource(NetworkReceive::new(receive));
let mut peer_map = PeerMap::default();
if let Some(socket) = self.peer {
let entity = app.world_mut().spawn(PeerData {
addr: socket.into(),
me: Uuid::nil(),
});
peer_map.insert(Uuid::nil(), entity.id());
}
app.insert_resource(peer_map);
} }
Err(err) => { Err(err) => {
warn!("Failed to set up networking: {err}"); warn!("Failed to set up networking: {err}");

View file

@ -1,8 +0,0 @@
use bevy::prelude::*;
#[derive(States, Default, Debug, Clone, PartialEq, Eq, Hash)]
pub enum NetworkState {
#[default]
SinglePlayer,
MultiPlayer,
}