From: David Scott Date: Wed, 26 Jan 2011 17:39:05 +0000 (+0000) Subject: Move the generation into the database cache rather than keeping it as a separate... X-Git-Url: http://xenbits.xensource.com/gitweb?a=commitdiff_plain;h=1e1c00699935f9a4c52f334c52243fab2b34521f;p=xcp%2Fxen-api.git Move the generation into the database cache rather than keeping it as a separate global thing. Signed-off-by: David Scott --- diff --git a/ocaml/database/backend_xml.ml b/ocaml/database/backend_xml.ml index 195fe1dc..23c35669 100644 --- a/ocaml/database/backend_xml.ml +++ b/ocaml/database/backend_xml.ml @@ -80,11 +80,11 @@ let atomically_write_to_db_file filename marshall = let flush_db_to_redo_log cache_to_flush = if Redo_log.is_enabled () then begin (* Atomically read the generation count and take a deep copy of the cache *) - let (gen_count, cache_copy) = Db_lock.with_lock (fun () -> Generation.read_generation(), Db_cache_types.snapshot cache_to_flush) in + let cache_copy = Db_lock.with_lock (fun () -> Db_cache_types.snapshot cache_to_flush) in debug "Flushing cache to redo-log"; - let db_cache_manifest = Db_cache_types.manifest_of_cache cache_to_flush gen_count in + let db_cache_manifest = Db_cache_types.manifest_of_cache cache_to_flush in let write_db_to_fd = (fun out_fd -> Db_xml.To.fd out_fd (db_cache_manifest, cache_copy)) in - Redo_log.write_db gen_count write_db_to_fd + Redo_log.write_db (Db_cache_types.generation_of_cache cache_copy) write_db_to_fd end (* atomically flush entire db cache to disk. If we are given a cache then flush that, otherwise flush the @@ -104,7 +104,7 @@ let flush_common dbconn optional_cache = let do_flush cache_to_flush filename = flush_db_to_redo_log cache_to_flush; - let db_cache_manifest = Db_cache_types.manifest_of_cache cache (Generation.read_generation()) in + let db_cache_manifest = Db_cache_types.manifest_of_cache cache in if not dbconn.Parse_db_conf.compress then Db_xml.To.file filename (db_cache_manifest, cache_to_flush) else @@ -126,7 +126,9 @@ let flush_common dbconn optional_cache = (* if we were given a specific cache to flush then just do it; no locks required *) | Some c -> do_flush c filename in let filename = dbconn.Parse_db_conf.path in + let generation_filename = Generation.filename dbconn in atomically_write_to_db_file filename marshall; + Unixext.write_string_to_file generation_filename (Int64.to_string (Db_cache_types.generation_of_cache cache)); debug "XML backend [%s] -- Write buffer flushed. Time: %f" filename (Unix.gettimeofday() -. time) @@ -188,6 +190,5 @@ let create_empty_db (major, minor) dbconn = Db_cache_types.set_schema_vsn empty_cache (major, minor); List.iter (fun tbl -> set_table_in_cache empty_cache tbl (create_empty_table ())) db_table_names; - let manifest = Db_cache_types.make_manifest major minor 0L (* new gen count *) in - let marshall filename = Db_lock.with_lock (fun () -> Db_xml.To.file filename (manifest, empty_cache)) in - atomically_write_to_db_file dbconn.Parse_db_conf.path marshall; + flush_common dbconn (Some empty_cache) + diff --git a/ocaml/database/db_cache.ml b/ocaml/database/db_cache.ml index 1e86a3fe..9b90d9e6 100644 --- a/ocaml/database/db_cache.ml +++ b/ocaml/database/db_cache.ml @@ -52,7 +52,7 @@ exception Too_many_values of (*class*) string * (*ref*) string * (*uuid*) stri module type DB_CACHE = sig - val dump_db_cache : Int64.t -> Unix.file_descr -> unit + val dump_db_cache : Unix.file_descr -> unit val stats : unit -> (string * int) list @@ -159,7 +159,7 @@ struct let save_in_redo_log context entry = if Redo_log.is_enabled() then begin - Redo_log.write_delta (Generation.read_generation()) entry + Redo_log.write_delta (Db_cache_types.generation_of_cache Db_backend.cache) entry (fun () -> (* the function which will be invoked if a database write is required instead of a delta *) Backend_xml.flush_db_to_redo_log Db_backend.cache ) @@ -212,7 +212,7 @@ struct (* Only flush to disk if persistent *) Db_dirty.set_all_row_dirty_status objref Db_dirty.Modified; Db_dirty.set_all_dirty_table_status tblname; - Generation.increment(); + Db_cache_types.increment Db_backend.cache; save_in_redo_log context (Redo_log.WriteField(tblname, objref, fldname, newval)) end; @@ -358,7 +358,7 @@ struct (* Update cache dirty status *) Db_dirty.clear_all_row_dirty_status objref; Db_dirty.set_all_dirty_table_status tblname; - Generation.increment(); + Db_cache_types.increment Db_backend.cache; save_in_redo_log context (Redo_log.DeleteRow(tblname, objref)) end; Ref_index.remove objref; @@ -405,7 +405,7 @@ struct begin Db_dirty.set_all_row_dirty_status new_objref Db_dirty.New; Db_dirty.set_all_dirty_table_status tblname; - Generation.increment(); + Db_cache_types.increment Db_backend.cache; save_in_redo_log context (Redo_log.CreateRow(tblname, new_objref, kvs)) end; add_ref_to_table_map new_objref tblname (* track ref against this table *); @@ -492,7 +492,6 @@ struct if Sys.file_exists Xapi_globs.gen_metadata_db then fake_gen_dbconn :: connections else connections in - Db_connections.init_gen_count connections; (* If we have a temporary_restore_path (backup uploaded in previous run of xapi process) then restore from that *) let db = if Sys.file_exists Xapi_globs.db_temporary_restore_path then begin @@ -660,8 +659,8 @@ struct let processed_str = SExpr.string_of (SExpr.Node processed) in write_field context tbl objref fld processed_str) - let dump_db_cache generation fd = - let db_cache_manifest = Db_cache_types.manifest_of_cache Db_backend.cache generation in + let dump_db_cache fd = + let db_cache_manifest = Db_cache_types.manifest_of_cache Db_backend.cache in let time = Unix.gettimeofday() in (* Snapshot the cache (uses the lock) and then slowly serialise the copy *) Db_xml.To.fd fd (db_cache_manifest, snapshot cache); @@ -855,7 +854,7 @@ struct "process_structured_field" (a,b,c,d,e) - let dump_db_cache _ _ = () (* this is master-only *) + let dump_db_cache _ = () (* this is master-only *) let apply_delta_to_cache _ = () (* this is master-only *) @@ -936,8 +935,8 @@ struct let read_records_where s e = Stats.log_db_call None ("record(where):"^s) Stats.Read; (sw Master.read_records_where Slave.read_records_where) s e - let dump_db_cache manifest fd = - (sw Master.dump_db_cache Slave.dump_db_cache) manifest fd + let dump_db_cache fd = + (sw Master.dump_db_cache Slave.dump_db_cache) fd let apply_delta_to_cache entry = (sw Master.apply_delta_to_cache Slave.apply_delta_to_cache) entry diff --git a/ocaml/database/db_cache_types.ml b/ocaml/database/db_cache_types.ml index 7bfed2bc..5adb14f6 100644 --- a/ocaml/database/db_cache_types.ml +++ b/ocaml/database/db_cache_types.ml @@ -18,6 +18,7 @@ type table = (string, row) Hashtbl.t type cache = { cache: (string, table) Hashtbl.t; schema: (int * int) option ref; + generation: Generation.t ref; } type where_record = {table:string; return:string; where_field:string; where_value:string} @@ -47,12 +48,18 @@ let schema_of_cache cache = match !(cache.schema) with | None -> (0, 0) | Some (major, minor) -> major, minor -let manifest_of_cache cache gen_count = +let manifest_of_cache cache = let major, minor = schema_of_cache cache in - make_manifest major minor gen_count + make_manifest major minor !(cache.generation) let set_schema_vsn cache (major, minor) = cache.schema := Some (major, minor) +let increment cache = cache.generation := Int64.add !(cache.generation) 1L + +let generation_of_cache cache = !(cache.generation) + +let set_generation cache generation = cache.generation := generation + (* Our versions of hashtbl.find *) let lookup_field_in_row row fld = try @@ -91,7 +98,7 @@ let create_empty_row () = Hashtbl.create 20 let create_empty_table () = Hashtbl.create 20 -let create_empty_cache () = { cache = Hashtbl.create 20; schema = ref None } +let create_empty_cache () = { cache = Hashtbl.create 20; schema = ref None; generation = ref Generation.null_generation } let fold_over_fields func row acc = Hashtbl.fold func row acc @@ -140,4 +147,6 @@ let snapshot cache : cache = let newcache = create_empty_cache () in iter_over_tables (table newcache) cache; + + set_generation newcache (generation_of_cache cache); newcache) diff --git a/ocaml/database/db_cache_types.mli b/ocaml/database/db_cache_types.mli index c101ecfa..f185d4d2 100644 --- a/ocaml/database/db_cache_types.mli +++ b/ocaml/database/db_cache_types.mli @@ -29,11 +29,15 @@ type db_dump_manifest = { } val make_manifest : int -> int -> Int64.t -> db_dump_manifest -val manifest_of_cache: cache -> Int64.t -> db_dump_manifest +val manifest_of_cache: cache -> db_dump_manifest val schema_of_cache: cache -> int * int val set_schema_vsn: cache -> int * int -> unit +val generation_of_cache: cache -> Generation.t +val set_generation: cache -> Generation.t -> unit +val increment: cache -> unit + val lookup_field_in_row : row -> string -> string val lookup_table_in_cache : cache -> string -> table val lookup_row_in_table : table -> string -> string -> row diff --git a/ocaml/database/db_connections.ml b/ocaml/database/db_connections.ml index 71e474e3..fd7e1ecd 100644 --- a/ocaml/database/db_connections.ml +++ b/ocaml/database/db_connections.ml @@ -18,11 +18,6 @@ open D let get_dbs_and_gen_counts() = List.map (fun conn->(Generation.read conn, conn)) (Db_conn_store.read_db_connections()) -let init_gen_count connections = - let gens = List.map Generation.read connections in - let max = List.fold_left (fun max x -> if x>max then x else max) 0L gens in - Generation.set_count max - (* This returns the most recent of the db connections to populate from. It also initialises the in-memory generation count to the largest of the db connections' generation counts *) let pick_most_recent_db connections = @@ -78,17 +73,11 @@ let pre_exit_hook () = let flush_dirty_and_maybe_exit dbconn exit_spec = Db_conn_store.with_db_conn_lock dbconn (fun () -> - let flush_conn dbconn = - let was_anything_flushed = Backend_xml.flush_dirty dbconn in - if was_anything_flushed then - Generation.write_out dbconn; - was_anything_flushed in - (* if we're being told to shutdown by signal handler then flush every connection - the rationale is that we're not sure which db connections will be available on next restart *) if !exit_on_next_flush then begin - flush_conn dbconn; + Backend_xml.flush_dirty dbconn; let refcount = dec_and_read_db_flush_thread_refcount() in (* last flushing thread close the door on the way out.. *) if refcount = 0 then @@ -101,7 +90,7 @@ let flush_dirty_and_maybe_exit dbconn exit_spec = debug "refcount is %d; not exiting" refcount end; - let was_anything_flushed = flush_conn dbconn in + let was_anything_flushed = Backend_xml.flush_dirty dbconn in (* exit if we've been told to by caller *) begin @@ -109,7 +98,7 @@ let flush_dirty_and_maybe_exit dbconn exit_spec = None -> () | (Some ret_code) -> pre_exit_hook(); exit ret_code end; - was_anything_flushed + was_anything_flushed ) (* let create_empty_db (major, minor) dbconn = @@ -117,27 +106,19 @@ let create_empty_db (major, minor) dbconn = Backend_xml.create_empty_db (major, minor) dbconn *) let maybe_create_new_db (major,minor) dbconn = - if not (Sys.file_exists dbconn.Parse_db_conf.path) then - begin - Generation.create_fresh dbconn; - Backend_xml.create_empty_db (major,minor) dbconn - end + if not (Sys.file_exists dbconn.Parse_db_conf.path) + then Backend_xml.create_empty_db (major,minor) dbconn let force_flush_all dbconn = - debug "About to flush database: %s" dbconn.Parse_db_conf.path; - Db_conn_store.with_db_conn_lock dbconn - (fun () -> - begin - Backend_xml.force_flush_all dbconn None - end; - Generation.write_out dbconn - ) + debug "About to flush database: %s" dbconn.Parse_db_conf.path; + Db_conn_store.with_db_conn_lock dbconn + (fun () -> + Backend_xml.force_flush_all dbconn None + ) + +let force_flush_specified_cache dbconn cache = + Db_conn_store.with_db_conn_lock dbconn + (fun () -> + Backend_xml.force_flush_all dbconn (Some cache) + ) -let force_flush_specified_cache dbconn generation_count cache = - Db_conn_store.with_db_conn_lock dbconn - (fun () -> - begin - Backend_xml.force_flush_all dbconn (Some cache) - end; - Generation.write_out_specified_count dbconn generation_count - ) diff --git a/ocaml/database/db_xml.ml b/ocaml/database/db_xml.ml index bf0357ef..5b61aebe 100644 --- a/ocaml/database/db_xml.ml +++ b/ocaml/database/db_xml.ml @@ -124,12 +124,15 @@ module From = struct | _ -> f acc in let (cache, _, manifest) = f (create_empty_cache (), create_empty_table (), []) in + let generation_count = Int64.of_string (List.assoc _generation_count manifest) in + Db_cache_types.set_generation cache generation_count; (* Manifest is actually a record *) let manifest = { schema_major_vsn = int_of_string (List.assoc _schema_major_vsn manifest); schema_minor_vsn = int_of_string (List.assoc _schema_minor_vsn manifest); - generation_count = Int64.of_string (List.assoc _generation_count manifest) + generation_count = generation_count } in + manifest, cache let file xml_filename = diff --git a/ocaml/database/generation.ml b/ocaml/database/generation.ml index 105a8c84..3502ecee 100644 --- a/ocaml/database/generation.ml +++ b/ocaml/database/generation.ml @@ -11,6 +11,7 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. *) + (* Simple generation count implementation *) module D = Debug.Debugger(struct let name = "sql" end) open D @@ -22,61 +23,8 @@ let to_string g = Int64.to_string g let add_int a b = Int64.add a (Int64.of_int b) let null_generation = -1L -(* the current generation (initialised on population) is stored here *) -let current_generation : t ref = ref 0L -let set_count g = - Db_lock.with_lock - (fun ()-> - debug "Set generation count to %Ld" g; - current_generation := g) - -let gen_count_file dbconn = dbconn.Parse_db_conf.path^".generation" - -let read_generation() = Db_lock.with_lock (fun () -> !current_generation) - -let create_with_contents dbconn contents = - Db_lock.with_lock - (fun () -> - Unixext.write_string_to_file (gen_count_file dbconn) (Int64.to_string contents) - ) - -(* Create a fresh generation count file *) -let create_fresh dbconn = create_with_contents dbconn 0L -let create_current dbconn = Db_lock.with_lock (fun ()->create_with_contents dbconn !current_generation) - -(* Increment generation count *) -let increment() = - Db_lock.with_lock (fun ()->current_generation := Int64.add !current_generation 1L) - -(* Write out the cache's current generation count to the specified db connection *) -let write_out dbconn = - Db_lock.with_lock - (fun () -> - try - let gencount_fname = gen_count_file dbconn in - debug "Writing generation count %Ld to file %s" !current_generation gencount_fname; - (* write new generation count atomically *) - Unixext.write_string_to_file gencount_fname (Int64.to_string !current_generation) - with e -> - debug "Exception incrementing generation count: %s" (Printexc.to_string e); - log_backtrace(); - raise e - ) - -let write_out_specified_count dbconn count = - Db_lock.with_lock - (fun () -> - try - let gencount_fname = gen_count_file dbconn in - debug "Writing specified generation count %Ld to file %s" count gencount_fname; - (* write new generation count atomically *) - Unixext.write_string_to_file gencount_fname (Int64.to_string count) - with e -> - debug "Exception incrementing generation count: %s" (Printexc.to_string e); - log_backtrace(); - raise e - ) +let filename dbconn = dbconn.Parse_db_conf.path^".generation" let read dbconn = - let gencount_fname = gen_count_file dbconn in + let gencount_fname = filename dbconn in try Int64.of_string (Unixext.string_of_file gencount_fname) with _ -> 0L diff --git a/ocaml/db_process/xapi-db-process.ml b/ocaml/db_process/xapi-db-process.ml index 807aa64b..7651e689 100644 --- a/ocaml/db_process/xapi-db-process.ml +++ b/ocaml/db_process/xapi-db-process.ml @@ -94,18 +94,11 @@ let do_write_database() = begin read_in_database(); if !xmltostdout then - Db_cache.DBCache.dump_db_cache (Generation.read_generation()) (Unix.descr_of_out_channel stdout) + Db_cache.DBCache.dump_db_cache (Unix.descr_of_out_channel stdout) else write_out_database !filename end -let do_read_gencount() = - initialise_db_connections(); - let connections = Db_conn_store.read_db_connections () in - Db_connections.init_gen_count connections; - let maxgen = Generation.read_generation() in - Printf.printf "%Ld\n" maxgen - let find_my_host_row() = Xapi_inventory.read_inventory (); let localhost_uuid = Xapi_inventory.lookup Xapi_inventory._installation_uuid in @@ -171,8 +164,6 @@ let _ = match parse_operation !operation with | Write_database -> do_write_database() - | Read_gencount -> - do_read_gencount() | Read_hostiqn -> do_read_hostiqn() | Write_hostiqn -> diff --git a/ocaml/xapi/pool_db_backup.ml b/ocaml/xapi/pool_db_backup.ml index 5676aeb1..34b66512 100644 --- a/ocaml/xapi/pool_db_backup.ml +++ b/ocaml/xapi/pool_db_backup.ml @@ -40,7 +40,7 @@ let write_database (s: Unix.file_descr) ~__context = let len = String.length minimally_compliant_miami_database in ignore (Unix.write s minimally_compliant_miami_database 0 len) else - Db_cache.DBCache.dump_db_cache (Generation.read_generation()) s + Db_cache.DBCache.dump_db_cache s (** Make sure the backup database version is compatible *) let version_check manifest = @@ -242,7 +242,7 @@ let fetch_database_backup ~master_address ~pool_secret ~force = (fun dbconn -> Threadext.Mutex.execute slave_backup_m (fun () -> - Db_connections.force_flush_specified_cache dbconn manifest.Db_cache_types.generation_count unmarshalled_db; + Db_connections.force_flush_specified_cache dbconn unmarshalled_db; Slave_backup.notify_write dbconn (* update writes_this_period for this connection *) ) ) @@ -260,12 +260,13 @@ let pool_db_backup_thread () = begin let hosts = Db.Host.get_all ~__context in let hosts = List.filter (fun hostref -> hostref <> !Xapi_globs.localhost_ref) hosts in + let generation = Db_lock.with_lock (fun () -> Db_cache_types.generation_of_cache Db_backend.cache) in let dohost host = try Thread.delay pool_db_sync_timer; debug "Starting DB synchronise with host %s" (Ref.string_of host); Helpers.call_api_functions ~__context - (fun rpc session_id -> Client.Host.request_backup rpc session_id host (Generation.read_generation()) false); + (fun rpc session_id -> Client.Host.request_backup rpc session_id host generation false); debug "Finished DB synchronise"; with e -> diff --git a/ocaml/xapi/redo_log_usage.ml b/ocaml/xapi/redo_log_usage.ml index 8317a7b7..5ed8d4bb 100644 --- a/ocaml/xapi/redo_log_usage.ml +++ b/ocaml/xapi/redo_log_usage.ml @@ -91,7 +91,9 @@ let read_from_redo_log staging_path = raise NoGeneration | Some generation -> (* Write the in-memory cache to the file *) - let manifest = Db_cache_types.manifest_of_cache Db_backend.cache generation in + (* Make sure the generation count is right -- is this necessary? *) + Db_cache_types.set_generation Db_backend.cache generation; + let manifest = Db_cache_types.manifest_of_cache Db_backend.cache in Db_xml.To.file staging_path (manifest, Db_backend.cache); Unixext.write_string_to_file (staging_path ^ ".generation") (Generation.to_string generation) end diff --git a/ocaml/xapi/xapi_ha.ml b/ocaml/xapi/xapi_ha.ml index a0320ee9..02705c4a 100644 --- a/ocaml/xapi/xapi_ha.ml +++ b/ocaml/xapi/xapi_ha.ml @@ -1524,10 +1524,11 @@ let enable __context heartbeat_srs configuration = (* ... *) (* Make sure everyone's got a fresh database *) + let generation = Db_lock.with_lock (fun () -> Db_cache_types.generation_of_cache Db_backend.cache) in let errors = thread_iter_all_exns (fun host -> debug "Synchronising database with host '%s' ('%s')" (Db.Host.get_name_label ~__context ~self:host) (Ref.string_of host); - Client.Host.request_backup rpc session_id host (Generation.read_generation()) true; + Client.Host.request_backup rpc session_id host generation true; count_call () ) hosts in List.iter diff --git a/ocaml/xapi/xapi_pool.ml b/ocaml/xapi/xapi_pool.ml index ab0051f5..1bce625e 100644 --- a/ocaml/xapi/xapi_pool.ml +++ b/ocaml/xapi/xapi_pool.ml @@ -763,7 +763,7 @@ let eject ~__context ~host = (* We need to delete all local dbs but leave remote ones alone *) let local = List.filter (fun db -> not db.Parse_db_conf.is_on_remote_storage) dbs in List.iter Unixext.unlink_safe (List.map (fun db->db.Parse_db_conf.path) local); - List.iter Unixext.unlink_safe (List.map Generation.gen_count_file local); + List.iter Unixext.unlink_safe (List.map Generation.filename local); (* remove any shared databases from my db.conf *) (* XXX: on OEM edition the db.conf is rebuilt on every boot *) Parse_db_conf.write_db_conf local; @@ -794,10 +794,11 @@ let sync_database ~__context = then debug "flushed database to metadata VDI: assuming this is sufficient." else begin debug "flushing database to all online nodes"; + let generation = Db_lock.with_lock (fun () -> Db_cache_types.generation_of_cache Db_backend.cache) in Threadext.thread_iter (fun host -> Helpers.call_api_functions ~__context - (fun rpc session_id -> Client.Host.request_backup rpc session_id host (Generation.read_generation()) true)) + (fun rpc session_id -> Client.Host.request_backup rpc session_id host generation true)) (Db.Host.get_all ~__context) end )