Slightly better implementation of peers, still need to create a more generic system for deciding which components to distribute where and then use that for Peers.
This commit is contained in:
parent
e013fb427a
commit
53fe3333f0
14 changed files with 438 additions and 265 deletions
|
|
@ -1,16 +1,29 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use bevy::prelude::*;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::net::peer::PotentialPeer;
|
||||
use crate::net::packet::PacketType;
|
||||
|
||||
use super::{
|
||||
packet::{InboundPacket, OutboundPacket, Packet},
|
||||
peer::{Peer, PeerChangeEvent, PeerData, PeerMap, PeerReceiveTiming, PeerSendTiming},
|
||||
peer::{PeerChangeEvent, PeerData, PeerMap, PeerReceiveTiming, PeerSendTiming},
|
||||
queues::{NetworkReceive, NetworkSend},
|
||||
};
|
||||
|
||||
#[derive(Debug, Resource)]
|
||||
pub struct Config {
|
||||
pub id: Uuid,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self { id: Uuid::new_v4() }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn format_message(data: &Vec<u8>, variant: PacketType, id: Uuid) -> Vec<u8> {
|
||||
[data.as_slice(), &[variant as u8], id.as_bytes()].concat()
|
||||
}
|
||||
|
||||
pub fn handle_network_input(
|
||||
from_socket: Res<NetworkReceive>,
|
||||
peer_map: Res<PeerMap>,
|
||||
|
|
@ -19,26 +32,24 @@ pub fn handle_network_input(
|
|||
time: Res<Time>,
|
||||
mut change_peer: EventWriter<PeerChangeEvent>,
|
||||
) -> Result {
|
||||
for (mut message, address) in from_socket.iter() {
|
||||
if message.len() < 16 {
|
||||
return Err(format!(
|
||||
"Message of length {} is not large enough to contain UUID",
|
||||
message.len()
|
||||
)
|
||||
.into());
|
||||
}
|
||||
let uuid = Uuid::from_slice(message.split_off(message.len() - 16).as_slice())?;
|
||||
if !message.is_empty() {
|
||||
to_app.write(InboundPacket(Packet::new(message, uuid)));
|
||||
}
|
||||
if let Some(peer_id) = peer_map.get(&uuid) {
|
||||
let (peer, mut last) = peers.get_mut(*peer_id)?;
|
||||
last.update(&time);
|
||||
if address == peer.addr {
|
||||
continue;
|
||||
for (message, address) in from_socket.iter() {
|
||||
match Packet::try_from(message) {
|
||||
Ok(packet) => {
|
||||
// TODO: Handle packet variant
|
||||
if !packet.message.is_empty() {
|
||||
to_app.write(packet.clone().into());
|
||||
}
|
||||
if let Some(peer_id) = peer_map.get(&packet.peer) {
|
||||
let (peer, mut last) = peers.get_mut(*peer_id)?;
|
||||
last.update(&time);
|
||||
if address == peer.addr {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
change_peer.write(PeerChangeEvent::new(packet.peer, Some(address)));
|
||||
}
|
||||
Err(err) => warn!("Error reading packet: {:?}", err),
|
||||
}
|
||||
change_peer.write(PeerChangeEvent::new(uuid, Some(address)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -47,52 +58,16 @@ pub fn handle_network_output(
|
|||
mut from_app: EventReader<OutboundPacket>,
|
||||
peer_map: Res<PeerMap>,
|
||||
mut peers: Query<(&PeerData, &mut PeerSendTiming)>,
|
||||
config: Res<Config>,
|
||||
to_socket: Res<NetworkSend>,
|
||||
time: Res<Time>,
|
||||
) -> Result {
|
||||
for packet in from_app.read() {
|
||||
let peer_id = peer_map.try_get(&packet.0.peer)?;
|
||||
for OutboundPacket(packet) in from_app.read() {
|
||||
let peer_id = peer_map.try_get(&packet.peer)?;
|
||||
let (peer, mut last) = peers.get_mut(*peer_id)?;
|
||||
// Append our UUID for client identification
|
||||
let message = [packet.0.message.as_slice(), peer.me.as_bytes()].concat();
|
||||
let message = format_message(&packet.message, packet.variant, config.id);
|
||||
to_socket.send(message, peer.addr.into())?;
|
||||
last.update(&time);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
const TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
pub fn heartbeat(
|
||||
peers: Query<(AnyOf<(&Peer, &PotentialPeer)>, &PeerSendTiming)>,
|
||||
time: Res<Time>,
|
||||
mut outbound: EventWriter<OutboundPacket>,
|
||||
) -> Result {
|
||||
for (peer, last) in peers {
|
||||
// Allow for 2 consecutive missed heartbeats without timing out
|
||||
if last.time() + TIMEOUT / 3 > time.elapsed() {
|
||||
continue;
|
||||
}
|
||||
let id = match peer {
|
||||
(None, None) => return Err("No peer identification".into()),
|
||||
(None, Some(potential)) => potential.id,
|
||||
(Some(actual), None) => actual.id,
|
||||
(Some(_), Some(_)) => return Err("Both a potential and actual peer".into()),
|
||||
};
|
||||
outbound.write(OutboundPacket(Packet::new(Vec::new(), id)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn timeout(
|
||||
peers: Query<(&Peer, &PeerReceiveTiming)>,
|
||||
time: Res<Time>,
|
||||
mut delete: EventWriter<PeerChangeEvent>,
|
||||
) {
|
||||
for (peer, last) in peers {
|
||||
if last.time() + TIMEOUT < time.elapsed() {
|
||||
warn!("Peer {} timed out", peer.id);
|
||||
delete.write(PeerChangeEvent::new(peer.id, None));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue