]> xenbits.xensource.com Git - xcp/xen-api.git/commitdiff
Add the possibility of automatically managed many-to-many relationships between objec...
authorDavid Scott <dave.scott@eu.citrix.com>
Wed, 26 Jan 2011 17:39:05 +0000 (17:39 +0000)
committerDavid Scott <dave.scott@eu.citrix.com>
Wed, 26 Jan 2011 17:39:05 +0000 (17:39 +0000)
1. Add the concept of a many-to-many mapping in the database schema
2. Loosen the existing datamodel sanity checks to permit this new configuration
3. Enforce referential integrity in the mid-level database update functions (remove_row; set_row_in_table)

Also add a set of unit-tests to check referential integity is maintained across create/destroy/modify

Signed-off-by: David Scott <dave.scott@eu.citrix.com>
ocaml/database/database_test.ml
ocaml/database/db_cache_types.ml
ocaml/database/schema.ml
ocaml/idl/datamodel_utils.ml
ocaml/idl/dm_api.ml

index 3a35073a40e4cd5e9c9d643ca8d42cfdf4b90514..aa7fcc4db28cc89ea2676b1fc138197a22371d1a 100644 (file)
@@ -174,6 +174,73 @@ module Tests = functor(Client: Db_interface.DB_ACCESS) -> struct
                        then failwith (Printf.sprintf "check_ref_index %s key %s: ref_index name_label = %s; db has %s" tblname key (Opt.default "None" name_label) (Opt.default "None" real_name_label))
                                        
 
+       open Pervasiveext
+       open Db_cache_types
+
+       let check_many_to_many () = 
+               let bar_column = { Schema.Column.name = "bars";
+               persistent = false;
+               empty = "()";
+               default = None;
+               issetref = false;
+               } in
+               let foo_column = { bar_column with Schema.Column.name = "foos" } in
+               let foo_table = { Schema.Table.name = "foo"; columns = [ bar_column ]; persistent = true } in
+               let bar_table = { Schema.Table.name = "bar"; columns = [ foo_column ]; persistent = true } in
+
+               let database = { Schema.Database.tables = [ foo_table; bar_table ] } in
+               let many_to_many = 
+                       Schema.StringMap.add "foo" [ "bars", "bar", "foos" ]
+                               (Schema.StringMap.add "bar" [ "foos", "foo", "bars" ]
+                                       Schema.StringMap.empty) in
+               let schema = { Schema.empty with
+                       Schema.database = database;
+                       many_to_many = many_to_many 
+               } in
+               let db = 
+                       ((fun x -> x)
+                       ++ (Db_backend.blow_away_non_persistent_fields schema)
+                       ++ (Db_upgrade.maybe_upgrade)
+                       ++ (Db_upgrade.generic_database_upgrade))
+                       (Database.make schema) in
+               (* make a foo with bars = [] *)
+               (* make a bar with foos = [] *)
+               (* add 'bar' to foo.bars *)
+               let db = 
+                       ((fun x -> x)
+                       ++ (set_field_in_row "foo" "foo:1" "bars" (add_to_set "bar:1" "()"))
+                       ++ (set_row_in_table "foo" "foo:1" (Row.add Db_names.ref "foo:1" (Row.add "bars" "()" Row.empty)))
+                       ++ (set_row_in_table "bar" "bar:1" (Row.add Db_names.ref "bar:1" (Row.add "foos" "()" Row.empty)))) db in
+               (* check that 'bar.foos' includes 'foo' *)
+               let bar_1 = Table.find "bar:1" (TableSet.find "bar" (Database.tableset db)) in
+               let bar_foos = Row.find "foos" bar_1 in
+               if bar_foos <> "('foo:1')"
+               then failwith (Printf.sprintf "check_many_to_many: bar(bar:1).foos expected ('foo:1') got %s" bar_foos);
+               
+               (* set foo.bars to [] *)
+               let foo_1 = Table.find "foo:1" (TableSet.find "foo" (Database.tableset db)) in
+               let db = set_field_in_row "foo" "foo:1" "bars" "()" db in
+               (* check that 'bar.foos' is empty *)
+               let bar_1 = Table.find "bar:1" (TableSet.find "bar" (Database.tableset db)) in
+               let bar_foos = Row.find "foos" bar_1 in
+               if bar_foos <> "()"
+               then failwith (Printf.sprintf "check_many_to_many: bar(bar:1).foos expected () got %s" bar_foos);               
+               (* add 'bar' to foo.bars *)
+               let db = set_field_in_row "foo" "foo:1" "bars" "('bar:1')" db in
+               (* check that 'bar.foos' includes 'foo' *)
+               let bar_1 = Table.find "bar:1" (TableSet.find "bar" (Database.tableset db)) in
+               let bar_foos = Row.find "foos" bar_1 in
+               if bar_foos <> "('foo:1')"
+               then failwith (Printf.sprintf "check_many_to_many: bar(bar:1).foos expected ('foo:1') got %s - 2" bar_foos);
+               (* delete 'bar' *)
+               let db = remove_row_from_table "bar" "bar:1" db in
+               (* check that 'foo.bars' is empty *)
+               let foo_1 = Table.find "foo:1" (TableSet.find "foo" (Database.tableset db)) in          
+               let foo_bars = Row.find "bars" foo_1 in
+               if foo_bars <> "()"
+               then failwith (Printf.sprintf "check_many_to_many: foo(foo:1).foos expected () got %s" foo_bars);                               
+               ()
+
        let main in_process =   
                (* reference which we create *)
                let valid_ref = "ref1" in
