Finishing channel refactor, todo: fix chain rebuilding

This commit is contained in:
Victor Vobis 2026-03-10 22:37:48 +01:00
parent 407085c78b
commit edbebe64fe
11 changed files with 322 additions and 337 deletions

6
node/package-lock.json generated Normal file
View File

@ -0,0 +1,6 @@
{
"name": "node",
"lockfileVersion": 3,
"requires": true,
"packages": {}
}

View File

@ -1,11 +1,9 @@
use std::net::SocketAddr;
use shared::blockchain_core;
use shared::blockchain_core::{self, Address, AddressParser};
use cli_renderer::RenderLayoutKind;
use clap::{Parser, Subcommand};
use clap::*;
#[derive(Parser)]
pub struct Cli {
#[command(subcommand)]
@ -14,6 +12,7 @@ pub struct Cli {
#[derive(Subcommand)]
pub enum CliCommand {
/// Ping a node
#[command(name = "ping")]
Ping {
#[command(subcommand)]
@ -34,12 +33,12 @@ pub enum CliCommand {
block_cmd: CliBlockCommand,
},
/// Award currency to wallet
#[command(name = "award")]
Award {
#[arg(short, long)]
#[clap(value_parser = AddressParser {})]
address: Address,
amount: u64,
#[arg(short, long)]
address: String,
},
/// Make a Transaction
@ -120,14 +119,14 @@ pub enum CliPingCommand {
/// Ping Peer by Id
#[command(name = "id", aliases = ["i"])]
Id {
#[arg(short, long)]
#[arg()]
id: String,
},
/// Ping Peer by Address
#[command(name = "addr", aliases = ["a", "ad"])]
Addr {
#[arg(short, long)]
#[arg()]
addr: String,
},
}

View File

@ -60,15 +60,12 @@ pub fn cli(input: &str) -> WatcherCommand {
WatcherCommand::Node(NodeCommand::ProcessChainData(ChainData::NodeTransaction(tx)))
}
CliCommand::Award { address, amount } => {
let mut bytes = [0u8; 20];
log(msg!(DEBUG, "Received address: {:?}", address));
if address.len() != 20 {
log(msg!(ERROR, "Invalid address length"))
} else if !address.is_ascii() {
log(msg!(ERROR, "Invalid address content"))
}
bytes.copy_from_slice(address.as_bytes());
WatcherCommand::Node(NodeCommand::AwardCurrency{ address: bytes, amount }
WatcherCommand::Node(NodeCommand::AwardCurrency{ address, amount }
)}
CliCommand::DebugShowId => WatcherCommand::Node(NodeCommand::ShowId),
CliCommand::StartListner { addr } => {

View File

@ -3,7 +3,7 @@ use crate::network::NodeId;
use crate::node::node;
use super::ProtocolMessage;
use tokio::net;
use tokio::sync::mpsc;
use futures::stream::Stream;
use super::Connector;
@ -14,22 +14,27 @@ use vlogger::*;
pub struct Connection {
node_id: NodeId,
peer_id: NodeId,
stream: net::TcpStream,
rx: mpsc::Receiver<ProtocolMessage>,
r_stream: net::tcp::OwnedReadHalf,
}
impl Stream for Connection {
type Item = (NodeId, Option<ProtocolMessage>);
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
todo!("Impl poll next")
}
}
impl Connection {
pub fn new(
node_id: NodeId,
peer_id: NodeId,
stream: net::TcpStream,
rx: mpsc::Receiver<ProtocolMessage>,
r_stream: net::tcp::OwnedReadHalf,
) -> Self {
Self {
node_id,
peer_id,
stream,
rx,
r_stream,
}
}
@ -38,22 +43,7 @@ impl Connection {
loop {
tokio::select! {
response_result = self.rx.recv() => {
match response_result {
Some(response) => {
if let Err(e) = Connector::send_message(&mut self.stream, &response).await {
log(msg!(ERROR, "Failed to send response to {}: {}", self.peer_id.clone(), e));
break;
}
},
None => {
log(msg!(DEBUG, "Response channel closed for {}", self.peer_id));
break;
}
}
}
message_result = Connector::receive_message(&mut self.stream) => {
message_result = Connector::receive_message(&mut self.r_stream) => {
match message_result {
Ok(message) => {
todo!("[TODO] Return parsed command to propagate");

View File

@ -1,19 +1,18 @@
use anyhow::Context;
use futures::stream::{ StreamExt, SelectAll };
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use std::net::SocketAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net;
use tokio::sync::mpsc;
use vlogger::*;
use shared::print_error_chain;
use thiserror::*;
use crate::db::BINCODE_CONFIG;
use crate::log;
use crate::network::NodeId;
use crate::network::{NodeId, ProtocolError};
use super::Connection;
use crate::bus::*;
use crate::node::node;
use crate::node::{NetworkError, error};
use super::ProtocolMessage;
pub enum ConnectorCommand {
@ -26,6 +25,8 @@ pub struct Connector {
node_id: NodeId,
addr: SocketAddr,
connections: Vec<Connection>,
peer_streams: Vec<(NodeId, OwnedWriteHalf)>,
streams: SelectAll<Connection>,
listener: Option<tokio::net::TcpListener>,
exit: bool,
}
@ -33,7 +34,17 @@ pub struct Connector {
#[derive(Error, Debug)]
pub enum ConnectorError {
#[error("Connection failed")]
ConnectionError(#[from] anyhow::Error),
IO(#[from] std::io::Error),
#[error("Decode Error: {0}")]
Decode(#[from] bincode::error::DecodeError),
#[error("Encode Error: {0}")]
Encode(#[from] bincode::error::EncodeError),
#[error("Protocol Error: {0}")]
Protocol(#[from] crate::network::message::ProtocolError),
#[error("Unkown Peer Id: {0}")]
UnknownPeerId(NodeId),
#[error("Unkown Peer Address: {0}")]
UnknownPeerAddr(SocketAddr),
}
const MAX_LISTNER_TRIES: usize = 5;
@ -47,6 +58,8 @@ impl Connector {
node_id,
addr,
connections: Vec::new(),
peer_streams: Vec::new(),
streams: SelectAll::new(),
listener: None,
exit: false,
}
@ -71,8 +84,11 @@ impl Connector {
pub async fn poll(&mut self) -> Result<Option<node::NodeCommand>, ConnectorError> {
if let Some(listener) = &mut self.listener {
todo!("Implement Vec Poll for connections");
tokio::select! {
protocol_message = self.streams.next() => {
todo!("Implement protocol message");
}
// cmd_result = self.rx.recv() => {
// todo!("Implement Vec Poll for connections");
// match cmd_result {
@ -108,7 +124,7 @@ impl Connector {
}
}
pub async fn execute_cmd(&mut self, cmd: ConnectorCommand) -> Result<Option<node::NodeCommand>, ConnectorError> {
pub async fn command(&mut self, cmd: ConnectorCommand) -> Result<Option<node::NodeCommand>, ConnectorError> {
match cmd {
ConnectorCommand::ConnectToTcpPeer(addr) => {
let peer = self.connect_to_peer(addr).await?;
@ -128,7 +144,6 @@ impl Connector {
pub async fn connect_to_seed(&mut self, addr: SocketAddr) -> Result<node::TcpPeer, ConnectorError> {
match net::TcpStream::connect(addr)
.await
.with_context(|| format!("Connecting to {}", addr))
{
Ok(stream) => {
let peer = self.establish_connection_outbound(stream, addr).await;
@ -136,9 +151,8 @@ impl Connector {
peer
},
Err(e) => {
// let err = ConnectorError::ConnectionError(e.into());
print_error_chain(&e.into());
todo!("Handle connector error propagation");
let err = ConnectorError::IO(e);
Err(err)
}
}
}
@ -147,9 +161,8 @@ impl Connector {
match net::TcpStream::connect(addr).await {
Ok(stream) => self.establish_connection_inbound(stream, addr).await,
Err(e) => {
let err = ConnectorError::ConnectionError(e.into());
print_error_chain(&err.into());
todo!("Handle connector error propagation");
let err = ConnectorError::IO(e);
Err(err)
}
}
}
@ -158,29 +171,28 @@ impl Connector {
match net::TcpStream::connect(addr).await {
Ok(stream) => self.establish_connection_outbound(stream, addr).await,
Err(e) => {
let err = ConnectorError::ConnectionError(e.into());
print_error_chain(&err.into());
todo!("Handle connector error propagation");
let err = ConnectorError::IO(e);
Err(err)
}
}
}
async fn establish_connection_outbound(
&mut self,
mut stream: tokio::net::TcpStream,
stream: tokio::net::TcpStream,
addr: SocketAddr,
) -> Result<node::TcpPeer, ConnectorError> {
let (mut r_stream, mut w_stream) = stream.into_split();
let handshake = ProtocolMessage::Handshake {
peer_id: self.node_id.clone(),
version: "".to_string(),
};
match Connector::send_message(&mut stream, &handshake).await {
match Connector::send_message(&mut w_stream, &handshake).await {
Ok(()) => {
if let Ok(mes) = Connector::receive_message(&mut stream).await {
let (ch_tx, ch_rx) = mpsc::channel::<ProtocolMessage>(100);
if let Ok(mes) = Connector::receive_message(&mut r_stream).await {
let peer = match mes {
ProtocolMessage::HandshakeAck { peer_id, .. } => {
node::TcpPeer::new(peer_id, addr, ch_tx)
node::TcpPeer::new(peer_id, addr)
}
_ => {
log(msg!(
@ -190,7 +202,7 @@ impl Connector {
todo!("Handle connector receive message fail");
}
};
let connection = Connection::new(self.node_id.clone(), peer.id.clone(), stream, ch_rx);
let connection = Connection::new(self.node_id.clone(), peer.id.clone(), r_stream);
self.connections.push(connection);
Ok(peer)
} else {
@ -206,82 +218,78 @@ impl Connector {
async fn establish_connection_inbound(
&mut self,
mut stream: tokio::net::TcpStream,
stream: tokio::net::TcpStream,
addr: SocketAddr,
) -> Result<node::TcpPeer, ConnectorError> {
if let Ok(mes) = Connector::receive_message(&mut stream).await {
let (ch_tx, ch_rx) = mpsc::channel::<ProtocolMessage>(100);
let peer = match mes {
ProtocolMessage::Handshake { peer_id, .. } => {
let ack = ProtocolMessage::HandshakeAck {
peer_id: self.node_id.clone(),
version: "".to_string(),
};
match Connector::send_message(&mut stream, &ack).await {
Ok(()) => node::TcpPeer::new(peer_id, addr, ch_tx),
Err(e) => {
print_error_chain(&e.into());
todo!("Implement inbound conneciton message send fail")
let (mut r_stream, mut w_stream) = stream.into_split();
let mes = Connector::receive_message(&mut r_stream).await?;
let peer = match mes {
ProtocolMessage::Handshake { peer_id, .. } => {
let ack = ProtocolMessage::HandshakeAck {
peer_id: self.node_id.clone(),
version: "".to_string(),
};
Connector::send_message(&mut w_stream, &ack).await?;
node::TcpPeer::new(peer_id, addr)
}
e => {
return Err(ConnectorError::Protocol(ProtocolError::Unexpected(e)));
}
};
let connection = Connection::new(self.node_id.clone(), peer.id.clone(), r_stream);
self.connections.push(connection);
Ok(peer)
}
},
}
}
_ => {
log(msg!(
ERROR,
"Invalid Message On Connetion Establishment: {mes}"
));
todo!("Implement inbound conneciton message invalid");
}
};
let connection = Connection::new(self.node_id.clone(), peer.id.clone(), stream, ch_rx);
self.connections.push(connection);
Ok(peer)
pub async fn send_message_to_peer(&mut self, peer_id: NodeId, message: &ProtocolMessage) -> Result<(), ConnectorError> {
if let Some((_, w_stream)) = &mut self.peer_streams.iter_mut().find(|p| p.0 == peer_id) {
Connector::send_message(w_stream, message).await
} else {
todo!("Implement inbount connection message read fail");
Err(ConnectorError::UnknownPeerId(peer_id))
}
}
pub async fn send_message(
stream: &mut net::TcpStream,
stream: &mut net::tcp::OwnedWriteHalf,
message: &ProtocolMessage,
) -> Result<(), NetworkError> {
) -> Result<(), ConnectorError> {
let data = bincode::encode_to_vec(message, BINCODE_CONFIG)?;
let len = data.len() as u32;
stream
.write_all(&len.to_be_bytes())
.await
.map_err(|_e| NetworkError::TODO)?;
.map_err(|e| ConnectorError::IO(e))?;
stream
.write_all(&data)
.await
.map_err(|_e| NetworkError::TODO)?;
stream.flush().await.map_err(|_e| NetworkError::TODO)?;
.map_err(|e| ConnectorError::IO(e))?;
stream.flush().await.map_err(|e| ConnectorError::IO(e))?;
Ok(())
}
pub async fn receive_message(
stream: &mut tokio::net::TcpStream,
) -> Result<ProtocolMessage, error::NetworkError> {
stream: &mut OwnedReadHalf,
) -> Result<ProtocolMessage, ConnectorError> {
let mut len_bytes = [0u8; 4];
stream
.read_exact(&mut len_bytes)
.await
.map_err(|_e| NetworkError::TODO)?;
.map_err(|e| ConnectorError::IO(e))?;
let len = u32::from_be_bytes(len_bytes) as usize;
if len >= super::message::MAX_MESSAGE_SIZE {
return Err(NetworkError::TODO);
return Err(ConnectorError::Protocol(ProtocolError::MessageTooLong(len)));
}
let mut data = vec![0u8; len];
stream
.read_exact(&mut data)
.await
.map_err(|_e| NetworkError::TODO)?;
.map_err(|e| ConnectorError::IO(e))?;
let (message, _): (ProtocolMessage, usize) = bincode::decode_from_slice(&data, BINCODE_CONFIG)?;

View File

@ -4,9 +4,17 @@ use std::net::SocketAddr;
pub const MAX_MESSAGE_SIZE: usize = 1_000_000;
#[derive(Debug, Clone, bincode::Encode, bincode::Decode, Hash, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, bincode::Encode, bincode::Decode, Hash, PartialEq, Eq)]
pub struct NodeId(pub [u8; 16]);
#[derive(thiserror::Error, Debug)]
pub enum ProtocolError {
#[error("Invalid message len: {0}")]
MessageTooLong(usize),
#[error("Unexpected message: {0}")]
Unexpected(ProtocolMessage)
}
#[derive(Debug, Clone, bincode::Encode, bincode::Decode)]
pub enum ProtocolMessage {
BootstrapRequest {
@ -51,10 +59,9 @@ pub enum ProtocolMessage {
}
impl fmt::Display for NodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let msg = self.to_string();
write!(f, "{}", msg)
}
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", hex::encode(self.0))
}
}
impl fmt::Display for ProtocolMessage {

View File

@ -33,9 +33,9 @@ pub struct ChainBootStrap {
#[allow(dead_code)]
#[derive(Error, Debug)]
pub enum BlockchainError {
#[error("Failed to serialize data: {0}")]
#[error("Failed to serialize data")]
Encode(#[from] bincode::error::EncodeError),
#[error("Failed to deserialize data: {0}")]
#[error("Failed to deserialize data")]
Decode(#[from] bincode::error::DecodeError),
#[error("Database operation failed")]
Database(#[from] DatabaseError),

View File

@ -4,8 +4,4 @@ use thiserror::Error;
pub enum NetworkError {
#[error("Implement NetworkError Enum: ({})", file!())]
TODO,
#[error("Decode Error: {0}")]
Decode(#[from] bincode::error::DecodeError),
#[error("Encode Error: {0}")]
Encode(#[from] bincode::error::EncodeError),
}

View File

@ -1,6 +1,5 @@
use crate::bus::{publish_system_event, publish_watcher_event, subscribe_system_event, SystemEvent};
use shared::blockchain_core::{self, ChainData, SignedTransaction, validator::ValidationError};
use crate::print_error_chain;
use crate::network::ConnectorError;
use shared::blockchain_core::{self, ChainData, SignedTransaction};
use crate::log;
use crate::network::{NodeId, ProtocolMessage};
use crate::network::{Connector, ConnectorCommand};
@ -20,24 +19,22 @@ use vlogger::*;
pub struct TcpPeer {
pub id: NodeId,
pub addr: SocketAddr,
pub sender: tokio::sync::mpsc::Sender<ProtocolMessage>,
}
impl TcpPeer {
pub fn new(
id: NodeId,
addr: SocketAddr,
sender: tokio::sync::mpsc::Sender<ProtocolMessage>,
) -> Self {
Self { id, addr, sender }
Self { id, addr }
}
}
#[allow(dead_code)]
pub struct Node {
pub tcp_connector: Option<Connector>,
pub tcp_connector: Connector,
pub id: NodeId,
pub addr: Option<SocketAddr>,
pub addr: SocketAddr,
pub tcp_peers: HashMap<NodeId, TcpPeer>,
chain: Blockchain,
listner_handle: Option<tokio::task::JoinHandle<()>>,
@ -46,7 +43,15 @@ pub struct Node {
#[derive(Debug, Error)]
pub enum NodeError {
#[error("Block chain error")]
ChainError(#[from] BlockchainError),
Blockchain(#[from] BlockchainError),
#[error("Connector Error")]
Connector(#[from] ConnectorError),
#[error("Unknown peer address")]
UnknownPeerAddr(SocketAddr),
#[error("Invalid Address:\npeer: {0}\naddr: {1}")]
InvalidAddress(String, SocketAddr),
#[error("Hex Error")]
Hex(#[from] hex::FromHexError),
}
#[derive(Debug, Clone)]
@ -86,9 +91,7 @@ impl Node {
.iter()
.map(|p| p.1.addr.to_string().parse::<SocketAddr>().unwrap())
.collect();
if let Some(a) = self.addr {
addr.push(a.clone());
}
addr.push(self.addr.clone());
addr
}
@ -109,43 +112,42 @@ impl Node {
self.tcp_peers.remove_entry(&peer_id);
}
async fn add_tcp_peer(&mut self, peer: TcpPeer) {
fn add_tcp_peer(&mut self, peer: TcpPeer) {
log(msg!(DEBUG, "Added Peer from address: {}", peer.addr));
self.tcp_peers.insert(peer.id.clone(), peer);
}
pub async fn new_with_id(
id: NodeId,
addr: Option<SocketAddr>,
addr: SocketAddr,
chain: Blockchain,
) -> Self {
Self {
id,
id: id.clone(),
tcp_peers: HashMap::new(),
addr,
chain,
listner_handle: None,
tcp_connector: None,
tcp_connector: Connector::new(id, addr),
}
}
pub fn new(
addr: Option<SocketAddr>,
addr: SocketAddr,
chain: Blockchain,
) -> Self {
let id = NodeId(*Uuid::new_v4().as_bytes());
Self {
id: NodeId(*Uuid::new_v4().as_bytes()),
id: id.clone(),
tcp_peers: HashMap::new(),
addr,
listner_handle: None,
tcp_connector: None,
tcp_connector: Connector::new(id, addr),
chain,
}
}
async fn shutdown(&mut self) {
if let Some(conn) = &self.tcp_connector {
}
let _ = self.chain.shutdown().await;
}
@ -153,6 +155,25 @@ impl Node {
Ok(self.chain.blocks()?)
}
async fn send_message(&mut self, peer_id: NodeId, message: &ProtocolMessage) -> Result<(), NodeError> {
self.tcp_connector.send_message_to_peer(peer_id, &message).await?;
Ok(())
}
async fn broadcast_message(&mut self, msg: &ProtocolMessage) -> Result<(), NodeError> {
let keys: Vec<_> = self.tcp_peers.keys().cloned().collect();
let mut results = Vec::new();
for id in keys {
let res = self.send_message(id, msg).await;
results.push(res);
}
if let Some(error) = results.into_iter().find(|r| r.is_err()) {
error
} else {
Ok(())
}
}
pub async fn process_message(&mut self, peer_id: NodeId, message: ProtocolMessage) -> Result<(), NodeError> {
match message {
ProtocolMessage::BootstrapRequest { .. } => {
@ -160,7 +181,7 @@ impl Node {
let peer = &self.tcp_peers[&peer_id];
let blocks = self.chain.bootstrap()?;
let resp = ProtocolMessage::BootstrapResponse { blocks };
peer.sender.send(resp).await.unwrap();
self.send_message(peer.id.clone(), &resp).await?;
log(msg!(DEBUG, "Send BootstrapResponse to {peer_id}"));
}
ProtocolMessage::BootstrapResponse { blocks } => {
@ -172,11 +193,11 @@ impl Node {
}
ProtocolMessage::Ping { peer_id } => {
log(msg!(DEBUG, "Received Ping from {peer_id}"));
let peer = &self.tcp_peers[&peer_id];
let resp = ProtocolMessage::Pong {
peer_id: self.id.clone(),
};
let peer = &self.tcp_peers[&peer_id];
peer.sender.send(resp).await.unwrap();
self.send_message(peer.id.clone(), &resp).await?;
}
ProtocolMessage::GetPeersRequest { peer_id } => {
log(msg!(DEBUG, "Received GetPeersRequest from {peer_id}"));
@ -185,7 +206,7 @@ impl Node {
peer_addresses: peers,
};
let peer = &self.tcp_peers[&peer_id];
peer.sender.send(resp).await.unwrap();
self.send_message(peer.id.clone(), &resp).await?;
}
ProtocolMessage::Block { block, .. } => {
log(msg!(DEBUG, "Received Block from {peer_id}"));
@ -209,223 +230,163 @@ impl Node {
Ok(())
}
pub async fn send_message_to_peer_addr(&self, addr: SocketAddr, msg: ProtocolMessage) {
pub async fn send_message_to_peer_addr(&mut self, addr: SocketAddr, msg: &ProtocolMessage) -> Result<(), NodeError> {
if let Some((_, peer)) = self.tcp_peers.iter().find(|(_, v)| v.addr == addr) {
if let Err(e) = peer.sender.send(msg).await {
log(msg!(ERROR, "Error Sending message to peer: {e}"));
}
log(msg!(DEBUG, "Sent BootstrapRequest to seed"));
self.send_message(peer.id.clone(), &msg).await?;
Ok(())
} else {
log(msg!(
ERROR,
"Error Sending message to peer: peer not in list"
));
Err(NodeError::UnknownPeerAddr(addr))
}
}
pub async fn send_message_to_peer_id(&self, id: NodeId, msg: ProtocolMessage) {
if let Some(peer) = self.tcp_peers.get(&id) {
if let Err(e) = peer.sender.send(msg).await {
log(msg!(ERROR, "Error Sending message to peer: {e}"));
}
}
pub async fn send_message_to_peer_id(&mut self, id: NodeId, msg: &ProtocolMessage) -> Result<(), NodeError> {
self.send_message(id.clone(), &msg).await?;
Ok(())
}
async fn send_message_to_seed(&self, msg: ProtocolMessage) {
async fn send_message_to_seed(&mut self, msg: &ProtocolMessage) -> Result<(), NodeError> {
for seed in SEED_NODES.iter() {
if let Some(_) = self.tcp_peers.iter().find(|(_, v)| v.addr == *seed) {
self.send_message_to_peer_addr(*seed, msg).await;
return;
} else {
self.send_message_to_peer_addr(*seed, msg).await;
return;
if let Some(s) = self.tcp_peers.iter().find(|(_, v)| v.addr == *seed) {
self.send_message_to_peer_addr(s.1.addr, &msg).await?;
}
}
log(msg!(ERROR, "No Seed Nodes Avaliable"));
Ok(())
}
async fn bootstrap(&mut self) -> Result<(), ValidationError> {
log(msg!(DEBUG, "Bootstrapping"));
async fn bootstrap(&mut self) -> Result<(), NodeError> {
let message = ProtocolMessage::BootstrapRequest {
peer_id: self.id.clone(),
version: "".to_string(),
};
self.send_message_to_seed(message).await;
self.send_message_to_seed(&message).await?;
Ok(())
}
async fn broadcast_network_data(&self, data: ChainData) {
for (id, peer) in &self.tcp_peers {
let message = ProtocolMessage::ChainData {
peer_id: self.id.clone(),
data: data.clone(),
};
peer.sender.send(message).await.unwrap();
log(msg!(DEBUG, "Send Transaction message to {id}"));
}
async fn broadcast_network_data(&mut self, data: ChainData) -> Result<(), NodeError> {
let message = ProtocolMessage::ChainData {
peer_id: self.id.clone(),
data,
};
self.broadcast_message(&message).await
}
async fn broadcast_block(&self, block: &blockchain_core::Block) {
for (id, peer) in &self.tcp_peers {
let message = ProtocolMessage::Block {
peer_id: self.id.clone(),
height: block.head().height as u64,
block: block.clone(),
};
peer.sender.send(message).await.unwrap();
log(msg!(DEBUG, "Send Block message to {id}"));
}
async fn broadcast_block(&mut self, block: &blockchain_core::Block) -> Result<(), NodeError> {
let message = ProtocolMessage::Block {
peer_id: self.id.clone(),
height: block.head().height as u64,
block: block.clone(),
};
self.broadcast_message(&message).await
}
async fn connector_cmd(&mut self, cmd: ConnectorCommand) -> Result<Option<NodeCommand>, crate::network::ConnectorError> {
match &mut self.tcp_connector {
Some(t) => { t.execute_cmd(cmd).await },
None => {
log(msg!(ERROR, "No Connector Availiable"));
todo!("Implement node level connection cmd fail");
},
}
async fn connector_cmd(&mut self, cmd: ConnectorCommand) -> Result<Option<NodeCommand>, NodeError> {
let res = self.tcp_connector.command(cmd).await?;
Ok(res)
}
async fn start_connection_listner(&mut self, addr: SocketAddr) {
fn start_connection_listner(&mut self, addr: SocketAddr) {
log(msg!(DEBUG, "Starting Connection Listener"));
let connector = Connector::new(self.id.clone(), addr);
log(msg!(DEBUG, "Connector Build"));
self.tcp_connector = Some(connector);
let connector = Connector::new(self.id.clone(), addr);
log(msg!(DEBUG, "Connector Build"));
self.tcp_connector = connector;
}
async fn connect_to_seed(&mut self) {
self
.connector_cmd(ConnectorCommand::ConnectToTcpSeed(SEED_NODES[0]))
.await;
async fn connect_to_seed(&mut self) -> Result<Option<NodeCommand>, NodeError> {
let res = self.tcp_connector.command(ConnectorCommand::ConnectToTcpSeed(SEED_NODES[0])).await?;
Ok(res)
}
pub async fn command(&mut self, command: NodeCommand) {
match command {
NodeCommand::BootStrap => {
log(msg!(DEBUG, "Received NodeCommand::BootStrap"));
let _ = self.bootstrap().await;
pub async fn command(&mut self, command: NodeCommand) -> Result<Option<WatcherCommand>, NodeError> {
match command {
NodeCommand::BootStrap => {
self.bootstrap().await?;
}
NodeCommand::BroadcastTransaction(sign_tx) => {
self.broadcast_network_data(ChainData::Transaction(sign_tx)).await?;
}
NodeCommand::StartListner(addr) => {
self.start_connection_listner(addr);
}
NodeCommand::ConnectToSeeds => {
self.connect_to_seed().await?;
}
NodeCommand::ConnectTcpPeer(addr) => {
let addr_sock = addr.parse::<SocketAddr>()
.map_err(|_| NodeError::InvalidAddress("Self".to_string(), self.addr))?;
let res = self.connector_cmd(ConnectorCommand::ConnectToTcpPeer(addr_sock)).await?;
if let Some(node_cmd) = res {
return Ok(Some(WatcherCommand::Node(node_cmd)))
}
NodeCommand::BroadcastTransaction(sign_tx) => {
self.broadcast_network_data(ChainData::Transaction(sign_tx)).await;
}
NodeCommand::StartListner(addr) => {
self.start_connection_listner(addr).await;
}
NodeCommand::ConnectToSeeds => {
self.connect_to_seed().await;
}
NodeCommand::ConnectTcpPeer(addr) => {
log(msg!(DEBUG, "Received ConnectToPeer: {addr}"));
if let Ok(addr_sock) = addr.parse::<SocketAddr>() {
let mes = ConnectorCommand::ConnectToTcpPeer(addr_sock);
self.connector_cmd(mes).await;
} else {
log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}"));
}
}
NodeCommand::PingAddr(addr) => {
if let Ok(addr_sock) = addr.parse::<SocketAddr>() {
let mes = ProtocolMessage::Ping { peer_id: self.id.clone() };
self.send_message_to_peer_addr(addr_sock, mes).await;
} else {
log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}"));
}
}
NodeCommand::PingId(id) => {
let mes = ProtocolMessage::Ping { peer_id: self.id.clone() };
self.send_message_to_peer_id(id, mes).await;
}
NodeCommand::AddPeer(peer) => {
self.add_tcp_peer(peer).await;
}
NodeCommand::RemovePeer { peer_id } => {
self.remove_tcp_peer(peer_id).await;
}
NodeCommand::ProcessMessage { peer_id, message } => {
self.process_message(peer_id, message).await.unwrap();
}
NodeCommand::AwardCurrency { address, amount } => {
if let Err(e) = self.chain.award_currency(address, amount) {
print_error_chain(&e.into());
}
}
NodeCommand::ProcessChainData(data) => {
if let Err(e) = self.chain.apply_chain_data(data.clone()) {
print_error_chain(&e.into());
}
self.broadcast_network_data(data).await;
}
NodeCommand::CreateBlock => {
log(msg!(DEBUG, "Received CreateBlock Command"));
match self.chain.create_block() {
Ok(block) => {
log(msg!(
INFO,
"Created Block with hash {}",
hex::encode(block.head().block_hash())
));
self.broadcast_block(&block).await;
}
Err(e) => print_error_chain(&e.into()),
}
}
NodeCommand::DisplayBlockInteractive => {
let blocks = match self.chain.list_blocks() {
Ok(b) => b,
Err(e) => return print_error_chain(&e.into()),
};
let wat_cmd = WatcherCommand::SetMode(WatcherMode::Select {
}
NodeCommand::PingAddr(addr) => {
let addr_sock = addr.parse::<SocketAddr>()
.map_err(|_| NodeError::InvalidAddress("Self".to_string(), self.addr))?;
let mes = ProtocolMessage::Ping { peer_id: self.id.clone() };
self.send_message_to_peer_addr(addr_sock, &mes).await?;
}
NodeCommand::PingId(id) => {
let mes = ProtocolMessage::Ping { peer_id: self.id.clone() };
self.send_message_to_peer_id(id, &mes).await?;
}
NodeCommand::AddPeer(peer) => {
self.add_tcp_peer(peer);
}
NodeCommand::RemovePeer { peer_id } => {
self.remove_tcp_peer(peer_id).await;
}
NodeCommand::ProcessMessage { peer_id, message } => {
self.process_message(peer_id, message).await?;
}
NodeCommand::AwardCurrency { address, amount } => {
self.chain.award_currency(address, amount)?;
}
NodeCommand::ProcessChainData(data) => {
self.chain.apply_chain_data(data.clone())?;
self.broadcast_network_data(data).await?;
}
NodeCommand::CreateBlock => {
let block = self.chain.create_block()?;
self.broadcast_block(&block).await?;
}
NodeCommand::DisplayBlockInteractive => {
let blocks = self.chain.list_blocks()?;
return Ok(Some(
WatcherCommand::SetMode(WatcherMode::Select {
content: blocks.iter().map(|h| hex::encode(h)).collect::<Vec<String>>().into(),
title: "Select Block to display".to_string(),
callback: Box::new(WatcherCommand::Node(NodeCommand::DisplayBlockByKey("".to_string()))),
index: 0
});
publish_watcher_event(wat_cmd);
}
NodeCommand::DisplayBlockByKey(key) => {
if let Ok(block_hash) = hex::decode(key) {
self.chain.display_block_by_key(&block_hash)
}
},
NodeCommand::DisplayBlockByHeight(height) => self.chain.display_block_by_height(height),
NodeCommand::ListBlocks => {
log(msg!(DEBUG, "Received DebugListBlocks command"));
match self.chain.list_blocks() {
Ok(s) => log(s.iter().map(|h| format!("{}\n", hex::encode(h))).collect::<Vec<String>>().join("\n")),
Err(e) => print_error_chain(&e.into()),
}
}
NodeCommand::ListPeers => {
log(msg!(DEBUG, "Received DebugListPeers command"));
log(self.list_peers());
}
NodeCommand::ShowId => {
log(msg!(DEBUG, "Received DebugListBlocks command"));
self.show_id().await;
}
NodeCommand::Exit => {
log(msg!(DEBUG, "Node Exit"));
index: 0,
})
));
}
NodeCommand::DisplayBlockByKey(key) => {
self.chain.display_block_by_key(&hex::decode(key)?);
}
NodeCommand::DisplayBlockByHeight(height) => {
self.chain.display_block_by_height(height);
}
NodeCommand::ListBlocks => {
let s = self.chain.list_blocks()?;
let block_list = s.iter().map(|h| format!("{}\n", hex::encode(h))).collect::<Vec<String>>().join("\n");
return Ok(Some(WatcherCommand::Print(block_list)));
}
NodeCommand::ListPeers => {
return Ok(Some(WatcherCommand::Print(self.list_peers())));
}
NodeCommand::ShowId => {
self.show_id().await;
}
NodeCommand::Exit => {}
}
Ok(None)
}
fn handle_error(&self, error: NodeError) {
log(msg!(ERROR, "{error}"));
}
pub async fn init(&mut self) {
if let Some(addr) = self.addr {
self.start_connection_listner(addr).await;
} else {
self
.start_connection_listner(SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
8080,
))
.await;
};
let _http_handle = tokio::spawn(async move {
let _ = crate::api::server::start_server().await;
});
@ -440,15 +401,12 @@ impl Node {
// }
// });
let mut _system_rx = subscribe_system_event();
publish_system_event(SystemEvent::NodeStarted);
self.chain.recover_mempool();
}
pub async fn poll(&mut self) -> Option<()> {
tokio::select! {
_con_cmd = self.tcp_connector.as_mut()?.poll() => {
_con_cmd = self.tcp_connector.poll() => {
None
}
}

View File

@ -71,14 +71,21 @@ impl WatcherBuilder {
self.addr = Some(crate::seeds_constants::SEED_NODES[0]);
}
if self.addr.is_none() {
self.addr = Some(SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
8080,
))
}
let chain = node::Blockchain::new(self.database, self.temporary).unwrap();
let mut node = Node::new(self.addr.clone(), chain);
let mut node = Node::new(self.addr.unwrap(), chain);
node.init().await;
log(msg!(INFO, "Built Node"));
if self.bootstrap {
node.command(NodeCommand::BootStrap).await;
node.command(NodeCommand::BootStrap).await.unwrap();
}
let cmd_history = Vec::new();

View File

@ -1,12 +1,13 @@
use crate::{
bus::{SystemEvent, publish_system_event, subscribe_system_event}, cli::cli, node::{Node, node::NodeCommand}, watcher::WatcherMode
bus::{SystemEvent, publish_system_event, subscribe_system_event}, cli::cli, node::{Node, NodeError, node::NodeCommand}, watcher::WatcherMode
};
use shared::print_error_chain;
use ratatui::prelude::CrosstermBackend;
use crate::print_error_chain;
use crossterm::event::{Event, EventStream, KeyCode, KeyEventKind, MouseButton, MouseEventKind};
use futures::StreamExt;
use memory_stats::memory_stats;
use std::io::{self, Write};
use std::io::{self, Stdout, Write};
use tokio::{
select,
time::{Duration, interval},
@ -24,6 +25,12 @@ use cli_renderer::{
RenderCommand
};
#[derive(thiserror::Error, Debug)]
pub enum WatcherError {
#[error("Node Error")]
Node(#[from] NodeError),
}
#[allow(dead_code)]
pub struct Watcher {
cmd_buffer: String,
@ -86,10 +93,9 @@ impl Watcher {
poll_res = self.poll() => {
match poll_res {
Ok(event) => {
self.renderer.set_area(terminal.get_frame().area());
match self.handle_event(event).await {
Ok(ret) => if !ret { self.exit(); break }
Err(e) => log(msg!(ERROR, "{}", e)),
match self.handle_event(event, &mut terminal).await {
Ok(_) => {}
Err(e) => print_error_chain(&e.into()),
}
}
Err(()) => { log(msg!(ERROR, "Failed to read from Stream")) }
@ -99,10 +105,13 @@ impl Watcher {
match ui_event {
Ok(cmd) => {
self.renderer.set_area(terminal.get_frame().area());
self.command(cmd);
match self.command(cmd).await {
Ok(_) => {}
Err(e) => print_error_chain(&e.into()),
}
},
Err(e) => {
log(msg!(ERROR, "{}", e))
print_error_chain(&e.into())
}
}
}
@ -160,28 +169,32 @@ impl Watcher {
self.mode = mode;
}
pub async fn command(&mut self, cmd: WatcherCommand) {
pub async fn command(&mut self, cmd: WatcherCommand) -> Result<Option<WatcherCommand>, WatcherError> {
match cmd {
WatcherCommand::NodeResponse(resp) => log(resp),
WatcherCommand::Node(n) => self.node.command(n).await,
WatcherCommand::Node(n) => return Ok(self.node.command(n).await?),
WatcherCommand::Render(p) => self.renderer.apply(p),
WatcherCommand::Echo(s) => self.echo(s),
WatcherCommand::Print(s) => log(s),
WatcherCommand::InvalidCommand(str) => self.invalid_command(str).await,
WatcherCommand::Exit => self.exit(),
WatcherCommand::SetMode(mode) => self.set_mode(mode),
}
};
Ok(None)
}
async fn handle_enter(&mut self) {
async fn handle_enter(&mut self) -> Result<(), WatcherError> {
match &self.mode {
WatcherMode::Input => {
if !self.cmd_buffer.is_empty() {
let exec_event = cli(&self.cmd_buffer);
self.command(exec_event).await;
let mut cmd = cli(&self.cmd_buffer);
self.cmd_buffer.clear();
self.renderer.handle_enter()
self.renderer.handle_enter();
while let Some(new_cmd) = self.command(cmd).await? {
cmd = new_cmd;
}
}
Ok(())
}
WatcherMode::Select { content, callback, index, .. } => {
match &&**callback {
@ -190,7 +203,7 @@ impl Watcher {
NodeCommand::DisplayBlockByKey(_) => {
let key = (*content)[*index].clone().to_string();
log(msg!(DEBUG, "KEY IN ENTER: {key}"));
self.node.command(NodeCommand::DisplayBlockByKey(key));
self.node.command(NodeCommand::DisplayBlockByKey(key)).await?;
}
_ => {log(msg!(DEBUG, "TODO: Implement callback for {:?}", nd_cmd))}
}
@ -200,6 +213,7 @@ impl Watcher {
self.mode = WatcherMode::Input;
let rd_cmd = RenderCommand::SetMode(InputMode::Input);
self.renderer.apply(rd_cmd);
Ok(())
}
}
}
@ -268,7 +282,7 @@ impl Watcher {
});
}
pub async fn handle_event(&mut self, event: Event) -> io::Result<bool> {
pub async fn handle_event(&mut self, event: Event, terminal: &mut ratatui::Terminal<CrosstermBackend<Stdout>>) -> Result<(), WatcherError> {
match event {
Event::Mouse(event) => match event.kind {
MouseEventKind::ScrollUp => {
@ -288,6 +302,9 @@ impl Watcher {
},
_ => {}
},
Event::Resize(_, _) => {
self.renderer.set_area(terminal.get_frame().area());
}
Event::Key(k) if k.kind == KeyEventKind::Press => match k.code {
KeyCode::Esc => publish_system_event(SystemEvent::Shutdown),
KeyCode::Char(c) => {
@ -299,7 +316,7 @@ impl Watcher {
self.renderer.handle_backspace()
}
KeyCode::Enter => {
self.handle_enter().await;
self.handle_enter().await?;
}
KeyCode::Up | KeyCode::Down | KeyCode::Left | KeyCode::Right => {
self.handle_arrow_key(k.code);
@ -313,7 +330,7 @@ impl Watcher {
}
_ => {}
}
Ok(true)
Ok(())
}
pub async fn poll(&mut self) -> Result<Event, ()> {