]> xenbits.xensource.com Git - xcp/xen-api.git/commitdiff
Move the generation into the database cache rather than keeping it as a separate...
authorDavid Scott <dave.scott@eu.citrix.com>
Wed, 26 Jan 2011 17:39:05 +0000 (17:39 +0000)
committerDavid Scott <dave.scott@eu.citrix.com>
Wed, 26 Jan 2011 17:39:05 +0000 (17:39 +0000)
Signed-off-by: David Scott <dave.scott@eu.citrix.com>
12 files changed:
ocaml/database/backend_xml.ml
ocaml/database/db_cache.ml
ocaml/database/db_cache_types.ml
ocaml/database/db_cache_types.mli
ocaml/database/db_connections.ml
ocaml/database/db_xml.ml
ocaml/database/generation.ml
ocaml/db_process/xapi-db-process.ml
ocaml/xapi/pool_db_backup.ml
ocaml/xapi/redo_log_usage.ml
ocaml/xapi/xapi_ha.ml
ocaml/xapi/xapi_pool.ml

index 195fe1dc1fab6ed74905dcf7ab31c693e38eb531..23c3566954cb8a892978ec71840296464b821728 100644 (file)
@@ -80,11 +80,11 @@ let atomically_write_to_db_file filename marshall =
 let flush_db_to_redo_log cache_to_flush =
   if Redo_log.is_enabled () then begin
     (* Atomically read the generation count and take a deep copy of the cache *)
-    let (gen_count, cache_copy) = Db_lock.with_lock (fun () -> Generation.read_generation(), Db_cache_types.snapshot cache_to_flush) in
+    let cache_copy = Db_lock.with_lock (fun () -> Db_cache_types.snapshot cache_to_flush) in
     debug "Flushing cache to redo-log";
-    let db_cache_manifest = Db_cache_types.manifest_of_cache cache_to_flush gen_count in
+    let db_cache_manifest = Db_cache_types.manifest_of_cache cache_to_flush in
     let write_db_to_fd = (fun out_fd -> Db_xml.To.fd out_fd (db_cache_manifest, cache_copy)) in