@@ -183,6 +250,8 @@ module Tests = functor(Client: Db_interface.DB_ACCESS) -> struct
                
        let vbd_ref = "waz" in
                let vbd_uuid = "whatever" in
+
+               check_many_to_many ();
                
                (* Before we begin, clear out any old state: *)
                expect_missing_row "VM" valid_ref
index 197812b73b8c9e9909326035682c181b11364c41..2c239744f3ab35f110ae5e4e4521dc353015a2a3 100644 (file)
@@ -294,6 +294,13 @@ let remove_from_set key t =
        let processed = List.filter (function SExpr.String x -> x <> key | _ -> true) existing in
        SExpr.string_of (SExpr.Node processed)
 
+let set_of_string t = 
+       List.map 
+               (function SExpr.String x -> x
+                       | x -> failwith (Printf.sprintf "Unexpected sexpr: %s" t))
+               (Db_action_helper.parse_sexpr t)
+let string_of_set t = SExpr.string_of (SExpr.Node (List.map (fun x -> SExpr.String x) t))
+
 exception Duplicate
 let add_to_map key value t = 
        let existing = Db_action_helper.parse_sexpr t in
@@ -338,10 +345,26 @@ let update_one_to_many tblname objref f db =
                else db
        ) db (Schema.one_to_many tblname (Database.schema db))
 
+let update_many_to_many tblname objref f db = 
+       List.fold_left (fun db (this_fld, other_tbl, other_fld) ->
+               let this_fld_val = get_field tblname objref this_fld db in
+               let this_fld_refs = set_of_string this_fld_val in
+               (* for each of this_fld_refs, apply f *)
+               List.fold_left (fun db other_ref ->
+                       let valid = try ignore(Database.table_of_ref other_ref db); true with _ -> false in
+                       if valid
+                       then 
+                               let other_field = get_field other_tbl other_ref other_fld db in
+                               set_field other_tbl other_ref other_fld (f objref other_field) db
+                       else db)
+                       db this_fld_refs
+       ) db (Schema.many_to_many tblname (Database.schema db))
+
 let set_row_in_table tblname objref newval =
        id
-               (* For any field in a one-to-many, add objref to the foreign Set(Ref_) fields *)
+               (* Update foreign Set(Ref _) fields *)
                (* NB this requires the new row to exist already *)
