This commit is contained in:
victor 2025-08-27 11:16:55 +02:00
parent a882d6684e
commit 4b6801644b
10 changed files with 162 additions and 78 deletions

View File

@ -1,3 +1,5 @@
use std::net::SocketAddr;
use clap::{Parser, Subcommand};
use crate::core;
@ -6,7 +8,7 @@ use crate::core;
pub struct Args {
/// Provide address on which node will listen
#[arg(short = 'a', long)]
pub addr: Option<String>,
pub addr: Option<SocketAddr>,
/// Provide File with current chain
#[arg(short = 'f', long)]

View File

@ -1,5 +1,3 @@
use error::{ BlockchainError, handle_error };
pub mod error;
pub mod args;
pub mod core;
@ -9,28 +7,19 @@ pub mod watcher;
use crate::{args::get_args, watcher::watcher::Watcher};
const SEED_ADDR: &str = "127.0.0.1:8333";
fn add_transaction(tx: core::Tx) -> Result<(), BlockchainError> {
Ok(())
}
fn list_accounts() {
println!("Account Balances:\n-----------------");
}
#[tokio::main]
async fn main() {
let args = get_args();
let mut watcher = Watcher::build().file(args.seed_file).addr(args.addr).start();
let mut watcher = Watcher::build().file(args.seed_file).addr(args.addr).start().await;
loop {
if !watcher.poll().await.is_ok() {
if watcher.poll().await.is_ok_and(|b| b) {
break ;
}
}
ratatui::restore();
println!("Hello, world!");
}

View File

@ -1,3 +1,5 @@
use std::net::SocketAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net;
use crate::native_node::node;
@ -19,7 +21,7 @@ pub enum ProtocolMessage {
peer_id: uuid::Uuid
},
GetPeersResponse {
peer_addresses: Vec<String>
peer_addresses: Vec<SocketAddr>
},
Handshake { node_id: uuid::Uuid, version: String },
Block { peer_id: uuid::Uuid, height: u64, block: core::Block },

View File

@ -1,3 +1,5 @@
use std::net::SocketAddr;
use crate::native_node::{message, node};
use crate::seeds_constants::SEED_NODES;
@ -8,21 +10,23 @@ use tokio::sync::mpsc;
impl node::NativeNode {
pub async fn connect_to_seeds(&mut self, sender: mpsc::Sender<node::NodeCommand>) {
for seed in SEED_NODES {
if seed != self.addr {
for seed in SEED_NODES.iter() {
if let Some(a)= self.addr {
if *seed != a {
if let Ok(mut stream) = tokio::net::TcpStream::connect(seed).await {
if let Ok(peer_id) = node::NativeNode::send_handshake(self.id.clone(), &mut stream).await {
let sender = sender.clone();
node::NativeNode::establish_connection(peer_id, seed.to_string(), stream, sender).await;
node::NativeNode::establish_connection(peer_id, *seed, stream, sender).await;
}
}
}
}
}
}
pub async fn establish_connection(
peer_id: uuid::Uuid,
addr: String,
addr: SocketAddr,
stream: tokio::net::TcpStream,
request_sender: tokio::sync::mpsc::Sender<node::NodeCommand>
) {
@ -59,7 +63,7 @@ impl node::NativeNode {
if let Ok(message) = node::NativeNode::receive_message(&mut stream).await {
match message {
message::ProtocolMessage::Handshake { node_id, .. } => {
node::NativeNode::establish_connection(node_id, addr.to_string(), stream, request_sender.clone()).await;
node::NativeNode::establish_connection(node_id, addr, stream, request_sender.clone()).await;
},
_ => {
log!(WARNING, "Invalid Response! expected Handshake, got {:?}", message);

View File

@ -5,45 +5,52 @@ use crate::seeds_constants::SEED_NODES;
use crate::watcher::executor::ExecutorCommand;
use std::collections::HashMap;
use std::net::SocketAddr;
use vlogger::*;
use tokio::sync::mpsc;
use uuid::Uuid;
pub struct TcpPeer {
pub id: Uuid,
pub addr: String,
pub addr: SocketAddr,
pub sender: tokio::sync::mpsc::Sender<ProtocolMessage>
}
pub struct NativeNode {
pub id: Uuid,
pub addr: String,
pub addr: Option<SocketAddr>,
pub tcp_peers: HashMap<Uuid, TcpPeer>,
pub chain: core::Blockchain,
listner_handle: Option<tokio::task::JoinHandle<()>>,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<NodeCommand>,
tx: mpsc::Sender<NodeCommand>,
}
#[derive(Debug)]
pub enum NodeCommand {
AddPeer { peer_id: Uuid, addr: String, sender: tokio::sync::mpsc::Sender<ProtocolMessage> },
AddPeer { peer_id: Uuid, addr: SocketAddr, sender: tokio::sync::mpsc::Sender<ProtocolMessage> },
RemovePeer { peer_id: Uuid },
ProcessMessage { peer_id: Uuid, message: ProtocolMessage },
Transaction { tx: core::Tx },
StartListner(SocketAddr),
CreateBlock,
DebugListBlocks,
DebugListPeers,
DebugShowId,
DebugDumpBlocks,
ConnectToSeeds
ConnectToSeeds,
Exit,
}
impl NativeNode {
pub fn peer_addresses(&self) -> Vec<String> {
let mut addr: Vec<String> = self.tcp_peers.iter().map(|p| p.1.addr.to_string()).collect();
addr.push(self.addr.clone());
pub fn peer_addresses(&self) -> Vec<SocketAddr> {
let mut addr: Vec<SocketAddr> = self.tcp_peers.iter().map(|p| p.1.addr.to_string().parse::<SocketAddr>().unwrap()).collect();
if let Some(a) = self.addr {
addr.push(a.clone());
}
addr
}
@ -63,7 +70,7 @@ impl NativeNode {
self.tcp_peers.remove_entry(&peer_id);
}
fn add_tcp_peer(&mut self, id: Uuid, addr: String, sender: tokio::sync::mpsc::Sender<ProtocolMessage>) {
fn add_tcp_peer(&mut self, id: Uuid, addr: SocketAddr, sender: tokio::sync::mpsc::Sender<ProtocolMessage>) {
let peer = TcpPeer {
id: id,
addr,
@ -78,24 +85,34 @@ impl NativeNode {
pub fn new_with_id(
id: uuid::Uuid,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<NodeCommand>
addr: Option<SocketAddr>,
blocks: Option<Vec<core::Block>>,
) -> Self {
let (tx, rx) = mpsc::channel::<NodeCommand>(100);
Self {
id,
tcp_peers: HashMap::new(),
chain: Default::default(),
addr: String::new(),
chain: {
if blocks.is_some() {
Blockchain::build(blocks.unwrap()).unwrap_or(Default::default())
} else {
Default::default()
}
},
addr,
exec_tx,
listner_handle: None,
tx,
rx,
}
}
pub fn new(
addr: Option<String>,
addr: Option<SocketAddr>,
blocks: Option<Vec<core::Block>>,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<NodeCommand>,
) -> Self {
let (tx, rx) = mpsc::channel::<NodeCommand>(100);
Self {
id: Uuid::new_v4(),
tcp_peers: HashMap::new(),
@ -106,9 +123,11 @@ impl NativeNode {
Default::default()
}
},
addr: if addr.is_some() { addr.unwrap() } else { String::new() },
addr,
exec_tx,
rx
listner_handle: None,
tx,
rx,
}
}
@ -171,7 +190,7 @@ impl NativeNode {
pub async fn broadcast_block(&self, block: &core::Block) {
for (id, peer) in &self.tcp_peers {
let message = ProtocolMessage::Block{
let message = ProtocolMessage::Block {
peer_id: self.id,
height: self.chain.blocks().len() as u64,
block: block.clone()
@ -181,22 +200,39 @@ impl NativeNode {
}
}
pub fn tx(&self) -> mpsc::Sender<NodeCommand> {
return self.tx.clone()
}
async fn start_connection_listner(&mut self, addr: SocketAddr) {
if self.listner_handle.is_some() {
self.listner_handle.as_ref().unwrap().abort();
self.listner_handle = None;
}
if let Ok(tcp_listner) = tokio::net::TcpListener::bind(addr).await {
let id = self.id.clone();
let node_tx = self.tx();
self.listner_handle = Some(tokio::spawn({
async move {
NativeNode::accept_connections(tcp_listner, node_tx, id).await;
}}))
};
}
pub async fn run_native(&mut self) {
let tcp_listner = tokio::net::TcpListener::bind(&self.addr).await.unwrap();
let (channel_write, mut channel_read) = mpsc::channel::<NodeCommand>(100);
if let Some(a) = self.addr {
self.start_connection_listner(a.clone()).await;
};
let id = self.id.clone();
tokio::spawn({
let c = channel_write.clone();
async move {
NativeNode::accept_connections(tcp_listner, c, id).await;
}});
while let Some(command) = channel_read.recv().await {
while let Some(command) = self.rx.recv().await {
match command {
NodeCommand::StartListner(addr) => {
self.start_connection_listner(addr).await;
}
NodeCommand::ConnectToSeeds => {
self.connect_to_seeds(channel_write.clone()).await;
self.connect_to_seeds(self.tx()).await;
},
NodeCommand::AddPeer { peer_id, addr, sender } => {
self.add_tcp_peer(peer_id, addr, sender);
@ -231,6 +267,10 @@ impl NativeNode {
NodeCommand::DebugDumpBlocks => {
// self.chain.dump_blocks(&mut self.db_file);
}
NodeCommand::Exit => {
log!(DEBUG, "Node Exit");
break ;
}
}
}
}

View File

@ -1,3 +1,8 @@
pub const SEED_NODES: [&str; 1] = [
"127.0.0.1:8333"
];
use once_cell::sync::Lazy;
use std::net::{SocketAddr, IpAddr, Ipv4Addr};
pub static SEED_NODES: Lazy<[SocketAddr; 3]> = Lazy::new(|| [
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8333),
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 3000),
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 5432),
]);

View File

@ -1,5 +1,6 @@
use crate::{native_node::node::NodeCommand, watcher::renderer::*};
use tokio::sync::mpsc;
use vlogger::*;
pub enum ExecutorCommand {
NodeResponse(String),
@ -28,6 +29,7 @@ impl Executor {
}
fn exit(&mut self) {
log!(DEBUG, "Executor Exit");
self.exit = true
}

View File

@ -1,3 +1,5 @@
use std::net::SocketAddr;
use crate::native_node::node::NodeCommand;
use crate::watcher::executor::{ExecutorCommand};
use vlogger::*;
@ -50,6 +52,7 @@ impl Parser {
async fn listen(&mut self) {
if let Ok(Some(mes)) = timeout(Duration::from_millis(400), self.rx.recv()).await {
log!(DEBUG, "Parser: Received message from watcher");
match mes {
ParserCommand::ParseCmdString(s) => {
let s_split: Vec<&str> = s.split(" ").collect();
@ -87,12 +90,13 @@ impl Parser {
},
"list" => {
if args.len() != 1 {
log!(ERROR, "{cmd}: Invalid arg! (blocks, peers)");
}
match args[0] {
"blocks" => ExecutorCommand::Node(NodeCommand::DebugListBlocks),
"peers" => ExecutorCommand::Node(NodeCommand::DebugListPeers),
_ => ExecutorCommand::InvalidCommand(msg!(ERROR, "Unkown arg: {}", args[0])),
ExecutorCommand::InvalidCommand(msg!(ERROR, "node list: expects 1 argument"))
} else {
match args[0] {
"blocks" => ExecutorCommand::Node(NodeCommand::DebugListBlocks),
"peers" => ExecutorCommand::Node(NodeCommand::DebugListPeers),
_ => ExecutorCommand::InvalidCommand(msg!(ERROR, "Unkown arg: {}", args[0])),
}
}
},
"dump_blocks" => {
@ -100,6 +104,17 @@ impl Parser {
},
"connect" => {
ExecutorCommand::Node(NodeCommand::ConnectToSeeds)
},
"listen" => {
if args.len() != 1 {
ExecutorCommand::InvalidCommand(msg!(ERROR, "node listen: expects 1 argument"))
} else {
if let Ok(addr) = args[0].parse::<SocketAddr>() {
ExecutorCommand::Node(NodeCommand::StartListner(addr))
} else {
ExecutorCommand::InvalidCommand(msg!(ERROR, "node listen: invalid address {}", args[0]))
}
}
}
_ => {
ExecutorCommand::InvalidCommand(msg!(ERROR, "node: unknown argument {}", args[0]))
@ -131,10 +146,11 @@ impl Parser {
ExecutorCommand::InvalidCommand(msg!(ERROR, "Unknown Command {cmd}"))
}
};
let _ = self.exec_tx.send(exec_cmd);
let _ = self.exec_tx.send(exec_cmd).await;
}
}
ParserCommand::Exit => {
log!(DEBUG, "Parser Exit");
self.exit();
}
}

View File

@ -7,9 +7,10 @@ use ratatui::{
symbols::border,
widgets::{Block, Paragraph, Widget},
DefaultTerminal, Frame,
};
use vlogger::*;
use tokio::sync::mpsc;
use tokio::time::{timeout, Duration};
use std::io;
@ -121,6 +122,7 @@ impl Renderer {
}
fn exit(&mut self) {
log!(DEBUG, "Renderer Exit");
self.exit = true;
}

View File

@ -2,9 +2,10 @@ use crate::watcher::*;
use crossterm::{event::{self, Event, KeyCode, KeyEventKind}};
use tokio::sync::mpsc;
use std::io::{self};
use std::{io::{self}, net::SocketAddr};
use crate::{native_node::node::{NativeNode, NodeCommand}};
use vlogger::*;
pub struct Watcher {
render_tx: mpsc::Sender<RenderCommand>,
@ -17,7 +18,7 @@ pub struct Watcher {
#[derive(Default)]
pub struct WatcherBuilder {
addr: Option<String>,
addr: Option<SocketAddr>,
seed_file: Option<String>
}
@ -26,6 +27,11 @@ impl Watcher {
WatcherBuilder::new()
}
pub async fn log(&self, msg: String) -> Result<(), mpsc::error::SendError<RenderCommand>> {
let rendermsg = RenderCommand::RenderStringToPane { str: msg, pane: RenderPane::CliOutput };
self.render_tx.send(rendermsg).await
}
pub async fn exit(self) {
ratatui::restore();
// for (i, handle) in self.handles.into_iter().enumerate() {
@ -41,27 +47,29 @@ impl Watcher {
KeyCode::Char(c) => {
self.cmd_buffer.push(c);
let message = RenderCommand::RenderInput(k.code);
let _ = self.render_tx.send(message);
let _ = self.render_tx.send(message).await;
}
KeyCode::Backspace => {
self.cmd_buffer.pop();
let message = RenderCommand::RenderInput(k.code);
let _ = self.render_tx.send(message);
let _ = self.render_tx.send(message).await;
},
KeyCode::Enter => {
let rd_mes = RenderCommand::RenderInput(k.code);
let pr_mes = ParserCommand::ParseCmdString(self.cmd_buffer.clone());
let _ = self.render_tx.send(rd_mes);
let _ = self.parser_tx.send(pr_mes);
let _ = self.render_tx.send(rd_mes).await;
let _ = self.parser_tx.send(pr_mes).await;
self.cmd_buffer.clear();
}
KeyCode::Esc => {
let rd_mes = RenderCommand::Exit;
let pr_mes = ParserCommand::Exit;
let exec_mes = ExecutorCommand::Exit;
let _ = self.render_tx.send(rd_mes);
let _ = self.parser_tx.send(pr_mes);
let _ = self.exec_tx.send(exec_mes);
let node_mes = NodeCommand::Exit;
let _ = self.render_tx.send(rd_mes).await;
let _ = self.parser_tx.send(pr_mes).await;
let _ = self.exec_tx.send(exec_mes).await;
let _ = self.node_tx.send(node_mes).await;
return Ok(false);
}
_ => {}
@ -78,7 +86,7 @@ impl WatcherBuilder {
Self::default()
}
pub fn addr(mut self, addr: Option<String>) -> Self {
pub fn addr(mut self, addr: Option<SocketAddr>) -> Self {
self.addr = addr;
self
}
@ -88,11 +96,16 @@ impl WatcherBuilder {
self
}
pub fn start(self) -> Watcher {
pub async fn log(render_tx: &mpsc::Sender<RenderCommand>, msg: String) -> Result<(), mpsc::error::SendError<RenderCommand>> {
let rendermsg = RenderCommand::RenderStringToPane { str: msg, pane: RenderPane::CliOutput };
render_tx.send(rendermsg).await
}
pub async fn start(self) -> Watcher {
let (render_tx, render_rx) = mpsc::channel::<RenderCommand>(100);
let (parser_tx, parser_rx) = mpsc::channel::<ParserCommand>(100);
let (exec_tx, exec_rx) = mpsc::channel::<ExecutorCommand>(100);
let (node_tx, node_rx) = mpsc::channel::<NodeCommand>(100);
let mut terminal = ratatui::init();
let render_handle = tokio::spawn({
@ -100,6 +113,14 @@ impl WatcherBuilder {
let _ = Renderer::new(render_rx, RenderLayoutKind::Cli).run(&mut terminal).await;
}
});
let _ = Self::log(&render_tx, msg!(INFO, "Started Renderer")).await;
let blocks = self.seed_file
.as_ref()
.and_then(|path| std::fs::read_to_string(path).ok())
.and_then(|content| serde_json::from_str(&content).ok());
let mut node = NativeNode::new(self.addr.clone(), blocks, exec_tx.clone());
let _ = Self::log(&render_tx, msg!(INFO, "Build Node"));
let parser_handle = tokio::spawn({
let exec_tx = exec_tx.clone();
@ -108,27 +129,28 @@ impl WatcherBuilder {
}
});
let _ = Self::log(&render_tx, msg!(INFO, "Started Parser")).await;
let executor_handle = tokio::spawn({
let rend_tx = render_tx.clone();
let node_tx = node_tx.clone();
let node_tx = node.tx();
async move {
let _ = Executor::new(rend_tx, node_tx, exec_rx).run().await;
}
});
let blocks = self.seed_file
.as_ref()
.and_then(|path| std::fs::read_to_string(path).ok())
.and_then(|content| serde_json::from_str(&content).ok());
let _ = Self::log(&render_tx, msg!(INFO, "Started Executor")).await;
let node_tx = node.tx();
let node_handle = tokio::spawn({
let exec_tx = exec_tx.clone();
async move {
let _ = NativeNode::new(self.addr.clone(), blocks, exec_tx, node_rx);
node.run_native().await;
}
});
let _ = Self::log(&render_tx, msg!(INFO, "Started Node")).await;
Watcher {
render_tx,
node_tx,