From aeaf2a240ce111ea296913ea42ceb512b267e9f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Henry?= Date: Tue, 17 Apr 2018 10:35:01 +0200 Subject: [PATCH] Client: temporary hack reaggregate http chunks for streamed RPC... :( --- .../ocplib-resto/lib_resto-cohttp/client.ml | 45 +++++++++---------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/vendors/ocplib-resto/lib_resto-cohttp/client.ml b/vendors/ocplib-resto/lib_resto-cohttp/client.ml index ff89bf0c3..7532563c3 100644 --- a/vendors/ocplib-resto/lib_resto-cohttp/client.ml +++ b/vendors/ocplib-resto/lib_resto-cohttp/client.ml @@ -319,32 +319,27 @@ module Make (Encoding : Resto.ENCODING) = struct on_close () ; Lwt.return (`Ok None) | Some chunk -> + let buffer = Buffer.create 2048 in let output = Service.output_encoding service in - log.log ~media output - `OK (lazy (Lwt.return chunk)) >>= fun () -> - match media.destruct output chunk with - | Error msg -> - Lwt.return (`Unexpected_content ((chunk, media), msg)) - | Ok body -> - on_chunk body ; - let rec loop () = - Lwt_stream.get stream >>= function - | None -> on_close () ; Lwt.return_unit - | Some chunk -> - log.log ~media output - `OK (lazy (Lwt.return chunk)) >>= fun () -> - match media.destruct output chunk with - | Ok body -> on_chunk body ; loop () - | Error _msg -> - (* TODO log error. *) - ignore (Lwt_stream.junk_while (fun _ -> true) stream - : unit Lwt.t) ; - on_close () ; Lwt.return_unit in - ignore (loop () : unit Lwt.t) ; - Lwt.return (`Ok (Some (fun () -> - ignore (Lwt_stream.junk_while (fun _ -> true) stream - : unit Lwt.t) ; - ()))) + let rec loop = function + | None -> on_close () ; Lwt.return_unit + | Some chunk -> + Buffer.add_string buffer chunk ; + let data = Buffer.contents buffer in + log.log ~media output + `OK (lazy (Lwt.return chunk)) >>= fun () -> + match media.destruct output data with + | Ok body -> + Buffer.reset buffer ; + on_chunk body ; + Lwt_stream.get stream >>= loop + | Error _msg -> + Lwt_stream.get stream >>= loop in + ignore (loop (Some chunk) : unit Lwt.t) ; + Lwt.return (`Ok (Some (fun () -> + ignore (Lwt_stream.junk_while (fun _ -> true) stream + : unit Lwt.t) ; + ()))) end | `Conflict body -> handle_error log service body `Conflict (fun v -> `Conflict v)