This commit is contained in:
Victor Vobis 2025-08-27 23:47:23 +02:00
parent d6f7c758c0
commit cb43facd9e
6 changed files with 69 additions and 64 deletions

4
out Normal file
View File

@ -0,0 +1,4 @@
[?1049h┏InputPane━━━━━━━━━━━━━━━━━┓┏OutputPane━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓┃>┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┃┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛[?25l[INFO]21:14:03|StartedRenderer[?25l[INFO]21:14:03|StartedParser[?25l[INFO]21:14:03|StartedExecutor[?25l[INFO]21:14:03|StartedNode[?25l[INFO]21:14:03|BuildingChain[?25l[?25l[?25l[?25l[?25l[?25l[?25lc[?25l[?25l[?25l[?25lc[?25l[?25l[?25l [?25l [?25l[?25l[?25l>[?25l[INFO]21:14:08|Exec:Recievedechocommand[?25l[INFO]21:14:08|Parser:Receivedmessagefromwatcher[?25lUsage:<COMMAND>Commands:nodeAblockchainnodeCLItoolclearhelpPrintthismessageorthehelpofthegivensubcommand(s)Options:-h,--helpPrinthelp[?25lc[?25l[?25l[?25la[?25l[?25l[?25l[?1049lHello, world!
[DEBUG] [src/watcher/renderer.rs:125] 21:14:11 | Renderer Exit
[?25h

View File

@ -4,6 +4,7 @@ use clap::{Parser, Subcommand};
use crate::core; use crate::core;
use crate::watcher::RenderPane; use crate::watcher::RenderPane;
use clap::*; use clap::*;
#[derive(Parser)] #[derive(Parser)]

View File

@ -22,6 +22,7 @@ async fn main() {
let mut watcher = Watcher::build() let mut watcher = Watcher::build()
.file(args.seed_file) .file(args.seed_file)
.addr(args.addr) .addr(args.addr)
.seed(args.seed)
.bootstrap(args.bootstrap) .bootstrap(args.bootstrap)
.start().await; .start().await;

View File

@ -76,7 +76,7 @@ impl node::NativeNode {
match message { match message {
ProtocolMessage::BootstrapRequest { .. } => { ProtocolMessage::BootstrapRequest { .. } => {
log!(INFO, "Received BootstrapRequest from {peer_id}"); self.log(msg!(INFO, "Received BootstrapRequest from {peer_id}")).await;
let peer = &self.tcp_peers[&peer_id]; let peer = &self.tcp_peers[&peer_id];
let resp = ProtocolMessage::BootstrapResponse { let resp = ProtocolMessage::BootstrapResponse {
blocks: self.chain.blocks().to_vec() blocks: self.chain.blocks().to_vec()
@ -84,6 +84,10 @@ impl node::NativeNode {
peer.sender.send(resp).await.unwrap(); peer.sender.send(resp).await.unwrap();
log!(INFO, "Send BootstrapResponse to {peer_id}"); log!(INFO, "Send BootstrapResponse to {peer_id}");
}, },
ProtocolMessage::BootstrapResponse { blocks } => {
self.log(msg!(INFO, "Received BootstrapResponse from seed")).await;
self.chain = core::Blockchain::build(blocks.to_vec()).unwrap();
},
ProtocolMessage::Ping {peer_id} => { ProtocolMessage::Ping {peer_id} => {
log!(INFO, "Received Ping from {peer_id}"); log!(INFO, "Received Ping from {peer_id}");
let resp = ProtocolMessage::Pong { peer_id: self.id.clone() }; let resp = ProtocolMessage::Pong { peer_id: self.id.clone() };

View File

@ -55,31 +55,31 @@ impl NativeNode {
addr addr
} }
pub fn list_peers(&self) { pub fn list_peers(&self) -> String {
println!("Peer List\n-----------"); let mut ret = String::from("Peer List\n-----------");
for (i, p) in self.tcp_peers.iter().enumerate() { for (i, p) in self.tcp_peers.iter().enumerate() {
println!("Peer #{i}: {}", p.1.id) ret.push_str(format!("Peer #{i}: {}", p.1.id).as_str())
} }
ret
} }
pub fn show_id(&self) { pub async fn show_id(&self) {
println!("Node Id: {}", self.id) self.log(msg!(INFO, "Node Id: {}", self.id)).await
} }
fn remove_tcp_peer(&mut self, peer_id: Uuid) { async fn remove_tcp_peer(&mut self, peer_id: Uuid) {
log!(INFO, "Removing Peer {peer_id}"); self.log(msg!(INFO, "Removing Peer {peer_id}")).await;
self.tcp_peers.remove_entry(&peer_id); self.tcp_peers.remove_entry(&peer_id);
} }
fn add_tcp_peer(&mut self, id: Uuid, addr: SocketAddr, sender: tokio::sync::mpsc::Sender<ProtocolMessage>) { async fn add_tcp_peer(&mut self, id: Uuid, addr: SocketAddr, sender: tokio::sync::mpsc::Sender<ProtocolMessage>) {
let peer = TcpPeer { let peer = TcpPeer {
id, id,
addr, addr,
sender sender
}; };
log!(INFO, "Adding Peer {}", peer.id); self.log(msg!(INFO, "Added Peer from address: {addr}")).await;
self.tcp_peers.insert(id, peer); self.tcp_peers.insert(id, peer);
} }
@ -134,53 +134,32 @@ impl NativeNode {
} }
} }
pub async fn send_handshake(id: uuid::Uuid, stream: &mut tokio::net::TcpStream) -> Result<uuid::Uuid, ValidationError> { pub async fn send_message_to_peer_addr(&self, addr: SocketAddr, msg: ProtocolMessage) {
let handshake = ProtocolMessage::Handshake { peer_id: id.clone(), version: "".to_string() }; if let Some((_, peer)) = self.tcp_peers.iter().find(|(_, v)| v.addr == addr) {
NativeNode::send_message(stream, &handshake).await.unwrap(); if let Err(e) = peer.sender.send(msg).await {
if let Ok(response) = NativeNode::receive_message(stream).await { self.log(msg!(ERROR, "Error Sending message to peer: {e}")).await;
match response {
message::ProtocolMessage::Handshake { peer_id, version: _ } => {
Ok(peer_id)
},
_ => {
log!(ERROR, "Invalid response on Handshake");
Err(ValidationError::InvalidBlockHash)
} }
} }
} else { }
Err(ValidationError::InvalidBlockHash)
pub async fn send_message_to_peer_id(&self, id: Uuid, msg: ProtocolMessage) {
if let Some(peer) = self.tcp_peers.get(&id) {
if let Err(e) = peer.sender.send(msg).await {
self.log(msg!(ERROR, "Error Sending message to peer: {e}")).await;
}
} }
} }
pub async fn bootstrap(&mut self) -> Result<(), ValidationError> { pub async fn bootstrap(&mut self) -> Result<(), ValidationError> {
self.log(msg!(INFO, "Running As Native Node")).await; self.log(msg!(INFO, "Running As Native Node")).await;
let mut stream = tokio::net::TcpStream::connect(SEED_NODES[0]).await.unwrap(); self.connect_to_peer(SEED_NODES[0]).await;
let id = uuid::Uuid::new_v4(); let message = ProtocolMessage::BootstrapRequest{peer_id: self.id, version: "".to_string()};
self.send_message_to_peer_addr(SEED_NODES[0], message).await;
if let Ok(_) = NativeNode::send_handshake(id, &mut stream).await {
let message = message::ProtocolMessage::BootstrapRequest { peer_id: id.clone(), version: "".to_string() };
NativeNode::send_message(&mut stream, &message).await.unwrap();
self.log(msg!(INFO, "Sent BootstrapRequest to seed")).await; self.log(msg!(INFO, "Sent BootstrapRequest to seed")).await;
if let Ok(response) = NativeNode::receive_message(&mut stream).await {
match response {
ProtocolMessage::BootstrapResponse { blocks } => {
self.log(msg!(INFO, "Received BootstrapResponse from seed")).await;
self.chain = core::Blockchain::build(blocks).unwrap();
Ok(()) Ok(())
},
_ => {
self.log(msg!(ERROR, "Invalid Response from BootstrapRequest: {:?}", &response)).await;
Err(ValidationError::InvalidBlockHash)
}
}
} else {
Err(ValidationError::InvalidBlockHash)
}
} else {
Err(ValidationError::InvalidBlockHash)
}
} }
pub async fn broadcast_transaction(&self, tx: &core::Tx) { pub async fn broadcast_transaction(&self, tx: &core::Tx) {
@ -212,6 +191,7 @@ impl NativeNode {
self.listner_handle.as_ref().unwrap().abort(); self.listner_handle.as_ref().unwrap().abort();
self.listner_handle = None; self.listner_handle = None;
} }
self.log(msg!(INFO, "Listening on address: {}", addr)).await;
if let Ok(tcp_listner) = tokio::net::TcpListener::bind(addr).await { if let Ok(tcp_listner) = tokio::net::TcpListener::bind(addr).await {
let id = self.id.clone(); let id = self.id.clone();
@ -223,7 +203,7 @@ impl NativeNode {
}; };
} }
async fn log(&self, msg: String) { pub async fn log(&self, msg: String) {
let _ = self.exec_tx.send(ExecutorCommand::Print(msg)).await; let _ = self.exec_tx.send(ExecutorCommand::Print(msg)).await;
} }
@ -254,10 +234,10 @@ impl NativeNode {
} }
} }
NodeCommand::AddPeer { peer_id, addr, sender } => { NodeCommand::AddPeer { peer_id, addr, sender } => {
self.add_tcp_peer(peer_id, addr, sender); self.add_tcp_peer(peer_id, addr, sender).await;
}, },
NodeCommand::RemovePeer { peer_id } => { NodeCommand::RemovePeer { peer_id } => {
self.remove_tcp_peer(peer_id); self.remove_tcp_peer(peer_id).await;
} }
NodeCommand::ProcessMessage { peer_id, message } => { NodeCommand::ProcessMessage { peer_id, message } => {
self.process_message(peer_id, &message).await; self.process_message(peer_id, &message).await;
@ -281,7 +261,7 @@ impl NativeNode {
}, },
NodeCommand::ShowId => { NodeCommand::ShowId => {
self.log(msg!(INFO, "Received DebugListBlocks command")).await; self.log(msg!(INFO, "Received DebugListBlocks command")).await;
self.show_id(); self.show_id().await;
}, },
NodeCommand::DumpBlocks(s) => { NodeCommand::DumpBlocks(s) => {
self.chain.dump_blocks(s); self.chain.dump_blocks(s);

View File

@ -17,13 +17,6 @@ pub struct Watcher {
handles: Vec<tokio::task::JoinHandle<()>> handles: Vec<tokio::task::JoinHandle<()>>
} }
#[derive(Default)]
pub struct WatcherBuilder {
addr: Option<SocketAddr>,
seed_file: Option<String>,
bootstrap: bool
}
impl Watcher { impl Watcher {
pub fn build() -> WatcherBuilder { pub fn build() -> WatcherBuilder {
WatcherBuilder::new() WatcherBuilder::new()
@ -97,6 +90,14 @@ impl Watcher {
} }
} }
#[derive(Default)]
pub struct WatcherBuilder {
addr: Option<SocketAddr>,
seed_file: Option<String>,
bootstrap: bool,
seed: bool,
}
impl WatcherBuilder { impl WatcherBuilder {
fn new() -> Self { fn new() -> Self {
Self::default() Self::default()
@ -109,6 +110,7 @@ impl WatcherBuilder {
pub fn file(mut self, seed_file: Option<String>) -> Self { pub fn file(mut self, seed_file: Option<String>) -> Self {
self.seed_file = seed_file; self.seed_file = seed_file;
self.seed = true;
self self
} }
@ -117,13 +119,18 @@ impl WatcherBuilder {
self self
} }
pub fn seed(mut self, seed: bool) -> Self {
self.seed = seed;
self
}
pub async fn log(render_tx: &mpsc::Sender<RenderCommand>, msg: String) { pub async fn log(render_tx: &mpsc::Sender<RenderCommand>, msg: String) {
let rendermsg = RenderCommand::RenderStringToPane { str: msg, pane: RenderPane::CliOutput }; let rendermsg = RenderCommand::RenderStringToPane { str: msg, pane: RenderPane::CliOutput };
let _ = render_tx.send(rendermsg).await; let _ = render_tx.send(rendermsg).await;
} }
pub async fn start(self) -> Watcher { pub async fn start(mut self) -> Watcher {
let (render_tx, render_rx) = mpsc::channel::<RenderCommand>(100); let (render_tx, render_rx) = mpsc::channel::<RenderCommand>(100);
let (parser_tx, parser_rx) = mpsc::channel::<ParserCommand>(100); let (parser_tx, parser_rx) = mpsc::channel::<ParserCommand>(100);
let (exec_tx, exec_rx) = mpsc::channel::<ExecutorCommand>(100); let (exec_tx, exec_rx) = mpsc::channel::<ExecutorCommand>(100);
@ -141,10 +148,18 @@ impl WatcherBuilder {
.and_then(|path| std::fs::read_to_string(path).ok()) .and_then(|path| std::fs::read_to_string(path).ok())
.and_then(|content| serde_json::from_str(&content).ok()); .and_then(|content| serde_json::from_str(&content).ok());
if self.seed {
self.addr = Some(crate::seeds_constants::SEED_NODES[0]);
}
let _ = Self::log(&render_tx, msg!(INFO, "Addr: {:?}", self.addr)).await;
let mut node = NativeNode::new(self.addr.clone(), blocks, exec_tx.clone()).await; let mut node = NativeNode::new(self.addr.clone(), blocks, exec_tx.clone()).await;
if self.bootstrap { if self.bootstrap {
let _ = node.bootstrap().await; let _ = node.bootstrap().await;
} }
let _ = Self::log(&render_tx, msg!(INFO, "Build Node")); let _ = Self::log(&render_tx, msg!(INFO, "Build Node"));
let parser_handle = tokio::spawn({ let parser_handle = tokio::spawn({