Refactoring the whole channel based structure to a imperative approach

This commit is contained in:
Victor Vobis 2026-03-10 10:17:30 +01:00
parent da9daa9359
commit 407085c78b
19 changed files with 220 additions and 3608 deletions

3141
node/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -20,18 +20,12 @@ once_cell = "1.21.3"
async-trait = "0.1.89"
anyhow = "1.0.99"
memory-stats = "1.2.0"
# jemalloc = "0.3.0"
# jemallocator = "0.5.4"
textwrap = "0.16.2"
sled = "0.34.7"
bincode = { version = "2.0.1", features = ["derive", "serde"] }
futures = "0.3.31"
secp256k1 = { version = "0.31.1", features = ["hashes", "rand", "recovery", "serde"] }
ring = "0.17.14"
shared = { path = "../shared", features = ["node"] }
watchlet = { path = "../watchlet" }
cli-renderer = { path = "../cli-renderer" }
tiny_http = "0.12.0"
serde-big-array = "0.5.1"
rust-ipfs = "0.14.1"
libp2p = "0.56.0"

View File

@ -3,19 +3,19 @@ use std::sync::Arc;
use tokio::sync::broadcast;
use super::event_bus::EventBus;
use crate::executor::ExecutorCommand;
use crate::watcher::WatcherCommand;
pub enum ErrorEvent {
}
static ERROR_BUS: Lazy<Arc<EventBus<ExecutorCommand>>> =
static ERROR_BUS: Lazy<Arc<EventBus<WatcherCommand>>> =
Lazy::new(|| Arc::new(EventBus::new()));
pub fn publish_error(event: ExecutorCommand) {
pub fn publish_error(event: WatcherCommand) {
ERROR_BUS.publish(event);
}
pub fn subscribe_error_bus() -> broadcast::Receiver<ExecutorCommand> {
pub fn subscribe_error_bus() -> broadcast::Receiver<WatcherCommand> {
ERROR_BUS.subscribe()
}

View File

@ -1,17 +0,0 @@
use once_cell::sync::Lazy;
use std::sync::Arc;
use tokio::sync::broadcast;
use super::event_bus::EventBus;
use crate::executor::ExecutorCommand;
static EXECUTOR_EVENT_BUS: Lazy<Arc<EventBus<ExecutorCommand>>> =
Lazy::new(|| Arc::new(EventBus::new()));
pub fn publish_executor_event(event: ExecutorCommand) {
EXECUTOR_EVENT_BUS.publish(event);
}
pub fn subscribe_executor_event() -> broadcast::Receiver<ExecutorCommand> {
EXECUTOR_EVENT_BUS.subscribe()
}

View File

@ -2,7 +2,7 @@ use crate::args::*;
use crate::network::NodeId;
use shared::blockchain_core::ChainData;
use vlogger::*;
use crate::executor::ExecutorCommand;
use crate::watcher::WatcherCommand;
use crate::node::*;
use crate::log;
use cli_renderer::RenderCommand;
@ -46,18 +46,18 @@ fn handle_ping(cmd: CliPingCommand) -> NodeCommand {
}
}
pub fn cli(input: &str) -> ExecutorCommand {
pub fn cli(input: &str) -> WatcherCommand {
let argv: Vec<&str> = std::iter::once(" ")
.chain(input.split_whitespace())
.collect();
match Cli::try_parse_from(argv) {
Ok(cmd) => match cmd.command {
CliCommand::Layout { mode } => ExecutorCommand::Render(RenderCommand::ChangeLayout(mode)),
CliCommand::Clear => ExecutorCommand::Render(RenderCommand::ClearPane),
CliCommand::Peer { peer_cmd } => ExecutorCommand::Node(handle_peer_command(peer_cmd)),
CliCommand::Block { block_cmd } => ExecutorCommand::Node(handle_block_command(block_cmd)),
CliCommand::Layout { mode } => WatcherCommand::Render(RenderCommand::ChangeLayout(mode)),
CliCommand::Clear => WatcherCommand::Render(RenderCommand::ClearPane),
CliCommand::Peer { peer_cmd } => WatcherCommand::Node(handle_peer_command(peer_cmd)),
CliCommand::Block { block_cmd } => WatcherCommand::Node(handle_block_command(block_cmd)),
CliCommand::Transaction(tx) => {
ExecutorCommand::Node(NodeCommand::ProcessChainData(ChainData::NodeTransaction(tx)))
WatcherCommand::Node(NodeCommand::ProcessChainData(ChainData::NodeTransaction(tx)))
}
CliCommand::Award { address, amount } => {
let mut bytes = [0u8; 20];
@ -68,15 +68,15 @@ pub fn cli(input: &str) -> ExecutorCommand {
}
bytes.copy_from_slice(address.as_bytes());
ExecutorCommand::Node(NodeCommand::AwardCurrency{ address: bytes, amount }
WatcherCommand::Node(NodeCommand::AwardCurrency{ address: bytes, amount }
)}
CliCommand::DebugShowId => ExecutorCommand::Node(NodeCommand::ShowId),
CliCommand::DebugShowId => WatcherCommand::Node(NodeCommand::ShowId),
CliCommand::StartListner { addr } => {
ExecutorCommand::Node(NodeCommand::StartListner(addr.parse().unwrap()))
WatcherCommand::Node(NodeCommand::StartListner(addr.parse().unwrap()))
}
CliCommand::Seeds { seed_cmd } => ExecutorCommand::Node(handle_seed_command(seed_cmd)),
CliCommand::Ping { ping_cmd } => ExecutorCommand::Node(handle_ping(ping_cmd)),
CliCommand::Seeds { seed_cmd } => WatcherCommand::Node(handle_seed_command(seed_cmd)),
CliCommand::Ping { ping_cmd } => WatcherCommand::Node(handle_ping(ping_cmd)),
},
Err(e) => ExecutorCommand::InvalidCommand(format!("{e}")),
Err(e) => WatcherCommand::InvalidCommand(format!("{e}")),
}
}

View File

@ -1,15 +0,0 @@
use crate::node::NodeCommand;
use cli_renderer::RenderCommand;
use crate::watcher::WatcherCommand;
#[derive(Clone, Debug)]
pub enum ExecutorCommand {
NodeResponse(String),
Echo(Vec<String>),
Print(String),
InvalidCommand(String),
Node(NodeCommand),
Render(RenderCommand),
Watcher(WatcherCommand),
Exit,
}

View File

@ -1,106 +0,0 @@
use crate::{
bus::{publish_system_event, publish_watcher_event, subscribe_system_event, SystemEvent},
log,
node::NodeCommand,
watcher::WatcherCommand,
};
use cli_renderer::pane::RenderTarget;
use thiserror::Error;
use tokio::{select, sync::mpsc};
use vlogger::*;
use super::ExecutorCommand;
use crate::RenderCommand;
#[derive(Debug, Error)]
pub enum InProcessError {
#[error("TODO: {0}")]
TODO(String),
}
pub struct Executor {
node_tx: mpsc::Sender<NodeCommand>,
rx: mpsc::Receiver<ExecutorCommand>,
exit: bool,
}
impl Executor {
pub fn new(node_tx: mpsc::Sender<NodeCommand>, rx: mpsc::Receiver<ExecutorCommand>) -> Self {
Self {
node_tx,
rx,
exit: false,
}
}
pub async fn run(&mut self) {
publish_system_event(SystemEvent::ExecutorStarted);
let mut sys_rx = subscribe_system_event();
while !self.exit {
select! {
_ = self.listen() => {}
event_res = sys_rx.recv() => {
if let Ok(event) = event_res {
match event {
SystemEvent::Shutdown => {
self.exit().await;
}
_ => {}
}
}
}
}
}
}
async fn exit(&mut self) {
log(msg!(DEBUG, "Executor Exit"));
self.exit = true
}
async fn listen(&mut self) {
if let Some(cmd) = self.rx.recv().await {
let _ = self.execute(cmd).await;
}
}
async fn send_node_cmd(&self, cmd: NodeCommand) {
self.node_tx.send(cmd).await.unwrap()
}
async fn handle_node_cmd(&self, cmd: NodeCommand) {
self.send_node_cmd(cmd).await;
}
async fn echo(&self, s: Vec<String>) {
let mut str = s.join(" ");
str.push_str("\n");
let rd_cmd = WatcherCommand::Render(RenderCommand::StringToPaneId {
str,
pane: RenderTarget::CliOutput,
});
publish_watcher_event(rd_cmd);
}
async fn invalid_command(&self, str: String) {
let rd_cmd = WatcherCommand::Render(RenderCommand::StringToPaneId {
str,
pane: RenderTarget::CliOutput,
});
publish_watcher_event(rd_cmd);
}
async fn execute(&mut self, cmd: ExecutorCommand) {
match cmd {
ExecutorCommand::NodeResponse(resp) => log(resp),
ExecutorCommand::Node(n) => self.handle_node_cmd(n).await,
ExecutorCommand::Render(p) => publish_watcher_event(WatcherCommand::Render(p)),
ExecutorCommand::Watcher(w) => publish_watcher_event(w),
ExecutorCommand::Echo(s) => self.echo(s).await,
ExecutorCommand::Print(s) => log(s),
ExecutorCommand::InvalidCommand(str) => self.invalid_command(str).await,
ExecutorCommand::Exit => self.exit().await,
}
}
}

View File

@ -41,20 +41,11 @@ pub mod bus {
pub mod system;
pub use error::*;
pub mod executor;
pub use network::*;
pub use watcher::*;
pub use system::*;
}
pub mod executor {
pub mod executor;
pub use executor::*;
pub mod command;
pub use command::*;
}
pub mod watcher {
pub mod builder;
pub mod watcher;

View File

@ -1,5 +1,4 @@
use crate::executor::ExecutorCommand;
use crate::log;
use crate::{log, watcher};
use crate::network::NodeId;
use crate::node::node;
use super::ProtocolMessage;
@ -16,7 +15,6 @@ pub struct Connection {
node_id: NodeId,
peer_id: NodeId,
stream: net::TcpStream,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<ProtocolMessage>,
}
@ -25,7 +23,6 @@ impl Connection {
node_id: NodeId,
peer_id: NodeId,
stream: net::TcpStream,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<ProtocolMessage>,
) -> Self {
Self {
@ -33,12 +30,10 @@ impl Connection {
peer_id,
stream,
rx,
exec_tx,
}
}
pub async fn start(mut self) {
tokio::spawn(async move {
pub async fn poll(mut self) {
log(msg!(DEBUG, "Started Message Handler for {}", self.peer_id));
loop {
@ -61,30 +56,18 @@ impl Connection {
message_result = Connector::receive_message(&mut self.stream) => {
match message_result {
Ok(message) => {
log(msg!(DEBUG, "Received Message from {}", self.peer_id));
let command = ExecutorCommand::Node(node::NodeCommand::ProcessMessage {
peer_id: self.peer_id.clone(),
message: message.clone()
});
if self.exec_tx.send(command).await.is_err() {
log(msg!(ERROR, "Failed to send command to main thread from {}", self.peer_id));
break;
}
todo!("[TODO] Return parsed command to propagate");
},
Err(e) => {
log(msg!(WARNING, "Connection to {} closed: {}", self.peer_id, e));
let cmd = ExecutorCommand::Node(node::NodeCommand::RemovePeer {
let cmd = watcher::WatcherCommand::Node(node::NodeCommand::RemovePeer {
peer_id: self.peer_id
});
self.exec_tx.send(cmd).await.unwrap();
break;
todo!("[TODO] Return Error Node Command to propagate");
}
}
}
}
}
});
}
}

View File

@ -12,7 +12,6 @@ use crate::log;
use crate::network::NodeId;
use super::Connection;
use crate::bus::*;
use crate::executor::ExecutorCommand;
use crate::node::node;
use crate::node::{NetworkError, error};
use super::ProtocolMessage;
@ -26,8 +25,8 @@ pub enum ConnectorCommand {
pub struct Connector {
node_id: NodeId,
addr: SocketAddr,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<ConnectorCommand>,
connections: Vec<Connection>,
listener: Option<tokio::net::TcpListener>,
exit: bool,
}
@ -43,57 +42,59 @@ impl Connector {
pub fn new(
node_id: NodeId,
addr: SocketAddr,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<ConnectorCommand>,
) -> Self {
Self {
node_id,
addr,
exec_tx,
rx,
connections: Vec::new(),
listener: None,
exit: false,
}
}
pub async fn start(&mut self) {
let mut listner: Option<tokio::net::TcpListener> = None;
let mut listner_err = None;
pub async fn init(&mut self) {
for _ in 0..MAX_LISTNER_TRIES {
match tokio::net::TcpListener::bind(self.addr).await {
Ok(l) => {
log(msg!(DEBUG, "Listening on address: {}", self.addr));
listner = Some(l);
self.listener = Some(l);
break;
}
Err(e) => {
self.addr.set_port(self.addr.port() + 1);
listner_err = Some(e);
let listner_err = Some(e);
println!("{:#?}", listner_err);
}
};
}
if let Some(listener) = listner {
while !self.exit {
}
pub async fn poll(&mut self) -> Result<Option<node::NodeCommand>, ConnectorError> {
if let Some(listener) = &mut self.listener {
todo!("Implement Vec Poll for connections");
tokio::select! {
cmd_result = self.rx.recv() => {
match cmd_result {
Some(cmd) => {
self.execute_cmd(cmd).await;
}
None => {
log(msg!(DEBUG, "Command channel closed"));
break;
}
}
}
// cmd_result = self.rx.recv() => {
// todo!("Implement Vec Poll for connections");
// match cmd_result {
// Some(cmd) => {
// self.execute_cmd(cmd).await
// }
// None => {
// log(msg!(DEBUG, "Command channel closed"));
// todo!("Handle Connector Error");
// }
// }
// }
accept_result = listener.accept() => {
match accept_result {
Ok((stream, addr)) => {
log(msg!(DEBUG, "Accepted connection from {}", addr));
self.establish_connection_inbound(stream, addr).await;
let peer = self.establish_connection_inbound(stream, addr).await?;
Ok(Some(node::NodeCommand::AddPeer(peer)))
}
Err(e) => {
log(msg!(ERROR, "Failed to accept connection: {}", e));
}
todo!("Implement Connector TcpListner connection fail");
}
}
}
@ -101,89 +102,74 @@ impl Connector {
} else {
log(msg!(
FATAL,
"Failed to start TCP Listener: {}",
listner_err.unwrap()
"Failed to start TCP Listener",
));
todo!("Implement Connector TcpListner fail");
}
}
async fn execute_cmd(&mut self, cmd: ConnectorCommand) {
pub async fn execute_cmd(&mut self, cmd: ConnectorCommand) -> Result<Option<node::NodeCommand>, ConnectorError> {
match cmd {
ConnectorCommand::ConnectToTcpPeer(addr) => self.connect_to_peer(addr).await,
ConnectorCommand::ConnectToTcpPeer(addr) => {
let peer = self.connect_to_peer(addr).await?;
Ok(Some(node::NodeCommand::AddPeer(peer)))
},
ConnectorCommand::ConnectToTcpSeed(addr) => {
self.connect_to_seed(addr).await;
let peer = self.connect_to_seed(addr).await?;
Ok(Some(node::NodeCommand::AddPeer(peer)))
}
ConnectorCommand::Shutdown => {
self.exit = true;
Ok(None)
}
}
}
pub async fn connect_to_seed(&self, addr: SocketAddr) {
pub async fn connect_to_seed(&mut self, addr: SocketAddr) -> Result<node::TcpPeer, ConnectorError> {
match net::TcpStream::connect(addr)
.await
.with_context(|| format!("Connecting to {}", addr))
{
Ok(stream) => self.establish_connection_to_seed(stream, addr).await,
Ok(stream) => {
let peer = self.establish_connection_outbound(stream, addr).await;
publish_network_event(NetworkEvent::SeedConnected(addr.to_string()));
peer
},
Err(e) => {
// let err = ConnectorError::ConnectionError(e.into());
print_error_chain(&e.into());
todo!("Handle connector error propagation");
}
}
}
pub async fn connect_to_peer(&self, addr: SocketAddr) {
pub async fn connect_to_peer_inbound(&mut self, addr: SocketAddr) -> Result<node::TcpPeer, ConnectorError> {
match net::TcpStream::connect(addr).await {
Ok(stream) => self.establish_connection_inbound(stream, addr).await,
Err(e) => {
let err = ConnectorError::ConnectionError(e.into());
print_error_chain(&err.into());
todo!("Handle connector error propagation");
}
}
}
pub async fn connect_to_peer(&mut self, addr: SocketAddr) -> Result<node::TcpPeer, ConnectorError> {
match net::TcpStream::connect(addr).await {
Ok(stream) => self.establish_connection_outbound(stream, addr).await,
Err(e) => {
let err = ConnectorError::ConnectionError(e.into());
print_error_chain(&err.into());
todo!("Handle connector error propagation");
}
}
}
pub async fn establish_connection_to_seed(
&self,
mut stream: tokio::net::TcpStream,
addr: SocketAddr,
) {
let handshake = ProtocolMessage::Handshake {
peer_id: self.node_id.clone(),
version: "".to_string(),
};
match Connector::send_message(&mut stream, &handshake).await {
Ok(()) => {
if let Ok(mes) = Connector::receive_message(&mut stream).await {
let (ch_tx, ch_rx) = mpsc::channel::<ProtocolMessage>(100);
let peer = match mes {
ProtocolMessage::HandshakeAck { peer_id, .. } => {
node::TcpPeer::new(peer_id, addr, ch_tx)
}
_ => {
log(msg!(
ERROR,
"Invalid Message On Connetion Establishment: {mes}"
));
return;
}
};
let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone()));
publish_network_event(NetworkEvent::SeedConnected(addr.to_string()));
let _ = self.exec_tx.send(cmd).await;
Connection::new(self.node_id.clone(), peer.id, stream, self.exec_tx.clone(), ch_rx)
.start()
.await;
}
}
Err(e) => print_error_chain(&e.into()),
}
}
async fn establish_connection_outbound(
&self,
&mut self,
mut stream: tokio::net::TcpStream,
addr: SocketAddr,
) {
) -> Result<node::TcpPeer, ConnectorError> {
let handshake = ProtocolMessage::Handshake {
peer_id: self.node_id.clone(),
version: "".to_string(),
@ -201,25 +187,28 @@ impl Connector {
ERROR,
"Invalid Message On Connetion Establishment: {mes}"
));
return;
todo!("Handle connector receive message fail");
}
};
let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone()));
let _ = self.exec_tx.send(cmd).await;
Connection::new(self.node_id.clone(), peer.id, stream, self.exec_tx.clone(), ch_rx)
.start()
.await;
let connection = Connection::new(self.node_id.clone(), peer.id.clone(), stream, ch_rx);
self.connections.push(connection);
Ok(peer)
} else {
todo!("Handle connector receive message fail");
}
}
Err(e) => print_error_chain(&e.into()),
Err(e) => {
print_error_chain(&e.into());
todo!("Handle Connector Error");
},
}
}
async fn establish_connection_inbound(
&self,
&mut self,
mut stream: tokio::net::TcpStream,
addr: SocketAddr,
) {
) -> Result<node::TcpPeer, ConnectorError> {
if let Ok(mes) = Connector::receive_message(&mut stream).await {
let (ch_tx, ch_rx) = mpsc::channel::<ProtocolMessage>(100);
let peer = match mes {
@ -230,7 +219,11 @@ impl Connector {
};
match Connector::send_message(&mut stream, &ack).await {
Ok(()) => node::TcpPeer::new(peer_id, addr, ch_tx),
Err(e) => return print_error_chain(&e.into()),
Err(e) => {
print_error_chain(&e.into());
todo!("Implement inbound conneciton message send fail")
},
}
}
_ => {
@ -238,14 +231,14 @@ impl Connector {
ERROR,
"Invalid Message On Connetion Establishment: {mes}"
));
return;
todo!("Implement inbound conneciton message invalid");
}
};
let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone()));
let _ = self.exec_tx.send(cmd).await;
Connection::new(self.node_id.clone(), peer.id, stream, self.exec_tx.clone(), ch_rx)
.start()
.await;
let connection = Connection::new(self.node_id.clone(), peer.id.clone(), stream, ch_rx);
self.connections.push(connection);
Ok(peer)
} else {
todo!("Implement inbount connection message read fail");
}
}

View File

@ -1,4 +0,0 @@
segment_size: 1048576
use_compression: false
version: 0.34
lø

Binary file not shown.

View File

@ -1,4 +0,0 @@
segment_size: 1048576
use_compression: false
version: 0.34
lø

Binary file not shown.

View File

@ -13,7 +13,7 @@ use shared::ws_protocol::{ WsClientRequest, WsClientResponse };
use watchlet::WalletError;
use crate::db::BINCODE_CONFIG;
use crate::executor::ExecutorCommand;
use crate::watcher::WatcherCommand;
use crate::log;
use crate::node::NodeCommand;
use crate::seeds_constants::WS_LISTEN_ADDRESS;
@ -46,13 +46,13 @@ pub enum WsServerError {
pub struct WsServer {
rx: Receiver<WsCommand>,
tx: Sender<ExecutorCommand>,
tx: Sender<WatcherCommand>,
clients: HashMap<SocketAddr, Sender<WsClientResponse>>,
}
async fn handle_ws_client_request(
req: WsClientRequest,
_tx: Sender<ExecutorCommand>,
_tx: Sender<WatcherCommand>,
) -> Result<(), WsServerError> {
match req {
WsClientRequest::Ping => {
@ -61,7 +61,7 @@ async fn handle_ws_client_request(
}
WsClientRequest::BroadcastTransaction(sign_tx) => {
Validator::verify_signature(&sign_tx)?;
let _cmd = ExecutorCommand::Node(NodeCommand::BroadcastTransaction(sign_tx));
let _cmd = WatcherCommand::Node(NodeCommand::BroadcastTransaction(sign_tx));
}
}
Ok(())
@ -70,7 +70,7 @@ async fn handle_ws_client_request(
async fn ws_connection(
stream: TcpStream,
mut rx: Receiver<WsClientResponse>,
_tx: Sender<ExecutorCommand>,
_tx: Sender<WatcherCommand>,
) -> Result<(), WsServerError> {
let ws_server = tokio_tungstenite::accept_async(stream).await.unwrap();
let (mut write, mut read) = ws_server.split();
@ -101,7 +101,7 @@ async fn ws_connection(
}
impl WsServer {
pub fn new(rx: Receiver<WsCommand>, tx: Sender<ExecutorCommand>) -> Self {
pub fn new(rx: Receiver<WsCommand>, tx: Sender<WatcherCommand>) -> Self {
Self {
rx,
tx,

View File

@ -1,13 +1,11 @@
use crate::bus::{publish_system_event, publish_watcher_event, subscribe_system_event, SystemEvent};
use shared::blockchain_core::{self, ChainData, SignedTransaction, validator::ValidationError};
use crate::print_error_chain;
use crate::executor::ExecutorCommand;
use crate::log;
use crate::network::{NodeId, ProtocolMessage};
use crate::network::{Connector, ConnectorCommand};
use crate::seeds_constants::SEED_NODES;
use crate::watcher::{WatcherCommand, WatcherMode};
use crate::network::ws_server::{WsCommand, WsServer};
use super::{ Blockchain, BlockchainError };
use std::collections::HashMap;
@ -15,8 +13,6 @@ use std::net::SocketAddr;
use std::sync::Arc;
use thiserror::*;
use tokio::select;
use tokio::sync::mpsc;
use uuid::Uuid;
use vlogger::*;
@ -39,15 +35,12 @@ impl TcpPeer {
#[allow(dead_code)]
pub struct Node {
pub tcp_connector: Option<mpsc::Sender<ConnectorCommand>>,
pub tcp_connector: Option<Connector>,
pub id: NodeId,
pub addr: Option<SocketAddr>,
pub tcp_peers: HashMap<NodeId, TcpPeer>,
chain: Blockchain,
listner_handle: Option<tokio::task::JoinHandle<()>>,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<NodeCommand>,
tx: mpsc::Sender<NodeCommand>,
}
#[derive(Debug, Error)]
@ -123,49 +116,35 @@ impl Node {
pub async fn new_with_id(
id: NodeId,
exec_tx: mpsc::Sender<ExecutorCommand>,
addr: Option<SocketAddr>,
chain: Blockchain,
) -> Self {
let (tx, rx) = mpsc::channel::<NodeCommand>(100);
Self {
id,
tcp_peers: HashMap::new(),
addr,
exec_tx,
chain,
listner_handle: None,
tcp_connector: None,
tx,
rx,
}
}
pub fn new(
addr: Option<SocketAddr>,
exec_tx: mpsc::Sender<ExecutorCommand>,
chain: Blockchain,
) -> Self {
let (tx, rx) = mpsc::channel::<NodeCommand>(100);
Self {
id: NodeId(*Uuid::new_v4().as_bytes()),
tcp_peers: HashMap::new(),
addr,
exec_tx,
listner_handle: None,
tcp_connector: None,
chain,
tx,
rx,
}
}
async fn shutdown(&mut self) {
if let Some(conn) = &self.tcp_connector {
let res = conn.send(ConnectorCommand::Shutdown).await;
if res.is_err() {
log(msg!(ERROR, "Failed to send shutdown signal to connector"));
}
}
let _ = self.chain.shutdown().await;
}
@ -300,35 +279,22 @@ impl Node {
}
}
pub fn tx(&self) -> mpsc::Sender<NodeCommand> {
return self.tx.clone();
}
pub fn exec_tx(&self) -> mpsc::Sender<ExecutorCommand> {
return self.exec_tx.clone();
}
async fn connector_cmd(&self, cmd: ConnectorCommand) {
match &self.tcp_connector {
Some(t) => match t.send(cmd).await {
Ok(()) => {}
Err(e) => log(msg!(ERROR, "Failed to Send Command to connector: {}", e)),
async fn connector_cmd(&mut self, cmd: ConnectorCommand) -> Result<Option<NodeCommand>, crate::network::ConnectorError> {
match &mut self.tcp_connector {
Some(t) => { t.execute_cmd(cmd).await },
None => {
log(msg!(ERROR, "No Connector Availiable"));
todo!("Implement node level connection cmd fail");
},
None => log(msg!(ERROR, "No Connector Availiable")),
}
}
async fn start_connection_listner(&mut self, addr: SocketAddr) {
log(msg!(DEBUG, "Starting Connection Listener"));
let (con_tx, con_rx) = mpsc::channel::<ConnectorCommand>(100);
self.tcp_connector = Some(con_tx);
self.listner_handle = Some(tokio::spawn({
let mut connector = Connector::new(self.id.clone(), addr, self.exec_tx(), con_rx);
let connector = Connector::new(self.id.clone(), addr);
log(msg!(DEBUG, "Connector Build"));
async move { connector.start().await }
}));
self.tcp_connector = Some(connector);
}
async fn connect_to_seed(&mut self) {
@ -337,8 +303,7 @@ impl Node {
.await;
}
async fn accept_command(&mut self) {
while let Some(command) = self.rx.recv().await {
pub async fn command(&mut self, command: NodeCommand) {
match command {
NodeCommand::BootStrap => {
log(msg!(DEBUG, "Received NodeCommand::BootStrap"));
@ -417,7 +382,7 @@ impl Node {
let wat_cmd = WatcherCommand::SetMode(WatcherMode::Select {
content: blocks.iter().map(|h| hex::encode(h)).collect::<Vec<String>>().into(),
title: "Select Block to display".to_string(),
callback: Box::new(ExecutorCommand::Node(NodeCommand::DisplayBlockByKey("".to_string()))),
callback: Box::new(WatcherCommand::Node(NodeCommand::DisplayBlockByKey("".to_string()))),
index: 0
});
publish_watcher_event(wat_cmd);
@ -445,13 +410,11 @@ impl Node {
}
NodeCommand::Exit => {
log(msg!(DEBUG, "Node Exit"));
break;
}
}
}
}
pub async fn run(&mut self) {
pub async fn init(&mut self) {
if let Some(addr) = self.addr {
self.start_connection_listner(addr).await;
} else {
@ -463,46 +426,31 @@ impl Node {
.await;
};
let http_handle = tokio::spawn(async move {
let _http_handle = tokio::spawn(async move {
let _ = crate::api::server::start_server().await;
});
let (_tx, rx) = mpsc::channel::<WsCommand>(100);
// let (_tx, rx) = mpsc::channel::<WsCommand>(100);
let mut ws_server = WsServer::new(rx, self.exec_tx());
// let mut ws_server = WsServer::new(rx, self.exec_tx());
let _ws_handle = tokio::spawn(async move {
if let Err(e) = ws_server.run().await {
print_error_chain(&e.into());
}
});
// let _ws_handle = tokio::spawn(async move {
// if let Err(e) = ws_server.run().await {
// print_error_chain(&e.into());
// }
// });
let mut system_rx = subscribe_system_event();
let mut _system_rx = subscribe_system_event();
publish_system_event(SystemEvent::NodeStarted);
self.chain.recover_mempool();
}
loop {
select! {
_ = self.accept_command() => {
}
event_result = system_rx.recv() => {
match event_result {
Ok(e) => {
match e {
SystemEvent::Shutdown => {
break;
}
_ => {}
}
}
_ => {}
pub async fn poll(&mut self) -> Option<()> {
tokio::select! {
_con_cmd = self.tcp_connector.as_mut()?.poll() => {
None
}
}
}
}
http_handle.abort_handle().abort();
self.shutdown().await;
}
}

View File

@ -1,10 +1,7 @@
use std::net::SocketAddr;
use tokio::sync::mpsc;
use vlogger::*;
use crate::bus::{NetworkEvent, SystemEvent, subscribe_system_event};
use crate::executor::{Executor, ExecutorCommand};
use crate::{log, node};
use crate::node::{Node, NodeCommand};
use cli_renderer::{RenderLayoutKind, Renderer};
@ -62,8 +59,6 @@ impl WatcherBuilder {
}
pub async fn start(mut self) -> Watcher {
let (exec_tx, exec_rx) = mpsc::channel::<ExecutorCommand>(100);
let mut sys_event = subscribe_system_event();
if self.debug {
Watcher::log_memory().await;
@ -77,79 +72,25 @@ impl WatcherBuilder {
}
let chain = node::Blockchain::new(self.database, self.temporary).unwrap();
let mut node = Node::new(self.addr.clone(), exec_tx.clone(), chain);
let mut node = Node::new(self.addr.clone(), chain);
node.init().await;
log(msg!(INFO, "Built Node"));
let executor_handle = tokio::spawn({
let node_tx = node.tx();
async move {
let _ = Executor::new(node_tx, exec_rx).run().await;
}
});
for i in 0..3 {
if let Ok(ev) = sys_event.recv().await {
match ev {
SystemEvent::ExecutorStarted => {
log(msg!(INFO, "Executor Started"));
break;
}
_ => log(msg!(WARNING, "Wrong Event: {ev:?}! Retrying... (try {i})")),
}
}
}
let node_tx = node.tx();
let node_handle = tokio::spawn({
async move {
node.run().await;
}
});
for i in 0..3 {
if let Ok(ev) = sys_event.recv().await {
match ev {
SystemEvent::NodeStarted => {
log(msg!(INFO, "Executor Started"));
break;
}
_ => log(msg!(WARNING, "Wrong Event: {ev:?}! Retrying... (try {i})")),
}
}
}
if self.bootstrap {
let exec_tx = exec_tx.clone();
tokio::spawn(async move {
let seed_cmd = ExecutorCommand::Node(NodeCommand::ConnectToSeeds);
let mut ev_rx = crate::bus::subscribe_network_event();
let _ = exec_tx.send(seed_cmd).await;
while let Ok(e) = ev_rx.recv().await {
match e {
NetworkEvent::SeedConnected(_) => {
let bootstrap_cmd = ExecutorCommand::Node(NodeCommand::BootStrap);
let _ = exec_tx.send(bootstrap_cmd).await;
}
_ => {}
}
}
});
node.command(NodeCommand::BootStrap).await;
}
let cmd_history = Vec::new();
let history_index = 0;
let cmd_buffer = String::new();
let handles = vec![executor_handle, node_handle];
Watcher::new(
node_tx,
exec_tx,
cmd_buffer,
cmd_history,
history_index,
handles,
renderer,
node,
)
}
}

View File

@ -1,12 +1,19 @@
use std::sync::Arc;
use cli_renderer::RenderCommand;
use crate::executor::ExecutorCommand;
use crate::node::NodeCommand;
#[derive(Debug, Clone)]
pub enum WatcherCommand {
NodeResponse(String),
Node(NodeCommand),
Echo(Vec<String>),
Print(String),
InvalidCommand(String),
Render(RenderCommand),
SetMode(WatcherMode),
Exit,
}
#[derive(Debug, Clone)]
@ -15,7 +22,7 @@ pub enum WatcherMode {
Select{
content: Arc<Vec<String>>,
title: String,
callback: Box<ExecutorCommand>,
callback: Box<WatcherCommand>,
index: usize,
},
}

View File

@ -1,8 +1,5 @@
use crate::{
bus::{publish_system_event, subscribe_system_event, SystemEvent},
cli::cli,
node::node::NodeCommand,
watcher::WatcherMode
bus::{SystemEvent, publish_system_event, subscribe_system_event}, cli::cli, node::{Node, node::NodeCommand}, watcher::WatcherMode
};
use shared::print_error_chain;
@ -12,7 +9,6 @@ use memory_stats::memory_stats;
use std::io::{self, Write};
use tokio::{
select,
sync::mpsc,
time::{Duration, interval},
};
use vlogger::*;
@ -20,7 +16,6 @@ use vlogger::*;
use super::{ WatcherBuilder, WatcherCommand };
use crate::bus::subscribe_watcher_event;
use crate::executor::*;
use crate::log;
use cli_renderer::{
@ -31,35 +26,29 @@ use cli_renderer::{
#[allow(dead_code)]
pub struct Watcher {
node_tx: mpsc::Sender<NodeCommand>,
exec_tx: mpsc::Sender<ExecutorCommand>,
cmd_buffer: String,
cmd_history: Vec<String>,
history_index: usize,
handles: Vec<tokio::task::JoinHandle<()>>,
event_stream: crossterm::event::EventStream,
mode: WatcherMode,
pub renderer: Renderer,
node: Node,
}
impl Watcher {
pub fn new(
node_tx: mpsc::Sender<NodeCommand>,
exec_tx: mpsc::Sender<ExecutorCommand>,
cmd_buffer: String,
cmd_history: Vec<String>,
history_index: usize,
handles: Vec<tokio::task::JoinHandle<()>>,
renderer: Renderer,
node: Node,
) -> Self {
Self {
node_tx,
exec_tx,
cmd_buffer,
cmd_history,
history_index,
handles,
renderer,
node,
mode: WatcherMode::Input,
event_stream: EventStream::new(),
}
@ -76,10 +65,6 @@ impl Watcher {
async fn shutdown(&mut self) -> io::Result<()> {
ratatui::restore();
let handles = std::mem::take(&mut self.handles);
for handle in handles {
handle.await.unwrap()
}
crossterm::execute!(
std::io::stdout(),
crossterm::event::DisableBracketedPaste,
@ -88,24 +73,6 @@ impl Watcher {
)
}
pub fn handle_cmd(&mut self, cmd: WatcherCommand) {
match cmd {
WatcherCommand::Render(rend_cmd) => {
self.renderer.apply(rend_cmd);
}
WatcherCommand::SetMode(mode) => {
match &mode {
WatcherMode::Input => {}
WatcherMode::Select{content, title, ..} => {
let rd_cmd = RenderCommand::SetMode(InputMode::PopUp(content.clone(), title.clone(), 0));
self.renderer.apply(rd_cmd);
}
}
self.mode = mode;
}
}
}
pub async fn run(&mut self) -> std::io::Result<()> {
let mut ui_rx = subscribe_watcher_event();
let mut render_interval = interval(Duration::from_millis(32));
@ -132,7 +99,7 @@ impl Watcher {
match ui_event {
Ok(cmd) => {
self.renderer.set_area(terminal.get_frame().area());
self.handle_cmd(cmd);
self.command(cmd);
},
Err(e) => {
log(msg!(ERROR, "{}", e))
@ -162,31 +129,68 @@ impl Watcher {
WatcherBuilder::new()
}
pub fn exec_tx(&self) -> mpsc::Sender<ExecutorCommand> {
self.exec_tx.clone()
fn exit(&mut self) {
log(msg!(DEBUG, "Watcher Exit"));
}
pub fn exit(&self) {}
fn echo(&mut self, s: Vec<String>) {
let mut str = s.join(" ");
str.push_str("\n");
self.renderer.apply(RenderCommand::StringToPaneId {
str,
pane: cli_renderer::RenderTarget::CliOutput,
});
}
async fn invalid_command(&mut self, str: String) {
self.renderer.apply(RenderCommand::StringToPaneId {
str,
pane: cli_renderer::RenderTarget::CliOutput,
});
}
fn set_mode(&mut self, mode: WatcherMode) {
match &mode {
WatcherMode::Input => {}
WatcherMode::Select{content, title, ..} => {
let rd_cmd = RenderCommand::SetMode(InputMode::PopUp(content.clone(), title.clone(), 0));
self.renderer.apply(rd_cmd);
}
}
self.mode = mode;
}
pub async fn command(&mut self, cmd: WatcherCommand) {
match cmd {
WatcherCommand::NodeResponse(resp) => log(resp),
WatcherCommand::Node(n) => self.node.command(n).await,
WatcherCommand::Render(p) => self.renderer.apply(p),
WatcherCommand::Echo(s) => self.echo(s),
WatcherCommand::Print(s) => log(s),
WatcherCommand::InvalidCommand(str) => self.invalid_command(str).await,
WatcherCommand::Exit => self.exit(),
WatcherCommand::SetMode(mode) => self.set_mode(mode),
}
}
async fn handle_enter(&mut self) {
match &self.mode {
WatcherMode::Input => {
if !self.cmd_buffer.is_empty() {
let exec_event = cli(&self.cmd_buffer);
let _ = self.exec_tx.send(exec_event).await;
self.command(exec_event).await;
self.cmd_buffer.clear();
self.renderer.handle_enter()
}
}
WatcherMode::Select { content, callback, index, .. } => {
match &&**callback {
&ExecutorCommand::Node(nd_cmd) => {
&WatcherCommand::Node(nd_cmd) => {
match nd_cmd {
NodeCommand::DisplayBlockByKey(_) => {
let key = (*content)[*index].clone().to_string();
log(msg!(DEBUG, "KEY IN ENTER: {key}"));
let resp = ExecutorCommand::Node(NodeCommand::DisplayBlockByKey(key));
let _ = self.exec_tx.send(resp).await;
self.node.command(NodeCommand::DisplayBlockByKey(key));
}
_ => {log(msg!(DEBUG, "TODO: Implement callback for {:?}", nd_cmd))}
}