match dom with
| None -> () (* treat socket connections as always free to conflict *)
| Some d -> if not (Domain.is_free_to_conflict d) then history := x :: !history
+
+(* Find the connections from records since commit-count [since] for which [f record] returns [true] *)
+let filter_connections ~since ~f =
+ (* The "mem" call is an optimisation, to avoid calling f if we have picked con already. *)
+ (* Using a hash table rather than a list is to optimise the "mem" call. *)
+ List.fold_left (fun acc hist_rec ->
+ if hist_rec.finish_count > since
+ && not (Hashtbl.mem acc hist_rec.con)
+ && f hist_rec
+ then Hashtbl.replace acc hist_rec.con ();
+ acc
+ ) (Hashtbl.create 1023) !history
let error fmt = Logging.error "process" fmt
let info fmt = Logging.info "process" fmt
+let debug fmt = Logging.debug "process" fmt
open Printf
open Stdext
exception Domain_not_match
exception Invalid_Cmd_Args
+(* This controls the do_debug fn in this module, not the debug logging-function. *)
let allow_debug = ref false
let c_int_of_string s =
false
| Transaction.Full(id, oldstore, cstore) ->
let tid = Connection.start_transaction c cstore in
- let new_t = Transaction.make ~internal:true tid cstore in
+ let replay_t = Transaction.make ~internal:true tid cstore in
let con = sprintf "r(%d):%s" id (Connection.get_domstr c) in
- let perform_exn (request, response) =
- write_access_log ~ty:request.Packet.ty ~tid ~con ~data:request.Packet.data;
+
+ let perform_exn ~wlog txn (request, response) =
+ if wlog then write_access_log ~ty:request.Packet.ty ~tid ~con ~data:request.Packet.data;
let fct = function_of_type_simple_op request.Packet.ty in
- let response' = input_handle_error ~cons ~doms ~fct ~con:c ~t:new_t ~req:request in
- write_response_log ~ty:request.Packet.ty ~tid ~con ~response:response';
- if not(Packet.response_equal response response') then raise Transaction_again in
+ let response' = input_handle_error ~cons ~doms ~fct ~con:c ~t:txn ~req:request in
+ if wlog then write_response_log ~ty:request.Packet.ty ~tid ~con ~response:response';
+ if not(Packet.response_equal response response') then raise Transaction_again
+ in
finally
(fun () ->
try
Logging.start_transaction ~con ~tid;
- List.iter perform_exn (Transaction.get_operations t);
- Logging.end_transaction ~con ~tid;
+ List.iter (perform_exn ~wlog:true replay_t) (Transaction.get_operations t); (* May throw EAGAIN *)
- Transaction.commit ~con new_t
- with e ->
+ Logging.end_transaction ~con ~tid;
+ Transaction.commit ~con replay_t
+ with
+ | Transaction_again -> (
+ let victim_domstr = Connection.get_domstr c in
+ debug "Apportioning blame for EAGAIN in txn %d, domain=%s" id victim_domstr;
+ let punish guilty_con =
+ debug "Blaming domain %s for conflict with domain %s txn %d"
+ (Connection.get_domstr guilty_con) victim_domstr id;
+ Connection.decr_conflict_credit doms guilty_con
+ in
+ let judge_and_sentence hist_rec = (
+ let can_apply_on store = (
+ let store = Store.copy store in
+ let trial_t = Transaction.make ~internal:true Transaction.none store in
+ try List.iter (perform_exn ~wlog:false trial_t) (Transaction.get_operations t);
+ true
+ with Transaction_again -> false
+ ) in
+ if can_apply_on hist_rec.History.before
+ && not (can_apply_on hist_rec.History.after)
+ then (punish hist_rec.History.con; true)
+ else false
+ ) in
+ let guilty_cons = History.filter_connections ~since:t.Transaction.start_count ~f:judge_and_sentence in
+ if Hashtbl.length guilty_cons = 0 then debug "Found no culprit for conflict in %s: must be self or not in history." con;
+ false
+ )
+ | e ->
info "transaction_replay %d caught: %s" tid (Printexc.to_string e);
false
)