+       ++ (update_many_to_many tblname objref add_to_set)
        ++ (update_one_to_many tblname objref add_to_set)
        ++ ((Database.update ++ (TableSet.update tblname Table.empty) ++ (Table.update objref Row.empty))
                (fun _ -> newval))
@@ -358,10 +381,10 @@ let remove_row tblname objref uuid =
        id
        ++ ((Database.update ++ (TableSet.update tblname Table.empty))
                (Table.remove objref))
-               (* For any field in a one-to-many, remove objref from the foreign Set(Ref_) fields *)
+               (* Update foreign (Set(Ref _)) fields *)
                (* NB this requires the original row to still exist *)
        ++ (update_one_to_many tblname objref remove_from_set)
-
+       ++ (update_many_to_many tblname objref remove_from_set)
        ++ (Database.update_keymap (KeyMap.remove (Ref objref)))
        ++ (Database.update_keymap (fun m ->
                match uuid with
index 2818e3f179d05a60912c6c3c7eda8eaa27b05ecc..2fb2ee8b0acf3c8223ae93d84eaf368e568f712b 100644 (file)
@@ -40,6 +40,7 @@ type t = {
        database: Database.t;
        (** indexed by table name, a list of (this field, foreign table, foreign field) *)
        one_to_many: ((string * string * string) list) StringMap.t;
+       many_to_many: ((string * string * string) list) StringMap.t;
 }
 
 let database x = x.database
@@ -56,6 +57,7 @@ let empty = {
        minor_vsn = 0;
        database = { Database.tables = [] };
        one_to_many = StringMap.empty;
+       many_to_many = StringMap.empty;
 }
 
 let is_table_persistent schema tblname = 
@@ -77,6 +79,12 @@ let one_to_many tblname schema =
                StringMap.find tblname schema.one_to_many
        with Not_found -> []
 
+let many_to_many tblname schema = 
+       (* If there is no entry in the map it means that the table has no many-to-many relationships *)
+       try
+               StringMap.find tblname schema.many_to_many
+       with Not_found -> []
+
 (* This code could live higher up the stack *)
 let of_datamodel () = 
        let rec flatten_fields fs acc =
@@ -84,14 +92,19 @@ let of_datamodel () =
                                [] -> acc
                        | (Datamodel_types.Field f)::fs -> flatten_fields fs (f::acc)
                        | (Datamodel_types.Namespace (_,internal_fs))::fs -> flatten_fields fs (flatten_fields internal_fs acc) in
-       let column f = 
+       let column obj f = 
                let issetref = match f.Datamodel_types.ty with
                        | Datamodel_types.Set (Datamodel_types.Ref _) -> true
                        | _ -> false in 
+               let is_many_to_many f =
+                       let api = Datamodel.all_api in
+                       let this = obj.Datamodel_types.name, f.Datamodel_types.field_name in
+                       Datamodel_utils.Relations.is_in_relation api this && 
+                               (Datamodel_utils.Relations.classify api (this,(Datamodel_utils.Relations.other_end_of api this)) = (`Many, `Many)) in
                {
                        Column.name = Escaping.escape_id f.Datamodel_types.full_name;
-                       (* NB we always regenerate Set(Ref _) fields *)
-                       persistent = f.Datamodel_types.field_persist && not issetref;
+                       (* NB we always regenerate one-to-many Set(Ref _) fields *)
+                       persistent = f.Datamodel_types.field_persist && (is_many_to_many f || not issetref);
                        empty = Datamodel_values.gen_empty_db_val f.Datamodel_types.ty;
                        (* NB Set(Ref _) fields aren't allowed to have a default value specified so we hardcode one here *)
                        default = 
@@ -112,13 +125,22 @@ let of_datamodel () =
 
        let table obj = {
                Table.name = Escaping.escape_obj obj.Datamodel_types.name;
-               columns = _ref :: (List.map column (flatten_fields obj.Datamodel_types.contents []));
+               columns = _ref :: (List.map (column obj) (flatten_fields obj.Datamodel_types.contents []));
                persistent = obj.Datamodel_types.persist = Datamodel_types.PersistEverything;
        } in
-       let one_to_many t ((one_tbl, one_fld), (many_tbl, many_fld)) =
-               let key = (one_fld, many_tbl, many_fld) in
+       let is_one_to_many x = 
+               match Datamodel_utils.Relations.classify Datamodel.all_api x with
+                       | `One, `Many | `Many, `One -> true
+                       | _ -> false in
+       let is_many_to_many x = 
+               match Datamodel_utils.Relations.classify Datamodel.all_api x with
+                       | `Many, `Many -> true
+                       | _ -> false in
+       let add_relation p t (((one_tbl, one_fld), (many_tbl, many_fld)) as r) =
                let l = if StringMap.mem one_tbl t then StringMap.find one_tbl t else [] in
-               StringMap.add one_tbl ((one_fld, many_tbl, many_fld) :: l) t in
+               if p r 
+               then StringMap.add one_tbl ((one_fld, many_tbl, many_fld) :: l) t
+               else t in
 
        let database api = {
                Database.tables = List.map table (Dm_api.objects_of_api api)
@@ -127,5 +149,6 @@ let of_datamodel () =
                major_vsn = Datamodel.schema_major_vsn;
                minor_vsn = Datamodel.schema_minor_vsn;
                database = database Datamodel.all_api;
-               one_to_many = List.fold_left one_to_many StringMap.empty (Dm_api.relations_of_api Datamodel.all_api);
+               one_to_many = List.fold_left (add_relation is_one_to_many) StringMap.empty (Dm_api.relations_of_api Datamodel.all_api);
+               many_to_many = List.fold_left (add_relation is_many_to_many) StringMap.empty (Dm_api.relations_of_api Datamodel.all_api);
        }
index c41a2f7b464d2086137d7edcaf66af8af8eb16a6..ab38fddc7c6517666d7fe47ba850b952777bb973 100644 (file)
@@ -109,6 +109,10 @@ module Relations = struct
     | [ other_end ] -> other_end
     | [] -> failwith (Printf.sprintf "Couldn't find other end of relation (%s,%s)" a b)
     | _ -> failwith ("Found multiple other ends of relation?!")
+
+  let is_in_relation api x = 
+         let rels = relations_of_api api in
+         List.mem_assoc x rels || (List.mem_assoc x (List.map (fun (k, v) -> v, k) rels))
   
 end
 
@@ -246,9 +250,16 @@ let new_messages_of_field x order fld =
                                (String.concat "/" fld.full_name) x.name);
                   msg_allowed_roles = fld.field_setter_roles;
                   msg_tag = FromField(Setter, fld) } in
-  match (fld.ty, fld.field_ignore_foreign_key) with
-  | Set(Ref _), false -> if order = 0 then [getter] else []
-  | Set(t), _ -> 
+  (* Set(Ref _) fields in a many-to-many generate symmetrical add_to, remove_from etc *)
+  let is_many_to_many =
+         let api = Datamodel.all_api in
+         let this = x.name, fld.field_name in
+         Relations.is_in_relation api this && 
+                 (Relations.classify api (this,(Relations.other_end_of api this)) = (`Many, `Many)) in
+
+  match (fld.ty, fld.field_ignore_foreign_key, is_many_to_many) with
+  | Set(Ref _), false, false -> if order = 0 then [getter] else []
+  | Set(t), _, _ -> 
       if order = 0 then [getter] else [
        setter; (* only makes sense to the database *)
        { common with
@@ -272,7 +283,7 @@ let new_messages_of_field x order fld =
            msg_allowed_roles = fld.field_setter_roles;
            msg_tag = FromField(Remove, fld) };
       ]
-  | Map(k, v), _ -> 
+  | Map(k, v), _, _ -> 
       if order = 0 then [getter] else [
        setter; (* only makes sense to the database *)
        { common with
@@ -299,7 +310,7 @@ let new_messages_of_field x order fld =
            msg_map_keys_roles = List.map (fun (k,(w))->(k,w)) fld.field_map_keys_roles;
            msg_tag = FromField(Remove, fld) };
       ]
-  | t, _ -> [
+  | t, _, _ -> [
       if order = 0 then getter else setter
     ] 
 
index 69917bc270023c6f4f64323820273fefca409a9e..8012b324bdab9d1acbca6487fc542d591abbe5a7 100644 (file)
@@ -161,11 +161,42 @@ let check api emergency_calls =
   let (_: obj list) = map_field (function { ty = Ref _; field_has_effect = true } ->
                                   failwith "Can't have a Ref field with a side-effect: it makes the destructors too complicated"
                                 | x -> x) system in
-  (* Sanity check 3: all Set(Ref _) fields should be DynamicRO *)
-  let (_: obj list) = map_field (function { ty = Set(Ref _); qualifier = q; field_ignore_foreign_key=false } as x-> 
-                                  if q <> DynamicRO
-                                  then failwith (Printf.sprintf "Can't have a Set(Ref _) field which isn't DynamicRO: %s" (String.concat "/" x.full_name)) else x
-                                | x -> x) system in
+  (* Sanity check: all Set(Ref _) fields should be one of:
+        1. one-to-many: the many end should be DynamicRO
+        2. many-to-many: the many end should be DynamicRO or RW
+        3. something else with field_ignore_foreign_key
+  *)
+  let rec flatten_fields fs acc =
+         match fs with
+                         [] -> acc
+                 | (Field f)::fs -> flatten_fields fs (f::acc)
+                 | (Namespace (_,internal_fs))::fs -> flatten_fields fs (flatten_fields internal_fs acc) in
+  let _ = 
+         let field objname = function
+                 { ty = Set(Ref y); qualifier = q; field_ignore_foreign_key = false } as x ->
+                         let relations = relations @ (List.map (fun (x, y) -> y, x) relations) in
+                         if not(List.mem_assoc (objname, x.field_name) relations)
+                         then failwith (Printf.sprintf "Set(Ref _) field is not in relations table: %s.%s" objname x.field_name);
+                         let other_obj, other_fld = List.assoc (objname, x.field_name) relations in
+                         let other_f = get_field_by_name api ~objname:other_obj ~fieldname:other_fld in
+                         begin match other_f.ty with
+                                 | Set(Ref _) ->
+                                         if q <> DynamicRO && q <> RW
+                                         then failwith (Printf.sprintf "many-to-many Set(Ref _) is not RW or DynamicRO: %s.%s" objname x.field_name);
+                                         if not x.field_persist
+                                         then failwith (Printf.sprintf "many-to-many Set(Ref _) is not persistent: %s.%s" objname x.field_name);
+                                         if not other_f.field_persist
+                                         then failwith (Printf.sprintf "many-to-many Set(Ref _) is not persistent: %s.%s" other_obj other_fld);
+                                 | Ref _ ->
+                                         if q <> DynamicRO
+                                         then failwith (Printf.sprintf "many-to-many Set(Ref _) is not DynamicRO: %s.%s" objname x.field_name)
+                                 | ty ->
+                                         failwith (Printf.sprintf "field in relationship has bad type (Ref or Set(Ref) only): %s.%s" other_obj other_fld)
+                         end
+                 | _ -> () in
+         let obj o = List.iter (field o.name) (flatten_fields o.contents []) in
+         List.iter obj (objects_of_api api) in
+                 
 
   (* Sanity check 4: all fields not in rel_rio and not dynamic_RO must have default values *)
   let (_: obj list) = map_field (function { qualifier=q; release={internal=ir}; default_value=None } as x ->