-    Redo_log.write_db gen_count write_db_to_fd
+    Redo_log.write_db (Db_cache_types.generation_of_cache cache_copy) write_db_to_fd
   end
 
 (* atomically flush entire db cache to disk. If we are given a cache then flush that, otherwise flush the
@@ -104,7 +104,7 @@ let flush_common dbconn optional_cache =
 
   let do_flush cache_to_flush filename =
     flush_db_to_redo_log cache_to_flush;
-    let db_cache_manifest = Db_cache_types.manifest_of_cache cache (Generation.read_generation()) in
+    let db_cache_manifest = Db_cache_types.manifest_of_cache cache in
     if not dbconn.Parse_db_conf.compress
     then Db_xml.To.file filename (db_cache_manifest, cache_to_flush)
     else 
@@ -126,7 +126,9 @@ let flush_common dbconn optional_cache =
        (* if we were given a specific cache to flush then just do it; no locks required *)
     | Some c -> do_flush c filename in
   let filename = dbconn.Parse_db_conf.path in
+  let generation_filename = Generation.filename dbconn in
   atomically_write_to_db_file filename marshall;
+  Unixext.write_string_to_file generation_filename (Int64.to_string (Db_cache_types.generation_of_cache cache));
   debug "XML backend [%s] -- Write buffer flushed. Time: %f" filename (Unix.gettimeofday() -. time)
 
 
@@ -188,6 +190,5 @@ let create_empty_db (major, minor) dbconn =
   Db_cache_types.set_schema_vsn empty_cache (major, minor);
 
   List.iter (fun tbl -> set_table_in_cache empty_cache tbl (create_empty_table ())) db_table_names;
-  let manifest = Db_cache_types.make_manifest major minor 0L (* new gen count *) in
-  let marshall filename = Db_lock.with_lock (fun () -> Db_xml.To.file filename (manifest, empty_cache)) in
-  atomically_write_to_db_file dbconn.Parse_db_conf.path marshall;
+  flush_common dbconn (Some empty_cache)
+
index 1e86a3fe4eaf4bd18f0c0d2721b75eef35a69ea3..9b90d9e6c33772f6253c86ebece1200b4bc183f1 100644 (file)
@@ -52,7 +52,7 @@ exception Too_many_values of   (*class*) string * (*ref*) string * (*uuid*) stri
 module type DB_CACHE =
 sig
 
-  val dump_db_cache : Int64.t -> Unix.file_descr -> unit
+  val dump_db_cache : Unix.file_descr -> unit
 
   val stats : unit -> (string * int) list
 
@@ -159,7 +159,7 @@ struct
 
     let save_in_redo_log context entry =
       if Redo_log.is_enabled() then begin
-        Redo_log.write_delta (Generation.read_generation()) entry
+        Redo_log.write_delta (Db_cache_types.generation_of_cache Db_backend.cache) entry
           (fun () -> (* the function which will be invoked if a database write is required instead of a delta *)
             Backend_xml.flush_db_to_redo_log Db_backend.cache
           )
@@ -212,7 +212,7 @@ struct
                   (* Only flush to disk if persistent *)
                   Db_dirty.set_all_row_dirty_status objref Db_dirty.Modified;
                   Db_dirty.set_all_dirty_table_status tblname;
-                  Generation.increment();
+                  Db_cache_types.increment Db_backend.cache;
                   save_in_redo_log context (Redo_log.WriteField(tblname, objref, fldname, newval))
                 end;
 
@@ -358,7 +358,7 @@ struct
               (* Update cache dirty status *)
               Db_dirty.clear_all_row_dirty_status objref;
               Db_dirty.set_all_dirty_table_status tblname;
-              Generation.increment();
+                  Db_cache_types.increment Db_backend.cache;
               save_in_redo_log context (Redo_log.DeleteRow(tblname, objref))
             end;
           Ref_index.remove objref;
@@ -405,7 +405,7 @@ struct
             begin
               Db_dirty.set_all_row_dirty_status new_objref Db_dirty.New;
               Db_dirty.set_all_dirty_table_status tblname;
-              Generation.increment();
+                  Db_cache_types.increment Db_backend.cache;
               save_in_redo_log context (Redo_log.CreateRow(tblname, new_objref, kvs))
             end;
           add_ref_to_table_map new_objref tblname (* track ref against this table *);
@@ -492,7 +492,6 @@ struct
                if Sys.file_exists Xapi_globs.gen_metadata_db
                then fake_gen_dbconn :: connections else connections in
        
-      Db_connections.init_gen_count connections;
       (* If we have a temporary_restore_path (backup uploaded in previous run of xapi process) then restore from that *)
       let db = 
        if Sys.file_exists Xapi_globs.db_temporary_restore_path then begin
@@ -660,8 +659,8 @@ struct
           let processed_str = SExpr.string_of (SExpr.Node processed) in
           write_field context tbl objref fld processed_str)
            
-    let dump_db_cache generation fd =
-               let db_cache_manifest = Db_cache_types.manifest_of_cache Db_backend.cache generation in
+    let dump_db_cache fd =
+               let db_cache_manifest = Db_cache_types.manifest_of_cache Db_backend.cache in
       let time = Unix.gettimeofday() in
       (* Snapshot the cache (uses the lock) and then slowly serialise the copy *)
       Db_xml.To.fd fd (db_cache_manifest, snapshot cache);
@@ -855,7 +854,7 @@ struct
        "process_structured_field"
        (a,b,c,d,e)
 
-    let dump_db_cache _ = () (* this is master-only *)
+    let dump_db_cache _ = () (* this is master-only *)
 
     let apply_delta_to_cache _ = () (* this is master-only *)
 
@@ -936,8 +935,8 @@ struct
   let read_records_where s e =
     Stats.log_db_call None ("record(where):"^s) Stats.Read;
     (sw Master.read_records_where Slave.read_records_where) s e
-  let dump_db_cache manifest fd =
-    (sw Master.dump_db_cache Slave.dump_db_cache) manifest fd
+  let dump_db_cache fd =
+    (sw Master.dump_db_cache Slave.dump_db_cache) fd
   let apply_delta_to_cache entry =
     (sw Master.apply_delta_to_cache Slave.apply_delta_to_cache) entry
 
