let flush_db_to_redo_log cache_to_flush =
if Redo_log.is_enabled () then begin
(* Atomically read the generation count and take a deep copy of the cache *)
- let (gen_count, cache_copy) = Db_lock.with_lock (fun () -> Generation.read_generation(), Db_cache_types.snapshot cache_to_flush) in
+ let cache_copy = Db_lock.with_lock (fun () -> Db_cache_types.snapshot cache_to_flush) in
debug "Flushing cache to redo-log";
- let db_cache_manifest = Db_cache_types.manifest_of_cache cache_to_flush gen_count in
+ let db_cache_manifest = Db_cache_types.manifest_of_cache cache_to_flush in
let write_db_to_fd = (fun out_fd -> Db_xml.To.fd out_fd (db_cache_manifest, cache_copy)) in
- Redo_log.write_db gen_count write_db_to_fd
+ Redo_log.write_db (Db_cache_types.generation_of_cache cache_copy) write_db_to_fd
end
(* atomically flush entire db cache to disk. If we are given a cache then flush that, otherwise flush the
let do_flush cache_to_flush filename =
flush_db_to_redo_log cache_to_flush;
- let db_cache_manifest = Db_cache_types.manifest_of_cache cache (Generation.read_generation()) in
+ let db_cache_manifest = Db_cache_types.manifest_of_cache cache in
if not dbconn.Parse_db_conf.compress
then Db_xml.To.file filename (db_cache_manifest, cache_to_flush)
else
(* if we were given a specific cache to flush then just do it; no locks required *)
| Some c -> do_flush c filename in
let filename = dbconn.Parse_db_conf.path in
+ let generation_filename = Generation.filename dbconn in
atomically_write_to_db_file filename marshall;
+ Unixext.write_string_to_file generation_filename (Int64.to_string (Db_cache_types.generation_of_cache cache));
debug "XML backend [%s] -- Write buffer flushed. Time: %f" filename (Unix.gettimeofday() -. time)
Db_cache_types.set_schema_vsn empty_cache (major, minor);
List.iter (fun tbl -> set_table_in_cache empty_cache tbl (create_empty_table ())) db_table_names;
- let manifest = Db_cache_types.make_manifest major minor 0L (* new gen count *) in
- let marshall filename = Db_lock.with_lock (fun () -> Db_xml.To.file filename (manifest, empty_cache)) in
- atomically_write_to_db_file dbconn.Parse_db_conf.path marshall;
+ flush_common dbconn (Some empty_cache)
+
module type DB_CACHE =
sig
- val dump_db_cache : Int64.t -> Unix.file_descr -> unit
+ val dump_db_cache : Unix.file_descr -> unit
val stats : unit -> (string * int) list
let save_in_redo_log context entry =
if Redo_log.is_enabled() then begin
- Redo_log.write_delta (Generation.read_generation()) entry
+ Redo_log.write_delta (Db_cache_types.generation_of_cache Db_backend.cache) entry
(fun () -> (* the function which will be invoked if a database write is required instead of a delta *)
Backend_xml.flush_db_to_redo_log Db_backend.cache
)
(* Only flush to disk if persistent *)
Db_dirty.set_all_row_dirty_status objref Db_dirty.Modified;
Db_dirty.set_all_dirty_table_status tblname;
- Generation.increment();
+ Db_cache_types.increment Db_backend.cache;
save_in_redo_log context (Redo_log.WriteField(tblname, objref, fldname, newval))
end;
(* Update cache dirty status *)
Db_dirty.clear_all_row_dirty_status objref;
Db_dirty.set_all_dirty_table_status tblname;
- Generation.increment();
+ Db_cache_types.increment Db_backend.cache;
save_in_redo_log context (Redo_log.DeleteRow(tblname, objref))
end;
Ref_index.remove objref;
begin
Db_dirty.set_all_row_dirty_status new_objref Db_dirty.New;
Db_dirty.set_all_dirty_table_status tblname;
- Generation.increment();
+ Db_cache_types.increment Db_backend.cache;
save_in_redo_log context (Redo_log.CreateRow(tblname, new_objref, kvs))
end;
add_ref_to_table_map new_objref tblname (* track ref against this table *);
if Sys.file_exists Xapi_globs.gen_metadata_db
then fake_gen_dbconn :: connections else connections in
- Db_connections.init_gen_count connections;
(* If we have a temporary_restore_path (backup uploaded in previous run of xapi process) then restore from that *)
let db =
if Sys.file_exists Xapi_globs.db_temporary_restore_path then begin
let processed_str = SExpr.string_of (SExpr.Node processed) in
write_field context tbl objref fld processed_str)
- let dump_db_cache generation fd =
- let db_cache_manifest = Db_cache_types.manifest_of_cache Db_backend.cache generation in
+ let dump_db_cache fd =
+ let db_cache_manifest = Db_cache_types.manifest_of_cache Db_backend.cache in
let time = Unix.gettimeofday() in
(* Snapshot the cache (uses the lock) and then slowly serialise the copy *)
Db_xml.To.fd fd (db_cache_manifest, snapshot cache);
"process_structured_field"
(a,b,c,d,e)
- let dump_db_cache _ _ = () (* this is master-only *)
+ let dump_db_cache _ = () (* this is master-only *)
let apply_delta_to_cache _ = () (* this is master-only *)
let read_records_where s e =
Stats.log_db_call None ("record(where):"^s) Stats.Read;
(sw Master.read_records_where Slave.read_records_where) s e
- let dump_db_cache manifest fd =
- (sw Master.dump_db_cache Slave.dump_db_cache) manifest fd
+ let dump_db_cache fd =
+ (sw Master.dump_db_cache Slave.dump_db_cache) fd
let apply_delta_to_cache entry =
(sw Master.apply_delta_to_cache Slave.apply_delta_to_cache) entry
type cache = {
cache: (string, table) Hashtbl.t;
schema: (int * int) option ref;
+ generation: Generation.t ref;
}
type where_record = {table:string; return:string; where_field:string; where_value:string}
| None -> (0, 0)
| Some (major, minor) -> major, minor
-let manifest_of_cache cache gen_count =
+let manifest_of_cache cache =
let major, minor = schema_of_cache cache in
- make_manifest major minor gen_count
+ make_manifest major minor !(cache.generation)
let set_schema_vsn cache (major, minor) = cache.schema := Some (major, minor)
+let increment cache = cache.generation := Int64.add !(cache.generation) 1L
+
+let generation_of_cache cache = !(cache.generation)
+
+let set_generation cache generation = cache.generation := generation
+
(* Our versions of hashtbl.find *)
let lookup_field_in_row row fld =
try
let create_empty_table () = Hashtbl.create 20
-let create_empty_cache () = { cache = Hashtbl.create 20; schema = ref None }
+let create_empty_cache () = { cache = Hashtbl.create 20; schema = ref None; generation = ref Generation.null_generation }
let fold_over_fields func row acc = Hashtbl.fold func row acc
let newcache = create_empty_cache () in
iter_over_tables (table newcache) cache;
+
+ set_generation newcache (generation_of_cache cache);
newcache)
}
val make_manifest : int -> int -> Int64.t -> db_dump_manifest
-val manifest_of_cache: cache -> Int64.t -> db_dump_manifest
+val manifest_of_cache: cache -> db_dump_manifest
val schema_of_cache: cache -> int * int
val set_schema_vsn: cache -> int * int -> unit
+val generation_of_cache: cache -> Generation.t
+val set_generation: cache -> Generation.t -> unit
+val increment: cache -> unit
+
val lookup_field_in_row : row -> string -> string
val lookup_table_in_cache : cache -> string -> table
val lookup_row_in_table : table -> string -> string -> row
let get_dbs_and_gen_counts() =
List.map (fun conn->(Generation.read conn, conn)) (Db_conn_store.read_db_connections())
-let init_gen_count connections =
- let gens = List.map Generation.read connections in
- let max = List.fold_left (fun max x -> if x>max then x else max) 0L gens in
- Generation.set_count max
-
(* This returns the most recent of the db connections to populate from. It also initialises the in-memory
generation count to the largest of the db connections' generation counts *)
let pick_most_recent_db connections =
let flush_dirty_and_maybe_exit dbconn exit_spec =
Db_conn_store.with_db_conn_lock dbconn
(fun () ->
- let flush_conn dbconn =
- let was_anything_flushed = Backend_xml.flush_dirty dbconn in
- if was_anything_flushed then
- Generation.write_out dbconn;
- was_anything_flushed in
-
(* if we're being told to shutdown by signal handler then flush every connection
- the rationale is that we're not sure which db connections will be available on next restart *)
if !exit_on_next_flush then
begin
- flush_conn dbconn;
+ Backend_xml.flush_dirty dbconn;
let refcount = dec_and_read_db_flush_thread_refcount() in
(* last flushing thread close the door on the way out.. *)
if refcount = 0 then
debug "refcount is %d; not exiting" refcount
end;
- let was_anything_flushed = flush_conn dbconn in
+ let was_anything_flushed = Backend_xml.flush_dirty dbconn in
(* exit if we've been told to by caller *)
begin
None -> ()
| (Some ret_code) -> pre_exit_hook(); exit ret_code
end;
- was_anything_flushed
+ was_anything_flushed
)
(*
let create_empty_db (major, minor) dbconn =
Backend_xml.create_empty_db (major, minor) dbconn
*)
let maybe_create_new_db (major,minor) dbconn =
- if not (Sys.file_exists dbconn.Parse_db_conf.path) then
- begin
- Generation.create_fresh dbconn;
- Backend_xml.create_empty_db (major,minor) dbconn
- end
+ if not (Sys.file_exists dbconn.Parse_db_conf.path)
+ then Backend_xml.create_empty_db (major,minor) dbconn
let force_flush_all dbconn =
- debug "About to flush database: %s" dbconn.Parse_db_conf.path;
- Db_conn_store.with_db_conn_lock dbconn
- (fun () ->
- begin
- Backend_xml.force_flush_all dbconn None
- end;
- Generation.write_out dbconn
- )
+ debug "About to flush database: %s" dbconn.Parse_db_conf.path;
+ Db_conn_store.with_db_conn_lock dbconn
+ (fun () ->
+ Backend_xml.force_flush_all dbconn None
+ )
+
+let force_flush_specified_cache dbconn cache =
+ Db_conn_store.with_db_conn_lock dbconn
+ (fun () ->
+ Backend_xml.force_flush_all dbconn (Some cache)
+ )
-let force_flush_specified_cache dbconn generation_count cache =
- Db_conn_store.with_db_conn_lock dbconn
- (fun () ->
- begin
- Backend_xml.force_flush_all dbconn (Some cache)
- end;
- Generation.write_out_specified_count dbconn generation_count
- )
| _ -> f acc
in
let (cache, _, manifest) = f (create_empty_cache (), create_empty_table (), []) in
+ let generation_count = Int64.of_string (List.assoc _generation_count manifest) in
+ Db_cache_types.set_generation cache generation_count;
(* Manifest is actually a record *)
let manifest = {
schema_major_vsn = int_of_string (List.assoc _schema_major_vsn manifest);
schema_minor_vsn = int_of_string (List.assoc _schema_minor_vsn manifest);
- generation_count = Int64.of_string (List.assoc _generation_count manifest)
+ generation_count = generation_count
} in
+
manifest, cache
let file xml_filename =
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)
+
(* Simple generation count implementation *)
module D = Debug.Debugger(struct let name = "sql" end)
open D
let add_int a b = Int64.add a (Int64.of_int b)
let null_generation = -1L
-(* the current generation (initialised on population) is stored here *)
-let current_generation : t ref = ref 0L
-let set_count g =
- Db_lock.with_lock
- (fun ()->
- debug "Set generation count to %Ld" g;
- current_generation := g)
-
-let gen_count_file dbconn = dbconn.Parse_db_conf.path^".generation"
-
-let read_generation() = Db_lock.with_lock (fun () -> !current_generation)
-
-let create_with_contents dbconn contents =
- Db_lock.with_lock
- (fun () ->
- Unixext.write_string_to_file (gen_count_file dbconn) (Int64.to_string contents)
- )
-
-(* Create a fresh generation count file *)
-let create_fresh dbconn = create_with_contents dbconn 0L
-let create_current dbconn = Db_lock.with_lock (fun ()->create_with_contents dbconn !current_generation)
-
-(* Increment generation count *)
-let increment() =
- Db_lock.with_lock (fun ()->current_generation := Int64.add !current_generation 1L)
-
-(* Write out the cache's current generation count to the specified db connection *)
-let write_out dbconn =
- Db_lock.with_lock
- (fun () ->
- try
- let gencount_fname = gen_count_file dbconn in
- debug "Writing generation count %Ld to file %s" !current_generation gencount_fname;
- (* write new generation count atomically *)
- Unixext.write_string_to_file gencount_fname (Int64.to_string !current_generation)
- with e ->
- debug "Exception incrementing generation count: %s" (Printexc.to_string e);
- log_backtrace();
- raise e
- )
-
-let write_out_specified_count dbconn count =
- Db_lock.with_lock
- (fun () ->
- try
- let gencount_fname = gen_count_file dbconn in
- debug "Writing specified generation count %Ld to file %s" count gencount_fname;
- (* write new generation count atomically *)
- Unixext.write_string_to_file gencount_fname (Int64.to_string count)
- with e ->
- debug "Exception incrementing generation count: %s" (Printexc.to_string e);
- log_backtrace();
- raise e
- )
+let filename dbconn = dbconn.Parse_db_conf.path^".generation"
let read dbconn =
- let gencount_fname = gen_count_file dbconn in
+ let gencount_fname = filename dbconn in
try Int64.of_string (Unixext.string_of_file gencount_fname) with _ -> 0L
begin
read_in_database();
if !xmltostdout then
- Db_cache.DBCache.dump_db_cache (Generation.read_generation()) (Unix.descr_of_out_channel stdout)
+ Db_cache.DBCache.dump_db_cache (Unix.descr_of_out_channel stdout)
else
write_out_database !filename
end
-let do_read_gencount() =
- initialise_db_connections();
- let connections = Db_conn_store.read_db_connections () in
- Db_connections.init_gen_count connections;
- let maxgen = Generation.read_generation() in
- Printf.printf "%Ld\n" maxgen
-
let find_my_host_row() =
Xapi_inventory.read_inventory ();
let localhost_uuid = Xapi_inventory.lookup Xapi_inventory._installation_uuid in
match parse_operation !operation with
| Write_database ->
do_write_database()
- | Read_gencount ->
- do_read_gencount()
| Read_hostiqn ->
do_read_hostiqn()
| Write_hostiqn ->
let len = String.length minimally_compliant_miami_database in
ignore (Unix.write s minimally_compliant_miami_database 0 len)
else
- Db_cache.DBCache.dump_db_cache (Generation.read_generation()) s
+ Db_cache.DBCache.dump_db_cache s
(** Make sure the backup database version is compatible *)
let version_check manifest =
(fun dbconn ->
Threadext.Mutex.execute slave_backup_m
(fun () ->
- Db_connections.force_flush_specified_cache dbconn manifest.Db_cache_types.generation_count unmarshalled_db;
+ Db_connections.force_flush_specified_cache dbconn unmarshalled_db;
Slave_backup.notify_write dbconn (* update writes_this_period for this connection *)
)
)
begin
let hosts = Db.Host.get_all ~__context in
let hosts = List.filter (fun hostref -> hostref <> !Xapi_globs.localhost_ref) hosts in
+ let generation = Db_lock.with_lock (fun () -> Db_cache_types.generation_of_cache Db_backend.cache) in
let dohost host =
try
Thread.delay pool_db_sync_timer;
debug "Starting DB synchronise with host %s" (Ref.string_of host);
Helpers.call_api_functions ~__context
- (fun rpc session_id -> Client.Host.request_backup rpc session_id host (Generation.read_generation()) false);
+ (fun rpc session_id -> Client.Host.request_backup rpc session_id host generation false);
debug "Finished DB synchronise";
with
e ->
raise NoGeneration
| Some generation ->
(* Write the in-memory cache to the file *)
- let manifest = Db_cache_types.manifest_of_cache Db_backend.cache generation in
+ (* Make sure the generation count is right -- is this necessary? *)
+ Db_cache_types.set_generation Db_backend.cache generation;
+ let manifest = Db_cache_types.manifest_of_cache Db_backend.cache in
Db_xml.To.file staging_path (manifest, Db_backend.cache);
Unixext.write_string_to_file (staging_path ^ ".generation") (Generation.to_string generation)
end
(* ... *)
(* Make sure everyone's got a fresh database *)
+ let generation = Db_lock.with_lock (fun () -> Db_cache_types.generation_of_cache Db_backend.cache) in
let errors = thread_iter_all_exns
(fun host ->
debug "Synchronising database with host '%s' ('%s')" (Db.Host.get_name_label ~__context ~self:host) (Ref.string_of host);
- Client.Host.request_backup rpc session_id host (Generation.read_generation()) true;
+ Client.Host.request_backup rpc session_id host generation true;
count_call ()
) hosts in
List.iter
(* We need to delete all local dbs but leave remote ones alone *)
let local = List.filter (fun db -> not db.Parse_db_conf.is_on_remote_storage) dbs in
List.iter Unixext.unlink_safe (List.map (fun db->db.Parse_db_conf.path) local);
- List.iter Unixext.unlink_safe (List.map Generation.gen_count_file local);
+ List.iter Unixext.unlink_safe (List.map Generation.filename local);
(* remove any shared databases from my db.conf *)
(* XXX: on OEM edition the db.conf is rebuilt on every boot *)
Parse_db_conf.write_db_conf local;
then debug "flushed database to metadata VDI: assuming this is sufficient."
else begin
debug "flushing database to all online nodes";
+ let generation = Db_lock.with_lock (fun () -> Db_cache_types.generation_of_cache Db_backend.cache) in
Threadext.thread_iter
(fun host ->
Helpers.call_api_functions ~__context
- (fun rpc session_id -> Client.Host.request_backup rpc session_id host (Generation.read_generation()) true))
+ (fun rpc session_id -> Client.Host.request_backup rpc session_id host generation true))
(Db.Host.get_all ~__context)
end
)