2222 doc_count ,
2323 w ,
2424 grouped_docs ,
25- reply
25+ reply ,
26+ update_options ,
27+ started = []
2628}).
2729
2830go (_ , [], _ ) ->
@@ -33,25 +35,25 @@ go(DbName, AllDocs0, Opts) ->
3335 validate_atomic_update (DbName , AllDocs , lists :member (all_or_nothing , Opts )),
3436 Options = lists :delete (all_or_nothing , Opts ),
3537 GroupedDocs = lists :map (
36- fun ({# shard {name = Name , node = Node } = Shard , Docs }) ->
37- Docs1 = untag_docs (Docs ),
38- Ref = rexi :cast (Node , {fabric_rpc , update_docs , [Name , Docs1 , Options ]}),
39- {Shard # shard {ref = Ref }, Docs }
38+ fun ({# shard {} = Shard , Docs }) ->
39+ {Shard # shard {ref = make_ref ()}, Docs }
4040 end ,
4141 group_docs_by_shard (DbName , AllDocs )
4242 ),
4343 {Workers , _ } = lists :unzip (GroupedDocs ),
4444 RexiMon = fabric_util :create_monitors (Workers ),
4545 W = couch_util :get_value (w , Options , integer_to_list (mem3 :quorum (DbName ))),
4646 Acc0 = # acc {
47+ update_options = Options ,
4748 waiting_count = length (Workers ),
4849 doc_count = length (AllDocs ),
4950 w = list_to_integer (W ),
5051 grouped_docs = GroupedDocs ,
5152 reply = dict :new ()
5253 },
5354 Timeout = fabric_util :request_timeout (),
54- try rexi_utils :recv (Workers , # shard .ref , fun handle_message /3 , Acc0 , infinity , Timeout ) of
55+ Acc1 = start_leaders (Acc0 ),
56+ try rexi_utils :recv (Workers , # shard .ref , fun handle_message /3 , Acc1 , infinity , Timeout ) of
5557 {ok , {Health , Results }} when
5658 Health =:= ok ; Health =:= accepted ; Health =:= error
5759 ->
@@ -72,24 +74,32 @@ go(DbName, AllDocs0, Opts) ->
7274 rexi_monitor :stop (RexiMon )
7375 end .
7476
75- handle_message ({rexi_DOWN , _ , {_ , NodeRef }, _ }, _Worker , # acc {} = Acc0 ) ->
77+ handle_message ({rexi_DOWN , _ , {_ , NodeRef }, _ }, Worker , # acc {} = Acc0 ) ->
7678 # acc {grouped_docs = GroupedDocs } = Acc0 ,
7779 NewGrpDocs = [X || {# shard {node = N }, _ } = X <- GroupedDocs , N =/= NodeRef ],
78- skip_message (Acc0 # acc {waiting_count = length (NewGrpDocs ), grouped_docs = NewGrpDocs });
80+ Acc1 = Acc0 # acc {waiting_count = length (NewGrpDocs ), grouped_docs = NewGrpDocs },
81+ Acc2 = start_followers (Worker , Acc1 ),
82+ skip_message (Acc2 );
7983handle_message ({rexi_EXIT , _ }, Worker , # acc {} = Acc0 ) ->
8084 # acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
8185 NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
82- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
86+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
87+ Acc2 = start_followers (Worker , Acc1 ),
88+ skip_message (Acc2 );
8389handle_message ({error , all_dbs_active }, Worker , # acc {} = Acc0 ) ->
8490 % treat it like rexi_EXIT, the hope at least one copy will return successfully
8591 # acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
8692 NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
87- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
93+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
94+ Acc2 = start_followers (Worker , Acc1 ),
95+ skip_message (Acc2 );
8896handle_message (internal_server_error , Worker , # acc {} = Acc0 ) ->
8997 % happens when we fail to load validation functions in an RPC worker
9098 # acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
9199 NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
92- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
100+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
101+ Acc2 = start_followers (Worker , Acc1 ),
102+ skip_message (Acc2 );
93103handle_message (attachment_chunk_received , _Worker , # acc {} = Acc0 ) ->
94104 {ok , Acc0 };
95105handle_message ({ok , Replies }, Worker , # acc {} = Acc0 ) ->
@@ -100,33 +110,42 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
100110 grouped_docs = GroupedDocs ,
101111 reply = DocReplyDict0
102112 } = Acc0 ,
103- {value , {_ , Docs }, NewGrpDocs } = lists :keytake (Worker , 1 , GroupedDocs ),
104- DocReplyDict = append_update_replies (Docs , Replies , DocReplyDict0 ),
105- case {WaitingCount , dict :size (DocReplyDict )} of
106- {1 , _ } ->
107- % last message has arrived, we need to conclude things
108- {Health , W , Reply } = dict :fold (
109- fun force_reply /3 ,
110- {ok , W , []},
111- DocReplyDict
112- ),
113- {stop , {Health , Reply }};
114- {_ , DocCount } ->
115- % we've got at least one reply for each document, let's take a look
116- case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
117- continue ->
118- {ok , Acc0 # acc {
119- waiting_count = WaitingCount - 1 ,
120- grouped_docs = NewGrpDocs ,
121- reply = DocReplyDict
122- }};
123- {stop , W , FinalReplies } ->
124- {stop , {ok , FinalReplies }}
125- end ;
126- _ ->
127- {ok , Acc0 # acc {
128- waiting_count = WaitingCount - 1 , grouped_docs = NewGrpDocs , reply = DocReplyDict
129- }}
113+ {value , {_ , Docs }, NewGrpDocs0 } = lists :keytake (Worker , 1 , GroupedDocs ),
114+ DocReplyDict = append_update_replies (Docs , Replies , W , DocReplyDict0 ),
115+ Acc1 = Acc0 # acc {grouped_docs = NewGrpDocs0 , reply = DocReplyDict },
116+ Acc2 = remove_conflicts (Docs , Replies , Acc1 ),
117+ NewGrpDocs = Acc2 # acc .grouped_docs ,
118+ case skip_message (Acc2 ) of
119+ {stop , Msg } ->
120+ {stop , Msg };
121+ {ok , Acc3 } ->
122+ Acc4 = start_followers (Worker , Acc3 ),
123+ case {Acc4 # acc .waiting_count , dict :size (DocReplyDict )} of
124+ {1 , _ } ->
125+ % last message has arrived, we need to conclude things
126+ {Health , W , Reply } = dict :fold (
127+ fun force_reply /3 ,
128+ {ok , W , []},
129+ DocReplyDict
130+ ),
131+ {stop , {Health , Reply }};
132+ {_ , DocCount } ->
133+ % we've got at least one reply for each document, let's take a look
134+ case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
135+ continue ->
136+ {ok , Acc4 # acc {
137+ waiting_count = Acc4 # acc .waiting_count - 1 ,
138+ grouped_docs = NewGrpDocs
139+ }};
140+ {stop , W , FinalReplies } ->
141+ {stop , {ok , FinalReplies }}
142+ end ;
143+ _ ->
144+ {ok , Acc4 # acc {
145+ waiting_count = Acc4 # acc .waiting_count - 1 ,
146+ grouped_docs = NewGrpDocs
147+ }}
148+ end
130149 end ;
131150handle_message ({missing_stub , Stub }, _ , _ ) ->
132151 throw ({missing_stub , Stub });
@@ -318,13 +337,90 @@ group_docs_by_shard(DbName, Docs) ->
318337 )
319338 ).
320339
321- append_update_replies ([], [], DocReplyDict ) ->
340+ % % use 'lowest' node that hosts this shard range as leader
341+ is_leader (Worker , Workers ) ->
342+ Worker == lists :min ([W || W <- Workers , W # shard .range == Worker # shard .range ]).
343+
344+ start_leaders (# acc {} = Acc0 ) ->
345+ # acc {grouped_docs = GroupedDocs } = Acc0 ,
346+ {Workers , _ } = lists :unzip (GroupedDocs ),
347+ Started = lists :foldl (
348+ fun ({Worker , Docs }, RefAcc ) ->
349+ case is_leader (Worker , Workers ) of
350+ true ->
351+ start_worker (Worker , Docs , Acc0 ),
352+ [Worker # shard .ref | RefAcc ];
353+ false ->
354+ RefAcc
355+ end
356+ end ,
357+ [],
358+ GroupedDocs
359+ ),
360+ Acc0 # acc {started = lists :append ([Started , Acc0 # acc .started ])}.
361+
362+ start_followers (# shard {} = Leader , # acc {} = Acc0 ) ->
363+ Followers = [
364+ {Worker , Docs }
365+ || {Worker , Docs } <- Acc0 # acc .grouped_docs ,
366+ Worker # shard .range == Leader # shard .range ,
367+ not lists :member (Worker # shard .ref , Acc0 # acc .started )
368+ ],
369+ lists :foreach (
370+ fun ({Worker , Docs }) ->
371+ start_worker (Worker , Docs , Acc0 )
372+ end ,
373+ Followers
374+ ),
375+ Started = [Ref || {# shard {ref = Ref }, _Docs } <- Followers ],
376+ Acc0 # acc {started = lists :append ([Started , Acc0 # acc .started ])}.
377+
378+ start_worker (# shard {ref = Ref } = Worker , Docs , # acc {} = Acc0 ) when is_reference (Ref ) ->
379+ # shard {name = Name , node = Node } = Worker ,
380+ # acc {update_options = UpdateOptions } = Acc0 ,
381+ rexi :cast_ref (Ref , Node , {fabric_rpc , update_docs , [Name , untag_docs (Docs ), UpdateOptions ]}),
382+ ok ;
383+ start_worker (# shard {ref = undefined }, _Docs , # acc {}) ->
384+ % for unit tests below.
385+ ok .
386+
387+ append_update_replies ([], [], _W , DocReplyDict ) ->
322388 DocReplyDict ;
323- append_update_replies ([Doc | Rest ], [], Dict0 ) ->
389+ append_update_replies ([Doc | Rest ], [], W , Dict0 ) ->
324390 % icky, if replicated_changes only errors show up in result
325- append_update_replies (Rest , [], dict :append (Doc , noreply , Dict0 ));
326- append_update_replies ([Doc | Rest1 ], [Reply | Rest2 ], Dict0 ) ->
327- append_update_replies (Rest1 , Rest2 , dict :append (Doc , Reply , Dict0 )).
391+ append_update_replies (Rest , [], W , dict :append (Doc , noreply , Dict0 ));
392+ append_update_replies ([Doc | Rest1 ], [conflict | Rest2 ], W , Dict0 ) ->
393+ % % fake conflict replies from followers as we won't ask them
394+ append_update_replies (
395+ Rest1 , Rest2 , W , dict :append_list (Doc , lists :duplicate (W , conflict ), Dict0 )
396+ );
397+ append_update_replies ([Doc | Rest1 ], [Reply | Rest2 ], W , Dict0 ) ->
398+ append_update_replies (Rest1 , Rest2 , W , dict :append (Doc , Reply , Dict0 )).
399+
400+ % % leader found a conflict, remove that doc from the other (follower) workers,
401+ % % removing the worker entirely if no docs remain.
402+ remove_conflicts ([], [], # acc {} = Acc0 ) ->
403+ Acc0 ;
404+ remove_conflicts ([Doc | DocRest ], [conflict | ReplyRest ], # acc {} = Acc0 ) ->
405+ # acc {grouped_docs = GroupedDocs0 } = Acc0 ,
406+ GroupedDocs1 = lists :foldl (
407+ fun ({Worker , Docs }, FoldAcc ) ->
408+ case lists :delete (Doc , Docs ) of
409+ [] ->
410+ FoldAcc ;
411+ Rest ->
412+ [{Worker , Rest } | FoldAcc ]
413+ end
414+ end ,
415+ [],
416+ GroupedDocs0
417+ ),
418+ Acc1 = Acc0 # acc {waiting_count = length (GroupedDocs1 ), grouped_docs = GroupedDocs1 },
419+ remove_conflicts (DocRest , ReplyRest , Acc1 );
420+ remove_conflicts ([_Doc | DocRest ], [_Reply | ReplyRest ], # acc {} = Acc0 ) ->
421+ remove_conflicts (DocRest , ReplyRest , Acc0 );
422+ remove_conflicts ([_Doc | DocRest ], [], # acc {} = Acc0 ) ->
423+ remove_conflicts (DocRest , [], Acc0 ).
328424
329425skip_message (# acc {waiting_count = 0 , w = W , reply = DocReplyDict }) ->
330426 {Health , W , Reply } = dict :fold (fun force_reply /3 , {ok , W , []}, DocReplyDict ),
0 commit comments