index 7bfed2bc404c2466aa97d4d2a55ddca68630a5a9..5adb14f65ced555691c37af6acaba585091a9da2 100644 (file)
@@ -18,6 +18,7 @@ type table = (string, row) Hashtbl.t
 type cache = {
        cache: (string, table) Hashtbl.t;
        schema: (int * int) option ref;
+       generation: Generation.t ref;
 }
 
 type where_record = {table:string; return:string; where_field:string; where_value:string}
@@ -47,12 +48,18 @@ let schema_of_cache cache = match !(cache.schema) with
 | None -> (0, 0)
 | Some (major, minor) -> major, minor
 
-let manifest_of_cache cache gen_count 
+let manifest_of_cache cache = 
        let major, minor = schema_of_cache cache in
-       make_manifest major minor gen_count
+       make_manifest major minor !(cache.generation)
 
 let set_schema_vsn cache (major, minor) = cache.schema := Some (major, minor)
 
+let increment cache = cache.generation := Int64.add !(cache.generation) 1L
+
+let generation_of_cache cache = !(cache.generation)
+
+let set_generation cache generation = cache.generation := generation
+
 (* Our versions of hashtbl.find *)
 let lookup_field_in_row row fld =
   try
@@ -91,7 +98,7 @@ let create_empty_row () = Hashtbl.create 20
 
 let create_empty_table () = Hashtbl.create 20
 
-let create_empty_cache () = { cache = Hashtbl.create 20; schema = ref None }
+let create_empty_cache () = { cache = Hashtbl.create 20; schema = ref None; generation = ref Generation.null_generation }
 
 let fold_over_fields func row acc = Hashtbl.fold func row acc
 
@@ -140,4 +147,6 @@ let snapshot cache : cache =
        
        let newcache = create_empty_cache () in  
        iter_over_tables (table newcache) cache;
+
+          set_generation newcache (generation_of_cache cache);
        newcache)
index c101ecfa82a9475a3f8b3e879ea693c6114b8c91..f185d4d201d33750916f7d5ec6a5212c7ad03165 100644 (file)
@@ -29,11 +29,15 @@ type db_dump_manifest = {
 }
 
 val make_manifest : int -> int -> Int64.t -> db_dump_manifest
-val manifest_of_cache: cache -> Int64.t -> db_dump_manifest
+val manifest_of_cache: cache -> db_dump_manifest
 
 val schema_of_cache: cache -> int * int
 val set_schema_vsn: cache -> int * int -> unit
 
+val generation_of_cache: cache -> Generation.t
+val set_generation: cache -> Generation.t -> unit
+val increment: cache -> unit
+
 val lookup_field_in_row : row -> string -> string
 val lookup_table_in_cache : cache -> string -> table
 val lookup_row_in_table : table -> string -> string -> row
index 71e474e3b5b8144154575e636922c081dc5206a8..fd7e1ecde7cc5b6edf5d9f24ce4ae9582f51a904 100644 (file)
@@ -18,11 +18,6 @@ open D
 let get_dbs_and_gen_counts() =
   List.map (fun conn->(Generation.read conn, conn)) (Db_conn_store.read_db_connections())
     
