From e6807449443fd7d4c6808b6cb715eefa850885ca Mon Sep 17 00:00:00 2001 From: victor Date: Tue, 26 Aug 2025 00:17:38 +0200 Subject: [PATCH] bless --- Cargo.lock | 13 ++ Cargo.toml | 1 + database/tx.db | 20 +- src/core/block.rs | 17 +- src/core/blockchain.rs | 161 ++++++++++++-- src/core/tx.rs | 9 +- src/main.rs | 5 +- src/native_node.rs | 6 + src/native_node/cli.rs | 81 +++++++ src/native_node/error.rs | 12 ++ src/native_node/message.rs | 113 ++++++++++ src/native_node/network.rs | 129 ++++++++++++ src/native_node/node.rs | 275 ++++++++++++++++++++++++ src/network.rs | 1 - src/network/native.rs | 375 --------------------------------- src/network/network_browser.rs | 3 - src/seeds_constants.rs | 3 + 17 files changed, 817 insertions(+), 407 deletions(-) create mode 100644 src/native_node.rs create mode 100644 src/native_node/cli.rs create mode 100644 src/native_node/error.rs create mode 100644 src/native_node/message.rs create mode 100644 src/native_node/network.rs create mode 100644 src/native_node/node.rs delete mode 100644 src/network.rs delete mode 100644 src/network/native.rs delete mode 100644 src/network/network_browser.rs create mode 100644 src/seeds_constants.rs diff --git a/Cargo.lock b/Cargo.lock index 09a2607..e8123db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -143,6 +143,7 @@ dependencies = [ "thiserror", "tokio", "tokio-tungstenite", + "uuid", "warp", "wasm-bindgen", "web-sys", @@ -1103,6 +1104,18 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +dependencies = [ + "getrandom", + "js-sys", + "serde", + "wasm-bindgen", +] + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index 8e3d69d..68b663c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ sha2 = "0.10.9" thiserror = "2.0.16" tokio = { version = "1.47.1", features = ["full"] } tokio-tungstenite = "0.27.0" +uuid = { version = "1.18.0", features = ["v4", "serde"] } warp = { version = "0.4.2", features = ["server", "websocket"] } wasm-bindgen = "0.2.100" web-sys = { version = "0.3.77", features = ["WebSocket"] } diff --git a/database/tx.db b/database/tx.db index fe51488..c943850 100644 --- a/database/tx.db +++ b/database/tx.db @@ -1 +1,19 @@ -[] +[ + { + "head": { + "previous_hash": "", + "timestamp": 1756157257, + "merkle_root": "3f99805580f3b4bbd1a903180d2057cfd7dace97d820d7170ffd83c95ce3852c", + "block_hash": "06af1aabb570a702dfd2ee7654200890c26e1fdc380589ae76825976524d5d09", + "nonce": 9 + }, + "tx": [ + { + "from": "victor", + "to": "pia", + "value": 500, + "data": "almosen" + } + ] + } +] diff --git a/src/core/block.rs b/src/core/block.rs index 3a8a47d..a760741 100644 --- a/src/core/block.rs +++ b/src/core/block.rs @@ -2,17 +2,17 @@ use crate::core; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct BlockHeader { - previous_hash: String, - timestamp: u64, - merkle_root: String, - block_hash: String, - nonce: u32 + pub previous_hash: String, + pub timestamp: u64, + pub merkle_root: String, + pub block_hash: String, + pub nonce: u32 } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Block { - head: BlockHeader, - tx: Vec + pub head: BlockHeader, + pub tx: Vec } impl BlockHeader { @@ -34,6 +34,9 @@ impl BlockHeader { } impl Block { + pub fn new(head: BlockHeader, tx: Vec) -> Self { + Self { head, tx } + } pub fn head(&self) -> &BlockHeader { &self.head } diff --git a/src/core/blockchain.rs b/src/core/blockchain.rs index 53c9f93..360c6a5 100644 --- a/src/core/blockchain.rs +++ b/src/core/blockchain.rs @@ -1,16 +1,21 @@ use sha2::Digest; +use sha2::Sha256; +use crate::core::block; use crate::log::*; use crate::core; use crate::error::{ BlockchainError, TxError }; use std::collections::HashMap; +use std::io::Write; +use std::time::UNIX_EPOCH; pub type Account = String; #[derive(Debug)] pub enum ValidationError { - InvalidBlockHash + InvalidBlockHash, + InvalidPreviousBlockHash } #[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] @@ -54,29 +59,126 @@ impl Blockchain { Ok(()) } + pub fn hash_transaction(tx: &core::Tx) -> String { + let mut hasher = Sha256::new(); + hasher.update(tx.to()); + hasher.update(tx.from()); + hasher.update(tx.value().to_be_bytes()); + hasher.update(tx.data()); + + let res = hasher.finalize(); + hex::encode(res) + } + + pub fn calculate_next_level(level: &[String]) -> Vec { + let mut next_level = Vec::new(); + + for chunk in level.chunks(2) { + let combined_hash = if chunk.len() == 2 { + Self::hash_pair(&chunk[0], &chunk[1]) + } else { + Self::hash_pair(&chunk[0], &chunk[0]) + }; + next_level.push(combined_hash); + } + next_level + } + + pub fn calculate_merkle_root(tx: &[core::Tx]) -> String { + let tx_hashes: Vec = tx + .iter() + .map(|tx| Blockchain::hash_transaction(tx)) + .collect(); + + if tx_hashes.is_empty() { + return Blockchain::hash_data(""); + } + + if tx_hashes.len() == 1 { + return tx_hashes[0].clone(); + } + + let mut current_level = tx_hashes.to_vec(); + + while current_level.len() > 1 { + current_level = Self::calculate_next_level(¤t_level); + } + + return current_level[0].clone(); + } + + fn hash_pair(left: &str, right: &str) -> String { + let combined = format!("{}{}", left, right); + Self::hash_data(&combined) + } + + fn hash_data(data: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(data.as_bytes()); + hex::encode(hasher.finalize()) + } + + pub fn dump_blocks(&self, db_file: &mut std::fs::File) { + let block_json = serde_json::to_string_pretty(&self.blocks).unwrap(); + db_file.write_all(&block_json.as_bytes()).unwrap(); + } + + pub fn create_block(&mut self) -> core::Block { + let previous_hash = if self.blocks().len() > 0 { + self.blocks().last().unwrap().head().block_hash() + } else { + "" + }; + let merkle_root = Self::calculate_merkle_root(&self.tx_mempool); + let timestamp = std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + let nonce = 0; + let mut new_head = core::BlockHeader { + previous_hash: previous_hash.to_string(), + merkle_root, + timestamp, + nonce, + block_hash: "".to_string() + }; + + let mut block_hash = String::new(); + while !block_hash.starts_with("0") { + new_head.nonce += 1; + block_hash = calculate_block_hash(&new_head) + } + + new_head.block_hash = block_hash; + + let new_block = core::Block::new(new_head, self.tx_mempool.clone()); + log!(DEBUG, "Created new Block {:#?}", new_block); + self.blocks.push(new_block); + self.blocks.last().unwrap().clone() + } + pub fn apply(&mut self, tx: &core::Tx) -> Result<(), BlockchainError> { + self.tx_mempool.push(tx.clone()); + return Ok(()); match tx.validate() { Ok(_) => {}, Err(e) => return Err(BlockchainError::Tx(e)) } - if let Some(from_balance) = self.balances.get_mut(tx.get_from()) { - if *from_balance > tx.get_value() { - *from_balance -= tx.get_value(); + if let Some(from_balance) = self.balances.get_mut(tx.from()) { + if *from_balance > tx.value() { + *from_balance -= tx.value(); } else { return Err(BlockchainError::Tx(TxError::FromInsuffitientFonds)) } } else { - return Err(BlockchainError::Tx(TxError::UnknownAccount(tx.get_from().to_string()))) + return Err(BlockchainError::Tx(TxError::UnknownAccount(tx.from().to_string()))) } - if let Some(to_balance) = self.balances.get_mut(&tx.get_to().to_string()) { - *to_balance += tx.get_value() + if let Some(to_balance) = self.balances.get_mut(&tx.to().to_string()) { + *to_balance += tx.value() } else { if tx.is_new_account() { - self.balances.insert(tx.get_to().to_string(), tx.get_value()); + self.balances.insert(tx.to().to_string(), tx.value()); } else { - return Err(BlockchainError::Tx(TxError::UnknownAccount(tx.get_to().to_string()))) + return Err(BlockchainError::Tx(TxError::UnknownAccount(tx.to().to_string()))) } } @@ -94,9 +196,8 @@ impl Blockchain { } -pub fn calculate_block_hash(block: &core::Block) -> String { +pub fn calculate_block_hash(head: &core::BlockHeader) -> String { let mut hasher = sha2::Sha256::new(); - let head = block.head(); hasher.update(head.nonce().to_be_bytes()); hasher.update(head.previous_hash()); @@ -108,6 +209,13 @@ pub fn calculate_block_hash(block: &core::Block) -> String { } impl Blockchain { + pub fn print_blocks(&self) { + println!("Blocks List\n--------------"); + for (i, b) in self.blocks.iter().enumerate() { + println!("Block #{i}\n{:#?}", b.head().block_hash); + } + } + pub fn get_balances(&self) -> &std::collections::HashMap { &self.balances } @@ -120,15 +228,38 @@ impl Blockchain { &self.genesis } + pub fn add_block(&mut self, block: core::Block) { + match self.validate_block(&block) { + Ok(()) => self.blocks.push(block), + Err(e) => match e { + ValidationError::InvalidBlockHash => log!(ERROR, "Invalid Block Hash"), + ValidationError::InvalidPreviousBlockHash => log!(ERROR, "Invalid Previos Block Hash") + } + } + } + + fn validate_block(&self, block: &core::Block) -> Result<(), ValidationError>{ + let head = block.head(); + let hash = calculate_block_hash(block.head()); + if hash != head.block_hash() { + return Err(ValidationError::InvalidBlockHash) + } + if let Some(prev_block) = self.blocks().last() { + if head.previous_hash() != prev_block.head().block_hash() { + return Err(ValidationError::InvalidPreviousBlockHash) + } + } + Ok(()) + } + fn validate_chain(&self) -> Result<(), ValidationError>{ - log!(DEBUG, "Validating Chain"); + log!(INFO, "Validating Chain"); let blocks = self.blocks(); for block in blocks { let head = block.head(); - let hash = calculate_block_hash(block); + let hash = calculate_block_hash(block.head()); if hash != head.block_hash() { - log!(ERROR, "Hash {} does not equal block_hash() {}", hash, head.block_hash()); return Err(ValidationError::InvalidBlockHash) } } @@ -136,7 +267,7 @@ impl Blockchain { } pub fn from_genesis(genesis: Genesis, blocks: Vec) -> Result { - log!(DEBUG, "Starting Chain Build from Genesis"); + log!(INFO, "Starting Chain Build from Genesis"); let chain = Blockchain { genesis, blocks, diff --git a/src/core/tx.rs b/src/core/tx.rs index 377343b..2af1eb2 100644 --- a/src/core/tx.rs +++ b/src/core/tx.rs @@ -30,13 +30,16 @@ impl Tx { pub fn is_reward(&self) -> bool { return self.data == "reward"; } - pub fn get_from(&self) -> &str { + pub fn from(&self) -> &str { &self.from } - pub fn get_to(&self) -> &str { + pub fn to(&self) -> &str { &self.to } - pub fn get_value(&self) -> u32 { + pub fn value(&self) -> u32 { self.value } + pub fn data(&self) -> &str { + &self.data + } } diff --git a/src/main.rs b/src/main.rs index e97e074..acfd7bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,10 +4,11 @@ use error::{ BlockchainError, handle_error }; pub mod log; pub mod error; pub mod args; -pub mod network; pub mod core; +pub mod native_node; +pub mod seeds_constants; -use crate::network::native::NativeNode; +use crate::native_node::node::NativeNode; use crate::args::{get_args, TxCmd, Commands}; const SEED_ADDR: &str = "127.0.0.1:8333"; diff --git a/src/native_node.rs b/src/native_node.rs new file mode 100644 index 0000000..2cd5294 --- /dev/null +++ b/src/native_node.rs @@ -0,0 +1,6 @@ +pub mod node; +pub mod network; +pub mod message; +pub mod error; +pub mod cli; + diff --git a/src/native_node/cli.rs b/src/native_node/cli.rs new file mode 100644 index 0000000..6e7a3bd --- /dev/null +++ b/src/native_node/cli.rs @@ -0,0 +1,81 @@ +use crate::native_node::{message, node}; +use crate::log::*; +use tokio::sync::mpsc; +use crate::core; +use std::io::{self, Write}; + +impl node::NativeNode { + pub async fn cli(command_sender: mpsc::Sender) { + loop { + print!("\n> "); + io::stdout().flush().unwrap(); + let mut input = String::new(); + match io::stdin().read_line(&mut input) { + Ok(_) => { + let input = input.trim(); + if input.is_empty() { + continue ; + } + + let parts: Vec<&str> = input.split_whitespace().collect(); + let command = parts[0]; + let args = &parts[1..]; + + match command { + "id" => { + command_sender.send(node::NodeCommand::DebugShowId).await.unwrap(); + }, + "tx" => { + if args.len() != 4 { + log!(ERROR, "Invalid arg count! Expected {}, got {}", 4, args.len()); + continue; + } + let from = args[0]; + let to = args[1]; + let value = args[2].parse::().unwrap(); + let data = args[3]; + + let tx = core::Tx::new( + from.to_string(), + to.to_string(), + value, + data.to_string() + ); + + let cmd = node::NodeCommand::Transaction { tx }; + + command_sender.send(cmd).await.unwrap(); + }, + "block" => { + let cmd = node::NodeCommand::CreateBlock; + command_sender.send(cmd).await.unwrap(); + }, + "list" => { + if args.len() != 1 { + log!(ERROR, "{command}: Invalid arg! (blocks, peers)"); + continue; + } + match args[0] { + "blocks" => command_sender.send(node::NodeCommand::DebugListBlocks).await.unwrap(), + "peers" => command_sender.send(node::NodeCommand::DebugListPeers).await.unwrap(), + _ => log!(ERROR, "Unkown arg: {}", args[0]), + } + }, + "dump_blocks" => { + command_sender.send(node::NodeCommand::DebugDumpBlocks).await.unwrap(); + }, + "connect" => { + command_sender.send(node::NodeCommand::ConnectToSeeds).await.unwrap(); + } + _ => { + log!(ERROR, "Unkown command {command}"); + continue; + } + } + }, + Err(_) => {} + } + } + } + +} diff --git a/src/native_node/error.rs b/src/native_node/error.rs new file mode 100644 index 0000000..da8f352 --- /dev/null +++ b/src/native_node/error.rs @@ -0,0 +1,12 @@ +#[derive(Debug, Clone)] +pub struct NetworkError { + pub message: String, +} + +impl std::fmt::Display for NetworkError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} + +impl std::error::Error for NetworkError {} diff --git a/src/native_node/message.rs b/src/native_node/message.rs new file mode 100644 index 0000000..df45079 --- /dev/null +++ b/src/native_node/message.rs @@ -0,0 +1,113 @@ +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net; +use crate::native_node::node; +use crate::native_node::error; +use crate::core; + +use crate::log::*; + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub enum ProtocolMessage { + BootstrapRequest { + node_id: uuid::Uuid, + version: String + }, + BootstrapResponse { + genesis: core::Genesis, + blocks: Vec + }, + GetPeersRequest { + peer_id: uuid::Uuid + }, + GetPeersResponse { + peer_addresses: Vec + }, + Handshake { node_id: uuid::Uuid, version: String }, + Block { peer_id: uuid::Uuid, height: u64, block: core::Block }, + Transaction{ peer_id: uuid::Uuid, tx: core::Tx }, + Ping { peer_id: uuid::Uuid }, + Pong { peer_id: uuid::Uuid }, + Disconnect { peer_id: uuid::Uuid }, +} + +impl node::NativeNode { + + pub async fn send_message(stream: &mut net::TcpStream, message: &ProtocolMessage) -> Result<(), error::NetworkError> { + // Serialize message to JSON + let json = serde_json::to_string(message) + .map_err(|e| error::NetworkError { message: format!("Failed to serialize: {}", e) })?; + let data = json.as_bytes(); + + // Send length prefix (4 bytes) + let len = data.len() as u32; + stream.write_all(&len.to_be_bytes()).await + .map_err(|e| error::NetworkError { message: format!("Failed to write data: {}", e) })?; + + // Send the actual data + stream.write_all(data).await + .map_err(|e| error::NetworkError { message: format!("Failed to write data: {}", e) })?; + stream.flush().await + .map_err(|e| error::NetworkError { message: format!("Failed to flush stream: {}", e) })?; + Ok(()) + } + + pub async fn receive_message(stream: &mut tokio::net::TcpStream) -> Result { + let mut len_bytes = [0u8; 4]; + stream.read_exact(&mut len_bytes).await + .map_err(|e| error::NetworkError { message: format!("Failed to read length: {}", e) })?; + + let len = u32::from_be_bytes(len_bytes) as usize; + + let mut data = vec![0u8; len]; + stream.read_exact(&mut data).await + .map_err(|e| error::NetworkError { message: format!("Failed to read data: {}", e) })?; + + let json = String::from_utf8(data) + .map_err(|e| error::NetworkError { message: format!("Invalid UTF-8: {}", e) })?; + + let message: ProtocolMessage = serde_json::from_str(&json) + .map_err(|e| error::NetworkError { message: format!("JSON parse error: {}", e) })?; + + Ok(message) + } + + pub async fn process_message(&mut self, peer_id: uuid::Uuid, message: &ProtocolMessage) { + + match message { + ProtocolMessage::BootstrapRequest { .. } => { + log!(INFO, "Received BootstrapRequest from {peer_id}"); + let peer = &self.tcp_peers[&peer_id]; + let resp = ProtocolMessage::BootstrapResponse { + genesis: self.chain.genesis().clone(), + blocks: self.chain.blocks().to_vec() + }; + peer.sender.send(resp).await.unwrap(); + log!(INFO, "Send BootstrapResponse to {peer_id}"); + }, + ProtocolMessage::Ping {peer_id} => { + log!(INFO, "Received Ping from {peer_id}"); + let resp = ProtocolMessage::Pong { peer_id: self.id.clone() }; + let peer = &self.tcp_peers[peer_id]; + peer.sender.send(resp).await.unwrap(); + }, + ProtocolMessage::GetPeersRequest { peer_id } => { + log!(INFO, "Received GetPeersRequest from {peer_id}"); + let peers = self.peer_addresses(); + let resp = ProtocolMessage::GetPeersResponse { peer_addresses: peers }; + let peer = &self.tcp_peers[peer_id]; + peer.sender.send(resp).await.unwrap(); + } + ProtocolMessage::Block { block, ..} => { + log!(INFO, "Received Block from {peer_id}"); + self.chain.add_block(block.clone()) + } + ProtocolMessage::Transaction { tx, ..} => { + log!(INFO, "Received Transaction from {peer_id}"); + self.chain.apply(tx).unwrap() + } + _ => { + log!(DEBUG, "TODO: implement this message type"); + } + } + } +} diff --git a/src/native_node/network.rs b/src/native_node/network.rs new file mode 100644 index 0000000..4bda228 --- /dev/null +++ b/src/native_node/network.rs @@ -0,0 +1,129 @@ +use crate::native_node::{message, node}; + +use crate::seeds_constants::SEED_NODES; + +use crate::log::*; +use tokio::select; +use tokio::sync::mpsc; + +impl node::NativeNode { + pub async fn connect_to_seeds(&mut self, sender: mpsc::Sender) { + for seed in SEED_NODES { + if seed != self.addr { + if let Ok(mut stream) = tokio::net::TcpStream::connect(seed).await { + if let Ok(peer_id) = node::NativeNode::send_handshake(self.id.clone(), &mut stream).await { + let sender = sender.clone(); + node::NativeNode::establish_connection(peer_id, seed.to_string(), stream, sender).await; + } + } + } + } + } + + pub async fn establish_connection( + peer_id: uuid::Uuid, + addr: String, + stream: tokio::net::TcpStream, + request_sender: tokio::sync::mpsc::Sender + ) { + let (response_sender, response_receiver) = mpsc::channel::(100); + + let add_peer = node::NodeCommand::AddPeer { + peer_id, + addr: addr.clone(), + sender: response_sender + }; + + if let Err(_) = request_sender.send(add_peer).await { + log!(ERROR, "Failed to send AddPeer to {}", addr); + } + + log!(INFO, "Established Connection with {}", addr); + node::NativeNode::start_peer_handler(stream, peer_id, request_sender.clone(), response_receiver).await; + } + + pub async fn accept_connections( + listner: tokio::net::TcpListener, + request_sender: tokio::sync::mpsc::Sender, + node_id: uuid::Uuid + ) { + log!(INFO, "Starting to accept connections"); + + while let Ok((mut stream, addr)) = listner.accept().await { + let handshake_response = message::ProtocolMessage::Handshake { + node_id: node_id.clone(), + version: "".to_string() + }; + + if let Ok(_) = node::NativeNode::send_message(&mut stream, &handshake_response).await { + if let Ok(message) = node::NativeNode::receive_message(&mut stream).await { + match message { + message::ProtocolMessage::Handshake { node_id, .. } => { + node::NativeNode::establish_connection(node_id, addr.to_string(), stream, request_sender.clone()).await; + }, + _ => { + log!(WARNING, "Invalid Response! expected Handshake, got {:?}", message); + } + } + } + } + } + + } + + async fn start_peer_handler( + mut stream: tokio::net::TcpStream, + peer_id: uuid::Uuid, + request_sender: tokio::sync::mpsc::Sender, + mut response_receiver: tokio::sync::mpsc::Receiver + ) { + let peer_id_clone = peer_id.clone(); + + tokio::spawn(async move { + log!(INFO, "Started Message Handler for {}", peer_id_clone); + + loop { + select! { + response_result = response_receiver.recv() => { + match response_result { + Some(response) => { + log!(INFO, "Sending response to {peer_id_clone}: {:#?}", response); + if let Err(e) = node::NativeNode::send_message(&mut stream, &response).await { + log!(ERROR, "Failed to send response to {peer_id_clone}: {}", e); + break; + } + }, + None => { + log!(INFO, "Response channel closed for {peer_id_clone}"); + break; + } + } + } + message_result = node::NativeNode::receive_message(&mut stream) => { + match message_result { + Ok(message) => { + log!(INFO, "Received Message from {peer_id_clone}"); + + let command = node::NodeCommand::ProcessMessage { + peer_id, + message: message.clone() + }; + + if request_sender.send(command).await.is_err() { + log!(ERROR, "Failed to send command to main thread from {peer_id}"); + break; + } + }, + Err(e) => { + log!(WARNING, "Connection to {peer_id_clone} closed: {}", e.message); + let cmd = node::NodeCommand::RemovePeer { peer_id: peer_id_clone.clone() }; + request_sender.send(cmd).await.unwrap(); + break; + } + } + } + } + } + }); + } +} diff --git a/src/native_node/node.rs b/src/native_node/node.rs new file mode 100644 index 0000000..205d752 --- /dev/null +++ b/src/native_node/node.rs @@ -0,0 +1,275 @@ +use crate::core::{self, ValidationError}; +use crate::native_node::message::{self, ProtocolMessage}; + +use crate::seeds_constants::SEED_NODES; + +use std::io::{Read, Write}; +use std::collections::HashMap; +use crate::log::*; +use tokio::sync::mpsc; +use uuid::Uuid; + +pub struct TcpPeer { + pub id: Uuid, + pub addr: String, + pub sender: tokio::sync::mpsc::Sender +} + +pub struct NativeNode { + pub id: Uuid, + pub addr: String, + pub tcp_peers: HashMap, + pub ws: Vec, + pub chain: core::Blockchain, + pub db_file: std::fs::File +} + +#[derive(Debug)] +pub enum NodeCommand { + AddPeer { peer_id: Uuid, addr: String, sender: tokio::sync::mpsc::Sender }, + RemovePeer { peer_id: Uuid }, + ProcessMessage { peer_id: Uuid, message: ProtocolMessage }, + Transaction { tx: core::Tx }, + CreateBlock, + DebugListBlocks, + DebugListPeers, + DebugShowId, + DebugDumpBlocks, + ConnectToSeeds +} + + +impl NativeNode { + + pub fn peer_addresses(&self) -> Vec { + let mut addr: Vec = self.tcp_peers.iter().map(|p| p.1.addr.to_string()).collect(); + addr.push(self.addr.clone()); + addr + } + + pub fn list_peers(&self) { + println!("Peer List\n-----------"); + for (i, p) in self.tcp_peers.iter().enumerate() { + println!("Peer #{i}: {}", p.1.id) + } + } + + pub fn show_id(&self) { + println!("Node Id: {}", self.id) + } + + fn remove_tcp_peer(&mut self, peer_id: Uuid) { + log!(INFO, "Removing Peer {peer_id}"); + self.tcp_peers.remove_entry(&peer_id); + } + + fn add_tcp_peer(&mut self, id: Uuid, addr: String, sender: tokio::sync::mpsc::Sender) { + let peer = TcpPeer { + id: id, + addr, + sender + }; + + log!(INFO, "Adding Peer {}", peer.id); + + self.tcp_peers.insert(id, peer); + } + + + fn persist(&mut self) { + for t in self.chain.blocks() { + let json = serde_json::to_string(&t).unwrap(); + self.db_file.write(json.as_bytes()).unwrap(); + } + } + + pub fn new_with_id(id: uuid::Uuid, chain: core::Blockchain, db_file: std::fs::File, addr: String) -> Self { + Self { + id, + tcp_peers: HashMap::new(), + ws: Vec::new(), + chain, + addr, + db_file + } + } + + pub fn new(chain: core::Blockchain, db_file: std::fs::File, addr: String) -> Self { + Self { + id: Uuid::new_v4(), + tcp_peers: HashMap::new(), + ws: Vec::new(), + chain, + addr, + db_file + } + } + + pub async fn send_handshake(id: uuid::Uuid, stream: &mut tokio::net::TcpStream) -> Result { + let handshake = ProtocolMessage::Handshake { node_id: id.clone(), version: "".to_string() }; + NativeNode::send_message(stream, &handshake).await.unwrap(); + if let Ok(response) = NativeNode::receive_message(stream).await { + match response { + message::ProtocolMessage::Handshake { node_id, version: _ } => { + Ok(node_id) + }, + _ => { + log!(ERROR, "Invalid response on Handshake"); + Err(ValidationError::InvalidBlockHash) + } + } + } else { + Err(ValidationError::InvalidBlockHash) + } + } + + pub async fn bootstrap(addr: &str) -> Result { + log!(INFO, "Running As Native Node"); + + let mut stream = tokio::net::TcpStream::connect(SEED_NODES[0]).await.unwrap(); + + let id = uuid::Uuid::new_v4(); + + if let Ok(_) = NativeNode::send_handshake(id, &mut stream).await { + let message = message::ProtocolMessage::BootstrapRequest { node_id: id.clone(), version: "".to_string() }; + NativeNode::send_message(&mut stream, &message).await.unwrap(); + log!(INFO, "Sent BootstrapRequest to seed"); + if let Ok(response) = NativeNode::receive_message(&mut stream).await { + match response { + ProtocolMessage::BootstrapResponse { genesis, blocks } => { + log!(INFO, "Received BootstrapResponse from seed"); + let chain = core::Blockchain::from_genesis(genesis, blocks)?; + let node = Self::new_with_id(id, chain, std::fs::File::open("./database/tx.db").unwrap(), addr.to_string()); + Ok(node) + }, + _ => { + log!(ERROR, "Invalid Response from BootstrapRequest: {:?}", &response); + Err(ValidationError::InvalidBlockHash) + } + } + } else { + Err(ValidationError::InvalidBlockHash) + } + } else { + Err(ValidationError::InvalidBlockHash) + } + } + + pub fn seed(addr: String) -> Self { + log!(INFO, "Running As Seed Node"); + let cwd = std::env::current_dir().unwrap(); + let mut genpath = std::path::PathBuf::from(&cwd); + genpath.push("database"); + genpath.push("genesis.json"); + let mut gen_file = std::fs::File::open(genpath).unwrap(); + + let mut buf = String::new(); + gen_file.read_to_string(&mut buf).unwrap(); + + let mut db_file: std::fs::File = { + let mut db_path = std::path::PathBuf::from(&cwd); + db_path.push("database"); + db_path.push("tx.db"); + + std::fs::OpenOptions::new().read(true).write(true).create(true).open(&db_path).unwrap() + }; + + let genesis = serde_json::from_str::(&buf).unwrap(); + + buf.clear(); + db_file.read_to_string(&mut buf).unwrap(); + + let buf = buf.trim(); + + log!(DEBUG, "Buf content: {:#?}", buf); + let blocks = if !buf.is_empty() { + serde_json::from_str::>(&buf).unwrap() + } else { + vec![] + }; + let chain = core::Blockchain::from_genesis(genesis, blocks).unwrap(); + + Self::new(chain, db_file, addr) + } + + pub async fn broadcast_transaction(&self, tx: &core::Tx) { + for (id, peer) in &self.tcp_peers { + let message = ProtocolMessage::Transaction{peer_id: self.id, tx: tx.clone()}; + peer.sender.send(message).await.unwrap(); + log!(DEBUG, "Send Transaction message to {id}"); + } + } + + pub async fn broadcast_block(&self, block: &core::Block) { + for (id, peer) in &self.tcp_peers { + let message = ProtocolMessage::Block{ + peer_id: self.id, + height: self.chain.blocks().len() as u64, + block: block.clone() + }; + peer.sender.send(message).await.unwrap(); + log!(DEBUG, "Send Block message to {id}"); + } + } + + pub async fn run_native(&mut self) { + let tcp_listner = tokio::net::TcpListener::bind(&self.addr).await.unwrap(); + + let (channel_write, mut channel_read) = mpsc::channel::(100); + + let id = self.id.clone(); + tokio::spawn({ + let c = channel_write.clone(); + async move { + NativeNode::accept_connections(tcp_listner, c, id).await; + }}); + + tokio::spawn({ + let c = channel_write.clone(); + async move { + NativeNode::cli(c).await; + } + }); + + while let Some(command) = channel_read.recv().await { + match command { + NodeCommand::ConnectToSeeds => { + self.connect_to_seeds(channel_write.clone()).await; + }, + NodeCommand::AddPeer { peer_id, addr, sender } => { + self.add_tcp_peer(peer_id, addr, sender); + }, + NodeCommand::RemovePeer { peer_id } => { + self.remove_tcp_peer(peer_id); + } + NodeCommand::ProcessMessage { peer_id, message } => { + self.process_message(peer_id, &message).await; + }, + NodeCommand::Transaction { tx } => { + self.chain.apply(&tx).unwrap(); + self.broadcast_transaction(&tx).await; + }, + NodeCommand::CreateBlock => { + log!(INFO, "Received CreateBlock Command"); + let block = self.chain.create_block(); + self.broadcast_block(&block).await; + }, + NodeCommand::DebugListBlocks => { + log!(INFO, "Received DebugListBlocks command"); + self.chain.print_blocks(); + }, + NodeCommand::DebugListPeers => { + log!(INFO, "Received DebugListPeers command"); + self.list_peers(); + }, + NodeCommand::DebugShowId => { + log!(INFO, "Received DebugListBlocks command"); + self.show_id(); + }, + NodeCommand::DebugDumpBlocks => { + self.chain.dump_blocks(&mut self.db_file); + } + } + } + } +} diff --git a/src/network.rs b/src/network.rs deleted file mode 100644 index ce12f2f..0000000 --- a/src/network.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod native; diff --git a/src/network/native.rs b/src/network/native.rs deleted file mode 100644 index 2e88134..0000000 --- a/src/network/native.rs +++ /dev/null @@ -1,375 +0,0 @@ -use crate::core::{self, ValidationError}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::{TcpStream}; - -use std::io::{self, Read, Write}; -use std::collections::HashMap; -use crate::log::*; -use tokio::sync::mpsc; - -struct TcpPeer { - id: String, - sender: tokio::sync::mpsc::Sender -} - -pub struct NativeNode { - addr: String, - tcp: HashMap, - ws: Vec, - chain: core::Blockchain, - db_file: std::fs::File -} - -async fn handle_browser_connection(websocket: warp::ws::WebSocket) { - println!("Browser connected via WebSocket on addr {:?}", websocket); -} - -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] -pub enum ProtocolMessage { - BootstrapRequest { - node_id: String, - version: String - }, - BootstrapResponse { - genesis: core::Genesis, - blocks: Vec - }, - Handshake { node_id: String, version: String }, - Block { height: u64, data: String }, - Transaction(core::Tx), - Ping { peer_id: String }, - Pong { peer_id: String }, - Disconnect { peer_id: String }, -} - -#[derive(Debug)] -enum NodeCommand { - AddPeer { peer_id: String, sender: tokio::sync::mpsc::Sender }, - ProcessMessage { peer_id: String, message: ProtocolMessage }, - Transaction { tx: core::Tx } -} - -#[derive(Debug, Clone)] -pub struct NetworkError { - pub message: String, -} - -impl std::fmt::Display for NetworkError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.message) - } -} - -impl std::error::Error for NetworkError {} - - -impl NativeNode { - - fn add_tcp_peer(&mut self, id: String, sender: tokio::sync::mpsc::Sender) { - let peer = TcpPeer { - id: id.clone(), - sender - }; - - self.tcp.insert(id, peer); - } - - async fn send_message(stream: &mut TcpStream, message: &ProtocolMessage) -> Result<(), Box> { - // Serialize message to JSON - let json = serde_json::to_string(message)?; - let data = json.as_bytes(); - - // Send length prefix (4 bytes) - let len = data.len() as u32; - stream.write_all(&len.to_be_bytes()).await?; - - // Send the actual data - stream.write_all(data).await?; - stream.flush().await?; - - println!("Sent: {:?}", message); - Ok(()) - } - - async fn receive_message(stream: &mut tokio::net::TcpStream) -> Result { - let mut len_bytes = [0u8; 4]; - stream.read_exact(&mut len_bytes).await - .map_err(|e| NetworkError { message: format!("Failed to read length: {}", e) })?; - - let len = u32::from_be_bytes(len_bytes) as usize; - - let mut data = vec![0u8; len]; - stream.read_exact(&mut data).await - .map_err(|e| NetworkError { message: format!("Failed to read data: {}", e) })?; - - let json = String::from_utf8(data) - .map_err(|e| NetworkError { message: format!("Invalid UTF-8: {}", e) })?; - - let message: ProtocolMessage = serde_json::from_str(&json) - .map_err(|e| NetworkError { message: format!("JSON parse error: {}", e) })?; - - Ok(message) - } - - fn persist(&mut self) { - for t in self.chain.blocks() { - let json = serde_json::to_string(&t).unwrap(); - self.db_file.write(json.as_bytes()).unwrap(); - } - } - - pub fn new(chain: core::Blockchain, db_file: std::fs::File, addr: String) -> Self { - Self { - tcp: HashMap::new(), - ws: Vec::new(), - chain, - addr, - db_file - } - } - - pub async fn bootstrap(addr: &str) -> Result { - const SEED_NODES: [&str; 1] = [ - "127.0.0.1:8333" - ]; - - log!(INFO, "Running As Native Node"); - - let mut stream = tokio::net::TcpStream::connect(SEED_NODES[0]).await.unwrap(); - - let mes = ProtocolMessage::BootstrapRequest { node_id: "".to_string(), version: "".to_string() }; - - NativeNode::send_message(&mut stream, &mes).await.unwrap(); - let response = NativeNode::receive_message(&mut stream).await.unwrap(); - - match response { - ProtocolMessage::BootstrapResponse { genesis, blocks } => { - let chain = core::Blockchain::from_genesis(genesis, blocks)?; - let node = Self::new(chain, std::fs::File::open("./database/tx.db").unwrap(), addr.to_string()); - Ok(node) - }, - _ => { - log!(ERROR, "Invalid Response from BootstrapRequest: {:?}", &response); - Err(ValidationError::InvalidBlockHash) - } - } - } - - pub fn seed(addr: String) -> Self { - log!(INFO, "Running As Seed Node"); - let cwd = std::env::current_dir().unwrap(); - let mut genpath = std::path::PathBuf::from(&cwd); - genpath.push("database"); - genpath.push("genesis.json"); - let mut gen_file = std::fs::File::open(genpath).unwrap(); - - let mut buf = String::new(); - gen_file.read_to_string(&mut buf).unwrap(); - - let mut db_file: std::fs::File = { - let mut db_path = std::path::PathBuf::from(&cwd); - db_path.push("database"); - db_path.push("tx.db"); - - std::fs::OpenOptions::new().read(true).write(true).create(true).open(&db_path).unwrap() - }; - - let genesis = serde_json::from_str::(&buf).unwrap(); - - buf.clear(); - db_file.read_to_string(&mut buf).unwrap(); - - let blocks = serde_json::from_str::>(&buf).unwrap(); - let chain = core::Blockchain::from_genesis(genesis, blocks).unwrap(); - - Self::new(chain, db_file, addr) - } - - async fn accept_connections( - listner: tokio::net::TcpListener, - request_sender: tokio::sync::mpsc::Sender - ) { - log!(INFO, "Starting to accept connections"); - - let mut connection_count: i32 = 0; - - while let Ok((stream, addr)) = listner.accept().await { - connection_count += 1; - let peer_id = format!("peer_{}", connection_count); - - let (response_sender, mut response_receiver) = mpsc::channel::(100); - - log!(INFO, "New Connection from {}", addr); - let add_peer = NodeCommand::AddPeer { - peer_id: peer_id.clone(), - sender: response_sender - }; - - if let Err(_) = request_sender.send(add_peer).await { - log!(ERROR, "Failed to send AddPeer to {}", addr); - break; - } - - NativeNode::start_peer_handler(stream, peer_id, request_sender.clone(), response_receiver).await; - } - - } - - async fn start_peer_handler( - mut stream: tokio::net::TcpStream, - peer_id: String, - request_sender: tokio::sync::mpsc::Sender, - mut response_receiver: tokio::sync::mpsc::Receiver - ) { - let peer_id_clone = peer_id.clone(); - - tokio::spawn(async move { - log!(INFO, "Started Message Handler for {}", peer_id_clone); - - loop { - match NativeNode::receive_message(&mut stream).await { - Ok(message) => { - log!(INFO, "Received Message from {peer_id_clone}"); - - let command = NodeCommand::ProcessMessage { - peer_id: peer_id.clone(), - message: message.clone() - }; - - if request_sender.send(command).await.is_err() { - log!(ERROR, "Failed to send command to main thread from {peer_id}"); - break; - } - - match message { - ProtocolMessage::Ping { peer_id: _ } => { - let resp = response_receiver.recv().await.unwrap(); - if !matches!(&resp, ProtocolMessage::Pong { peer_id: _ }) { - log!{ERROR, "Invalid Response to Ping Request"}; - } else { - NativeNode::send_message(&mut stream, &resp).await; - } - }, - ProtocolMessage::BootstrapRequest { node_id, version } => { - let resp = response_receiver.recv().await.unwrap(); - if !matches!(&resp, ProtocolMessage::BootstrapResponse { genesis, blocks }) { - log!{ERROR, "Invalid Response to Ping Request"}; - } else { - NativeNode::send_message(&mut stream, &resp).await; - } - } - _ => {} - } - }, - Err(e) => { - log!(ERROR, "Failed to read message from {peer_id_clone}: {e}"); - break; - } - } - } - }); - } - - async fn process_message(&mut self, peer_id: String, message: &ProtocolMessage) { - - match message { - ProtocolMessage::BootstrapRequest { node_id, version } => { - log!(INFO, "Received BootstrapRequest from {peer_id}"); - let peer = &self.tcp[&peer_id]; - let resp = ProtocolMessage::BootstrapResponse { - genesis: self.chain.genesis().clone(), - blocks: self.chain.blocks().to_vec() - }; - peer.sender.send(resp).await.unwrap(); - log!(INFO, "Send BootstrapResponse to {peer_id}"); - }, - ProtocolMessage::Ping {peer_id} => { - log!(INFO, "Received Ping from {peer_id}"); - let peer = &self.tcp[peer_id]; - let resp = ProtocolMessage::Pong { peer_id: "seed".to_string() }; - peer.sender.send(resp).await.unwrap(); - }, - _ => { - log!(DEBUG, "TODO: implement this message type"); - } - } - } - - async fn cli(command_sender: mpsc::Sender) { - loop { - let mut input = String::new(); - match io::stdin().read_line(&mut input) { - Ok(_) => { - let input = input.trim(); - if input.is_empty() { - continue ; - } - - let parts: Vec<&str> = input.split_whitespace().collect(); - let command = parts[0]; - let args = &parts[1..]; - - match command { - "tx" => { - if args.len() != 4 { - log!(ERROR, "Invalid arg count! Expected {}", 4); - } - let from = args[0]; - let to = args[1]; - let value = args[2].parse::().unwrap(); - let data = args[3]; - - let tx = core::Tx::new( - from.to_string(), - to.to_string(), - value, - data.to_string() - ); - - - } - _ => { - log!(ERROR, "Unkown command {command}"); - continue; - } - } - - }, - Err(_) => {} - } - } - } - - pub async fn run_native(&mut self) { - let tcp_listner = tokio::net::TcpListener::bind(&self.addr).await.unwrap(); - - let (channel_write, mut channel_read) = mpsc::channel::(100); - - tokio::spawn({ - let c = channel_write.clone(); - async move { - NativeNode::accept_connections(tcp_listner, c).await; - }}); - - tokio::spawn({ - let c = channel_write.clone(); - async move { - NativeNode::cli(c).await; - } - }); - - while let Some(command) = channel_read.recv().await { - match command { - NodeCommand::AddPeer { peer_id, sender } => { - self.add_tcp_peer(peer_id, sender); - }, - NodeCommand::ProcessMessage { peer_id, message } => { - self.process_message(peer_id, &message).await; - }, - NodeCommand::Transaction { tx } => { - self.chain.apply(&tx).unwrap(); - } - } - } - } -} diff --git a/src/network/network_browser.rs b/src/network/network_browser.rs deleted file mode 100644 index 15943b2..0000000 --- a/src/network/network_browser.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub struct BrowserNetwork { - -} diff --git a/src/seeds_constants.rs b/src/seeds_constants.rs new file mode 100644 index 0000000..e4ce96c --- /dev/null +++ b/src/seeds_constants.rs @@ -0,0 +1,3 @@ +pub const SEED_NODES: [&str; 1] = [ + "127.0.0.1:8333" +];