This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch wip-suggest-fsync-header
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/wip-suggest-fsync-header by 
this push:
     new d41d58687 wip suggestion for fsync pr
d41d58687 is described below

commit d41d58687771f06e80fceaff2878ff9868f37765
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Jun 23 15:44:15 2025 -0400

    wip suggestion for fsync pr
---
 src/couch/src/couch_file.erl | 61 ++++++++++++++++++++++++++++++++------------
 1 file changed, 45 insertions(+), 16 deletions(-)

diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index 24c910302..4b377d33c 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -47,7 +47,7 @@
 -export([append_term/2, append_term/3]).
 -export([pread_terms/2]).
 -export([append_terms/2, append_terms/3]).
--export([write_header/2, read_header/1]).
+-export([write_header/2, write_header/3, read_header/1]).
 -export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
 
 % gen_server callbacks
@@ -433,11 +433,16 @@ read_header(Fd) ->
     end.
 
 write_header(Fd, Data) ->
+    write_header(Fd, Data, []).
+
+% Only the sync option is currently supported
+%
+write_header(Fd, Data, Opts) when is_list(Opts) ->
     Bin = ?term_to_bin(Data),
     Checksum = generate_checksum(Bin),
     % now we assemble the final header binary and write to disk
     FinalBin = <<Checksum/binary, Bin/binary>>,
-    ioq:call(Fd, {write_header, FinalBin}, erlang:get(io_priority)).
+    ioq:call(Fd, {write_header, FinalBin, Opts}, erlang:get(io_priority)).
 
 init_status_error(ReturnPid, Ref, Error) ->
     ReturnPid ! {Ref, self(), Error},
@@ -580,20 +585,21 @@ handle_call({append_bins, Bins}, _From, #file{} = File) ->
         {{ok, Resps}, File1} -> {reply, {ok, Resps}, File1};
         {Error, File1} -> {reply, Error, File1}
     end;
-handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
-    BinSize = byte_size(Bin),
-    case Pos rem ?SIZE_BLOCK of
-        0 ->
-            Padding = <<>>;
-        BlockOffset ->
-            Padding = <<0:(8 * (?SIZE_BLOCK - BlockOffset))>>
-    end,
-    FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(5, [Bin])],
-    case file:write(Fd, FinalBin) of
-        ok ->
-            {reply, ok, File#file{eof = Pos + iolist_size(FinalBin)}};
-        Error ->
-            {reply, Error, reset_eof(File)}
+handle_call({write_header, Bin, Opts}, _From, #file{} = File) ->
+    try
+        ok = header_fsync(File, Opts),
+        case handle_write_header(Bin, File) of
+            {ok, NewFile} ->
+                ok = header_fsync(NewFile, Opts),
+                {reply, ok, NewFile};
+            {{error, Err}, NewFile} ->
+                {reply, {error, Err}, NewFile}
+        end
+    catch
+        error:{fsync_error, Error} ->
+            % If fsync error happens we stop. See comment in
+            % handle_call(sync, ...) why we're dropping the fd
+            {stop, {error, Error}, {error, Error}, #file{fd = nil}}
     end;
 handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
     {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
@@ -659,6 +665,17 @@ pread(#file{} = File, PosL) ->
     Extracted = lists:zipwith(ZipFun, DataSizes, Resps),
     {ok, Extracted}.
 
+header_fsync(#file{fd = Fd}, Opts) when is_list(Opts) ->
+    case couch_util:get_value(sync, Opts, false) of
+        true ->
+            case fsync(Fd) of
+                ok -> ok;
+                {error, Err} -> error({fsync_error, Err})
+            end;
+        false ->
+            ok
+    end.
+
 fsync(Fd) ->
     T0 = erlang:monotonic_time(),
     % We do not rely on mtime/atime for our safety/consitency so we can use
@@ -757,6 +774,18 @@ find_newest_header(Fd, [{Location, Size} | LocationSizes]) 
->
             find_newest_header(Fd, LocationSizes)
     end.
 
+handle_write_header(Bin, #file{fd = Fd, eof = Pos} = File) ->
+    BinSize = byte_size(Bin),
+    case Pos rem ?SIZE_BLOCK of
+        0 -> Padding = <<>>;
+        BlockOffset -> Padding = <<0:(8 * (?SIZE_BLOCK - BlockOffset))>>
+    end,
+    FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(5, [Bin])],
+    case file:write(Fd, FinalBin) of
+        ok -> {ok, File#file{eof = Pos + iolist_size(FinalBin)}};
+        {error, Error} -> {{error, Error}, reset_eof(File)}
+    end.
+
 read_multi_raw_iolists_int(#file{fd = Fd, eof = Eof} = File, PosLens) ->
     MapFun = fun({Pos, Len}) -> get_pread_locnum(File, Pos, Len) end,
     LocNums = lists:map(MapFun, PosLens),

Reply via email to