From 659d1aa63ad64c3f9bf0f3a83dc6cfc96eb1b5cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Tue, 14 Nov 2017 03:52:26 +0100 Subject: [PATCH] Shell: remove unrequired `net_id` from network messages --- src/node/shell/distributed_db.ml | 202 +++++++++++++--------- src/node/shell/distributed_db_message.ml | 82 +++++---- src/node/shell/distributed_db_message.mli | 12 +- 3 files changed, 164 insertions(+), 132 deletions(-) diff --git a/src/node/shell/distributed_db.ml b/src/node/shell/distributed_db.ml index 52dc75cfe..f4a39334b 100644 --- a/src/node/shell/distributed_db.ml +++ b/src/node/shell/distributed_db.ml @@ -82,8 +82,8 @@ module Raw_operation = (Fake_operation_storage) (Operation_hash.Table) (struct - type param = Net_id.t - let forge net_id keys = Message.Get_operations (net_id, keys) + type param = unit + let forge () keys = Message.Get_operations keys end) (struct type param = unit @@ -112,8 +112,8 @@ module Raw_block_header = (Block_header_storage) (Block_hash.Table) (struct - type param = Net_id.t - let forge net_id keys = Message.Get_block_headers (net_id, keys) + type param = unit + let forge () keys = Message.Get_block_headers keys end) (struct type param = unit @@ -164,9 +164,9 @@ module Raw_operation_hashes = struct (Operation_hashes_storage) (Operations_table) (struct - type param = Net_id.t - let forge net_id keys = - Message.Get_operation_hashes_for_blocks (net_id, keys) + type param = unit + let forge () keys = + Message.Get_operation_hashes_for_blocks keys end) (struct type param = Operation_list_list_hash.t @@ -232,9 +232,9 @@ module Raw_operations = struct (Operations_storage) (Operations_table) (struct - type param = Net_id.t - let forge net_id keys = - Message.Get_operations_for_blocks (net_id, keys) + type param = unit + let forge () keys = + Message.Get_operations_for_blocks keys end) (struct type param = Operation_list_list_hash.t @@ -340,7 +340,14 @@ let state { disk } = disk let net_state { net_state } = net_state let db { global_db } = global_db -let find_pending_block_header active_nets h = +let read_block_header { disk } h = + State.read_block disk h >>= function + | Some b -> + Lwt.return_some (State.Block.net_id b, State.Block.header b) + | None -> + Lwt.return_none + +let find_pending_block_header { peer_active_nets } h = Net_id.Table.fold (fun _net_id net_db acc -> match acc with @@ -349,10 +356,34 @@ let find_pending_block_header active_nets h = net_db.block_header_db.table h -> Some net_db | None -> None) - active_nets + peer_active_nets None -let find_pending_operation active_nets h = +let find_pending_operations { peer_active_nets } h i = + Net_id.Table.fold + (fun _net_id net_db acc -> + match acc with + | Some _ -> acc + | None when Raw_operations.Table.pending + net_db.operations_db.table (h, i) -> + Some net_db + | None -> None) + peer_active_nets + None + +let find_pending_operation_hashes { peer_active_nets } h i = + Net_id.Table.fold + (fun _net_id net_db acc -> + match acc with + | Some _ -> acc + | None when Raw_operation_hashes.Table.pending + net_db.operation_hashes_db.table (h, i) -> + Some net_db + | None -> None) + peer_active_nets + None + +let find_pending_operation { peer_active_nets } h = Net_id.Table.fold (fun _net_id net_db acc -> match acc with @@ -361,9 +392,22 @@ let find_pending_operation active_nets h = net_db.operation_db.table h -> Some net_db | None -> None) - active_nets + peer_active_nets None +let read_operation { active_nets } h = + Net_id.Table.fold + (fun net_id net_db acc -> + acc >>= function + | Some _ -> acc + | None -> + Raw_operation.Table.read_opt + net_db.operation_db.table h >>= function + | None -> Lwt.return_none + | Some bh -> Lwt.return_some (net_id, bh)) + active_nets + Lwt.return_none + module P2p_reader = struct let may_activate global_db state net_id f = @@ -464,22 +508,22 @@ module P2p_reader = struct (* TODO Kickban *) Lwt.return_unit - | Get_block_headers (net_id, hashes) -> - may_handle state net_id @@ fun net_db -> - (* TODO: Blame request of unadvertised blocks ? *) + | Get_block_headers hashes -> Lwt_list.iter_p (fun hash -> - State.Block.read_opt net_db.net_state hash >|= function - | None -> () - | Some b -> - let header = State.Block.header b in + read_block_header global_db hash >>= function + | None -> + (* TODO: Blame request of unadvertised blocks ? *) + Lwt.return_unit + | Some (_net_id, header) -> ignore @@ - P2p.try_send global_db.p2p state.conn (Block_header header)) + P2p.try_send global_db.p2p state.conn (Block_header header) ; + Lwt.return_unit) hashes | Block_header block -> begin let hash = Block_header.hash block in - match find_pending_block_header state.peer_active_nets hash with + match find_pending_block_header state hash with | None -> (* TODO some penalty. *) Lwt.return_unit @@ -489,22 +533,22 @@ module P2p_reader = struct Lwt.return_unit end - | Get_operations (net_id, hashes) -> - may_handle state net_id @@ fun net_db -> - (* TODO: only answers for prevalidated operations *) + | Get_operations hashes -> Lwt_list.iter_p (fun hash -> - Raw_operation.Table.read_opt - net_db.operation_db.table hash >|= function - | None -> () - | Some p -> + read_operation global_db hash >>= function + | None -> + (* TODO: Blame request of unadvertised operations ? *) + Lwt.return_unit + | Some (_net_id, op) -> ignore @@ - P2p.try_send global_db.p2p state.conn (Operation p)) + P2p.try_send global_db.p2p state.conn (Operation op) ; + Lwt.return_unit) hashes | Operation operation -> begin let hash = Operation.hash operation in - match find_pending_operation state.peer_active_nets hash with + match find_pending_operation state hash with | None -> (* TODO some penalty. *) Lwt.return_unit @@ -517,11 +561,14 @@ module P2p_reader = struct | Get_protocols hashes -> Lwt_list.iter_p (fun hash -> - State.Protocol.read_opt global_db.disk hash >|= function - | None -> () + State.Protocol.read_opt global_db.disk hash >>= function + | None -> + (* TODO: Blame request of unadvertised protocol ? *) + Lwt.return_unit | Some p -> ignore @@ - P2p.try_send global_db.p2p state.conn (Protocol p)) + P2p.try_send global_db.p2p state.conn (Protocol p) ; + Lwt.return_unit) hashes | Protocol protocol -> @@ -530,54 +577,57 @@ module P2p_reader = struct global_db.protocol_db.table state.gid hash protocol >>= fun () -> Lwt.return_unit - | Get_operation_hashes_for_blocks (net_id, blocks) -> - may_handle state net_id @@ fun net_db -> - (* TODO: Blame request of unadvertised blocks ? *) + | Get_operation_hashes_for_blocks blocks -> Lwt_list.iter_p (fun (hash, ofs) -> - State.Block.read_opt net_db.net_state hash >>= function + State.read_block global_db.disk hash >>= function | None -> Lwt.return_unit - | Some b -> - State.Block.operation_hashes b ofs >>= fun (hashes, path) -> + | Some block -> + State.Block.operation_hashes + block ofs >>= fun (hashes, path) -> ignore @@ - P2p.try_send global_db.p2p state.conn - (Operation_hashes_for_block - (net_id, hash, ofs, hashes, path)) ; + P2p.try_send global_db.p2p state.conn @@ + Operation_hashes_for_block (hash, ofs, hashes, path) ; Lwt.return_unit) blocks - | Operation_hashes_for_block (net_id, block, ofs, ops, path) -> begin - may_handle state net_id @@ fun net_db -> - (* TODO early detection of non-requested list. *) - Raw_operation_hashes.Table.notify - net_db.operation_hashes_db.table state.gid - (block, ofs) (ops, path) >>= fun () -> - Lwt.return_unit + | Operation_hashes_for_block (block, ofs, ops, path) -> begin + match find_pending_operation_hashes state block ofs with + | None -> + (* TODO some penalty. *) + Lwt.return_unit + | Some net_db -> + Raw_operation_hashes.Table.notify + net_db.operation_hashes_db.table state.gid + (block, ofs) (ops, path) >>= fun () -> + Lwt.return_unit end - | Get_operations_for_blocks (net_id, blocks) -> - may_handle state net_id @@ fun net_db -> - (* TODO: Blame request of unadvertised blocks ? *) + | Get_operations_for_blocks blocks -> Lwt_list.iter_p (fun (hash, ofs) -> - State.Block.read_opt net_db.net_state hash >>= function + State.read_block global_db.disk hash >>= function | None -> Lwt.return_unit - | Some b -> - State.Block.operations b ofs >>= fun (hashes, path) -> + | Some block -> + State.Block.operations + block ofs >>= fun (ops, path) -> ignore @@ - P2p.try_send global_db.p2p state.conn - (Operations_for_block - (net_id, hash, ofs, hashes, path)) ; + P2p.try_send global_db.p2p state.conn @@ + Operations_for_block (hash, ofs, ops, path) ; Lwt.return_unit) blocks - | Operations_for_block (net_id, block, ofs, ops, path) -> - may_handle state net_id @@ fun net_db -> - (* TODO early detection of non-requested operations. *) - Raw_operations.Table.notify - net_db.operations_db.table state.gid - (block, ofs) (ops, path) >>= fun () -> - Lwt.return_unit + | Operations_for_block (block, ofs, ops, path) -> begin + match find_pending_operations state block ofs with + | None -> + (* TODO some penalty. *) + Lwt.return_unit + | Some net_db -> + Raw_operations.Table.notify + net_db.operations_db.table state.gid + (block, ofs) (ops, path) >>= fun () -> + Lwt.return_unit + end let rec worker_loop global_db state = Lwt_utils.protect ~canceler:state.canceler begin fun () -> @@ -657,7 +707,7 @@ let activate ({ p2p ; active_nets } as global_db) net_state = | exception Not_found -> let active_peers = ref P2p.Peer_id.Set.empty in let p2p_request = - { data = net_id ; + { data = () ; active = (fun () -> !active_peers) ; send = raw_try_send p2p ; } in @@ -829,22 +879,6 @@ module Block_header = struct and type param := unit) end -let read_block_header { disk ; active_nets } h = - State.read_block disk h >>= function - | Some b -> - Lwt.return_some (State.Block.net_id b, State.Block.header b) - | None -> - Net_id.Table.fold - (fun net_id net_db acc -> - acc >>= function - | Some _ -> acc - | None -> - Block_header.read_opt net_db h >>= function - | None -> Lwt.return_none - | Some bh -> Lwt.return_some (net_id, bh)) - active_nets - Lwt.return_none - module Operation_hashes = Make (Raw_operation_hashes.Table) (struct type t = net_db diff --git a/src/node/shell/distributed_db_message.ml b/src/node/shell/distributed_db_message.ml index 9dc2d1ed0..992a6d06c 100644 --- a/src/node/shell/distributed_db_message.ml +++ b/src/node/shell/distributed_db_message.ml @@ -16,23 +16,23 @@ type t = | Get_current_head of Net_id.t | Current_head of Net_id.t * Block_header.t * Mempool.t - | Get_block_headers of Net_id.t * Block_hash.t list + | Get_block_headers of Block_hash.t list | Block_header of Block_header.t - | Get_operations of Net_id.t * Operation_hash.t list + | Get_operations of Operation_hash.t list | Operation of Operation.t | Get_protocols of Protocol_hash.t list | Protocol of Protocol.t - | Get_operation_hashes_for_blocks of Net_id.t * (Block_hash.t * int) list + | Get_operation_hashes_for_blocks of (Block_hash.t * int) list | Operation_hashes_for_block of - Net_id.t * Block_hash.t * int * + Block_hash.t * int * Operation_hash.t list * Operation_list_list_hash.path - | Get_operations_for_blocks of Net_id.t * (Block_hash.t * int) list + | Get_operations_for_blocks of (Block_hash.t * int) list | Operations_for_block of - Net_id.t * Block_hash.t * int * + Block_hash.t * int * Operation.t list * Operation_list_list_hash.path let encoding = @@ -84,13 +84,11 @@ let encoding = (fun (net_id, bh, mempool) -> Current_head (net_id, bh, mempool)) ; case ~tag:0x20 - (obj2 - (req "net_id" Net_id.encoding) - (req "get_block_headers" (list Block_hash.encoding))) + (obj1 (req "get_block_headers" (list Block_hash.encoding))) (function - | Get_block_headers (net_id, bhs) -> Some (net_id, bhs) + | Get_block_headers bhs -> Some bhs | _ -> None) - (fun (net_id, bhs) -> Get_block_headers (net_id, bhs)) ; + (fun bhs -> Get_block_headers bhs) ; case ~tag:0x21 (obj1 (req "block_header" Block_header.encoding)) @@ -100,13 +98,11 @@ let encoding = (fun bh -> Block_header bh) ; case ~tag:0x30 - (obj2 - (req "net_id" Net_id.encoding) - (req "get_operations" (list Operation_hash.encoding))) + (obj1 (req "get_operations" (list Operation_hash.encoding))) (function - | Get_operations (net_id, bhs) -> Some (net_id, bhs) + | Get_operations bhs -> Some bhs | _ -> None) - (fun (net_id, bhs) -> Get_operations (net_id, bhs)) ; + (fun bhs -> Get_operations bhs) ; case ~tag:0x31 (obj1 (req "operation" Operation.encoding)) @@ -127,46 +123,48 @@ let encoding = (fun proto -> Protocol proto); case ~tag:0x50 - (obj2 - (req "net_id" Net_id.encoding) - (req "get_operation_hashes_for_blocks" - (list (tup2 Block_hash.encoding int8)))) + (obj1 (req "get_operation_hashes_for_blocks" + (list (tup2 Block_hash.encoding int8)))) (function - | Get_operation_hashes_for_blocks (net_id, keys) -> Some (net_id, keys) + | Get_operation_hashes_for_blocks keys -> Some keys | _ -> None) - (fun (net_id, keys) -> Get_operation_hashes_for_blocks (net_id, keys)); + (fun keys -> Get_operation_hashes_for_blocks keys); case ~tag:0x51 - (obj4 - (req "net_id" Net_id.encoding) - (req "operation_hashes_for_block" (tup2 Block_hash.encoding int8)) + (obj3 + (req "operation_hashes_for_block" + (obj2 + (req "hash" Block_hash.encoding) + (req "validation_pass" int8))) (req "operation_hashes" (list Operation_hash.encoding)) (req "operation_hashes_path" Operation_list_list_hash.path_encoding)) - (function Operation_hashes_for_block (net_id, block, ofs, ops, path) -> - Some (net_id, (block, ofs), ops, path) | _ -> None) - (fun (net_id, (block, ofs), ops, path) -> - Operation_hashes_for_block (net_id, block, ofs, ops, path)) ; + (function Operation_hashes_for_block (block, ofs, ops, path) -> + Some ((block, ofs), ops, path) | _ -> None) + (fun ((block, ofs), ops, path) -> + Operation_hashes_for_block (block, ofs, ops, path)) ; case ~tag:0x60 - (obj2 - (req "net_id" Net_id.encoding) - (req "get_operations_for_blocks" - (list (tup2 Block_hash.encoding int8)))) + (obj1 (req "get_operations_for_blocks" + (list (obj2 + (req "hash" Block_hash.encoding) + (req "validation_pass" int8))))) (function - | Get_operations_for_blocks (net_id, keys) -> Some (net_id, keys) + | Get_operations_for_blocks keys -> Some keys | _ -> None) - (fun (net_id, keys) -> Get_operations_for_blocks (net_id, keys)); + (fun keys -> Get_operations_for_blocks keys); case ~tag:0x61 - (obj4 - (req "net_id" Net_id.encoding) - (req "operations_for_block" (tup2 Block_hash.encoding int8)) + (obj3 + (req "operations_for_block" + (obj2 + (req "hash" Block_hash.encoding) + (req "validation_pass" int8))) (req "operations" (list (dynamic_size Operation.encoding))) (req "operations_path" Operation_list_list_hash.path_encoding)) - (function Operations_for_block (net_id, block, ofs, ops, path) -> - Some (net_id, (block, ofs), ops, path) | _ -> None) - (fun (net_id, (block, ofs), ops, path) -> - Operations_for_block (net_id, block, ofs, ops, path)) ; + (function Operations_for_block (block, ofs, ops, path) -> + Some ((block, ofs), ops, path) | _ -> None) + (fun ((block, ofs), ops, path) -> + Operations_for_block (block, ofs, ops, path)) ; ] diff --git a/src/node/shell/distributed_db_message.mli b/src/node/shell/distributed_db_message.mli index 0ce8720db..b2f6ae934 100644 --- a/src/node/shell/distributed_db_message.mli +++ b/src/node/shell/distributed_db_message.mli @@ -18,23 +18,23 @@ type t = | Get_current_head of Net_id.t | Current_head of Net_id.t * Block_header.t * Mempool.t - | Get_block_headers of Net_id.t * Block_hash.t list + | Get_block_headers of Block_hash.t list | Block_header of Block_header.t - | Get_operations of Net_id.t * Operation_hash.t list + | Get_operations of Operation_hash.t list | Operation of Operation.t | Get_protocols of Protocol_hash.t list | Protocol of Protocol.t - | Get_operation_hashes_for_blocks of Net_id.t * (Block_hash.t * int) list + | Get_operation_hashes_for_blocks of (Block_hash.t * int) list | Operation_hashes_for_block of - Net_id.t * Block_hash.t * int * + Block_hash.t * int * Operation_hash.t list * Operation_list_list_hash.path - | Get_operations_for_blocks of Net_id.t * (Block_hash.t * int) list + | Get_operations_for_blocks of (Block_hash.t * int) list | Operations_for_block of - Net_id.t * Block_hash.t * int * + Block_hash.t * int * Operation.t list * Operation_list_list_hash.path val cfg : t P2p.message_config