distributed_physics_test/src/net.rs

161 lines
4.5 KiB
Rust

use std::{
net::{Ipv6Addr, SocketAddr, UdpSocket},
thread::{JoinHandle, spawn},
};
use bevy::{prelude::*, tasks::futures_lite::io};
use crossbeam_channel::{Receiver, Sender, TrySendError, unbounded};
use crate::game::seed::Seed;
fn configure_socket(socket: &UdpSocket) -> io::Result<()> {
socket.set_read_timeout(None)?;
socket.set_write_timeout(None)?;
Ok(())
}
type NetworkMessage = (Vec<u8>, SocketAddr);
fn start_network_thread<M: Send + 'static>(
network_loop: fn(M, UdpSocket) -> Result,
messages: M,
socket: UdpSocket,
) -> JoinHandle<Result> {
spawn(move || {
let result = network_loop(messages, socket);
match result {
Ok(()) => error!("Network thread: Loop returned without error?"),
Err(ref err) => error!("Network thread: {err}"),
};
result
})
}
fn network_send_loop(messages: Receiver<NetworkMessage>, socket: UdpSocket) -> Result {
loop {
let (message, address) = messages.recv()?;
debug!("Sending {} bytes to {}", message.len(), address);
let sent = socket.send_to(message.as_slice(), address)?;
if message.len() != sent {
error!(
"Network thread: Tried to send {} bytes but only sent {}",
message.len(),
sent
);
}
}
}
fn network_receive_loop(messages: Sender<NetworkMessage>, socket: UdpSocket) -> Result {
loop {
let mut message = [0u8; 1024]; // 1 KiB seems like it would be enough, TBD though
let (len, address) = socket.recv_from(&mut message)?;
debug!("Network thread: Received {len} bytes from {address}");
messages.try_send((message[..len].into(), address))?;
}
}
fn setup_socket(port: u16) -> Result<(Sender<NetworkMessage>, Receiver<NetworkMessage>)> {
let socket = UdpSocket::bind((Ipv6Addr::UNSPECIFIED, port))?;
configure_socket(&socket)?;
let (send_inbound, receive_inbound) = unbounded();
start_network_thread(network_receive_loop, send_inbound, socket.try_clone()?);
let (send_outbound, receive_outbound) = unbounded();
start_network_thread(network_send_loop, receive_outbound, socket);
Ok((send_outbound, receive_inbound))
}
#[derive(Resource)]
pub struct NetworkSend(Sender<NetworkMessage>);
impl NetworkSend {
/// Send the message, erroring if the queue is full or disconnected
pub fn send(
&self,
value: Vec<u8>,
address: SocketAddr,
) -> Result<(), TrySendError<NetworkMessage>> {
self.0.try_send((value, address))
}
}
#[derive(Resource, Clone)]
struct NetworkReceive(Receiver<NetworkMessage>);
impl NetworkReceive {
fn iter(&self) -> Iter {
Iter(self.0.clone())
}
}
struct Iter(Receiver<NetworkMessage>);
impl Iterator for Iter {
type Item = NetworkMessage;
fn next(&mut self) -> Option<Self::Item> {
self.0.try_recv().ok()
}
}
fn handle_network_io(
receive: Res<NetworkReceive>,
send: Res<NetworkSend>,
seed: Option<Res<Seed>>,
mut commands: Commands,
) -> Result {
for (message, address) in receive.iter() {
if let Some(ref value) = seed {
send.send((**value).into(), address)?;
} else {
commands.insert_resource::<Seed>(message.try_into()?);
}
}
Ok(())
}
#[derive(States, Default, Debug, Clone, PartialEq, Eq, Hash)]
enum NetworkState {
#[default]
SinglePlayer,
MultiPlayer,
}
pub struct NetIOPlugin {
listen: u16,
peer: Option<SocketAddr>,
}
impl NetIOPlugin {
pub fn new(listen: u16, peer: Option<SocketAddr>) -> Self {
Self { listen, peer }
}
}
impl Plugin for NetIOPlugin {
fn build(&self, app: &mut App) {
app.init_state::<NetworkState>().add_systems(
FixedUpdate,
handle_network_io.run_if(in_state(NetworkState::MultiPlayer)),
);
match setup_socket(self.listen) {
Ok((send, receive)) => {
if let Some(socket) = self.peer {
if let Err(err) = send.try_send((Vec::new(), socket)) {
warn!("Failed to send to peer: {err}");
return;
}
}
app.insert_state(NetworkState::MultiPlayer)
.insert_resource(NetworkSend(send))
.insert_resource(NetworkReceive(receive));
}
Err(err) => {
warn!("Failed to set up networking: {err}");
}
};
}
}