Shell: remove unrequired net_id from network messages

This commit is contained in:
Grégoire Henry 2017-11-14 03:52:26 +01:00 committed by Benjamin Canou
parent 178d839ee1
commit 659d1aa63a
3 changed files with 164 additions and 132 deletions

View File

@ -82,8 +82,8 @@ module Raw_operation =
(Fake_operation_storage) (Fake_operation_storage)
(Operation_hash.Table) (Operation_hash.Table)
(struct (struct
type param = Net_id.t type param = unit
let forge net_id keys = Message.Get_operations (net_id, keys) let forge () keys = Message.Get_operations keys
end) end)
(struct (struct
type param = unit type param = unit
@ -112,8 +112,8 @@ module Raw_block_header =
(Block_header_storage) (Block_header_storage)
(Block_hash.Table) (Block_hash.Table)
(struct (struct
type param = Net_id.t type param = unit
let forge net_id keys = Message.Get_block_headers (net_id, keys) let forge () keys = Message.Get_block_headers keys
end) end)
(struct (struct
type param = unit type param = unit
@ -164,9 +164,9 @@ module Raw_operation_hashes = struct
(Operation_hashes_storage) (Operation_hashes_storage)
(Operations_table) (Operations_table)
(struct (struct
type param = Net_id.t type param = unit
let forge net_id keys = let forge () keys =
Message.Get_operation_hashes_for_blocks (net_id, keys) Message.Get_operation_hashes_for_blocks keys
end) end)
(struct (struct
type param = Operation_list_list_hash.t type param = Operation_list_list_hash.t
@ -232,9 +232,9 @@ module Raw_operations = struct
(Operations_storage) (Operations_storage)
(Operations_table) (Operations_table)
(struct (struct
type param = Net_id.t type param = unit
let forge net_id keys = let forge () keys =
Message.Get_operations_for_blocks (net_id, keys) Message.Get_operations_for_blocks keys
end) end)
(struct (struct
type param = Operation_list_list_hash.t type param = Operation_list_list_hash.t
@ -340,7 +340,14 @@ let state { disk } = disk
let net_state { net_state } = net_state let net_state { net_state } = net_state
let db { global_db } = global_db let db { global_db } = global_db
let find_pending_block_header active_nets h = let read_block_header { disk } h =
State.read_block disk h >>= function
| Some b ->
Lwt.return_some (State.Block.net_id b, State.Block.header b)
| None ->
Lwt.return_none
let find_pending_block_header { peer_active_nets } h =
Net_id.Table.fold Net_id.Table.fold
(fun _net_id net_db acc -> (fun _net_id net_db acc ->
match acc with match acc with
@ -349,10 +356,34 @@ let find_pending_block_header active_nets h =
net_db.block_header_db.table h -> net_db.block_header_db.table h ->
Some net_db Some net_db
| None -> None) | None -> None)
active_nets peer_active_nets
None None
let find_pending_operation active_nets h = let find_pending_operations { peer_active_nets } h i =
Net_id.Table.fold
(fun _net_id net_db acc ->
match acc with
| Some _ -> acc
| None when Raw_operations.Table.pending
net_db.operations_db.table (h, i) ->
Some net_db
| None -> None)
peer_active_nets
None
let find_pending_operation_hashes { peer_active_nets } h i =
Net_id.Table.fold
(fun _net_id net_db acc ->
match acc with
| Some _ -> acc
| None when Raw_operation_hashes.Table.pending
net_db.operation_hashes_db.table (h, i) ->
Some net_db
| None -> None)
peer_active_nets
None
let find_pending_operation { peer_active_nets } h =
Net_id.Table.fold Net_id.Table.fold
(fun _net_id net_db acc -> (fun _net_id net_db acc ->
match acc with match acc with
@ -361,9 +392,22 @@ let find_pending_operation active_nets h =
net_db.operation_db.table h -> net_db.operation_db.table h ->
Some net_db Some net_db
| None -> None) | None -> None)
active_nets peer_active_nets
None None
let read_operation { active_nets } h =
Net_id.Table.fold
(fun net_id net_db acc ->
acc >>= function
| Some _ -> acc
| None ->
Raw_operation.Table.read_opt
net_db.operation_db.table h >>= function
| None -> Lwt.return_none
| Some bh -> Lwt.return_some (net_id, bh))
active_nets
Lwt.return_none
module P2p_reader = struct module P2p_reader = struct
let may_activate global_db state net_id f = let may_activate global_db state net_id f =
@ -464,22 +508,22 @@ module P2p_reader = struct
(* TODO Kickban *) (* TODO Kickban *)
Lwt.return_unit Lwt.return_unit
| Get_block_headers (net_id, hashes) -> | Get_block_headers hashes ->
may_handle state net_id @@ fun net_db ->
(* TODO: Blame request of unadvertised blocks ? *)
Lwt_list.iter_p Lwt_list.iter_p
(fun hash -> (fun hash ->
State.Block.read_opt net_db.net_state hash >|= function read_block_header global_db hash >>= function
| None -> () | None ->
| Some b -> (* TODO: Blame request of unadvertised blocks ? *)
let header = State.Block.header b in Lwt.return_unit
| Some (_net_id, header) ->
ignore @@ ignore @@
P2p.try_send global_db.p2p state.conn (Block_header header)) P2p.try_send global_db.p2p state.conn (Block_header header) ;
Lwt.return_unit)
hashes hashes
| Block_header block -> begin | Block_header block -> begin
let hash = Block_header.hash block in let hash = Block_header.hash block in
match find_pending_block_header state.peer_active_nets hash with match find_pending_block_header state hash with
| None -> | None ->
(* TODO some penalty. *) (* TODO some penalty. *)
Lwt.return_unit Lwt.return_unit
@ -489,22 +533,22 @@ module P2p_reader = struct
Lwt.return_unit Lwt.return_unit
end end
| Get_operations (net_id, hashes) -> | Get_operations hashes ->
may_handle state net_id @@ fun net_db ->
(* TODO: only answers for prevalidated operations *)
Lwt_list.iter_p Lwt_list.iter_p
(fun hash -> (fun hash ->
Raw_operation.Table.read_opt read_operation global_db hash >>= function
net_db.operation_db.table hash >|= function | None ->
| None -> () (* TODO: Blame request of unadvertised operations ? *)
| Some p -> Lwt.return_unit
| Some (_net_id, op) ->
ignore @@ ignore @@
P2p.try_send global_db.p2p state.conn (Operation p)) P2p.try_send global_db.p2p state.conn (Operation op) ;
Lwt.return_unit)
hashes hashes
| Operation operation -> begin | Operation operation -> begin
let hash = Operation.hash operation in let hash = Operation.hash operation in
match find_pending_operation state.peer_active_nets hash with match find_pending_operation state hash with
| None -> | None ->
(* TODO some penalty. *) (* TODO some penalty. *)
Lwt.return_unit Lwt.return_unit
@ -517,11 +561,14 @@ module P2p_reader = struct
| Get_protocols hashes -> | Get_protocols hashes ->
Lwt_list.iter_p Lwt_list.iter_p
(fun hash -> (fun hash ->
State.Protocol.read_opt global_db.disk hash >|= function State.Protocol.read_opt global_db.disk hash >>= function
| None -> () | None ->
(* TODO: Blame request of unadvertised protocol ? *)
Lwt.return_unit
| Some p -> | Some p ->
ignore @@ ignore @@
P2p.try_send global_db.p2p state.conn (Protocol p)) P2p.try_send global_db.p2p state.conn (Protocol p) ;
Lwt.return_unit)
hashes hashes
| Protocol protocol -> | Protocol protocol ->
@ -530,54 +577,57 @@ module P2p_reader = struct
global_db.protocol_db.table state.gid hash protocol >>= fun () -> global_db.protocol_db.table state.gid hash protocol >>= fun () ->
Lwt.return_unit Lwt.return_unit
| Get_operation_hashes_for_blocks (net_id, blocks) -> | Get_operation_hashes_for_blocks blocks ->
may_handle state net_id @@ fun net_db ->
(* TODO: Blame request of unadvertised blocks ? *)
Lwt_list.iter_p Lwt_list.iter_p
(fun (hash, ofs) -> (fun (hash, ofs) ->
State.Block.read_opt net_db.net_state hash >>= function State.read_block global_db.disk hash >>= function
| None -> Lwt.return_unit | None -> Lwt.return_unit
| Some b -> | Some block ->
State.Block.operation_hashes b ofs >>= fun (hashes, path) -> State.Block.operation_hashes
block ofs >>= fun (hashes, path) ->
ignore @@ ignore @@
P2p.try_send global_db.p2p state.conn P2p.try_send global_db.p2p state.conn @@
(Operation_hashes_for_block Operation_hashes_for_block (hash, ofs, hashes, path) ;
(net_id, hash, ofs, hashes, path)) ;
Lwt.return_unit) Lwt.return_unit)
blocks blocks
| Operation_hashes_for_block (net_id, block, ofs, ops, path) -> begin | Operation_hashes_for_block (block, ofs, ops, path) -> begin
may_handle state net_id @@ fun net_db -> match find_pending_operation_hashes state block ofs with
(* TODO early detection of non-requested list. *) | None ->
Raw_operation_hashes.Table.notify (* TODO some penalty. *)
net_db.operation_hashes_db.table state.gid Lwt.return_unit
(block, ofs) (ops, path) >>= fun () -> | Some net_db ->
Lwt.return_unit Raw_operation_hashes.Table.notify
net_db.operation_hashes_db.table state.gid
(block, ofs) (ops, path) >>= fun () ->
Lwt.return_unit
end end
| Get_operations_for_blocks (net_id, blocks) -> | Get_operations_for_blocks blocks ->
may_handle state net_id @@ fun net_db ->
(* TODO: Blame request of unadvertised blocks ? *)
Lwt_list.iter_p Lwt_list.iter_p
(fun (hash, ofs) -> (fun (hash, ofs) ->
State.Block.read_opt net_db.net_state hash >>= function State.read_block global_db.disk hash >>= function
| None -> Lwt.return_unit | None -> Lwt.return_unit
| Some b -> | Some block ->
State.Block.operations b ofs >>= fun (hashes, path) -> State.Block.operations
block ofs >>= fun (ops, path) ->
ignore @@ ignore @@
P2p.try_send global_db.p2p state.conn P2p.try_send global_db.p2p state.conn @@
(Operations_for_block Operations_for_block (hash, ofs, ops, path) ;
(net_id, hash, ofs, hashes, path)) ;
Lwt.return_unit) Lwt.return_unit)
blocks blocks
| Operations_for_block (net_id, block, ofs, ops, path) -> | Operations_for_block (block, ofs, ops, path) -> begin
may_handle state net_id @@ fun net_db -> match find_pending_operations state block ofs with
(* TODO early detection of non-requested operations. *) | None ->
Raw_operations.Table.notify (* TODO some penalty. *)
net_db.operations_db.table state.gid Lwt.return_unit
(block, ofs) (ops, path) >>= fun () -> | Some net_db ->
Lwt.return_unit Raw_operations.Table.notify
net_db.operations_db.table state.gid
(block, ofs) (ops, path) >>= fun () ->
Lwt.return_unit
end
let rec worker_loop global_db state = let rec worker_loop global_db state =
Lwt_utils.protect ~canceler:state.canceler begin fun () -> Lwt_utils.protect ~canceler:state.canceler begin fun () ->
@ -657,7 +707,7 @@ let activate ({ p2p ; active_nets } as global_db) net_state =
| exception Not_found -> | exception Not_found ->
let active_peers = ref P2p.Peer_id.Set.empty in let active_peers = ref P2p.Peer_id.Set.empty in
let p2p_request = let p2p_request =
{ data = net_id ; { data = () ;
active = (fun () -> !active_peers) ; active = (fun () -> !active_peers) ;
send = raw_try_send p2p ; send = raw_try_send p2p ;
} in } in
@ -829,22 +879,6 @@ module Block_header = struct
and type param := unit) and type param := unit)
end end
let read_block_header { disk ; active_nets } h =
State.read_block disk h >>= function
| Some b ->
Lwt.return_some (State.Block.net_id b, State.Block.header b)
| None ->
Net_id.Table.fold
(fun net_id net_db acc ->
acc >>= function
| Some _ -> acc
| None ->
Block_header.read_opt net_db h >>= function
| None -> Lwt.return_none
| Some bh -> Lwt.return_some (net_id, bh))
active_nets
Lwt.return_none
module Operation_hashes = module Operation_hashes =
Make (Raw_operation_hashes.Table) (struct Make (Raw_operation_hashes.Table) (struct
type t = net_db type t = net_db

View File

@ -16,23 +16,23 @@ type t =
| Get_current_head of Net_id.t | Get_current_head of Net_id.t
| Current_head of Net_id.t * Block_header.t * Mempool.t | Current_head of Net_id.t * Block_header.t * Mempool.t
| Get_block_headers of Net_id.t * Block_hash.t list | Get_block_headers of Block_hash.t list
| Block_header of Block_header.t | Block_header of Block_header.t
| Get_operations of Net_id.t * Operation_hash.t list | Get_operations of Operation_hash.t list
| Operation of Operation.t | Operation of Operation.t
| Get_protocols of Protocol_hash.t list | Get_protocols of Protocol_hash.t list
| Protocol of Protocol.t | Protocol of Protocol.t
| Get_operation_hashes_for_blocks of Net_id.t * (Block_hash.t * int) list | Get_operation_hashes_for_blocks of (Block_hash.t * int) list
| Operation_hashes_for_block of | Operation_hashes_for_block of
Net_id.t * Block_hash.t * int * Block_hash.t * int *
Operation_hash.t list * Operation_list_list_hash.path Operation_hash.t list * Operation_list_list_hash.path
| Get_operations_for_blocks of Net_id.t * (Block_hash.t * int) list | Get_operations_for_blocks of (Block_hash.t * int) list
| Operations_for_block of | Operations_for_block of
Net_id.t * Block_hash.t * int * Block_hash.t * int *
Operation.t list * Operation_list_list_hash.path Operation.t list * Operation_list_list_hash.path
let encoding = let encoding =
@ -84,13 +84,11 @@ let encoding =
(fun (net_id, bh, mempool) -> Current_head (net_id, bh, mempool)) ; (fun (net_id, bh, mempool) -> Current_head (net_id, bh, mempool)) ;
case ~tag:0x20 case ~tag:0x20
(obj2 (obj1 (req "get_block_headers" (list Block_hash.encoding)))
(req "net_id" Net_id.encoding)
(req "get_block_headers" (list Block_hash.encoding)))
(function (function
| Get_block_headers (net_id, bhs) -> Some (net_id, bhs) | Get_block_headers bhs -> Some bhs
| _ -> None) | _ -> None)
(fun (net_id, bhs) -> Get_block_headers (net_id, bhs)) ; (fun bhs -> Get_block_headers bhs) ;
case ~tag:0x21 case ~tag:0x21
(obj1 (req "block_header" Block_header.encoding)) (obj1 (req "block_header" Block_header.encoding))
@ -100,13 +98,11 @@ let encoding =
(fun bh -> Block_header bh) ; (fun bh -> Block_header bh) ;
case ~tag:0x30 case ~tag:0x30
(obj2 (obj1 (req "get_operations" (list Operation_hash.encoding)))
(req "net_id" Net_id.encoding)
(req "get_operations" (list Operation_hash.encoding)))
(function (function
| Get_operations (net_id, bhs) -> Some (net_id, bhs) | Get_operations bhs -> Some bhs
| _ -> None) | _ -> None)
(fun (net_id, bhs) -> Get_operations (net_id, bhs)) ; (fun bhs -> Get_operations bhs) ;
case ~tag:0x31 case ~tag:0x31
(obj1 (req "operation" Operation.encoding)) (obj1 (req "operation" Operation.encoding))
@ -127,46 +123,48 @@ let encoding =
(fun proto -> Protocol proto); (fun proto -> Protocol proto);
case ~tag:0x50 case ~tag:0x50
(obj2 (obj1 (req "get_operation_hashes_for_blocks"
(req "net_id" Net_id.encoding) (list (tup2 Block_hash.encoding int8))))
(req "get_operation_hashes_for_blocks"
(list (tup2 Block_hash.encoding int8))))
(function (function
| Get_operation_hashes_for_blocks (net_id, keys) -> Some (net_id, keys) | Get_operation_hashes_for_blocks keys -> Some keys
| _ -> None) | _ -> None)
(fun (net_id, keys) -> Get_operation_hashes_for_blocks (net_id, keys)); (fun keys -> Get_operation_hashes_for_blocks keys);
case ~tag:0x51 case ~tag:0x51
(obj4 (obj3
(req "net_id" Net_id.encoding) (req "operation_hashes_for_block"
(req "operation_hashes_for_block" (tup2 Block_hash.encoding int8)) (obj2
(req "hash" Block_hash.encoding)
(req "validation_pass" int8)))
(req "operation_hashes" (list Operation_hash.encoding)) (req "operation_hashes" (list Operation_hash.encoding))
(req "operation_hashes_path" Operation_list_list_hash.path_encoding)) (req "operation_hashes_path" Operation_list_list_hash.path_encoding))
(function Operation_hashes_for_block (net_id, block, ofs, ops, path) -> (function Operation_hashes_for_block (block, ofs, ops, path) ->
Some (net_id, (block, ofs), ops, path) | _ -> None) Some ((block, ofs), ops, path) | _ -> None)
(fun (net_id, (block, ofs), ops, path) -> (fun ((block, ofs), ops, path) ->
Operation_hashes_for_block (net_id, block, ofs, ops, path)) ; Operation_hashes_for_block (block, ofs, ops, path)) ;
case ~tag:0x60 case ~tag:0x60
(obj2 (obj1 (req "get_operations_for_blocks"
(req "net_id" Net_id.encoding) (list (obj2
(req "get_operations_for_blocks" (req "hash" Block_hash.encoding)
(list (tup2 Block_hash.encoding int8)))) (req "validation_pass" int8)))))
(function (function
| Get_operations_for_blocks (net_id, keys) -> Some (net_id, keys) | Get_operations_for_blocks keys -> Some keys
| _ -> None) | _ -> None)
(fun (net_id, keys) -> Get_operations_for_blocks (net_id, keys)); (fun keys -> Get_operations_for_blocks keys);
case ~tag:0x61 case ~tag:0x61
(obj4 (obj3
(req "net_id" Net_id.encoding) (req "operations_for_block"
(req "operations_for_block" (tup2 Block_hash.encoding int8)) (obj2
(req "hash" Block_hash.encoding)
(req "validation_pass" int8)))
(req "operations" (list (dynamic_size Operation.encoding))) (req "operations" (list (dynamic_size Operation.encoding)))
(req "operations_path" Operation_list_list_hash.path_encoding)) (req "operations_path" Operation_list_list_hash.path_encoding))
(function Operations_for_block (net_id, block, ofs, ops, path) -> (function Operations_for_block (block, ofs, ops, path) ->
Some (net_id, (block, ofs), ops, path) | _ -> None) Some ((block, ofs), ops, path) | _ -> None)
(fun (net_id, (block, ofs), ops, path) -> (fun ((block, ofs), ops, path) ->
Operations_for_block (net_id, block, ofs, ops, path)) ; Operations_for_block (block, ofs, ops, path)) ;
] ]

View File

@ -18,23 +18,23 @@ type t =
| Get_current_head of Net_id.t | Get_current_head of Net_id.t
| Current_head of Net_id.t * Block_header.t * Mempool.t | Current_head of Net_id.t * Block_header.t * Mempool.t
| Get_block_headers of Net_id.t * Block_hash.t list | Get_block_headers of Block_hash.t list
| Block_header of Block_header.t | Block_header of Block_header.t
| Get_operations of Net_id.t * Operation_hash.t list | Get_operations of Operation_hash.t list
| Operation of Operation.t | Operation of Operation.t
| Get_protocols of Protocol_hash.t list | Get_protocols of Protocol_hash.t list
| Protocol of Protocol.t | Protocol of Protocol.t
| Get_operation_hashes_for_blocks of Net_id.t * (Block_hash.t * int) list | Get_operation_hashes_for_blocks of (Block_hash.t * int) list
| Operation_hashes_for_block of | Operation_hashes_for_block of
Net_id.t * Block_hash.t * int * Block_hash.t * int *
Operation_hash.t list * Operation_list_list_hash.path Operation_hash.t list * Operation_list_list_hash.path
| Get_operations_for_blocks of Net_id.t * (Block_hash.t * int) list | Get_operations_for_blocks of (Block_hash.t * int) list
| Operations_for_block of | Operations_for_block of
Net_id.t * Block_hash.t * int * Block_hash.t * int *
Operation.t list * Operation_list_list_hash.path Operation.t list * Operation_list_list_hash.path
val cfg : t P2p.message_config val cfg : t P2p.message_config