(* NB We don't do incremental flushing *)
let flush_dirty dbconn =
- let db = get_database () in
+ let db = Db_ref.get_database (Db_backend.make ()) in
let g = Manifest.generation (Database.manifest db) in
if g > dbconn.Parse_db_conf.last_generation_count then begin
flush dbconn db;
Printf.printf "Database path: %s\n%!" db_filename;
let db = Parse_db_conf.make db_filename in
Db_conn_store.initialise_db_connections [ db ];
- Db_cache.set_master true;
-
- Db_cache_impl.make [ db ] (Schema.of_datamodel ());
- Db_cache_impl.sync [ db ] (Db_backend.get_database ());
+ let t = Db_backend.make () in
+ Db_cache_impl.make t [ db ] (Schema.of_datamodel ());
+ Db_cache_impl.sync [ db ] (Db_ref.get_database t);
Unixext.unlink_safe !listen_path;
let sockaddr = Unix.ADDR_UNIX !listen_path in
)
(* Verify the ref_index contents are correct for a given [tblname] and [key] (uuid/ref) *)
- let check_ref_index tblname key = match Ref_index.lookup key with
+ let check_ref_index t tblname key = match Ref_index.lookup key with
| None ->
(* We should fail to find the row *)
expect_missing_row tblname key
- (fun () -> let (_: string) = Client.read_field tblname "uuid" key in ());
+ (fun () -> let (_: string) = Client.read_field t tblname "uuid" key in ());
expect_missing_uuid tblname key
- (fun () -> let (_: string) = Client.db_get_by_uuid tblname key in ())
+ (fun () -> let (_: string) = Client.db_get_by_uuid t tblname key in ())
| Some { Ref_index.name_label = name_label; uuid = uuid; _ref = _ref } ->
(* key should be either uuid or _ref *)
if key <> uuid && (key <> _ref)
then failwith (Printf.sprintf "check_ref_index %s key %s: got ref %s uuid %s" tblname key _ref uuid);
- let real_ref = if Client.is_valid_ref key then key else Client.db_get_by_uuid tblname key in
+ let real_ref = if Client.is_valid_ref t key then key else Client.db_get_by_uuid t tblname key in
let real_name_label =
- try Some (Client.read_field tblname "name__label" real_ref)
+ try Some (Client.read_field t tblname "name__label" real_ref)
with _ -> None in
if name_label <> real_name_label
then failwith (Printf.sprintf "check_ref_index %s key %s: ref_index name_label = %s; db has %s" tblname key (Opt.default "None" name_label) (Opt.default "None" real_name_label))
let invalid_ref = "foo" in
let invalid_uuid = "bar" in
+ let t = if in_process then Db_backend.make () else Db_ref.Remote in
+
let vbd_ref = "waz" in
let vbd_uuid = "whatever" in
(* Before we begin, clear out any old state: *)
expect_missing_row "VM" valid_ref
(fun () ->
- Client.delete_row "VM" valid_ref;
+ Client.delete_row t "VM" valid_ref;
);
- if in_process then check_ref_index "VM" valid_ref;
+ if in_process then check_ref_index t "VM" valid_ref;
expect_missing_row "VBD" vbd_ref
(fun () ->
- Client.delete_row "VBD" vbd_ref;
+ Client.delete_row t "VBD" vbd_ref;
);
- if in_process then check_ref_index "VBD" vbd_ref;
+ if in_process then check_ref_index t "VBD" vbd_ref;
Printf.printf "Deleted stale state from previous test\n";
Printf.printf "get_table_from_ref <invalid ref>\n";
begin
- match Client.get_table_from_ref invalid_ref with
+ match Client.get_table_from_ref t invalid_ref with
| None -> Printf.printf "Reference '%s' has no associated table\n" invalid_ref
| Some t -> failwith (Printf.sprintf "Reference '%s' exists in table '%s'" invalid_ref t)
end;
Printf.printf "is_valid_ref <invalid_ref>\n";
- if Client.is_valid_ref invalid_ref then failwith "is_valid_ref <invalid_ref> = true";
+ if Client.is_valid_ref t invalid_ref then failwith "is_valid_ref <invalid_ref> = true";
Printf.printf "read_refs <valid tbl>\n";
- let existing_refs = Client.read_refs "VM" in
+ let existing_refs = Client.read_refs t "VM" in
Printf.printf "VM refs: [ %s ]\n" (String.concat "; " existing_refs);
Printf.printf "read_refs <invalid tbl>\n";
expect_missing_tbl "Vm"
(fun () ->
- let (_: string list) = Client.read_refs "Vm" in
+ let (_: string list) = Client.read_refs t "Vm" in
()
);
Printf.printf "delete_row <invalid ref>\n";
expect_missing_row "VM" invalid_ref
(fun () ->
- Client.delete_row "VM" invalid_ref;
+ Client.delete_row t "VM" invalid_ref;
failwith "delete_row of a non-existent row silently succeeded"
);
Printf.printf "create_row <unique ref> <unique uuid> <missing required field>\n";
expect_missing_field "name__label"
(fun () ->
let broken_vm = List.filter (fun (k, _) -> k <> "name__label") (make_vm valid_ref valid_uuid) in
- Client.create_row "VM" broken_vm valid_ref;
+ Client.create_row t "VM" broken_vm valid_ref;
failwith "create_row <unique ref> <unique uuid> <missing required field>"
);
Printf.printf "create_row <unique ref> <unique uuid>\n";
- Client.create_row "VM" (make_vm valid_ref valid_uuid) valid_ref;
- if in_process then check_ref_index "VM" valid_ref;
+ Client.create_row t "VM" (make_vm valid_ref valid_uuid) valid_ref;
+ if in_process then check_ref_index t "VM" valid_ref;
Printf.printf "is_valid_ref <valid ref>\n";
- if not (Client.is_valid_ref valid_ref)
+ if not (Client.is_valid_ref t valid_ref)
then failwith "is_valid_ref <valid_ref> = false, after create_row";
Printf.printf "get_table_from_ref <valid ref>\n";
- begin match Client.get_table_from_ref valid_ref with
+ begin match Client.get_table_from_ref t valid_ref with
| Some "VM" -> ()
| Some t -> failwith "get_table_from_ref <valid ref> : invalid table"
| None -> failwith "get_table_from_ref <valid ref> : None"
end;
Printf.printf "read_refs includes <valid ref>\n";
- if not (List.mem valid_ref (Client.read_refs "VM"))
+ if not (List.mem valid_ref (Client.read_refs t "VM"))
then failwith "read_refs did not include <valid ref>";
Printf.printf "create_row <duplicate ref> <unique uuid>\n";
expect_uniqueness_violation "VM" "_ref" valid_ref
(fun () ->
- Client.create_row "VM" (make_vm valid_ref (valid_uuid ^ "unique")) valid_ref;
+ Client.create_row t "VM" (make_vm valid_ref (valid_uuid ^ "unique")) valid_ref;
failwith "create_row <duplicate ref> <unique uuid>"
);
Printf.printf "create_row <unique ref> <duplicate uuid>\n";
expect_uniqueness_violation "VM" "uuid" valid_uuid
(fun () ->
- Client.create_row "VM" (make_vm (valid_ref ^ "unique") valid_uuid) (valid_ref ^ "unique");
+ Client.create_row t "VM" (make_vm (valid_ref ^ "unique") valid_uuid) (valid_ref ^ "unique");
failwith "create_row <unique ref> <duplicate uuid>"
);
Printf.printf "db_get_by_uuid <valid uuid>\n";
- let r = Client.db_get_by_uuid "VM" valid_uuid in
+ let r = Client.db_get_by_uuid t "VM" valid_uuid in
if r <> valid_ref
then failwith (Printf.sprintf "db_get_by_uuid <valid uuid>: got %s; expected %s" r valid_ref);
Printf.printf "db_get_by_uuid <invalid uuid>\n";
expect_missing_uuid "VM" invalid_uuid
(fun () ->
- let (_: string) = Client.db_get_by_uuid "VM" invalid_uuid in
+ let (_: string) = Client.db_get_by_uuid t "VM" invalid_uuid in
failwith "db_get_by_uuid <invalid uuid>"
);
Printf.printf "get_by_name_label <invalid name label>\n";
- if Client.db_get_by_name_label "VM" invalid_name <> []
+ if Client.db_get_by_name_label t "VM" invalid_name <> []
then failwith "db_get_by_name_label <invalid name label>";
Printf.printf "get_by_name_label <valid name label>\n";
- if Client.db_get_by_name_label "VM" name <> [ valid_ref ]
+ if Client.db_get_by_name_label t "VM" name <> [ valid_ref ]
then failwith "db_get_by_name_label <valid name label>";
Printf.printf "read_field <valid field> <valid objref>\n";
- if Client.read_field "VM" "name__label" valid_ref <> name
+ if Client.read_field t "VM" "name__label" valid_ref <> name
then failwith "read_field <valid field> <valid objref> : invalid name";
Printf.printf "read_field <valid defaulted field> <valid objref>\n";
- if Client.read_field "VM" "protection_policy" valid_ref <> "OpaqueRef:NULL"
+ if Client.read_field t "VM" "protection_policy" valid_ref <> "OpaqueRef:NULL"
then failwith "read_field <valid defaulted field> <valid objref> : invalid protection_policy";
Printf.printf "read_field <valid field> <invalid objref>\n";
expect_missing_row "VM" invalid_ref
(fun () ->
- let (_: string) = Client.read_field "VM" "name__label" invalid_ref in
+ let (_: string) = Client.read_field t "VM" "name__label" invalid_ref in
failwith "read_field <valid field> <invalid objref>"
);
Printf.printf "read_field <invalid field> <valid objref>\n";
expect_missing_field "name_label"
(fun () ->
- let (_: string) = Client.read_field "VM" "name_label" valid_ref in
+ let (_: string) = Client.read_field t "VM" "name_label" valid_ref in
failwith "read_field <invalid field> <valid objref>"
);
Printf.printf "read_field <invalid field> <invalid objref>\n";
expect_missing_row "VM" invalid_ref
(fun () ->
- let (_: string) = Client.read_field "VM" "name_label" invalid_ref in
+ let (_: string) = Client.read_field t "VM" "name_label" invalid_ref in
failwith "read_field <invalid field> <invalid objref>"
);
Printf.printf "read_field_where <valid table> <valid return> <valid field> <valid value>\n";
let where_name_label =
{ Db_cache_types.table = "VM"; return = Escaping.escape_id(["name"; "label"]); where_field="uuid"; where_value = valid_uuid } in
- let xs = Client.read_field_where where_name_label in
+ let xs = Client.read_field_where t where_name_label in
if not (List.mem name xs)
then failwith "read_field_where <valid table> <valid return> <valid field> <valid value>";
- test_invalid_where_record "read_field_where" Client.read_field_where;
+ test_invalid_where_record "read_field_where" (Client.read_field_where t);
- let xs = Client.read_set_ref where_name_label in
+ let xs = Client.read_set_ref t where_name_label in
if not (List.mem name xs)
then failwith "read_set_ref <valid table> <valid return> <valid field> <valid value>";
- test_invalid_where_record "read_set_ref" Client.read_set_ref;
+ test_invalid_where_record "read_set_ref" (Client.read_set_ref t);
Printf.printf "write_field <invalid table>\n";
expect_missing_tbl "Vm"
(fun () ->
- let (_: unit) = Client.write_field "Vm" "" "" "" in
+ let (_: unit) = Client.write_field t "Vm" "" "" "" in
failwith "write_field <invalid table>"
);
Printf.printf "write_field <valid table> <invalid ref>\n";
expect_missing_row "VM" invalid_ref
(fun () ->
- let (_: unit) = Client.write_field "VM" invalid_ref "" "" in
+ let (_: unit) = Client.write_field t "VM" invalid_ref "" "" in
failwith "write_field <valid table> <invalid ref>"
);
Printf.printf "write_field <valid table> <valid ref> <invalid field>\n";
expect_missing_field "wibble"
(fun () ->
- let (_: unit) = Client.write_field "VM" valid_ref "wibble" "" in
+ let (_: unit) = Client.write_field t "VM" valid_ref "wibble" "" in
failwith "write_field <valid table> <valid ref> <invalid field>"
);
Printf.printf "write_field <valid table> <valid ref> <valid field>\n";
- let (_: unit) = Client.write_field "VM" valid_ref (Escaping.escape_id ["name"; "description"]) "description" in
- if in_process then check_ref_index "VM" valid_ref;
+ let (_: unit) = Client.write_field t "VM" valid_ref (Escaping.escape_id ["name"; "description"]) "description" in
+ if in_process then check_ref_index t "VM" valid_ref;
Printf.printf "write_field <valid table> <valid ref> <valid field> - invalidating ref_index\n";
- let (_: unit) = Client.write_field "VM" valid_ref (Escaping.escape_id ["name"; "label"]) "newlabel" in
- if in_process then check_ref_index "VM" valid_ref;
+ let (_: unit) = Client.write_field t "VM" valid_ref (Escaping.escape_id ["name"; "label"]) "newlabel" in
+ if in_process then check_ref_index t "VM" valid_ref;
Printf.printf "read_record <invalid table> <invalid ref>\n";
expect_missing_tbl "Vm"
(fun () ->
- let _ = Client.read_record "Vm" invalid_ref in
+ let _ = Client.read_record t "Vm" invalid_ref in
failwith "read_record <invalid table> <invalid ref>"
);
Printf.printf "read_record <valid table> <valid ref>\n";
expect_missing_row "VM" invalid_ref
(fun () ->
- let _ = Client.read_record "VM" invalid_ref in
+ let _ = Client.read_record t "VM" invalid_ref in
failwith "read_record <valid table> <invalid ref>"
);
Printf.printf "read_record <valid table> <valid ref>\n";
- let fv_list, fvs_list = Client.read_record "VM" valid_ref in
+ let fv_list, fvs_list = Client.read_record t "VM" valid_ref in
if not(List.mem_assoc (Escaping.escape_id [ "name"; "label" ]) fv_list)
then failwith "read_record <valid table> <valid ref> 1";
if List.assoc "VBDs" fvs_list <> []
then failwith "read_record <valid table> <valid ref> 2";
Printf.printf "read_record <valid table> <valid ref> foreign key\n";
- Client.create_row "VBD" (make_vbd valid_ref vbd_ref vbd_uuid) vbd_ref;
- let fv_list, fvs_list = Client.read_record "VM" valid_ref in
+ Client.create_row t "VBD" (make_vbd valid_ref vbd_ref vbd_uuid) vbd_ref;
+ let fv_list, fvs_list = Client.read_record t "VM" valid_ref in
if List.assoc "VBDs" fvs_list <> [ vbd_ref ] then begin
Printf.printf "fv_list = [ %s ] fvs_list = [ %s ]\n%!" (String.concat "; " (List.map (fun (k, v) -> k ^":" ^ v) fv_list)) (String.concat "; " (List.map (fun (k, v) -> k ^ ":" ^ (String.concat ", " v)) fvs_list));
failwith "read_record <valid table> <valid ref> 3"
end;
Printf.printf "read_record <valid table> <valid ref> deleted foreign key\n";
- Client.delete_row "VBD" vbd_ref;
- let fv_list, fvs_list = Client.read_record "VM" valid_ref in
+ Client.delete_row t "VBD" vbd_ref;
+ let fv_list, fvs_list = Client.read_record t "VM" valid_ref in
if List.assoc "VBDs" fvs_list <> []
then failwith "read_record <valid table> <valid ref> 4";
Printf.printf "read_record <valid table> <valid ref> overwritten foreign key\n";
- Client.create_row "VBD" (make_vbd valid_ref vbd_ref vbd_uuid) vbd_ref;
- let fv_list, fvs_list = Client.read_record "VM" valid_ref in
+ Client.create_row t "VBD" (make_vbd valid_ref vbd_ref vbd_uuid) vbd_ref;
+ let fv_list, fvs_list = Client.read_record t "VM" valid_ref in
if List.assoc "VBDs" fvs_list = []
then failwith "read_record <valid table> <valid ref> 5";
- Client.write_field "VBD" vbd_ref (Escaping.escape_id [ "VM" ]) "overwritten";
- let fv_list, fvs_list = Client.read_record "VM" valid_ref in
+ Client.write_field t "VBD" vbd_ref (Escaping.escape_id [ "VM" ]) "overwritten";
+ let fv_list, fvs_list = Client.read_record t "VM" valid_ref in
if List.assoc "VBDs" fvs_list <> []
then failwith "read_record <valid table> <valid ref> 6";
expect_missing_tbl "Vm"
(fun () ->
- let _ = Client.read_records_where "Vm" Db_filter_types.True in
+ let _ = Client.read_records_where t "Vm" Db_filter_types.True in
()
);
- let xs = Client.read_records_where "VM" Db_filter_types.True in
+ let xs = Client.read_records_where t "VM" Db_filter_types.True in
if List.length xs <> 1
then failwith "read_records_where <valid table> 2";
- let xs = Client.read_records_where "VM" Db_filter_types.False in
+ let xs = Client.read_records_where t "VM" Db_filter_types.False in
if xs <> []
then failwith "read_records_where <valid table> 3";
expect_missing_tbl "Vm"
(fun () ->
- let xs = Client.find_refs_with_filter "Vm" Db_filter_types.True in
+ let xs = Client.find_refs_with_filter t "Vm" Db_filter_types.True in
failwith "find_refs_with_filter <invalid table>";
);
- let xs = Client.find_refs_with_filter "VM" Db_filter_types.True in
+ let xs = Client.find_refs_with_filter t "VM" Db_filter_types.True in
if List.length xs <> 1
then failwith "find_refs_with_filter <valid table> 1";
- let xs = Client.find_refs_with_filter "VM" Db_filter_types.False in
+ let xs = Client.find_refs_with_filter t "VM" Db_filter_types.False in
if xs <> []
then failwith "find_refs_with_filter <valid table> 2";
expect_missing_tbl "Vm"
(fun () ->
- Client.process_structured_field ("","") "Vm" "wibble" invalid_ref Db_cache_types.AddSet;
+ Client.process_structured_field t ("","") "Vm" "wibble" invalid_ref Db_cache_types.AddSet;
failwith "process_structure_field <invalid table> <invalid fld> <invalid ref>"
);
expect_missing_field "wibble"
(fun () ->
- Client.process_structured_field ("","") "VM" "wibble" valid_ref Db_cache_types.AddSet;
+ Client.process_structured_field t ("","") "VM" "wibble" valid_ref Db_cache_types.AddSet;
failwith "process_structure_field <valid table> <invalid fld> <valid ref>"
);
expect_missing_row "VM" invalid_ref
(fun () ->
- Client.process_structured_field ("","") "VM" (Escaping.escape_id ["name"; "label"]) invalid_ref Db_cache_types.AddSet;
+ Client.process_structured_field t ("","") "VM" (Escaping.escape_id ["name"; "label"]) invalid_ref Db_cache_types.AddSet;
failwith "process_structure_field <valid table> <valid fld> <invalid ref>"
);
- Client.process_structured_field ("foo", "") "VM" "tags" valid_ref Db_cache_types.AddSet;
- if Client.read_field "VM" "tags" valid_ref <> "('foo')"
+ Client.process_structured_field t ("foo", "") "VM" "tags" valid_ref Db_cache_types.AddSet;
+ if Client.read_field t "VM" "tags" valid_ref <> "('foo')"
then failwith "process_structure_field expected ('foo')";
- Client.process_structured_field ("foo", "") "VM" "tags" valid_ref Db_cache_types.AddSet;
- if Client.read_field "VM" "tags" valid_ref <> "('foo')"
+ Client.process_structured_field t ("foo", "") "VM" "tags" valid_ref Db_cache_types.AddSet;
+ if Client.read_field t "VM" "tags" valid_ref <> "('foo')"
then failwith "process_structure_field expected ('foo') 2";
- Client.process_structured_field ("foo", "bar") "VM" "other_config" valid_ref Db_cache_types.AddMap;
+ Client.process_structured_field t ("foo", "bar") "VM" "other_config" valid_ref Db_cache_types.AddMap;
- if Client.read_field "VM" "other_config" valid_ref <> "(('foo' 'bar'))"
+ if Client.read_field t "VM" "other_config" valid_ref <> "(('foo' 'bar'))"
then failwith "process_structure_field expected (('foo' 'bar')) 3";
begin
try
- Client.process_structured_field ("foo", "bar") "VM" "other_config" valid_ref Db_cache_types.AddMap;
+ Client.process_structured_field t ("foo", "bar") "VM" "other_config" valid_ref Db_cache_types.AddMap;
with Db_exn.Duplicate_key("VM", "other_config", r', "foo") when r' = valid_ref -> ()
end;
- if Client.read_field "VM" "other_config" valid_ref <> "(('foo' 'bar'))"
+ if Client.read_field t "VM" "other_config" valid_ref <> "(('foo' 'bar'))"
then failwith "process_structure_field expected (('foo' 'bar')) 4";
(* Check that non-persistent fields are filled with an empty value *)
let n = 5000 in
let rpc_time = time n (fun _ ->
- let (_: bool) = Client.is_valid_ref valid_ref in ()) in
+ let (_: bool) = Client.is_valid_ref t valid_ref in ()) in
Printf.printf "%.2f primitive RPC calls/sec\n" rpc_time;
(fun i ->
let rf = Printf.sprintf "%s:%d" vbd_ref i in
try
- Client.delete_row "VBD" rf
+ Client.delete_row t "VBD" rf
with _ -> ()
) in
Printf.printf "Deleted %d VBD records, %.2f calls/sec\n%!" n delete_time;
expect_missing_row "VBD" vbd_ref
(fun () ->
- Client.delete_row "VBD" vbd_ref;
+ Client.delete_row t "VBD" vbd_ref;
);
(* Create lots of VBDs referening no VM *)
(fun i ->
let rf = Printf.sprintf "%s:%d" vbd_ref i in
let uuid = Printf.sprintf "%s:%d" vbd_uuid i in
- Client.create_row "VBD" (make_vbd invalid_ref rf uuid) rf;
+ Client.create_row t "VBD" (make_vbd invalid_ref rf uuid) rf;
) in
Printf.printf "Created %d VBD records, %.2f calls/sec\n%!" n create_time;
(fun i ->
if i < (m / 3 * 2) then begin
if i mod 2 = 0
- then Client.create_row "VBD" (make_vbd valid_ref vbd_ref vbd_uuid) vbd_ref
- else Client.delete_row "VBD" vbd_ref
+ then Client.create_row t "VBD" (make_vbd valid_ref vbd_ref vbd_uuid) vbd_ref
+ else Client.delete_row t "VBD" vbd_ref
end else
- let fv_list, fvs_list = Client.read_record "VM" valid_ref in
+ let fv_list, fvs_list = Client.read_record t "VM" valid_ref in
()
) in
Printf.printf "good sequence: %.2f calls/sec\n%!" benign_time;
let malign_time = time m
(fun i ->
match i mod 3 with
- | 0 -> Client.create_row "VBD" (make_vbd valid_ref vbd_ref vbd_uuid) vbd_ref
- | 1 -> Client.delete_row "VBD" vbd_ref
- | 2 -> let fv_list, fvs_list = Client.read_record "VM" valid_ref in
+ | 0 -> Client.create_row t "VBD" (make_vbd valid_ref vbd_ref vbd_uuid) vbd_ref
+ | 1 -> Client.delete_row t "VBD" vbd_ref
+ | 2 -> let fv_list, fvs_list = Client.read_record t "VM" valid_ref in
()
) in
Printf.printf "bad sequence: %.2f calls/sec\n%!" malign_time;
let db_FLUSH_TIMER=2.0 (* flush db write buffer every db_FLUSH_TIMER seconds *)
let display_sql_writelog_val = ref true (* compute/write sql-writelog debug string *)
-(* The cache itself: *)
-let database : Db_cache_types.Database.t ref = ref (Db_cache_types.Database.make (Schema.of_datamodel ()))
-
(* --------------------- Util functions on db datastructures *)
-let update_database f =
- database := f (!database)
+let master_database = ref (Db_cache_types.Database.make Schema.empty)
-let get_database () = !database
+let make () = Db_ref.in_memory (ref master_database)
(* !!! Right now this is called at cache population time. It would probably be preferable to call it on flush time instead, so we
let rpc request = Master_connection.execute_remote_fn request Constants.remote_db_access_uri
end)
-exception Must_initialise_database_mode
-let implementation = ref None
-let set_master = function
- | false ->
- implementation := Some (module Remote_db : DB_ACCESS)
- | true ->
- implementation := Some (module Local_db : DB_ACCESS)
-let get () = match !implementation with
- | None -> raise Must_initialise_database_mode
- | Some m -> m
+let get = function
+ | Db_ref.In_memory _ -> (module Local_db : DB_ACCESS)
+ | Db_ref.Remote -> (module Remote_db : DB_ACCESS)
let apply_delta_to_cache entry =
- let module DB = (val (get ()) : DB_ACCESS) in
+ let module DB = (Local_db : DB_ACCESS) in
+ let t = Db_backend.make () in
let context = Context.make "redo_log" in
match entry with
| Redo_log.CreateRow(tblname, objref, kvs) ->
debug "Redoing create_row %s (%s)" tblname objref;
- DB.create_row tblname kvs objref
+ DB.create_row t tblname kvs objref
| Redo_log.DeleteRow(tblname, objref) ->
debug "Redoing delete_row %s (%s)" tblname objref;
- DB.delete_row tblname objref
+ DB.delete_row t tblname objref
| Redo_log.WriteField(tblname, objref, fldname, newval) ->
debug "Redoing write_field %s (%s) [%s -> %s]" tblname objref fldname newval;
- DB.write_field tblname objref fldname newval
+ DB.write_field t tblname objref fldname newval
(** An in-memory cache, used by pool master *)
+(* Locking strategy:
+ 1. functions which read/modify/write must acquire the db lock. Such
+ functions have the suffix "_locked" to clearly identify them.
+ 2. functions which only read must only call "get_database" once,
+ to ensure they see a consistent snapshot.
+*)
open Db_exn
open Db_lock
open Pervasiveext
module W = Debug.Debugger(struct let name = "db_write" end)
open Db_cache_types
-open Db_backend
+open Db_ref
+
+(* Only needed by the DB_ACCESS signature *)
+let initialise () = ()
(* This fn is part of external interface, so need to take lock *)
-let get_table_from_ref objref =
- with_lock
- (fun () ->
- let db = get_database () in
- try
- Some (Database.table_of_ref objref db)
- with Not_found ->
- None)
+let get_table_from_ref t objref =
+ try
+ Some (Database.table_of_ref objref (get_database t))
+ with Not_found ->
+ None
-let is_valid_ref objref =
- match (get_table_from_ref objref) with
+let is_valid_ref t objref =
+ match (get_table_from_ref t objref) with
| Some _ -> true
| None -> false
+let read_field_internal t tblname fldname objref db =
+ Row.find fldname (Table.find_exn tblname objref (TableSet.find tblname (Database.tableset db)))
+
(* Read field from cache *)
-let read_field tblname fldname objref =
- with_lock
- (fun () ->
- let db = get_database () in
- Row.find fldname (Table.find_exn tblname objref (TableSet.find tblname (Database.tableset db)))
- )
+let read_field t tblname fldname objref =
+ read_field_internal t tblname fldname objref (get_database t)
+
+
(** Finds the longest XML-compatible UTF-8 prefix of the given *)
(* Write field in cache *)
-let write_field tblname objref fldname newval =
- with_lock
- (fun () ->
- let db = get_database () in
-
- let row = Table.find_exn tblname objref (TableSet.find tblname (Database.tableset db)) in
- let current_val = Row.find fldname row in
-
- let newval = ensure_utf8_xml newval in
-
- if current_val<>newval then
- begin
- W.debug "write_field %s,%s: %s |-> %s" tblname objref fldname newval;
-
- (* Update the field in the cache whether it's persistent or not *)
- update_database (set_field_in_row tblname objref fldname newval);
-
- Database.notify (WriteField(tblname, objref, fldname, current_val, newval)) db;
-
- (* then only persist the change if the schema says so *)
- if Schema.is_field_persistent (Database.schema db) tblname fldname
- then update_database Database.increment;
- end)
+let write_field_locked t tblname objref fldname newval =
+ let row = Table.find_exn tblname objref (TableSet.find tblname (Database.tableset (get_database t))) in
+ let current_val = Row.find fldname row in
+
+ let newval = ensure_utf8_xml newval in
+
+ if current_val<>newval then begin
+ W.debug "write_field %s,%s: %s |-> %s" tblname objref fldname newval;
+
+ (* Update the field in the cache whether it's persistent or not *)
+ update_database t (set_field_in_row tblname objref fldname newval);
+ Database.notify (WriteField(tblname, objref, fldname, current_val, newval)) (get_database t);
+
+ (* then only persist the change if the schema says so *)
+ if Schema.is_field_persistent (Database.schema (get_database t)) tblname fldname
+ then update_database t Database.increment
+ end
+
+let write_field t tblname objref fldname newval =
+ with_lock (fun () ->
+ write_field_locked t tblname objref fldname newval)
+
(* This function *should* only be used by db_actions code looking up Set(Ref _) fields:
if we detect another (illegal) use we log the problem and fall back to a slow scan *)
-let read_set_ref rcd =
- let db = get_database () in
-
+let read_set_ref t rcd =
+ let db = get_database t in
(* The where_record should correspond to the 'one' end of a 'one to many' *)
let one_tbl = rcd.table in
let one_fld = rcd.where_field in
let _, many_tbl, many_fld = List.find (fun (a, _, _) -> a = one_fld) rels in
let objref = rcd.where_value in
- let str = read_field many_tbl many_fld objref in
+ let str = read_field_internal t many_tbl many_fld objref db in
String_unmarshall_helper.set (fun x -> x) str
end else begin
error "Illegal read_set_ref query { table = %s; where_field = %s; where_value = %s; return = %s }; falling back to linear scan" rcd.table rcd.where_field rcd.where_value rcd.return;
Printf.printf "Illegal read_set_ref query { table = %s; where_field = %s; where_value = %s; return = %s }; falling back to linear scan\n%!" rcd.table rcd.where_field rcd.where_value rcd.return;
- with_lock
- (fun () ->
- let db = get_database () in
- let tbl = TableSet.find rcd.table (Database.tableset db) in
- Table.fold
- (fun rf row acc ->
- if Row.find rcd.where_field row = rcd.where_value
- then Row.find rcd.return row :: acc else acc)
- tbl []
- )
+ let tbl = TableSet.find rcd.table (Database.tableset db) in
+ Table.fold
+ (fun rf row acc ->
+ if Row.find rcd.where_field row = rcd.where_value
+ then Row.find rcd.return row :: acc else acc)
+ tbl []
end
and iterates through set-refs [returning (fieldname, ref list) list; where fieldname is the
name of the Set Ref field in tbl; and ref list is the list of foreign keys from related
table with remote-fieldname=objref] *)
-let read_record tblname objref =
- with_lock
- (fun ()->
- let db = get_database () in
- let tbl = TableSet.find tblname (Database.tableset db) in
- let row = Table.find_exn tblname objref tbl in
- let fvlist = Row.fold (fun k d env -> (k,d)::env) row [] in
- (* Unfortunately the interface distinguishes between Set(Ref _) types and
- ordinary fields *)
- let schema = Schema.table tblname (Database.schema db) in
- let set_ref = List.filter (fun (k, _) ->
- try
- let column = Schema.Table.find k schema in
- column.Schema.Column.issetref
- with Not_found as e ->
- Printf.printf "Failed to find table %s in schema\n%!" k;
- raise e
- ) fvlist in
- (* the set_ref fields must be converted back into lists *)
- let set_ref = List.map (fun (k, v) ->
- k, String_unmarshall_helper.set (fun x -> x) v) set_ref in
- (fvlist, set_ref))
+let read_record t tblname objref =
+ let db = get_database t in
+ let tbl = TableSet.find tblname (Database.tableset db) in
+ let row = Table.find_exn tblname objref tbl in
+ let fvlist = Row.fold (fun k d env -> (k,d)::env) row [] in
+ (* Unfortunately the interface distinguishes between Set(Ref _) types and
+ ordinary fields *)
+ let schema = Schema.table tblname (Database.schema db) in
+ let set_ref = List.filter (fun (k, _) ->
+ try
+ let column = Schema.Table.find k schema in
+ column.Schema.Column.issetref
+ with Not_found as e ->
+ Printf.printf "Failed to find table %s in schema\n%!" k;
+ raise e
+ ) fvlist in
+ (* the set_ref fields must be converted back into lists *)
+ let set_ref = List.map (fun (k, v) ->
+ k, String_unmarshall_helper.set (fun x -> x) v) set_ref in
+ (fvlist, set_ref)
(* Delete row from tbl *)
-let delete_row tblname objref =
- with_lock
- (fun () ->
- W.debug "delete_row %s (%s)" tblname objref;
-
- let db = get_database () in
- let tbl = TableSet.find tblname (Database.tableset db) in
- let row = Table.find_exn tblname objref tbl in
-
- Database.notify (PreDelete(tblname, objref)) db;
- update_database (remove_row_from_table tblname objref);
- Database.notify (Delete(tblname, objref, Row.fold (fun k v acc -> (k, v) :: acc) row [])) db;
- if Schema.is_table_persistent (Database.schema db) tblname
- then update_database Database.increment;
- )
+let delete_row_locked t tblname objref =
+ W.debug "delete_row %s (%s)" tblname objref;
+
+ let tbl = TableSet.find tblname (Database.tableset (get_database t)) in
+ let row = Table.find_exn tblname objref tbl in
+
+ let db = get_database t in
+ Database.notify (PreDelete(tblname, objref)) db;
+ update_database t (remove_row_from_table tblname objref);
+ Database.notify (Delete(tblname, objref, Row.fold (fun k v acc -> (k, v) :: acc) row [])) db;
+ if Schema.is_table_persistent (Database.schema db) tblname
+ then update_database t Database.increment
+let delete_row t tblname objref =
+ with_lock (fun () -> delete_row_locked t tblname objref)
+
(* Create new row in tbl containing specified k-v pairs *)
-let create_row tblname kvs' new_objref =
+let create_row_locked t tblname kvs' new_objref =
(* Ensure values are valid for UTF-8-encoded XML. *)
let kvs' = List.map (fun (key, value) -> (key, ensure_utf8_xml value)) kvs' in
let kvs' = (Db_names.ref, new_objref) :: kvs' in
let row = List.fold_left (fun row (k, v) -> Row.add k v row) Row.empty kvs' in
- let schema = Schema.table tblname (Database.schema (get_database ())) in
+ let schema = Schema.table tblname (Database.schema (get_database t)) in
(* fill in default values if kv pairs for these are not supplied already *)
let row = Row.add_defaults schema row in
- with_lock
- (fun () ->
- W.debug "create_row %s (%s) [%s]" tblname new_objref (String.concat "," (List.map (fun (k,v)->"("^k^","^"v"^")") kvs'));
- let db = get_database () in
- let tbl = TableSet.find tblname (Database.tableset db) in
- update_database (set_row_in_table tblname new_objref row);
-
- Database.notify (Create(tblname, new_objref, Row.fold (fun k v acc -> (k, v) :: acc) row [])) db;
-
- if Schema.is_table_persistent (Database.schema db) tblname
- then update_database Database.increment;
- )
+ W.debug "create_row %s (%s) [%s]" tblname new_objref (String.concat "," (List.map (fun (k,v)->"("^k^","^"v"^")") kvs'));
+ let tbl = TableSet.find tblname (Database.tableset (get_database t)) in
+ update_database t (set_row_in_table tblname new_objref row);
+
+ Database.notify (Create(tblname, new_objref, Row.fold (fun k v acc -> (k, v) :: acc) row [])) (get_database t);
+ if Schema.is_table_persistent (Database.schema (get_database t)) tblname
+ then update_database t Database.increment
+let create_row t tblname kvs' new_objref =
+ with_lock (fun () -> create_row_locked t tblname kvs' new_objref)
+
(* Do linear scan to find field values which match where clause *)
-let read_field_where rcd =
- with_lock
- (fun () ->
- let db = get_database () in
- let tbl = TableSet.find rcd.table (Database.tableset db) in
- Table.fold
- (fun r row acc ->
- let field = Row.find rcd.where_field row in
- if field = rcd.where_value then Row.find rcd.return row :: acc else acc
- ) tbl []
- )
+let read_field_where t rcd =
+ let db = get_database t in
+ let tbl = TableSet.find rcd.table (Database.tableset db) in
+ Table.fold
+ (fun r row acc ->
+ let field = Row.find rcd.where_field row in
+ if field = rcd.where_value then Row.find rcd.return row :: acc else acc
+ ) tbl []
-let db_get_by_uuid tbl uuid_val =
- match (read_field_where
+let db_get_by_uuid t tbl uuid_val =
+ match (read_field_where t
{table=tbl; return=Db_names.ref;
where_field=Db_names.uuid; where_value=uuid_val}) with
| [] -> raise (Read_missing_uuid (tbl, "", uuid_val))
| _ -> raise (Too_many_values (tbl, "", uuid_val))
(** Return reference fields from tbl that matches specified name_label field *)
-let db_get_by_name_label tbl label =
- read_field_where
+let db_get_by_name_label t tbl label =
+ read_field_where t
{table=tbl; return=Db_names.ref;
where_field=(Escaping.escape_id ["name"; "label"]);
where_value=label}
(* Read references from tbl *)
-let read_refs tblname =
- with_lock
- (fun () ->
- let db = get_database () in
- let tbl = TableSet.find tblname (Database.tableset db) in
- Table.fold (fun r _ acc -> r :: acc) tbl [])
+let read_refs t tblname =
+ let tbl = TableSet.find tblname (Database.tableset (get_database t)) in
+ Table.fold (fun r _ acc -> r :: acc) tbl []
(* Return a list of all the refs for which the expression returns true. *)
-let find_refs_with_filter (tblname: string) (expr: Db_filter_types.expr) =
- with_lock
- (fun ()->
- let db = get_database () in
- let tbl = TableSet.find tblname (Database.tableset db) in
- let eval_val row = function
- | Db_filter_types.Literal x -> x
- | Db_filter_types.Field x -> Row.find x row in
- Table.fold
- (fun r row acc ->
- if Db_filter.eval_expr (eval_val row) expr
- then Row.find Db_names.ref row :: acc else acc
- ) tbl []
- )
+let find_refs_with_filter t (tblname: string) (expr: Db_filter_types.expr) =
+ let db = get_database t in
+ let tbl = TableSet.find tblname (Database.tableset db) in
+ let eval_val row = function
+ | Db_filter_types.Literal x -> x
+ | Db_filter_types.Field x -> Row.find x row in
+ Table.fold
+ (fun r row acc ->
+ if Db_filter.eval_expr (eval_val row) expr
+ then Row.find Db_names.ref row :: acc else acc
+ ) tbl []
-let read_records_where tbl expr =
- with_lock
- (fun ()->
- let reqd_refs = find_refs_with_filter tbl expr in
- List.map (fun ref->ref, read_record tbl ref) reqd_refs
- )
+let read_records_where t tbl expr =
+ let reqd_refs = find_refs_with_filter t tbl expr in
+ List.map (fun ref->ref, read_record t tbl ref) reqd_refs
-let process_structured_field (key,value) tblname fld objref proc_fn_selector =
+let process_structured_field_locked t (key,value) tblname fld objref proc_fn_selector =
(* Ensure that both keys and values are valid for UTF-8-encoded XML. *)
let key = ensure_utf8_xml key in
let value = ensure_utf8_xml value in
- with_lock
- (fun () ->
- let db = get_database () in
- let tbl = TableSet.find tblname (Database.tableset db) in
- let row = Table.find_exn tblname objref tbl in
- let existing_str = Row.find fld row in
- let new_str = match proc_fn_selector with
- | AddSet -> add_to_set key existing_str
- | RemoveSet -> remove_from_set key existing_str
- | AddMap ->
- begin
- try
- add_to_map key value existing_str
- with Duplicate ->
- error "Duplicate key in set or map: table %s; field %s; ref %s; key %s" tblname fld objref key;
- raise (Duplicate_key (tblname,fld,objref,key));
- end
- | RemoveMap -> remove_from_map key existing_str in
- write_field tblname objref fld new_str)
-
+ let tbl = TableSet.find tblname (Database.tableset (get_database t)) in
+ let row = Table.find_exn tblname objref tbl in
+ let existing_str = Row.find fld row in
+ let new_str = match proc_fn_selector with
+ | AddSet -> add_to_set key existing_str
+ | RemoveSet -> remove_from_set key existing_str
+ | AddMap ->
+ begin
+ try
+ add_to_map key value existing_str
+ with Duplicate ->
+ error "Duplicate key in set or map: table %s; field %s; ref %s; key %s" tblname fld objref key;
+ raise (Duplicate_key (tblname,fld,objref,key));
+ end
+ | RemoveMap -> remove_from_map key existing_str in
+ write_field t tblname objref fld new_str
+
+let process_structured_field t (key,value) tblname fld objref proc_fn_selector =
+ with_lock (fun () ->
+ process_structured_field_locked t (key,value) tblname fld objref proc_fn_selector)
+
(* -------------------------------------------------------------------- *)
let load connections default_schema =
(* Called by server at start-of-day to initialiase cache. Populates cache and starts flushing threads *)
-let make connections default_schema =
+let make t connections default_schema =
let db = load connections default_schema in
let db = Database.reindex db in
- update_database (fun _ -> db);
+ update_database t (fun _ -> db);
spawn_db_flush_threads()
(** Return an association list of table name * record count *)
-let stats () =
- with_lock
- (fun () ->
- TableSet.fold (fun name tbl acc ->
- let size = Table.fold (fun _ _ acc -> acc + 1) tbl 0 in
- (name, size) :: acc)
- (Database.tableset (Db_backend.get_database ()))
- []
- )
+let stats t =
+ TableSet.fold (fun name tbl acc ->
+ let size = Table.fold (fun _ _ acc -> acc + 1) tbl 0 in
+ (name, size) :: acc)
+ (Database.tableset (get_database t))
+ []
+
-(* Only needed by the DB_ACCESS signature *)
-let initialise () = ()
include Db_interface.DB_ACCESS
-(** [make connections default_schema] initialises the in-memory cache *)
-val make : Parse_db_conf.db_connection list -> Schema.t -> unit
+(** [make t connections default_schema] initialises the in-memory cache *)
+val make : Db_ref.t -> Parse_db_conf.db_connection list -> Schema.t -> unit
(** [flush_and_exit db code] flushes the specific backend [db] and exits
xapi with [code] *)
(** [sync db] forcibly flushes the database to disk *)
val sync : Parse_db_conf.db_connection list -> Db_cache_types.Database.t -> unit
-(** [stats ()] returns some stats data for logging *)
-val stats : unit -> (string * int) list
+(** [stats t] returns some stats data for logging *)
+val stats : Db_ref.t -> (string * int) list
(** [get_table_from_ref ref] returns [Some tbl] if [ref] is a
valid reference; None otherwise *)
- val get_table_from_ref : string -> string option
+ val get_table_from_ref : Db_ref.t -> string -> string option
(** [is_valid_ref ref] returns true if [ref] is valid; false otherwise *)
- val is_valid_ref : string -> bool
+ val is_valid_ref : Db_ref.t -> string -> bool
(** [read_refs tbl] returns a list of all references in table [tbl] *)
- val read_refs : string -> string list
+ val read_refs : Db_ref.t -> string -> string list
(** [find_refs_with_filter tbl expr] returns a list of all references
to rows which match [expr] *)
val find_refs_with_filter :
- string -> Db_filter_types.expr -> string list
+ Db_ref.t -> string -> Db_filter_types.expr -> string list
(** [read_field_where {tbl,return,where_field,where_value}] returns a
list of the [return] fields in table [tbl] where the [where_field]
equals [where_value] *)
- val read_field_where : Db_cache_types.where_record -> string list
+ val read_field_where : Db_ref.t -> Db_cache_types.where_record -> string list
(** [db_get_by_uuid tbl uuid] returns the single object reference
associated with [uuid] *)
- val db_get_by_uuid : string -> string -> string
+ val db_get_by_uuid : Db_ref.t -> string -> string -> string
(** [db_get_by_name_label tbl label] returns the list of object references
associated with [label] *)
- val db_get_by_name_label : string -> string -> string list
+ val db_get_by_name_label : Db_ref.t -> string -> string -> string list
(** [read_set_ref {tbl,return,where_field,where_value}] is identical
to [read_field_where ...]. *)
- val read_set_ref : Db_cache_types.where_record -> string list
+ val read_set_ref : Db_ref.t -> Db_cache_types.where_record -> string list
(** [create_row tbl kvpairs ref] create a new row in [tbl] with
key [ref] and contents [kvpairs] *)
val create_row :
- string -> (string * string) list -> string -> unit
+ Db_ref.t -> string -> (string * string) list -> string -> unit
(** [delete_row context tbl ref] deletes row [ref] from table [tbl] *)
- val delete_row : string -> string -> unit
+ val delete_row : Db_ref.t -> string -> string -> unit
(** [write_field context tbl ref fld val] changes field [fld] to [val] in
row [ref] in table [tbl] *)
- val write_field : string -> string -> string -> string -> unit
-
+ val write_field : Db_ref.t -> string -> string -> string -> string -> unit
+
(** [read_field context tbl ref fld] returns the value of field [fld]
in row [ref] in table [tbl] *)
- val read_field : string -> string -> string -> string
+ val read_field : Db_ref.t -> string -> string -> string -> string
(** [read_record tbl ref] returns
[ (field, value) ] * [ (set_ref fieldname * [ ref ]) ] *)
- val read_record : string -> string -> db_record
+ val read_record : Db_ref.t -> string -> string -> db_record
(** [read_records_where tbl expr] returns a list of the values returned
by read_record that match the expression *)
- val read_records_where : string -> Db_filter_types.expr ->
+ val read_records_where : Db_ref.t -> string -> Db_filter_types.expr ->
(string * db_record) list
(** [process_structured_field context kv tbl fld ref op] modifies the
which may be one of AddSet RemoveSet AddMap RemoveMap with
arguments [kv] *)
val process_structured_field :
- string * string ->
+ Db_ref.t -> string * string ->
string -> string -> string -> Db_cache_types.structured_op_t -> unit
end
--- /dev/null
+type t =
+ | In_memory of Db_cache_types.Database.t ref ref
+ | Remote
+
+exception Database_not_in_memory
+
+let in_memory (rf: Db_cache_types.Database.t ref ref) = In_memory rf
+
+let get_database = function
+ | In_memory x -> !(!(x))
+ | Remote -> raise Database_not_in_memory
+
+let update_database t f = match t with
+ | In_memory x ->
+ let d : Db_cache_types.Database.t = f (get_database t) in
+ (!(x)) := d
+ | Remote -> raise Database_not_in_memory
+
+
+open Threadext
module DBCacheRemoteListener = struct
open Db_rpc_common_v1
Note that, although the messages still contain the pool_secret for historical reasons,
access has already been applied by the RBAC code in Xapi_http.add_handler. *)
let process_xmlrpc xml =
- Mutex.lock ctr_mutex;
- calls_processed := !calls_processed + 1;
- Mutex.unlock ctr_mutex;
+ Mutex.execute ctr_mutex
+ (fun () -> calls_processed := !calls_processed + 1);
+
let fn_name, args =
match (XMLRPC.From.array (fun x->x) xml) with
[fn_name; _; args] ->
XMLRPC.From.string fn_name, args
| _ -> raise DBCacheListenerInvalidMessageReceived in
+ let t = Db_backend.make () in
try
debug "Received [total=%d rx=%d tx=%d] %s" !calls_processed !total_recv_len !total_transmit_len fn_name;
match fn_name with
"get_table_from_ref" ->
let s = unmarshall_get_table_from_ref_args args in
- success (marshall_get_table_from_ref_response (DBCache.get_table_from_ref s))
+ success (marshall_get_table_from_ref_response (DBCache.get_table_from_ref t s))
| "is_valid_ref" ->
let s = unmarshall_is_valid_ref_args args in
- success (marshall_is_valid_ref_response (DBCache.is_valid_ref s))
+ success (marshall_is_valid_ref_response (DBCache.is_valid_ref t s))
| "read_refs" ->
let s = unmarshall_read_refs_args args in
- success (marshall_read_refs_response (DBCache.read_refs s))
+ success (marshall_read_refs_response (DBCache.read_refs t s))
| "read_field_where" ->
let w = unmarshall_read_field_where_args args in
- success (marshall_read_field_where_response (DBCache.read_field_where w))
+ success (marshall_read_field_where_response (DBCache.read_field_where t w))
| "read_set_ref" ->
let w = unmarshall_read_set_ref_args args in
- success (marshall_read_set_ref_response (DBCache.read_field_where w))
+ success (marshall_read_set_ref_response (DBCache.read_field_where t w))
| "create_row" ->
let (s1,ssl,s2) = unmarshall_create_row_args args in
- success (marshall_create_row_response (DBCache.create_row s1 ssl s2))
+ success (marshall_create_row_response (DBCache.create_row t s1 ssl s2))
| "delete_row" ->
let (s1,s2) = unmarshall_delete_row_args args in
- success (marshall_delete_row_response (DBCache.delete_row s1 s2))
+ success (marshall_delete_row_response (DBCache.delete_row t s1 s2))
| "write_field" ->
let (s1,s2,s3,s4) = unmarshall_write_field_args args in
- success (marshall_write_field_response (DBCache.write_field s1 s2 s3 s4))
+ success (marshall_write_field_response (DBCache.write_field t s1 s2 s3 s4))
| "read_field" ->
let (s1,s2,s3) = unmarshall_read_field_args args in
- success (marshall_read_field_response (DBCache.read_field s1 s2 s3))
+ success (marshall_read_field_response (DBCache.read_field t s1 s2 s3))
| "find_refs_with_filter" ->
let (s,e) = unmarshall_find_refs_with_filter_args args in
- success (marshall_find_refs_with_filter_response (DBCache.find_refs_with_filter s e))
+ success (marshall_find_refs_with_filter_response (DBCache.find_refs_with_filter t s e))
| "process_structured_field" ->
let (ss,s1,s2,s3,op) = unmarshall_process_structured_field_args args in
- success (marshall_process_structured_field_response (DBCache.process_structured_field ss s1 s2 s3 op))
+ success (marshall_process_structured_field_response (DBCache.process_structured_field t ss s1 s2 s3 op))
| "read_record" ->
let (s1,s2) = unmarshall_read_record_args args in
- success (marshall_read_record_response (DBCache.read_record s1 s2))
+ success (marshall_read_record_response (DBCache.read_record t s1 s2))
| "read_records_where" ->
let (s,e) = unmarshall_read_records_where_args args in
- success (marshall_read_records_where_response (DBCache.read_records_where s e))
+ success (marshall_read_records_where_response (DBCache.read_records_where t s e))
| "db_get_by_uuid" ->
let (s,e) = unmarshall_db_get_by_uuid_args args in
- success (marshall_db_get_by_uuid_response (DBCache.db_get_by_uuid s e))
+ success (marshall_db_get_by_uuid_response (DBCache.db_get_by_uuid t s e))
| "db_get_by_name_label" ->
let (s,e) = unmarshall_db_get_by_name_label_args args in
- success (marshall_db_get_by_name_label_response (DBCache.db_get_by_name_label s e))
+ success (marshall_db_get_by_name_label_response (DBCache.db_get_by_name_label t s e))
| _ -> raise (DBCacheListenerUnknownMessageName fn_name)
with
Duplicate_key (c,f,u,k) ->
let fd = Buf_io.fd_of bio in (* fd only used for writing *)
let body = Http_svr.read_body ~limit:Xapi_globs.http_limit_max_rpc_size req bio in
let body_xml = Xml.parse_string body in
- let response = Xml.to_bigbuffer (DBCacheRemoteListener.process_xmlrpc body_xml) in
+ let reply_xml = DBCacheRemoteListener.process_xmlrpc body_xml in
+ let response = Xml.to_bigbuffer reply_xml in
Http_svr.response_fct req fd (Bigbuffer.length response)
(fun fd -> Bigbuffer.to_fct response (fun s -> ignore(Unix.write fd s 0 (String.length s))))
(** Convert a marshalled Request Rpc.t into a marshalled Response Rpc.t *)
let process_rpc (req: Rpc.t) =
let module DB = (Db_cache_impl : Db_interface.DB_ACCESS) in
+ let t = Db_backend.make () in
Response.rpc_of_t
(try
match Request.t_of_rpc req with
| Request.Get_table_from_ref x ->
- Response.Get_table_from_ref (DB.get_table_from_ref x)
+ Response.Get_table_from_ref (DB.get_table_from_ref t x)
| Request.Is_valid_ref x ->
- Response.Is_valid_ref (DB.is_valid_ref x)
+ Response.Is_valid_ref (DB.is_valid_ref t x)
| Request.Read_refs x ->
- Response.Read_refs (DB.read_refs x)
+ Response.Read_refs (DB.read_refs t x)
| Request.Find_refs_with_filter (x, e) ->
- Response.Find_refs_with_filter (DB.find_refs_with_filter x e)
+ Response.Find_refs_with_filter (DB.find_refs_with_filter t x e)
| Request.Read_field_where w ->
- Response.Read_field_where (DB.read_field_where w)
+ Response.Read_field_where (DB.read_field_where t w)
| Request.Db_get_by_uuid (a, b) ->
- Response.Db_get_by_uuid (DB.db_get_by_uuid a b)
+ Response.Db_get_by_uuid (DB.db_get_by_uuid t a b)
| Request.Db_get_by_name_label (a, b) ->
- Response.Db_get_by_name_label (DB.db_get_by_name_label a b)
+ Response.Db_get_by_name_label (DB.db_get_by_name_label t a b)
| Request.Read_set_ref w ->
- Response.Read_set_ref (DB.read_set_ref w)
+ Response.Read_set_ref (DB.read_set_ref t w)
| Request.Create_row (a, b, c) ->
- Response.Create_row (DB.create_row a b c)
+ Response.Create_row (DB.create_row t a b c)
| Request.Delete_row (a, b) ->
- Response.Delete_row (DB.delete_row a b)
+ Response.Delete_row (DB.delete_row t a b)
| Request.Write_field (a, b, c, d) ->
- Response.Write_field (DB.write_field a b c d)
+ Response.Write_field (DB.write_field t a b c d)
| Request.Read_field (a, b, c) ->
- Response.Read_field (DB.read_field a b c)
+ Response.Read_field (DB.read_field t a b c)
| Request.Read_record (a, b) ->
- let a', b' = DB.read_record a b in
+ let a', b' = DB.read_record t a b in
Response.Read_record (a', b')
| Request.Read_records_where (a, b) ->
- Response.Read_records_where (DB.read_records_where a b)
+ Response.Read_records_where (DB.read_records_where t a b)
| Request.Process_structured_field (a, b, c, d, e) ->
- Response.Process_structured_field (DB.process_structured_field a b c d e)
+ Response.Process_structured_field (DB.process_structured_field t a b c d e)
with
| DBCache_NotFound (x,y,z) ->
Response.Dbcache_notfound (x, y, z)
let fd = Buf_io.fd_of bio in (* fd only used for writing *)
let body = Http_svr.read_body ~limit:Xapi_globs.http_limit_max_rpc_size req bio in
let request_rpc = Jsonrpc.of_string body in
+ let reply_rpc = process_rpc request_rpc in
(* XXX: need to cope with > 16MiB responses *)
- let response = Jsonrpc.to_string (process_rpc request_rpc) in
+ let response = Jsonrpc.to_string reply_rpc in
Http_svr.response_str req fd response
else process_exception_xml resp_xml
| _ -> raise Remote_db_server_returned_bad_message
- let get_table_from_ref x =
+ let get_table_from_ref _ x =
do_remote_call
marshall_get_table_from_ref_args
unmarshall_get_table_from_ref_response
"get_table_from_ref"
x
- let is_valid_ref x =
+ let is_valid_ref _ x =
do_remote_call
marshall_is_valid_ref_args
unmarshall_is_valid_ref_response
"is_valid_ref"
x
- let read_refs x =
+ let read_refs _ x =
do_remote_call
marshall_read_refs_args
unmarshall_read_refs_response
"read_refs"
x
- let read_field_where x =
+ let read_field_where _ x =
do_remote_call
marshall_read_field_where_args
unmarshall_read_field_where_response
x
- let db_get_by_uuid t u =
+ let db_get_by_uuid _ t u =
do_remote_call
marshall_db_get_by_uuid_args
unmarshall_db_get_by_uuid_response
"db_get_by_uuid"
(t,u)
- let db_get_by_name_label t l =
+ let db_get_by_name_label _ t l =
do_remote_call
marshall_db_get_by_name_label_args
unmarshall_db_get_by_name_label_response
"db_get_by_name_label"
(t,l)
- let read_set_ref x =
+ let read_set_ref _ x =
do_remote_call
marshall_read_set_ref_args
unmarshall_read_set_ref_response
x
- let create_row x y z =
+ let create_row _ x y z =
do_remote_call
marshall_create_row_args
unmarshall_create_row_response
"create_row"
(x,y,z)
- let delete_row x y =
+ let delete_row _ x y =
do_remote_call
marshall_delete_row_args
unmarshall_delete_row_response
"delete_row"
(x,y)
- let write_field a b c d =
+ let write_field _ a b c d =
do_remote_call
marshall_write_field_args
unmarshall_write_field_response
"write_field"
(a,b,c,d)
- let read_field x y z =
+ let read_field _ x y z =
do_remote_call
marshall_read_field_args
unmarshall_read_field_response
"read_field"
(x,y,z)
- let find_refs_with_filter s e =
+ let find_refs_with_filter _ s e =
do_remote_call
marshall_find_refs_with_filter_args
unmarshall_find_refs_with_filter_response
"find_refs_with_filter"
(s,e)
- let read_record x y =
+ let read_record _ x y =
do_remote_call
marshall_read_record_args
unmarshall_read_record_response
"read_record"
(x,y)
- let read_records_where x e =
+ let read_records_where _ x e =
do_remote_call
marshall_read_records_where_args
unmarshall_read_records_where_response
"read_records_where"
(x,e)
- let process_structured_field a b c d e =
+ let process_structured_field _ a b c d e =
do_remote_call
marshall_process_structured_field_args
unmarshall_process_structured_field_response
open Db_exn
module Make = functor(RPC: Db_interface.RPC) -> struct
-
let initialise = RPC.initialise
let rpc x = Jsonrpc.of_string (RPC.rpc (Jsonrpc.to_string x))
raise (Too_many_values (x,y,z))
| y -> y
- let get_table_from_ref x =
+ let get_table_from_ref _ x =
match process (Request.Get_table_from_ref x) with
| Response.Get_table_from_ref y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let is_valid_ref x =
+ let is_valid_ref _ x =
match process (Request.Is_valid_ref x) with
| Response.Is_valid_ref y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let read_refs x =
+ let read_refs _ x =
match process (Request.Read_refs x) with
| Response.Read_refs y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let read_field_where x =
+ let read_field_where _ x =
match process (Request.Read_field_where x) with
| Response.Read_field_where y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let db_get_by_uuid t u =
+ let db_get_by_uuid _ t u =
match process (Request.Db_get_by_uuid (t, u)) with
| Response.Db_get_by_uuid y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let db_get_by_name_label t l =
+ let db_get_by_name_label _ t l =
match process (Request.Db_get_by_name_label (t, l)) with
| Response.Db_get_by_name_label y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let read_set_ref x =
+ let read_set_ref _ x =
match process (Request.Read_set_ref x) with
| Response.Read_set_ref y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let create_row x y z =
+ let create_row _ x y z =
match process (Request.Create_row (x, y, z)) with
| Response.Create_row y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let delete_row x y =
+ let delete_row _ x y =
match process (Request.Delete_row (x, y)) with
| Response.Delete_row y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let write_field a b c d =
+ let write_field _ a b c d =
match process (Request.Write_field (a, b, c, d)) with
| Response.Write_field y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let read_field x y z =
+ let read_field _ x y z =
match process (Request.Read_field (x, y, z)) with
| Response.Read_field y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let find_refs_with_filter s e =
+ let find_refs_with_filter _ s e =
match process (Request.Find_refs_with_filter (s, e)) with
| Response.Find_refs_with_filter y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let read_record x y =
+ let read_record _ x y =
match process (Request.Read_record (x, y)) with
| Response.Read_record (x, y) -> x, y
| _ -> raise Remote_db_server_returned_bad_message
- let read_records_where x e =
+ let read_records_where _ x e =
match process (Request.Read_records_where (x, e)) with
| Response.Read_records_where y -> y
| _ -> raise Remote_db_server_returned_bad_message
- let process_structured_field a b c d e =
+ let process_structured_field _ a b c d e =
match process (Request.Process_structured_field(a, b, c, d, e)) with
| Response.Process_structured_field y -> y
| _ -> raise Remote_db_server_returned_bad_message
let other_tbl_refs_for_this_field tblname fldname =
List.filter (fun (_,fld) -> fld=fldname) (other_tbl_refs tblname) in
+ let is_valid_ref r =
+ try
+ ignore(Database.table_of_ref r db);
+ true
+ with _ -> false in
+
match event with
| WriteField (tblname, objref, fldname, oldval, newval) ->
let events_old_val =
- if Db_cache_impl.is_valid_ref oldval then
+ if is_valid_ref oldval then
events_of_other_tbl_refs
(List.map (fun (tbl,fld) ->
(tbl, oldval, find_get_record tbl ~__context:context ~self:oldval)) (other_tbl_refs_for_this_field tblname fldname))
else [] in
let events_new_val =
- if Db_cache_impl.is_valid_ref newval then
+ if is_valid_ref newval then
events_of_other_tbl_refs
(List.map (fun (tbl,fld) ->
(tbl, newval, find_get_record tbl ~__context:context ~self:newval)) (other_tbl_refs_for_this_field tblname fldname))
let other_tbl_refs =
List.fold_left (fun accu (remote_tbl,fld) ->
let fld_value = List.assoc fld kv in
- if Db_cache_impl.is_valid_ref fld_value
+ if is_valid_ref fld_value
then (remote_tbl, fld_value, find_get_record remote_tbl ~__context:context ~self:fld_value) :: accu
else accu)
[] other_tbl_refs in
let other_tbl_refs =
List.fold_left (fun accu (tbl,fld) ->
let fld_value = List.assoc fld kv in
- if Db_cache_impl.is_valid_ref fld_value
+ if is_valid_ref fld_value
then (tbl, fld_value, find_get_record tbl ~__context:context ~self:fld_value) :: accu
else accu)
[] other_tbl_refs in
Printf.sprintf "%s%s" x.uuid (Opt.default "" (Opt.map (fun name -> Printf.sprintf " (%s)" name) x.name_label))
let lookup key =
- let db = Db_backend.get_database () in
+ let t = Db_backend.make () in
+ let db = Db_ref.get_database t in
let r (tblname, objref) =
let row = Table.find objref (TableSet.find tblname (Database.tableset db)) in {
name_label = (try Some (Row.find Db_names.name_label row) with _ -> None);
dbs
let read_in_database() =
- (* Make sure we're running in master mode: we cannot be a slave
- and then access the dbcache *)
- Db_cache.set_master true;
let connections = initialise_db_connections() in
(* Initialiase in-memory database cache *)
- Db_cache_impl.make connections Schema.empty
+ Db_cache_impl.make (Db_backend.make ()) connections Schema.empty
let write_out_databases() =
- Db_cache_impl.sync (Db_conn_store.read_db_connections ()) (Db_backend.get_database ())
+ Db_cache_impl.sync (Db_conn_store.read_db_connections ()) (Db_ref.get_database (Db_backend.make ()))
(* should never be thrown due to checking argument at start *)
exception UnknownFormat
Parse_db_conf.path=filename;
Parse_db_conf.mode=Parse_db_conf.No_limit;
Parse_db_conf.compress=(!compress)
- } ] (Db_backend.get_database ())
+ } ] (Db_ref.get_database (Db_backend.make ()))
let help_pad = " "
let operation_list =
begin
read_in_database();
if !xmltostdout then
- Db_xml.To.fd (Unix.descr_of_out_channel stdout) (Db_backend.get_database ())
+ Db_xml.To.fd (Unix.descr_of_out_channel stdout) (Db_ref.get_database (Db_backend.make()))
else
write_out_database !filename
end
let find_my_host_row() =
Xapi_inventory.read_inventory ();
let localhost_uuid = Xapi_inventory.lookup Xapi_inventory._installation_uuid in
- let db = Db_backend.get_database () in
+ let db = Db_ref.get_database (Db_backend.make ()) in
let tbl = TableSet.find Db_names.host (Database.tableset db) in
Table.fold (fun r row acc -> if Row.find Db_names.uuid row = localhost_uuid then (Some (r, row)) else acc) tbl None
(* ... otherwise add new key/value pair *)
(_iscsi_iqn,new_iqn)::other_config in
let other_config = String_marshall_helper.map (fun x->x) (fun x->x) other_config in
- Db_backend.update_database (set_field_in_row Db_names.host r Db_names.other_config other_config);
+ Db_ref.update_database (Db_backend.make ()) (set_field_in_row Db_names.host r Db_names.other_config other_config);
write_out_databases()
let do_am_i_in_the_database () =
$(AUTOGEN_HELPER_DIR)/db_exn \
$(AUTOGEN_HELPER_DIR)/ref_index \
$(AUTOGEN_HELPER_DIR)/db_backend \
+ $(AUTOGEN_HELPER_DIR)/db_ref \
$(AUTOGEN_HELPER_DIR)/backend_xml \
$(AUTOGEN_HELPER_DIR)/generation \
$(AUTOGEN_HELPER_DIR)/db_connections \
forwarded_task : bool;
origin: origin;
task_name: string; (* Name for dummy task FIXME: used only for dummy task, as real task as their name in the database *)
+ database: Db_ref.t;
}
let get_session_id x =
(string_of_origin x.origin)
x.task_name
+let database_of x = x.database
+
(** Calls coming in from the unix socket are pre-authenticated *)
let is_unix_socket s =
match Unix.getpeername s with
| Unix.ADDR_INET (addr, _) when addr = Unix.inet_addr_loopback -> false
| Unix.ADDR_INET _ -> true
+let default_database () =
+ if Pool_role.is_master ()
+ then Db_backend.make ()
+ else Db_ref.Remote
let preauth ~__context =
match __context.origin with
forwarded_task = false;
origin = Internal;
task_name = "initial_task";
+ database = default_database ();
}
(* ref fn used to break the cyclic dependency between context, db_actions and taskhelper *)
forwarded_task = true;
task_in_database = not (Ref.is_dummy task_id);
origin = origin;
- task_name = task_name }
+ task_name = task_name;
+ database = default_database ();
+ }
-let make ?(__context=initial) ?(quiet=false) ?subtask_of ?session_id ?(task_in_database=false) ?task_description ?(origin=Internal) task_name =
+let make ?(__context=initial) ?(quiet=false) ?subtask_of ?session_id ?(database=default_database ()) ?(task_in_database=false) ?task_description ?(origin=Internal) task_name =
let task_id, task_uuid =
if task_in_database
then !__make_task ~__context ?description:task_description ?session_id ?subtask_of task_name
| Some subtask_of -> " by task " ^ !__string_of_task "" subtask_of)
;
{ session_id = session_id;
+ database = database;
task_id = task_id;
task_in_database = task_in_database;
origin = origin;
(** [initial] is the initial context. *)
val initial : t
-(** [make ~__context ~subtask_of ~session_id ~task_in_database ~task_description ~origin name] creates a new context.
+(** [make ~__context ~subtask_of ~database ~session_id ~task_in_database ~task_description ~origin name] creates a new context.
[__context] is the calling context,
[quiet] silences "task created" log messages,
[subtask_of] is a reference to the parent task,
[session_id] is the current session id,
+ [database] is the database to use in future Db.* operations
[task_in_database] indicates if the task needs to be stored the task in the database,
[task_descrpition] is the description of the task,
[task_name] is the task name of the created context. *)
?quiet:bool ->
?subtask_of:API.ref_task ->
?session_id:API.ref_session ->
+ ?database:Db_ref.t ->
?task_in_database:bool ->
?task_description:string -> ?origin:origin -> string -> t
(** [string_of __context] returns a string representing the context. *)
val string_of : t -> string
+(** [database_of __context] returns a database handle, which can be used by Db.* *)
+val database_of : t -> Db_ref.t
+
(** {6 Destructors} *)
val destroy : t -> unit
(* Set(Ref t) is actually stored in the table t *)
let obj', fld' = look_up_related_table_and_field obj other full_name in
String.concat "\n" [
- Printf.sprintf "if not(DB.is_valid_ref %s)" Client._self;
+ Printf.sprintf "if not(DB.is_valid_ref __t %s)" Client._self;
Printf.sprintf "then raise (Api_errors.Server_error(Api_errors.handle_invalid, [ %s ]))" Client._self;
- Printf.sprintf "else List.map %s.%s (DB.read_set_ref " _string_to_dm (OU.alias_of_ty (DT.Ref other));
+ Printf.sprintf "else List.map %s.%s (DB.read_set_ref __t " _string_to_dm (OU.alias_of_ty (DT.Ref other));
Printf.sprintf " { table = \"%s\"; return=Db_names.ref; " (Escaping.escape_obj obj');
Printf.sprintf " where_field = \"%s\"; where_value = %s })" fld' Client._self
]
let get_record (obj: obj) aux_fn_name =
let body =
[
- Printf.sprintf "let (__regular_fields, __set_refs) = DB.read_record \"%s\" %s in"
+ Printf.sprintf "let (__regular_fields, __set_refs) = DB.read_record __t \"%s\" %s in"
(Escaping.escape_obj obj.DT.name) Client._self;
aux_fn_name^" ~__regular_fields ~__set_refs";
] in
(String.concat "; " (List.map (fun f -> "\"" ^ f ^ "\"") sql_fields))
*)
-let open_db_module = "let module DB = (val (Db_cache.get ()) : Db_interface.DB_ACCESS) in\n"
+let open_db_module =
+ "let __t = Context.database_of __context in\n" ^
+ "let module DB = (val (Db_cache.get __t) : Db_interface.DB_ACCESS) in\n"
let db_action api : O.Module.t =
let api = make_db_api api in
~name: "get_refs_where"
~params: [ Gen_common.context_arg; expr_arg ]
~ty: ( OU.alias_of_ty (Ref obj.DT.name) ^ " list")
- ~body: [ open_db_module; "let refs = (DB.find_refs_with_filter \"" ^ tbl ^ "\" " ^ expr ^ ") in ";
+ ~body: [ open_db_module; "let refs = (DB.find_refs_with_filter __t \"" ^ tbl ^ "\" " ^ expr ^ ") in ";
"List.map Ref.of_string refs " ] () in
let get_record_aux_fn_body ?(m="API.") (obj: obj) (all_fields: field list) =
~params: [ Gen_common.context_arg; expr_arg ]
~ty: ("'a")
~body: [ open_db_module;
- Printf.sprintf "let records = DB.read_records_where \"%s\" %s in"
+ Printf.sprintf "let records = DB.read_records_where __t \"%s\" %s in"
(Escaping.escape_obj obj.DT.name) expr;
Printf.sprintf "List.map (fun (ref,(__regular_fields,__set_refs)) -> Ref.of_string ref, %s __regular_fields __set_refs) records" conversion_fn] () in
let body = match tag with
| FromField(Setter, fld) ->
- Printf.sprintf "DB.write_field (*__context*) \"%s\" %s \"%s\" value"
+ Printf.sprintf "DB.write_field __t \"%s\" %s \"%s\" value"
(Escaping.escape_obj obj.DT.name)
Client._self
(Escaping.escape_id fld.DT.full_name)
(Escaping.escape_obj obj') fld' Client._self
*)
| FromField(Getter, { DT.ty = ty; full_name = full_name }) ->
- Printf.sprintf "%s.%s (DB.read_field (*__context*) \"%s\" \"%s\" %s)"
+ Printf.sprintf "%s.%s (DB.read_field __t \"%s\" \"%s\" %s)"
_string_to_dm (OU.alias_of_ty ty)
(Escaping.escape_obj obj.DT.name)
(Escaping.escape_id full_name)
Client._self
| FromField(Add, { DT.ty = DT.Map(_, _); full_name = full_name }) ->
- Printf.sprintf "DB.process_structured_field (*__context*) (%s,%s) \"%s\" \"%s\" %s AddMap"
+ Printf.sprintf "DB.process_structured_field __t (%s,%s) \"%s\" \"%s\" %s AddMap"
Client._key Client._value
(Escaping.escape_obj obj.DT.name)
(Escaping.escape_id full_name)
Client._self
| FromField(Add, { DT.ty = DT.Set(_); full_name = full_name }) ->
- Printf.sprintf "DB.process_structured_field (*__context*) (%s,\"\") \"%s\" \"%s\" %s AddSet"
+ Printf.sprintf "DB.process_structured_field __t (%s,\"\") \"%s\" \"%s\" %s AddSet"
Client._value
(Escaping.escape_obj obj.DT.name)
(Escaping.escape_id full_name)
Client._self
| FromField(Remove, { DT.ty = DT.Map(_, _); full_name = full_name }) ->
- Printf.sprintf "DB.process_structured_field (*__context*) (%s,\"\") \"%s\" \"%s\" %s RemoveMap"
+ Printf.sprintf "DB.process_structured_field __t (%s,\"\") \"%s\" \"%s\" %s RemoveMap"
Client._key
(Escaping.escape_obj obj.DT.name)
(Escaping.escape_id full_name)
Client._self
| FromField(Remove, { DT.ty = DT.Set(_); full_name = full_name }) ->
- Printf.sprintf "DB.process_structured_field (*__context*) (%s,\"\") \"%s\" \"%s\" %s RemoveSet"
+ Printf.sprintf "DB.process_structured_field __t (%s,\"\") \"%s\" \"%s\" %s RemoveSet"
Client._value
(Escaping.escape_obj obj.DT.name)
(Escaping.escape_id full_name)
| FromField((Add | Remove), _) -> failwith "Cannot generate db add/remove for non sets and maps"
| FromObject(Delete) ->
- (Printf.sprintf "DB.delete_row (*__context*) \"%s\" %s"
+ (Printf.sprintf "DB.delete_row __t \"%s\" %s"
(Escaping.escape_obj obj.DT.name) Client._self)
| FromObject(Make) ->
let fields = List.filter field_in_this_table (DU.fields_of_obj obj) in
OU.escape (OU.ocaml_of_id fld.full_name)) fields in
let kvs' = List.map (fun (sql, o) ->
Printf.sprintf "(\"%s\", %s)" sql o) kvs in
- Printf.sprintf "DB.create_row (*__context*) \"%s\" [ %s ] ref"
+ Printf.sprintf "DB.create_row __t \"%s\" [ %s ] ref"
(Escaping.escape_obj obj.DT.name)
(String.concat "; " kvs')
| FromObject(GetByUuid) ->
begin match x.msg_params, x.msg_result with
| [ {param_type=ty; param_name=name} ], Some (result_ty, _) ->
- let query = Printf.sprintf "DB.db_get_by_uuid \"%s\" %s"
+ let query = Printf.sprintf "DB.db_get_by_uuid __t \"%s\" %s"
(Escaping.escape_obj obj.DT.name)
(OU.escape name) in
_string_to_dm ^ "." ^ (OU.alias_of_ty result_ty) ^ " (" ^ query ^ ")"
| FromObject(GetByLabel) ->
begin match x.msg_params, x.msg_result with
| [ {param_type=ty; param_name=name} ], Some (Set result_ty, _) ->
- let query = Printf.sprintf "DB.db_get_by_name_label \"%s\" %s"
+ let query = Printf.sprintf "DB.db_get_by_name_label __t \"%s\" %s"
(Escaping.escape_obj obj.DT.name)
(OU.escape name) in
if DU.obj_has_get_by_name_label obj
Eventually we'll need to provide user filtering for the public version *)
begin match x.msg_result with
| Some (Set result_ty, _) ->
- let query = Printf.sprintf "DB.read_refs \"%s\""
+ let query = Printf.sprintf "DB.read_refs __t \"%s\""
(Escaping.escape_obj obj.DT.name) in
"List.map " ^ _string_to_dm ^ "." ^ (OU.alias_of_ty result_ty) ^ "(" ^ query ^ ")"
| _ -> failwith "GetAll call needs a result type"
let host =
if sr<>Ref.null
then Importexport.find_host_for_sr ~__context sr
- else Helpers.get_localhost ()
+ else Helpers.get_localhost __context
in
let address = Client.Host.get_address rpc session_id host in
(* Although it's inefficient use a loopback HTTP connection *)
(* The _ref may be either a VM ref in which case we look for a
default VNC console or it may be a console ref in which case we
go for that. *)
+ let db = Context.database_of __context in
let is_vm, is_console =
- let module DB = (val (Db_cache.get ()) : Db_interface.DB_ACCESS) in
- match DB.get_table_from_ref _ref with
+ let module DB = (val (Db_cache.get db) : Db_interface.DB_ACCESS) in
+ match DB.get_table_from_ref db _ref with
| Some c when c = Db_names.vm -> true, false
| Some c when c = Db_names.console -> false, true
| _ ->
create_domain_zero_console_record ~__context ~domain_zero_ref
| [console_ref] ->
(* if there's a single reference but it's invalid, make a new one: *)
- if not (Db.is_valid_ref console_ref) then
+ if not (Db.is_valid_ref __context console_ref) then
create_domain_zero_console_record ~__context ~domain_zero_ref
| _ ->
(* if there's more than one console then something strange is *)
create_domain_zero_console_record ~__context ~domain_zero_ref
and ensure_domain_zero_guest_metrics_record ~__context ~domain_zero_ref =
- if not (Db.is_valid_ref (Db.VM.get_metrics ~__context ~self:domain_zero_ref)) then
+ if not (Db.is_valid_ref __context (Db.VM.get_metrics ~__context ~self:domain_zero_ref)) then
begin
debug "Domain 0 record does not have associated guest metrics record. Creating now";
let metrics_ref = Ref.make() in
*)
include Db_actions.DB_Action
-let is_valid_ref r =
- let module DB = (val (Db_cache.get ()) : Db_interface.DB_ACCESS) in
- DB.is_valid_ref (Ref.string_of r)
+let is_valid_ref __context r =
+ let t = Context.database_of __context in
+ let module DB = (val (Db_cache.get t) : Db_interface.DB_ACCESS) in
+ DB.is_valid_ref t (Ref.string_of r)
let valid_ref x = Db.is_valid_ref x
let gc_connector ~__context get_all get_record valid_ref1 valid_ref2 delete_record =
- let module DB = (val (Db_cache.get ()) : Db_interface.DB_ACCESS) in
+ let db = Context.database_of __context in
+ let module DB = (val (Db_cache.get db) : Db_interface.DB_ACCESS) in
let all_refs = get_all ~__context in
let do_gc ref =
let print_valid b = if b then "valid" else "INVALID" in
if not (ref_1_valid && ref_2_valid) then
begin
let table,reference,valid1,valid2 =
- (match DB.get_table_from_ref (Ref.string_of ref) with
+ (match DB.get_table_from_ref db (Ref.string_of ref) with
None -> "UNKNOWN CLASS"
| Some c -> c),
(Ref.string_of ref),
List.iter do_gc all_refs
let gc_PIFs ~__context =
- gc_connector ~__context Db.PIF.get_all Db.PIF.get_record (fun x->valid_ref x.pIF_host) (fun x->valid_ref x.pIF_network)
+ gc_connector ~__context Db.PIF.get_all Db.PIF.get_record (fun x->valid_ref __context x.pIF_host) (fun x->valid_ref __context x.pIF_network)
(fun ~__context ~self ->
(* We need to destroy the PIF, it's metrics and any VLAN/bond records that this PIF was a master of. *)
(* bonds_to_gc is actually a list which is either empty (not part of a bond) or containing exactly one reference.. *)
List.iter (fun bond -> (try Db.Bond.destroy ~__context ~self:bond with _ -> ())) bonds_to_gc;
Db.PIF.destroy ~__context ~self)
let gc_VBDs ~__context =
- gc_connector ~__context Db.VBD.get_all Db.VBD.get_record (fun x->valid_ref x.vBD_VM) (fun x->valid_ref x.vBD_VDI || x.vBD_empty)
+ gc_connector ~__context Db.VBD.get_all Db.VBD.get_record (fun x->valid_ref __context x.vBD_VM) (fun x->valid_ref __context x.vBD_VDI || x.vBD_empty)
(fun ~__context ~self ->
(* When GCing VBDs that are CDs, set them to empty rather than destroy them entirely *)
- if (valid_ref (Db.VBD.get_VM ~__context ~self)) && (Db.VBD.get_type ~__context ~self = `CD) then
+ if (valid_ref __context (Db.VBD.get_VM ~__context ~self)) && (Db.VBD.get_type ~__context ~self = `CD) then
begin
Db.VBD.set_VDI ~__context ~self ~value:Ref.null;
Db.VBD.set_empty ~__context ~self ~value:true;
let gc_crashdumps ~__context =
gc_connector ~__context Db.Crashdump.get_all Db.Crashdump.get_record
- (fun x->valid_ref x.crashdump_VM) (fun x->valid_ref x.crashdump_VDI) Db.Crashdump.destroy
+ (fun x->valid_ref __context x.crashdump_VM) (fun x->valid_ref __context x.crashdump_VDI) Db.Crashdump.destroy
let gc_VIFs ~__context =
- gc_connector ~__context Db.VIF.get_all Db.VIF.get_record (fun x->valid_ref x.vIF_VM) (fun x->valid_ref x.vIF_network)
+ gc_connector ~__context Db.VIF.get_all Db.VIF.get_record (fun x->valid_ref __context x.vIF_VM) (fun x->valid_ref __context x.vIF_network)
(fun ~__context ~self ->
let metrics = Db.VIF.get_metrics ~__context ~self in
(try Db.VIF_metrics.destroy ~__context ~self:metrics with _ -> ());
Db.VIF.destroy ~__context ~self)
let gc_PBDs ~__context =
- gc_connector ~__context Db.PBD.get_all Db.PBD.get_record (fun x->valid_ref x.pBD_host) (fun x->valid_ref x.pBD_SR) Db.PBD.destroy
+ gc_connector ~__context Db.PBD.get_all Db.PBD.get_record (fun x->valid_ref __context x.pBD_host) (fun x->valid_ref __context x.pBD_SR) Db.PBD.destroy
let gc_Host_patches ~__context =
- gc_connector ~__context Db.Host_patch.get_all Db.Host_patch.get_record (fun x->valid_ref x.host_patch_host) (fun x->valid_ref x.host_patch_pool_patch) Db.Host_patch.destroy
+ gc_connector ~__context Db.Host_patch.get_all Db.Host_patch.get_record (fun x->valid_ref __context x.host_patch_host) (fun x->valid_ref __context x.host_patch_pool_patch) Db.Host_patch.destroy
let gc_host_cpus ~__context =
let host_cpus = Db.Host_cpu.get_all ~__context in
List.iter
(fun hcpu ->
- if not (valid_ref (Db.Host_cpu.get_host ~__context ~self:hcpu)) then
+ if not (valid_ref __context (Db.Host_cpu.get_host ~__context ~self:hcpu)) then
Db.Host_cpu.destroy ~__context ~self:hcpu) host_cpus
(* If the SR record is missing, delete the VDI record *)
List.iter
(fun self ->
let m = Db.Host.get_metrics ~__context ~self in
- if not(Db.is_valid_ref m) then begin
+ if not(Db.is_valid_ref __context m) then begin
debug "Creating missing Host_metrics object for Host: %s" (Db.Host.get_uuid ~__context ~self);
let r = Ref.make () in
Db.Host_metrics.create ~__context ~ref:r
let reset_vms_running_on_missing_hosts ~__context =
List.iter (fun vm ->
let vm_r = Db.VM.get_record ~__context ~self:vm in
- let valid_resident_on = Db.is_valid_ref vm_r.API.vM_resident_on in
+ let valid_resident_on = Db.is_valid_ref __context vm_r.API.vM_resident_on in
if not valid_resident_on then begin
if vm_r.API.vM_is_control_domain then begin
info "Deleting control domain VM uuid '%s' ecause VM.resident_on refers to a Host which is nolonger in the Pool" vm_r.API.vM_uuid;
let ensure_vm_metrics_records_exist __context =
List.iter (fun vm ->
let m = Db.VM.get_metrics ~__context ~self:vm in
- if not(Db.is_valid_ref m) then begin
+ if not(Db.is_valid_ref __context m) then begin
info "Regenerating missing VM_metrics record for VM %s" (Ref.string_of vm);
let m = Ref.make () in
let uuid = Uuid.to_string (Uuid.make_uuid ()) in
(* Remove all the scheduled_to_be_resident_on VMs which are resident_on somewhere since that host 'owns' them.
NB if resident_on this host the VM will still be counted in the all_resident_on_vms set *)
let really_my_scheduled_to_be_resident_on_vms =
- List.filter (fun (_, vm_r) -> not (Db.is_valid_ref vm_r.API.vM_resident_on)) all_scheduled_to_be_resident_on_vms in
+ List.filter (fun (_, vm_r) -> not (Db.is_valid_ref __context vm_r.API.vM_resident_on)) all_scheduled_to_be_resident_on_vms in
let all_vms_assigned_to_me = Listext.List.setify (all_resident_on_vms @ really_my_scheduled_to_be_resident_on_vms) in
let all_vbds = Db.VBD.get_records_where ~__context ~expr:Db_filter_types.True in
List.iter
(fun vbd ->
try
- if Db.is_valid_ref vbd && not (Db.VBD.get_empty ~__context ~self:vbd)
+ if Db.is_valid_ref __context vbd && not (Db.VBD.get_empty ~__context ~self:vbd)
then Events.Resync.vbd ~__context token vmref vbd
with e ->
warn "Caught error resynchronising VBD: %s" (ExnHelper.string_of_exn e)) vm_vbds;
List.iter
(fun vif ->
try
- if Db.is_valid_ref vif
+ if Db.is_valid_ref __context vif
then Events.Resync.vif ~__context token vmref vif
with e ->
warn "Caught error resynchronising VIF: %s" (ExnHelper.string_of_exn e)) vm_vifs
* For example, this will prevent needless glitches in storage interfaces.
*)
let resynchronise_pif_params ~__context =
- let localhost = Helpers.get_localhost () in
+ let localhost = Helpers.get_localhost ~__context in
(* 1. Acquire data. We minimise round-trips not bandwidth *)
let networks = Db.Network.get_all_records ~__context in
let expr = Db_filter_types.Eq(Db_filter_types.Field "host", Db_filter_types.Literal (Ref.string_of localhost)) in
end
in
- if Db.is_valid_ref vm && not (Hashtbl.mem table (Ref.string_of vm)) then begin
+ if Db.is_valid_ref __context vm && not (Hashtbl.mem table (Ref.string_of vm)) then begin
add vm;
let vm = Db.VM.get_record ~__context ~self:vm in
List.iter
- (fun vif -> if Db.is_valid_ref vif then begin
+ (fun vif -> if Db.is_valid_ref __context vif then begin
add vif;
let vif = Db.VIF.get_record ~__context ~self:vif in
add vif.API.vIF_network end)
vm.API.vM_VIFs;
List.iter
- (fun vbd -> if Db.is_valid_ref vbd then begin
+ (fun vbd -> if Db.is_valid_ref __context vbd then begin
add vbd;
let vbd = Db.VBD.get_record ~__context ~self:vbd in
if not(vbd.API.vBD_empty)
vm.API.vM_snapshots;
(* If VM is suspended then add the suspend_VDI *)
let vdi = vm.API.vM_suspend_VDI in
- if preserve_power_state && vm.API.vM_power_state = `Suspended && Db.is_valid_ref vdi then begin
+ if preserve_power_state && vm.API.vM_power_state = `Suspended && Db.is_valid_ref __context vdi then begin
add_vdi vdi
end;
(* Add also the guest metrics *)
with
| e -> (warn "Unable to touch ready file '%s': %s" fname (Printexc.to_string e))
-let vm_to_string vm =
+let vm_to_string __context vm =
let str = Ref.string_of vm in
- if not (Db.is_valid_ref vm)
+ if not (Db.is_valid_ref __context vm)
then raise (Api_errors.Server_error(Api_errors.invalid_value ,[str]));
-
- let module DB = (val (Db_cache.get ()) : Db_interface.DB_ACCESS) in
- let fields = fst (DB.read_record Db_names.vm str) in
+ let t = Context.database_of __context in
+ let module DB = (val (Db_cache.get t) : Db_interface.DB_ACCESS) in
+ let fields = fst (DB.read_record t Db_names.vm str) in
let sexpr = SExpr.Node (List.map (fun (key,value) -> SExpr.Node [SExpr.String key; SExpr.String value]) fields) in
SExpr.string_of sexpr
if List.mem_assoc "vdi" all
then List.assoc "vdi" all
else raise (Failure "Missing vdi query parameter") in
- if Db.is_valid_ref (Ref.of_string vdi)
+ if Db.is_valid_ref __context (Ref.of_string vdi)
then Ref.of_string vdi
else Db.VDI.get_by_uuid ~__context ~uuid:vdi
let task_id = Ref.string_of (Context.get_task_id __context) in
iter_with_drop ~doc:("unmarking VBDs after " ^ doc)
(fun self ->
- if Db.is_valid_ref self then begin
+ if Db.is_valid_ref __context self then begin
Db.VBD.remove_from_current_operations ~__context ~self ~key:task_id;
Xapi_vbd_helpers.update_allowed_operations ~__context ~self;
Early_wakeup.broadcast (Datamodel._vbd, Ref.string_of self);
let task_id = Ref.string_of (Context.get_task_id __context) in
iter_with_drop ~doc:("unmarking VIFs after " ^ doc)
(fun self ->
- if Db.is_valid_ref self then begin
+ if Db.is_valid_ref __context self then begin
Db.VIF.remove_from_current_operations ~__context ~self ~key:task_id;
Xapi_vif_helpers.update_allowed_operations ~__context ~self;
Early_wakeup.broadcast (Datamodel._vif, Ref.string_of self);
let vm = Db.VM.get_snapshot_of ~__context ~self:snapshot in
let vm =
- if Db.is_valid_ref vm
+ if Db.is_valid_ref __context vm
then vm
else Xapi_vm_snapshot.create_vm_from_snapshot ~__context ~snapshot in
let pbd = choose_pbd_for_sr ~__context ~self:sr () in
let host = Db.PBD.get_host ~__context ~self:pbd in
let metrics = Db.Host.get_metrics ~__context ~self:host in
- let live = Db.is_valid_ref metrics && (Db.Host_metrics.get_live ~__context ~self:metrics) in
+ let live = Db.is_valid_ref __context metrics && (Db.Host_metrics.get_live ~__context ~self:metrics) in
if not live
then raise (Api_errors.Server_error(Api_errors.host_not_live, [ Ref.string_of host ]))
end;
let task_id = Ref.string_of (Context.get_task_id __context) in
log_exn ~doc:("unmarking VIF after " ^ doc)
(fun self ->
- if Db.is_valid_ref self then begin
+ if Db.is_valid_ref __context self then begin
Db.VIF.remove_from_current_operations ~__context ~self ~key:task_id;
Xapi_vif_helpers.update_allowed_operations ~__context ~self;
Early_wakeup.broadcast (Datamodel._vif, Ref.string_of self);
debug "Unmarking SR after %s (task=%s)" doc task_id;
log_exn_ignore ~doc:("unmarking SR after " ^ doc)
(fun self ->
- if Db.is_valid_ref self then begin
+ if Db.is_valid_ref __context self then begin
Db.SR.remove_from_current_operations ~__context ~self ~key:task_id;
Xapi_sr.update_allowed_operations ~__context ~self;
Early_wakeup.broadcast (Datamodel._sr, Ref.string_of self);
let task_id = Ref.string_of (Context.get_task_id __context) in
log_exn_ignore ~doc:("unmarking VDI after " ^ doc)
(fun self ->
- if Db.is_valid_ref self then begin
+ if Db.is_valid_ref __context self then begin
Db.VDI.remove_from_current_operations ~__context ~self ~key:task_id;
Xapi_vdi.update_allowed_operations ~__context ~self;
Early_wakeup.broadcast (Datamodel._vdi, Ref.string_of self);
let task_id = Ref.string_of (Context.get_task_id __context) in
log_exn ~doc:("unmarking VBD after " ^ doc)
(fun self ->
- if Db.is_valid_ref self then begin
+ if Db.is_valid_ref __context self then begin
Db.VBD.remove_from_current_operations ~__context ~self ~key:task_id;
Xapi_vbd_helpers.update_allowed_operations ~__context ~self;
Early_wakeup.broadcast (Datamodel._vbd, Ref.string_of vbd)
let set_vm_metrics ~__context ~vm ~memory ~cpus =
(* if vm metrics don't exist then make one *)
let metrics = Db.VM.get_metrics ~__context ~self:vm in
- if not (Db.is_valid_ref metrics) then
+ if not (Db.is_valid_ref __context metrics) then
begin
let ref = Ref.make() in
Db.VM_metrics.create ~__context ~ref ~uuid:(Uuid.to_string (Uuid.make_uuid ()))
(* if vif metrics don't exist then make one *)
let metrics = Db.VIF.get_metrics ~__context ~self in
- if not (Db.is_valid_ref metrics) then
+ if not (Db.is_valid_ref __context metrics) then
begin
let ref = Ref.make() in
Db.VIF_metrics.create ~__context ~ref ~uuid:(Uuid.to_string (Uuid.make_uuid ()))
(* if vbd metrics don't exist then make one *)
let metrics = Db.VBD.get_metrics ~__context ~self in
- if not (Db.is_valid_ref metrics) then
+ if not (Db.is_valid_ref __context metrics) then
begin
let ref = Ref.make() in
Db.VBD_metrics.create ~__context ~ref ~uuid:(Uuid.to_string (Uuid.make_uuid ()))
let pif_stats=List.find (fun p -> p.pif_name = real_device_name) pifs in
let metrics = Db.PIF.get_metrics ~__context ~self:pifdev in
(* if PIF metrics don't exist then create one: *)
- if not (Db.is_valid_ref metrics) then
+ if not (Db.is_valid_ref __context metrics) then
begin
let ref = Ref.make() in
Db.PIF_metrics.create ~__context ~ref ~uuid:(Uuid.to_string (Uuid.make_uuid ())) ~carrier:false
(* If the resident_on field is valid, or the request isn't
from dbsync, then redirect *)
- if Db.is_valid_ref host &&
+ if Db.is_valid_ref __context host &&
(not (List.mem_assoc "dbsync" query)) then
let address = Db.Host.get_address ~__context ~self:host in
let url = Printf.sprintf "https://%s%s?%s" address req.Http.uri (String.concat "&" (List.map (fun (a,b) -> a^"="^b) query)) in
Printf.sprintf "size: %d KiB; rss: %d KiB; data: %d KiB; stack: %d KiB"
x.size x.rss x.data x.stack
-let summarise_db_size () = match Db_cache_impl.stats () with
- | [] -> "(running as slave; no in-memory db cache)"
- | xs -> Printf.sprintf "(%s)" (String.concat "; " (List.map (fun (tbl, x) -> Printf.sprintf "%s[%d records]" tbl x) xs))
-
let one () =
let pid = Unix.getpid () in
let pmi = process_memory_info_of_pid pid in
- let db = summarise_db_size () in
let mi = string_of_meminfo (meminfo ()) in
- debug "Process: %s; Database: %s" (string_of_process_memory_info pmi) db;
+ debug "Process: %s" (string_of_process_memory_info pmi);
debug "System: %s" mi
let last_log = ref 0.
(* Check that the PIF is not in-use *)
let uuid = Db.PIF.get_uuid ~__context ~self:pif in
let network = Db.PIF.get_network ~__context ~self:pif in
- Xapi_network_attach_helpers.assert_network_has_no_vifs_in_use_on_me ~__context ~host:(Helpers.get_localhost()) ~network;
+ Xapi_network_attach_helpers.assert_network_has_no_vifs_in_use_on_me ~__context ~host:(Helpers.get_localhost ~__context) ~network;
Xapi_network_attach_helpers.assert_pif_disallow_unplug_not_set ~__context pif;
if Db.PIF.get_currently_attached ~__context ~self:pif = true then begin
debug "PIF %s has currently_attached set to true; bringing down now" uuid;
let len = String.length minimally_compliant_miami_database in
ignore (Unix.write s minimally_compliant_miami_database 0 len)
else
- Db_xml.To.fd s (Db_backend.get_database ())
+ Db_xml.To.fd s (Db_ref.get_database (Context.database_of __context))
(** Make sure the backup database version is compatible *)
let version_check db =
begin
let hosts = Db.Host.get_all ~__context in
let hosts = List.filter (fun hostref -> hostref <> !Xapi_globs.localhost_ref) hosts in
- let generation = Db_lock.with_lock (fun () -> Manifest.generation (Database.manifest (Db_backend.get_database ()))) in
+ let generation = Db_lock.with_lock (fun () -> Manifest.generation (Database.manifest (Db_ref.get_database (Context.database_of __context)))) in
let dohost host =
try
Thread.delay pool_db_sync_timer;
let conn = Parse_db_conf.make temp_file in
(* ideally, the reading from the file would also respect the latest_response_time *)
let db = Backend_xml.populate (Schema.of_datamodel ()) conn in
- Db_backend.update_database (fun _ -> db);
+ let t = Db_backend.make () in
+ Db_ref.update_database t (fun _ -> db);
R.debug "Finished reading database from %s into cache (generation = %Ld)" temp_file gen_count;
R.debug "Database from redo log has generation %Ld" generation;
(* Write the in-memory cache to the file *)
(* Make sure the generation count is right -- is this necessary? *)
- Db_backend.update_database (Db_cache_types.Database.set_generation generation);
- let db = Db_backend.get_database () in
+ let t = Db_backend.make () in
+ Db_ref.update_database t (Db_cache_types.Database.set_generation generation);
+ let db = Db_ref.get_database t in
Db_xml.To.file staging_path db;
Unixext.write_string_to_file (staging_path ^ ".generation") (Generation.to_string generation)
end
let startup_check () =
Sanitycheck.check_for_bad_link ()
-(* Tell the dbcache whether we're a master or a slave *)
-let set_db_mode() =
- Db_cache.set_master (Pool_role.is_master ())
-
(* Parse db conf file from disk and use this to initialise database connections. This is done on
both master and slave. On masters the parsed data is used to flush databases to and to populate
cache; on the slave the parsed data is used to determine where to put backups.
let schema = Schema.of_datamodel () in
let connections = Db_conn_store.read_db_connections () in
- Db_cache_impl.make connections schema;
- Db_cache_impl.sync connections (Db_backend.get_database ());
+ let t = Db_backend.make () in
+ Db_cache_impl.make t connections schema;
+ Db_cache_impl.sync connections (Db_ref.get_database t);
- Db_backend.update_database (Database.register_callback "redo_log" Redo_log.database_callback);
- Db_backend.update_database (Database.register_callback "events" Eventgen.database_callback);
+ Db_ref.update_database t (Database.register_callback "redo_log" Redo_log.database_callback);
+ Db_ref.update_database t (Database.register_callback "events" Eventgen.database_callback);
debug "Performing initial DB GC";
Db_gc.single_pass ();
try
Server_helpers.exec_with_new_task "resynchronise_ha_state"
(fun __context ->
- let pool = Helpers.get_pool () in
+ let pool = Helpers.get_pool ~__context in
let pool_ha_enabled = Db.Pool.get_ha_enabled ~__context ~self:pool in
let local_ha_enabled = bool_of_string (Localdb.get Constants.ha_armed) in
match local_ha_enabled, pool_ha_enabled with
"Registering master-only http handlers", [ Startup.OnlyMaster ], (fun () -> List.iter Xapi_http.add_handler master_only_http_handlers);
"Listening unix socket", [], listen_unix_socket;
"Listening localhost", [], listen_localhost;
- (* Pre-requisite for starting HA since it may temporarily use the DB cache *)
- "Set DB mode", [], set_db_mode;
"Checking HA configuration", [], start_ha;
"Checking for non-HA redo-log", [], start_redo_log;
(* It is a pre-requisite for starting db engine *)
(* Make sure our cached idea of whether the domain is live or not is correct *)
let vm_guest_metrics = Db.VM.get_guest_metrics ~__context ~self in
let live = true
- && Db.is_valid_ref vm_guest_metrics
+ && Db.is_valid_ref __context vm_guest_metrics
&& Db.VM_guest_metrics.get_live ~__context ~self:vm_guest_metrics in
if live then
dead_domains := IntSet.remove domid !dead_domains
ignore(attach_metadata_vdi ~__context metadata_vdi);
end;
- write_uuid_to_ip_mapping ();
+ write_uuid_to_ip_mapping ~__context;
let base_t = Timeouts.get_base_t ~__context in
Localdb.put Constants.ha_base_t (string_of_int base_t)
nodes to self-fence if the statefile disappears. *)
Helpers.log_exn_continue
"stopping HA daemon on the master after setting pool state to invalid"
- (fun () -> ha_stop_daemon __context (Helpers.get_localhost ())) ();
+ (fun () -> ha_stop_daemon __context (Helpers.get_localhost ~__context)) ();
(* No node may become the master automatically without the statefile so we can safely change
the Pool state to disabled *)
(* Check also that any PIFs with IP information set are currently attached - it's a non-fatal
error if they are, but we'll warn with a message *)
let pifs_with_ip_config = List.filter (fun (_,pifr) -> pifr.API.pIF_ip_configuration_mode <> `None) pifs in
- let not_bond_slaves = List.filter (fun (_,pifr) -> not (Db.is_valid_ref pifr.API.pIF_bond_slave_of)) pifs_with_ip_config in
+ let not_bond_slaves = List.filter (fun (_,pifr) -> not (Db.is_valid_ref __context pifr.API.pIF_bond_slave_of)) pifs_with_ip_config in
let without_disallow_unplug = List.filter (fun (_,pifr) -> not (pifr.API.pIF_disallow_unplug || pifr.API.pIF_management)) not_bond_slaves in
if List.length without_disallow_unplug > 0 then begin
let pifinfo = List.map (fun (pif,pifr) -> (Db.Host.get_name_label ~__context ~self:pifr.API.pIF_host, pif, pifr)) without_disallow_unplug in
in
warn "Warning: A possible network anomaly was found. The following hosts possibly have storage PIFs that can be unplugged: %s"
(String.concat ", " bodylines);
- ignore(Xapi_message.create ~__context ~name:Api_messages.ip_configured_pif_can_unplug ~priority:5L ~cls:`Pool ~obj_uuid:(Db.Pool.get_uuid ~__context ~self:(Helpers.get_pool ()))
+ ignore(Xapi_message.create ~__context ~name:Api_messages.ip_configured_pif_can_unplug ~priority:5L ~cls:`Pool ~obj_uuid:(Db.Pool.get_uuid ~__context ~self:(Helpers.get_pool ~__context))
~body:(String.concat "\n" bodylines))
end;
(* ... *)
(* Make sure everyone's got a fresh database *)
- let generation = Db_lock.with_lock (fun () -> Manifest.generation (Database.manifest (Db_backend.get_database ()))) in
+ let generation = Db_lock.with_lock (fun () -> Manifest.generation (Database.manifest (Db_ref.get_database (Db_backend.make ())))) in
let errors = thread_iter_all_exns
(fun host ->
debug "Synchronising database with host '%s' ('%s')" (Db.Host.get_name_label ~__context ~self:host) (Ref.string_of host);
it really should take a backup *)
let request_backup ~__context ~host ~generation ~force =
- if Helpers.get_localhost () <> host
+ if Helpers.get_localhost ~__context <> host
then failwith "Forwarded to the wrong host";
let master_address = Helpers.get_main_ip_address __context in
Pool_db_backup.fetch_database_backup ~master_address:master_address ~pool_secret:!Xapi_globs.pool_secret
Xapi_network.attach_internal ~management_interface:true ~__context ~self:net ();
change_management_interface ~__context bridge;
- Xapi_pif.update_management_flags ~__context ~host:(Helpers.get_localhost ())
+ Xapi_pif.update_management_flags ~__context ~host:(Helpers.get_localhost ~__context)
end
let management_disable ~__context =
Xapi_mgmt_iface.stop ();
(* Make sure all my PIFs are marked appropriately *)
- Xapi_pif.update_management_flags ~__context ~host:(Helpers.get_localhost ())
+ Xapi_pif.update_management_flags ~__context ~host:(Helpers.get_localhost ~__context)
let get_system_status_capabilities ~__context ~host =
- if Helpers.get_localhost () <> host
+ if Helpers.get_localhost ~__context <> host
then failwith "Forwarded to the wrong host";
System_status.get_capabilities()
val update_master : __context:'a -> host:'b -> master_address:'c -> 'd
val emergency_ha_disable : __context:'a -> unit
val request_backup :
- __context:'a -> host:API.ref_host -> generation:int64 -> force:bool -> unit
+ __context:Context.t -> host:API.ref_host -> generation:int64 -> force:bool -> unit
val request_config_file_sync : __context:'a -> host:'b -> hash:string -> unit
val syslog_config_write : string -> bool -> bool -> unit
val syslog_reconfigure : __context:Context.t -> host:'a -> unit
(** {2 (Fill in title!)} *)
val get_system_status_capabilities :
- __context:'a -> host:API.ref_host -> string
+ __context:Context.t -> host:API.ref_host -> string
val get_diagnostic_timing_stats :
- __context:'a -> host:'b -> (string * string) list
+ __context:Context.t -> host:'b -> (string * string) list
val set_hostname_live :
__context:Context.t -> host:[ `host ] Ref.t -> hostname:string -> unit
val is_in_emergency_mode : __context:'a -> bool
let last_updated = Date.of_float (Unix.gettimeofday ()) in
let m = Db.Host.get_metrics ~__context ~self:host in
(* Every host should always have a Host_metrics object *)
- if Db.is_valid_ref m then begin
+ if Db.is_valid_ref __context m then begin
Db.Host_metrics.set_memory_total ~__context ~self:m ~value:memory_total;
Db.Host_metrics.set_memory_free ~__context ~self:m ~value:memory_free;
Db.Host_metrics.set_last_updated ~__context ~self:m ~value:last_updated;
if List.mem_assoc "subtask_of" all
then Some (Ref.of_string (List.assoc "subtask_of" all))
else None in
+ let localhost = Server_helpers.exec_with_new_task "with_context" (fun __context -> Helpers.get_localhost ~__context) in
try
let session_id,must_logout =
if List.mem_assoc "session_id" all
then Ref.of_string (List.assoc "session_id" all), false
else
if List.mem_assoc "pool_secret" all
- then Client.Session.slave_login inet_rpc (Helpers.get_localhost ()) (List.assoc "pool_secret" all), true
+ then Client.Session.slave_login inet_rpc localhost (List.assoc "pool_secret" all), true
else begin
match req.Http.auth with
| Some (Http.Basic(username, password)) ->
2 Host.address
3. Console URIs *)
let new_hostname = Helpers.reget_hostname () in
- let localhost = Helpers.get_localhost () in
+ let localhost = Helpers.get_localhost ~__context in
if Db.Host.get_hostname ~__context ~self:localhost <> new_hostname then begin
debug "Changing Host.hostname in database to: %s" new_hostname;
Db.Host.set_hostname ~__context ~self:localhost ~value:new_hostname
if not(Netdev.Link.is_up bridge) then Netdev.Link.up bridge
let attach_internal ?(management_interface=false) ~__context ~self () =
- let host = Helpers.get_localhost () in
+ let host = Helpers.get_localhost ~__context in
let shafted_pifs, local_pifs =
Xapi_network_attach_helpers.assert_can_attach_network_on_host ~__context ~self ~host ~overide_management_if_check:management_interface in
true &&
pifr.API.pIF_host = localhost && (* this host only *)
Nm.is_dom0_interface pifr &&
- not (Db.is_valid_ref pifr.API.pIF_bond_slave_of) (* not enslaved by a bond *)
+ not (Db.is_valid_ref __context pifr.API.pIF_bond_slave_of) (* not enslaved by a bond *)
)
(Db.PIF.get_all_records ~__context)
interfaces required by storage NICs etc. (these interface are not filtered out at the moment).
*)
val calculate_pifs_required_at_start_of_day :
- __context:'a -> ('b Ref.t * API.pIF_t) list
+ __context:Context.t -> ('b Ref.t * API.pIF_t) list
(** Attempt to bring up (plug) the required PIFs when the host starts up.
* Uses {!calculate_pifs_required_at_start_of_day}. *)
then debug "flushed database to metadata VDI: assuming this is sufficient."
else begin
debug "flushing database to all online nodes";
- let generation = Db_lock.with_lock (fun () -> Manifest.generation (Database.manifest (Db_backend.get_database ()))) in
+ let generation = Db_lock.with_lock (fun () -> Manifest.generation (Database.manifest (Db_ref.get_database (Context.database_of __context)))) in
Threadext.thread_iter
(fun host ->
Helpers.call_api_functions ~__context
let all_hosts = Db.Host.get_all ~__context in
(* We make no attempt to demand a quorum or anything. *)
let addresses = List.map (fun self -> Db.Host.get_address ~__context ~self) all_hosts in
- let my_address = Db.Host.get_address ~__context ~self:(Helpers.get_localhost ()) in
+ let my_address = Db.Host.get_address ~__context ~self:(Helpers.get_localhost ~__context) in
let peers = List.filter (fun x -> x <> my_address) addresses in
Xapi_pool_transition.attempt_two_phase_commit_of_new_master ~__context true peers my_address
end
let is_slave = not (Pool_role.is_master ()) in
info "Pool.is_slave call received (I'm a %s)" (if is_slave then "slave" else "master");
debug "About to kick the database connection to make sure it's still working...";
- Db.is_valid_ref (Ref.of_string "Pool.is_slave checking to see if the database connection is up");
+ Db.is_valid_ref __context (Ref.of_string "Pool.is_slave checking to see if the database connection is up");
is_slave
let hello ~__context ~host_uuid ~host_address =
val sync_database : __context:Context.t -> unit
val designate_new_master : __context:Context.t -> host:'a -> unit
val initial_auth : __context:'a -> string
-val is_slave : __context:'a -> host:'b -> bool
+val is_slave : __context:Context.t -> host:'b -> bool
val hello :
__context:Context.t ->
host_uuid:string ->
let metrics = Db.VIF.get_metrics ~__context ~self in
(* Don't let a failure to destroy the metrics stop us *)
Helpers.log_exn_continue "VIF_metrics.destroy"
- (fun self -> if Db.is_valid_ref self then Db.VIF_metrics.destroy ~__context ~self) metrics;
+ (fun self -> if Db.is_valid_ref __context self then Db.VIF_metrics.destroy ~__context ~self) metrics;
Db.VIF.destroy ~__context ~self
let power_state = Db.VM.get_power_state ~__context ~self:vm in
if power_state = `Running || power_state = `Paused then begin
debug "VM.power_state_reset vm=%s power state is either running or paused: performing sanity checks" (Ref.string_of vm);
- let localhost = Helpers.get_localhost () in
+ let localhost = Helpers.get_localhost ~__context in
(* We only query domid, resident_on and Xc.domain_getinfo with the VM lock held to make
sure the VM isn't in the middle of a migrate/reboot/shutdown. Note we don't hold it for
the whole of this function which might perform off-box RPCs. *)
let revert ~__context ~snapshot =
let vm = Db.VM.get_snapshot_of ~__context ~self:snapshot in
let vm =
- if Db.is_valid_ref vm
+ if Db.is_valid_ref __context vm
then vm
else Xapi_vm_snapshot.create_vm_from_snapshot ~__context ~snapshot in
Xapi_vm_snapshot.revert ~__context ~snapshot ~vm
val start :
__context:Context.t ->
vm:API.ref_VM -> start_paused:bool -> force:'a -> unit
-val assert_host_is_localhost : __context:'a -> host:API.ref_host -> unit
+val assert_host_is_localhost : __context:Context.t -> host:API.ref_host -> unit
val start_on :
__context:Context.t ->
vm:API.ref_VM -> host:API.ref_host -> start_paused:bool -> force:'a -> unit
val set_memory_target_live :
__context:'a -> self:API.ref_VM -> target:'b -> unit
val wait_memory_target_live : __context:Context.t -> self:API.ref_VM -> unit
-val get_cooperative : __context:'a -> self:[ `VM ] Ref.t -> bool
+val get_cooperative : __context:Context.t -> self:[ `VM ] Ref.t -> bool
val set_HVM_shadow_multiplier :
__context:Context.t -> self:[ `VM ] Ref.t -> value:float -> unit
val set_shadow_multiplier_live :
__context:Context.t -> self:API.ref_VM -> multiplier:float -> unit
val send_sysrq : __context:'a -> vm:API.ref_VM -> key:'b -> 'c
val send_trigger : __context:'a -> vm:API.ref_VM -> trigger:'b -> 'c
-val get_boot_record : __context:'a -> self:API.ref_VM -> API.vM_t
+val get_boot_record : __context:Context.t -> self:API.ref_VM -> API.vM_t
val get_data_sources :
__context:Context.t -> self:[ `VM ] Ref.t -> API.data_source_t list
val record_data_source :
else
[]
-let snapshot_metadata ~vm ~is_a_snapshot =
+let snapshot_metadata ~__context ~vm ~is_a_snapshot =
if is_a_snapshot then
- Helpers.vm_to_string vm
+ Helpers.vm_to_string __context vm
else
""
in
(* Copy the old metrics if available, otherwise generate a fresh one *)
let m =
- if Db.is_valid_ref all.Db_actions.vM_metrics
+ if Db.is_valid_ref __context all.Db_actions.vM_metrics
then Some (Db.VM_metrics.get_record_internal ~__context ~self:all.Db_actions.vM_metrics)
else None
in
~snapshot_of:(if is_a_snapshot then vm else Ref.null)
~snapshot_time:(if is_a_snapshot then Date.of_float (Unix.gettimeofday ()) else Date.never)
~snapshot_info:(snapshot_info ~power_state ~is_a_snapshot)
- ~snapshot_metadata:(snapshot_metadata ~vm ~is_a_snapshot)
+ ~snapshot_metadata:(snapshot_metadata ~__context ~vm ~is_a_snapshot)
~transportable_snapshot_id:""
~parent
~resident_on:Ref.null
current_error
let maybe_get_guest_metrics ~__context ~ref =
- if Db.is_valid_ref ref
+ if Db.is_valid_ref __context ref
then Some (Db.VM_guest_metrics.get_record_internal ~__context ~self:ref)
else None
(** Returns a list of affinity host identifiers for the given [guest]. *)
let affinity_host_ids_of_guest __context guest =
let affinity_host = Db.VM.get_affinity ~__context ~self:guest in
- let affinity_host_is_valid = Db.is_valid_ref affinity_host in
+ let affinity_host_is_valid = Db.is_valid_ref __context affinity_host in
if affinity_host_is_valid
then [Db.Host.get_uuid __context affinity_host]
else []
try
let suspend_VDI = Db.VM.get_suspend_VDI ~__context ~self:vm in
Vmops.resume ~__context ~xc ~xs ~vm;
- if Db.is_valid_ref suspend_VDI then begin
+ if Db.is_valid_ref __context suspend_VDI then begin
Db.VM.set_suspend_VDI ~__context ~self:vm ~value:Ref.null;
Helpers.call_api_functions ~__context (fun rpc session_id -> Client.VDI.destroy rpc session_id suspend_VDI);
end;
let copy_vm_fields ~__context ~metadata ~dst ~do_not_copy ~default_values =
assert (Pool_role.is_master ());
debug "copying metadata into %s" (Ref.string_of dst);
- let module DB = (val (Db_cache.get ()) : Db_interface.DB_ACCESS) in
+ let db = Context.database_of __context in
+ let module DB = (val (Db_cache.get db) : Db_interface.DB_ACCESS) in
List.iter
(fun (key,value) ->
let value =
then List.assoc key default_values
else value in
if not (List.mem key do_not_copy)
- then DB.write_field Db_names.vm (Ref.string_of dst) key value)
+ then DB.write_field db Db_names.vm (Ref.string_of dst) key value)
metadata
let safe_destroy_vbd ~__context ~rpc ~session_id vbd =
- if Db.is_valid_ref vbd then begin
+ if Db.is_valid_ref __context vbd then begin
Client.VBD.destroy rpc session_id vbd
end
let safe_destroy_vif ~__context ~rpc ~session_id vif =
- if Db.is_valid_ref vif then begin
+ if Db.is_valid_ref __context vif then begin
Client.VIF.destroy rpc session_id vif
end
let safe_destroy_vdi ~__context ~rpc ~session_id vdi =
- if Db.is_valid_ref vdi then begin
+ if Db.is_valid_ref __context vdi then begin
let sr = Db.VDI.get_SR ~__context ~self:vdi in
if not (Db.SR.get_content_type ~__context ~self:sr = "iso") then
Client.VDI.destroy rpc session_id vdi
let vm_gm = Db.VM.get_guest_metrics ~__context ~self:vm in
debug "Reverting the guest metrics";
- if Db.is_valid_ref vm_gm then Db.VM_guest_metrics.destroy ~__context ~self:vm_gm;
- if Db.is_valid_ref snap_gm then begin
+ if Db.is_valid_ref __context vm_gm then Db.VM_guest_metrics.destroy ~__context ~self:vm_gm;
+ if Db.is_valid_ref __context snap_gm then begin
let new_gm = Xapi_vm_helpers.copy_guest_metrics ~__context ~vm:snapshot in
Db.VM.set_guest_metrics ~__context ~self:vm ~value:new_gm
end
let snap_metadata =
if post_MNR
then Helpers.vm_string_to_assoc snap_metadata
- else Helpers.vm_string_to_assoc (Helpers.vm_to_string snapshot) in
+ else Helpers.vm_string_to_assoc (Helpers.vm_to_string __context snapshot) in
let do_not_copy =
if post_MNR
then do_not_copy
(** Attempt to flush the database to the metadata VDI *)
let flush_database ~__context =
try
- Redo_log.flush_db_to_redo_log (Db_backend.get_database ());
+ Redo_log.flush_db_to_redo_log (Db_ref.get_database (Db_backend.make ()));
true
with _ -> false