-let init_gen_count connections =
-  let gens = List.map Generation.read connections in
-  let max = List.fold_left (fun max x -> if x>max then x else max) 0L gens in
-  Generation.set_count max
-
 (* This returns the most recent of the db connections to populate from. It also initialises the in-memory
    generation count to the largest of the db connections' generation counts *)
 let pick_most_recent_db connections =
@@ -78,17 +73,11 @@ let pre_exit_hook () =
 let flush_dirty_and_maybe_exit dbconn exit_spec =
   Db_conn_store.with_db_conn_lock dbconn
     (fun () ->
-       let flush_conn dbconn =
-        let was_anything_flushed = Backend_xml.flush_dirty dbconn in
-        if was_anything_flushed then
-          Generation.write_out dbconn;
-     was_anything_flushed in
-
        (* if we're being told to shutdown by signal handler then flush every connection
          - the rationale is that we're not sure which db connections will be available on next restart *)
        if !exit_on_next_flush then
         begin
-          flush_conn dbconn;
+          Backend_xml.flush_dirty dbconn;
           let refcount = dec_and_read_db_flush_thread_refcount() in
           (* last flushing thread close the door on the way out.. *)
           if refcount = 0 then
@@ -101,7 +90,7 @@ let flush_dirty_and_maybe_exit dbconn exit_spec =
             debug "refcount is %d; not exiting" refcount
         end;
        
-       let was_anything_flushed = flush_conn dbconn in
+       let was_anything_flushed = Backend_xml.flush_dirty dbconn in
        
        (* exit if we've been told to by caller *)
        begin
@@ -109,7 +98,7 @@ let flush_dirty_and_maybe_exit dbconn exit_spec =
           None -> ()
         | (Some ret_code) -> pre_exit_hook(); exit ret_code
        end;
-       was_anything_flushed
+          was_anything_flushed
     )
 (*
 let create_empty_db (major, minor) dbconn =
@@ -117,27 +106,19 @@ let create_empty_db (major, minor) dbconn =
   Backend_xml.create_empty_db (major, minor) dbconn
   *)
 let maybe_create_new_db (major,minor) dbconn =
-  if not (Sys.file_exists dbconn.Parse_db_conf.path) then
-    begin
-      Generation.create_fresh dbconn;
-      Backend_xml.create_empty_db (major,minor) dbconn
-    end
+       if not (Sys.file_exists dbconn.Parse_db_conf.path) 
+       then Backend_xml.create_empty_db (major,minor) dbconn
 
 let force_flush_all dbconn =
-  debug "About to flush database: %s" dbconn.Parse_db_conf.path;
-  Db_conn_store.with_db_conn_lock dbconn
-    (fun () ->
-       begin
-        Backend_xml.force_flush_all dbconn None
-       end;
-       Generation.write_out dbconn
-    )
+       debug "About to flush database: %s" dbconn.Parse_db_conf.path;
+       Db_conn_store.with_db_conn_lock dbconn
+               (fun () ->
+                       Backend_xml.force_flush_all dbconn None
+               )
+
+let force_flush_specified_cache dbconn cache =
+       Db_conn_store.with_db_conn_lock dbconn
+               (fun () ->
+                       Backend_xml.force_flush_all dbconn (Some cache)
+               )
 
-let force_flush_specified_cache dbconn generation_count cache =
-  Db_conn_store.with_db_conn_lock dbconn
-    (fun () ->
-       begin
-        Backend_xml.force_flush_all dbconn (Some cache)
-       end;
-       Generation.write_out_specified_count dbconn generation_count
-    )
index bf0357efff3e6fd6abe7ed4807e4d239a69a38e8..5b61aebe5a0474f99b6408abd4f4a8565690ed3a 100644 (file)
@@ -124,12 +124,15 @@ module From = struct
     | _ -> f acc
     in
     let (cache, _, manifest) = f (create_empty_cache (), create_empty_table (), []) in
+       let generation_count = Int64.of_string (List.assoc _generation_count manifest) in
+       Db_cache_types.set_generation cache generation_count;
     (* Manifest is actually a record *)
     let manifest = { 
       schema_major_vsn = int_of_string (List.assoc _schema_major_vsn manifest);
       schema_minor_vsn = int_of_string (List.assoc _schema_minor_vsn manifest);
-      generation_count = Int64.of_string (List.assoc _generation_count manifest)
+      generation_count = generation_count
     } in
+
     manifest, cache
 
   let file xml_filename =
index 105a8c84b83fd7a14b6d82a5408b00ddcaba2f50..3502ecee8f0f1cb3f3d3daaef11db31945c7b6b2 100644 (file)
@@ -11,6 +11,7 @@
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU Lesser General Public License for more details.
  *)
+
 (* Simple generation count implementation *)
 module D = Debug.Debugger(struct let name = "sql" end)
 open D
@@ -22,61 +23,8 @@ let to_string g = Int64.to_string g
 let add_int a b = Int64.add a (Int64.of_int b)
 let null_generation = -1L
 
-(* the current generation (initialised on population) is stored here *)
-let current_generation : t ref = ref 0L
-let set_count g =
-  Db_lock.with_lock
-    (fun ()->
-       debug "Set generation count to %Ld" g;
-       current_generation := g)
-
-let gen_count_file dbconn = dbconn.Parse_db_conf.path^".generation"
-
-let read_generation() = Db_lock.with_lock (fun () -> !current_generation)
-
-let create_with_contents dbconn contents =
-  Db_lock.with_lock
-    (fun () ->
-       Unixext.write_string_to_file (gen_count_file dbconn) (Int64.to_string contents)
-    )
-
-(* Create a fresh generation count file *)
-let create_fresh dbconn = create_with_contents dbconn 0L
-let create_current dbconn = Db_lock.with_lock (fun ()->create_with_contents dbconn !current_generation)
-
-(* Increment generation count *)
-let increment() =
-  Db_lock.with_lock (fun ()->current_generation := Int64.add !current_generation 1L)
-
-(* Write out the cache's current generation count to the specified db connection *)
-let write_out dbconn =
-  Db_lock.with_lock
-    (fun () ->
-       try
-        let gencount_fname = gen_count_file dbconn in
-        debug "Writing generation count %Ld to file %s" !current_generation gencount_fname;
-        (* write new generation count atomically *)
-        Unixext.write_string_to_file gencount_fname (Int64.to_string !current_generation)
-       with e ->
-        debug "Exception incrementing generation count: %s" (Printexc.to_string e);
-        log_backtrace();
-        raise e
-    )
-
-let write_out_specified_count dbconn count =
-  Db_lock.with_lock
-    (fun () ->
-       try
-        let gencount_fname = gen_count_file dbconn in
-        debug "Writing specified generation count %Ld to file %s" count gencount_fname;
-        (* write new generation count atomically *)
-        Unixext.write_string_to_file gencount_fname (Int64.to_string count)
-       with e ->
-        debug "Exception incrementing generation count: %s" (Printexc.to_string e);
-        log_backtrace();
-        raise e
-    )
+let filename dbconn = dbconn.Parse_db_conf.path^".generation"
 
 let read dbconn =
-  let gencount_fname = gen_count_file dbconn in
+  let gencount_fname = filename dbconn in
   try Int64.of_string (Unixext.string_of_file gencount_fname) with _ -> 0L
index 807aa64b6941879b0b09984f7d01c46876070c17..7651e689891e8d9ab4b2b691c17adb8f022b0710 100644 (file)
@@ -94,18 +94,11 @@ let do_write_database() =
   begin
     read_in_database();
     if !xmltostdout then
-               Db_cache.DBCache.dump_db_cache (Generation.read_generation()) (Unix.descr_of_out_channel stdout)
+               Db_cache.DBCache.dump_db_cache (Unix.descr_of_out_channel stdout)
     else
       write_out_database !filename
   end
     
-let do_read_gencount() =
-  initialise_db_connections();
-  let connections = Db_conn_store.read_db_connections () in
-  Db_connections.init_gen_count connections;
-  let maxgen = Generation.read_generation() in
-  Printf.printf "%Ld\n" maxgen
-
 let find_my_host_row() =
   Xapi_inventory.read_inventory ();
   let localhost_uuid = Xapi_inventory.lookup Xapi_inventory._installation_uuid in
@@ -171,8 +164,6 @@ let _ =
   match parse_operation !operation with
   | Write_database ->
       do_write_database()
-  | Read_gencount ->
-      do_read_gencount()
   | Read_hostiqn ->
       do_read_hostiqn()
   | Write_hostiqn ->
index 5676aeb110d13033b37815b25dbe529b2aa4ebe8..34b6651246ac1dc8f3d331b9fa573ddcf799f34a 100644 (file)
@@ -40,7 +40,7 @@ let write_database (s: Unix.file_descr) ~__context =
                let len = String.length minimally_compliant_miami_database in
                ignore (Unix.write s minimally_compliant_miami_database 0 len)
        else
-               Db_cache.DBCache.dump_db_cache (Generation.read_generation()) s
+               Db_cache.DBCache.dump_db_cache s
 
 (** Make sure the backup database version is compatible *)
 let version_check manifest =
@@ -242,7 +242,7 @@ let fetch_database_backup ~master_address ~pool_secret ~force =
        (fun dbconn ->
           Threadext.Mutex.execute slave_backup_m
             (fun () ->
-               Db_connections.force_flush_specified_cache dbconn manifest.Db_cache_types.generation_count unmarshalled_db;
+               Db_connections.force_flush_specified_cache dbconn unmarshalled_db;
                Slave_backup.notify_write dbconn (* update writes_this_period for this connection *)
             )
        )
@@ -260,12 +260,13 @@ let pool_db_backup_thread () =
       begin
        let hosts = Db.Host.get_all ~__context in
        let hosts = List.filter (fun hostref -> hostref <> !Xapi_globs.localhost_ref) hosts in
+       let generation = Db_lock.with_lock (fun () -> Db_cache_types.generation_of_cache Db_backend.cache) in
        let dohost host =
          try
            Thread.delay pool_db_sync_timer;
            debug "Starting DB synchronise with host %s" (Ref.string_of host);
            Helpers.call_api_functions ~__context
-             (fun rpc session_id -> Client.Host.request_backup rpc session_id host (Generation.read_generation()) false);
+             (fun rpc session_id -> Client.Host.request_backup rpc session_id host generation false);
            debug "Finished DB synchronise";
          with
            e -> 
index 8317a7b75216fa30defdb163b61ad32a6f204d3a..5ed8d4bbd05591934e22896bfcfc1ae5b744a53c 100644 (file)
@@ -91,7 +91,9 @@ let read_from_redo_log staging_path =
         raise NoGeneration
       | Some generation ->
         (* Write the in-memory cache to the file *)
-        let manifest = Db_cache_types.manifest_of_cache Db_backend.cache generation in
+                 (* Make sure the generation count is right -- is this necessary? *)
+                 Db_cache_types.set_generation Db_backend.cache generation;
+          let manifest = Db_cache_types.manifest_of_cache Db_backend.cache in
         Db_xml.To.file staging_path (manifest, Db_backend.cache);
         Unixext.write_string_to_file (staging_path ^ ".generation") (Generation.to_string generation)
     end
index a0320ee94e992b443c85d74058600599451f760c..02705c4a94f1ec6a0a6d924a1357fd6717c6b7bb 100644 (file)
@@ -1524,10 +1524,11 @@ let enable __context heartbeat_srs configuration =
 
                                (* ... *)
                                (* Make sure everyone's got a fresh database *)
+                               let generation = Db_lock.with_lock (fun () -> Db_cache_types.generation_of_cache Db_backend.cache) in
                                let errors = thread_iter_all_exns
                                        (fun host ->
                                                debug "Synchronising database with host '%s' ('%s')" (Db.Host.get_name_label ~__context ~self:host) (Ref.string_of host);
-                                               Client.Host.request_backup rpc session_id host (Generation.read_generation()) true;
+                                               Client.Host.request_backup rpc session_id host generation true;
                                                count_call ()
                                        ) hosts in
                                List.iter
index ab0051f582ab7d625dd50fd9edcf5cf9ce9d9b6a..1bce625e77b4d1c26fd2576f636efd2ec5b72d50 100644 (file)
@@ -763,7 +763,7 @@ let eject ~__context ~host =
                        (* We need to delete all local dbs but leave remote ones alone *)
                        let local = List.filter (fun db -> not db.Parse_db_conf.is_on_remote_storage) dbs in
                        List.iter Unixext.unlink_safe (List.map (fun db->db.Parse_db_conf.path) local);
-                       List.iter Unixext.unlink_safe (List.map Generation.gen_count_file local);
+                       List.iter Unixext.unlink_safe (List.map Generation.filename local);
                        (* remove any shared databases from my db.conf *)
                        (* XXX: on OEM edition the db.conf is rebuilt on every boot *)
                        Parse_db_conf.write_db_conf local;
@@ -794,10 +794,11 @@ let sync_database ~__context =
        then debug "flushed database to metadata VDI: assuming this is sufficient."
        else begin
         debug "flushing database to all online nodes";
+                  let generation = Db_lock.with_lock (fun () -> Db_cache_types.generation_of_cache Db_backend.cache) in
         Threadext.thread_iter
           (fun host ->
              Helpers.call_api_functions ~__context
-               (fun rpc session_id -> Client.Host.request_backup rpc session_id host (Generation.read_generation()) true))
+               (fun rpc session_id -> Client.Host.request_backup rpc session_id host generation true))
           (Db.Host.get_all ~__context)
        end
     )