From 45e6a03dc862e2bdee222e61bbea874972c9f49a Mon Sep 17 00:00:00 2001 From: David Scott Date: Wed, 26 Jan 2011 17:39:05 +0000 Subject: [PATCH] Add a component-level test for the database server subcomponent. Signed-off-by: David Scott --- ocaml/database/OMakefile | 13 +- ocaml/database/backend_xml.ml | 2 +- ocaml/database/database_server_main.ml | 80 ++ ocaml/database/database_test.ml | 426 +++++++++ ocaml/database/db_cache.ml | 956 +------------------ ocaml/database/db_cache_impl.ml | 623 ++++++++++++ ocaml/database/db_cache_impl.mli | 19 + ocaml/database/db_cache_types.ml | 4 +- ocaml/database/db_cache_types.mli | 6 + ocaml/database/db_connections.ml | 31 +- ocaml/database/db_exn.ml | 7 + ocaml/database/db_filter_types.ml | 3 +- ocaml/database/db_interface.ml | 98 ++ ocaml/database/db_remote_cache_access.ml | 135 --- ocaml/database/db_remote_cache_access_v1.ml | 131 +++ ocaml/database/db_remote_cache_access_v1.mli | 2 + ocaml/database/db_remote_cache_access_v2.ml | 78 ++ ocaml/database/db_remote_cache_access_v2.mli | 2 + ocaml/database/db_remote_marshall.ml | 307 ------ ocaml/database/db_rpc_client_v1.ml | 169 ++++ ocaml/database/db_rpc_client_v1.mli | 3 + ocaml/database/db_rpc_client_v2.ml | 114 +++ ocaml/database/db_rpc_client_v2.mli | 2 + ocaml/database/db_rpc_common_v1.ml | 311 ++++++ ocaml/database/db_rpc_common_v2.ml | 66 ++ ocaml/database/eventgen.ml | 8 +- ocaml/database/master_connection.ml | 16 +- ocaml/database/unit_test_marshall.ml | 2 +- ocaml/db_process/xapi-db-process.ml | 6 +- ocaml/idl/constants.ml | 1 + ocaml/idl/datamodel.ml | 1 + ocaml/idl/ocaml_backend/OMakefile | 10 +- ocaml/idl/ocaml_backend/exnHelper.ml | 4 +- ocaml/idl/ocaml_backend/gen_db_actions.ml | 37 +- ocaml/xapi/console.ml | 4 +- ocaml/xapi/db.ml | 3 +- ocaml/xapi/db_gc.ml | 3 +- ocaml/xapi/helpers.ml | 3 +- ocaml/xapi/import_raw_vdi.ml | 2 +- ocaml/xapi/message_forwarding.ml | 4 +- ocaml/xapi/monitor_dbcalls.ml | 2 +- ocaml/xapi/monitor_self.ml | 2 +- ocaml/xapi/pool_db_backup.ml | 2 +- ocaml/xapi/redo_log_usage.ml | 7 +- ocaml/xapi/workload_balancing.ml | 12 +- ocaml/xapi/xapi.ml | 34 +- ocaml/xapi/xapi_fuse.ml | 4 +- ocaml/xapi/xapi_pool_transition.ml | 4 +- ocaml/xapi/xapi_vm_placement.ml | 3 +- ocaml/xapi/xapi_vm_snapshot.ml | 3 +- 50 files changed, 2301 insertions(+), 1464 deletions(-) create mode 100644 ocaml/database/database_server_main.ml create mode 100644 ocaml/database/database_test.ml create mode 100644 ocaml/database/db_cache_impl.ml create mode 100644 ocaml/database/db_cache_impl.mli create mode 100644 ocaml/database/db_interface.ml delete mode 100644 ocaml/database/db_remote_cache_access.ml create mode 100644 ocaml/database/db_remote_cache_access_v1.ml create mode 100644 ocaml/database/db_remote_cache_access_v1.mli create mode 100644 ocaml/database/db_remote_cache_access_v2.ml create mode 100644 ocaml/database/db_remote_cache_access_v2.mli delete mode 100644 ocaml/database/db_remote_marshall.ml create mode 100644 ocaml/database/db_rpc_client_v1.ml create mode 100644 ocaml/database/db_rpc_client_v1.mli create mode 100644 ocaml/database/db_rpc_client_v2.ml create mode 100644 ocaml/database/db_rpc_client_v2.mli create mode 100644 ocaml/database/db_rpc_common_v1.ml create mode 100644 ocaml/database/db_rpc_common_v2.ml diff --git a/ocaml/database/OMakefile b/ocaml/database/OMakefile index 7f26b752..0460194d 100644 --- a/ocaml/database/OMakefile +++ b/ocaml/database/OMakefile @@ -1,8 +1,11 @@ + OCAMLINCLUDES = ../idl/ocaml_backend ../xapi ../idl ../util ../autogen OCAMLPACKS = xml-light2 stdext stunnel http-svr log sexpr #OCAMLPPFLAGS = -pp "camlp4o" #OCAMLDEPFLAGS = -pp "camlp4o" +UseCamlp4(rpc-light.syntax, db_rpc_common_v2 db_cache_types db_filter_types) + OCamlGeneratedFiles(db_filter_parse.ml db_filter_parse.mli db_filter_lex.ml) OCamlProgram(unit_test_sql, unit_test_sql) @@ -16,9 +19,17 @@ BLOCK_DEVICE_IO_FILES = \ OCamlProgram(block_device_io, $(BLOCK_DEVICE_IO_FILES)) OCamlDocProgram(block_device_io, $(BLOCK_DEVICE_IO_FILES)) +DATABASE_SERVER_FILES = database_server_main ../autogen/db_actions +DATABASE_TEST_FILES = database_test +section: + #XXX there are lots of interdependencies which we should be aim to remove + OCAML_LIBS += ../util/version ../idl/ocaml_backend/common ../idl/ocaml_backend/client ../util/stats ../idl/ocaml_backend/server + OCamlProgram(database_server, $(DATABASE_SERVER_FILES)) + OCamlProgram(database_test, $(DATABASE_TEST_FILES)) + section: OCAML_LIBS += ../idl/ocaml_backend/common ../idl/ocaml_backend/client ../idl/ocaml_backend/server - OCamlProgram(unit_test_marshall, unit_test_marshall db_remote_cache_access) + OCamlProgram(unit_test_marshall, unit_test_marshall db_remote_cache_access_v1) .PHONY: install install: diff --git a/ocaml/database/backend_xml.ml b/ocaml/database/backend_xml.ml index 23c35669..90960fb2 100644 --- a/ocaml/database/backend_xml.ml +++ b/ocaml/database/backend_xml.ml @@ -53,7 +53,7 @@ let unmarshall dbconn = (* Given table name, read all rows from db and store in cache *) let populate_and_read_manifest dbconn = - debug "attempting to restore database from %s" dbconn.Parse_db_conf.path; + Printf.printf "attempting to restore database from %s\n" dbconn.Parse_db_conf.path; let manifest, unmarshalled_db = unmarshall dbconn in debug "finished parsing xml"; (* version_check manifest; *) diff --git a/ocaml/database/database_server_main.ml b/ocaml/database/database_server_main.ml new file mode 100644 index 00000000..ec2f0d35 --- /dev/null +++ b/ocaml/database/database_server_main.ml @@ -0,0 +1,80 @@ +open Threadext + +type mode = + | Slave of string (* master IP *) + | Master of string (* database filename *) + +let mode = ref None + +let finished = ref false +let m = Mutex.create () +let c = Condition.create () + +(** Handler for the remote database access URL *) +let remote_database_access_handler_v1 req bio = + try + Db_remote_cache_access_v1.handler req bio + with e -> + Printf.printf "Caught: %s\n" (Printexc.to_string e); + Printexc.print_backtrace stdout; + flush stdout; + raise e + +(** Handler for the remote database access URL *) +let remote_database_access_handler_v2 req bio = + try + Db_remote_cache_access_v2.handler req bio + with e -> + Printf.printf "Caught: %s\n" (Printexc.to_string e); + Printexc.print_backtrace stdout; + flush stdout; + raise e + +let _ = + let listen_path = ref "./database" in + Printexc.record_backtrace true; + + Arg.parse [ + "--slave-of", Arg.String (fun master -> mode := Some(Slave master)), "run as a slave of a remote db"; + "--master", Arg.String (fun db -> mode := Some(Master db)), "run as a master from the given db filename"; + "--listen-on", Arg.Set_string listen_path, Printf.sprintf "listen for requests on path (default %s)" !listen_path; + ] (fun x -> Printf.fprintf stderr "Ignoring unknown parameter: %s\n%!" x) + "run a stand-alone database server"; + + match !mode with + | None -> failwith "Requires either --slave-of or --master arguments" + | Some mode -> + begin match mode with + | Slave _ -> failwith "unimplemented" + | Master db_filename -> + Printf.printf "Database path: %s\n%!" db_filename; + let db = { Parse_db_conf.dummy_conf with + Parse_db_conf.path = db_filename + } in + Db_conn_store.initialise_db_connections [ db ]; + Printf.printf "About to create new dbs\n%!"; + List.iter (Db_connections.maybe_create_new_db (0,0)) (Db_conn_store.read_db_connections()); + Printf.printf "dbs created\n%!"; + Db_cache.set_master true; + Db_dirty.make_blank_dirty_records(); + + Db_cache_impl.initialise (); + + Unixext.unlink_safe !listen_path; + let sockaddr = Unix.ADDR_UNIX !listen_path in + let socket = Http_svr.bind sockaddr in + + Http_svr.add_handler Http.Post "/post_remote_db_access" (Http_svr.BufIO remote_database_access_handler_v1); + Http_svr.add_handler Http.Post "/post_remote_db_access_v2" (Http_svr.BufIO remote_database_access_handler_v2); + let server = Http_svr.start (socket, "http") in + Printf.printf "server listening\n%!"; + (* Wait for either completion *) + Mutex.execute m + (fun () -> + while not (!finished) do + Condition.wait c m + done + ); + Http_svr.stop server + end + diff --git a/ocaml/database/database_test.ml b/ocaml/database/database_test.ml new file mode 100644 index 00000000..e626bb9c --- /dev/null +++ b/ocaml/database/database_test.ml @@ -0,0 +1,426 @@ +(* Supported operations: *) + +let path = ref "./database" + +let rpc_common url content_type request = + let version = "1.1" in + let content_length = String.length request in + let headers = [ + Printf.sprintf "POST %s HTTP/%s" url version; + Printf.sprintf "User-Agent: xapi/%s" Xapi_globs.api_version_string; + "Content-Type: text/json"; + Printf.sprintf "Content-length: %d" content_length; + ] in + Xmlrpcclient.do_http_rpc "" 0 headers ~unixsock:(Some (!path)) request + (fun content_length _ fd -> + let buffer = String.make content_length '\000' in + Unixext.really_read fd buffer 0 content_length; + buffer) + +module Client_v1 = Db_rpc_client_v1.Make(struct + let initialise () = () + let rpc request = rpc_common "/post_remote_db_access" "text/xml" request +end) + +module Client_v2 = Db_rpc_client_v2.Make(struct + let initialise () = () + let rpc request = rpc_common "/post_remote_db_access_v2" "text/json" request +end) + +module Client = Client_v2 + +let name = "thevmname" +let invalid_name = "notavmname" + +let make_vm r uuid = + [ + "ref", r; + "uuid", uuid; + "memory__static_max", "0"; + "memory__overhead", "0"; + "PV__ramdisk", ""; + "is_control_domain", "false"; + "actions__after_crash", "restart"; + "resident_on", "OpaqueRef:NULL"; + "snapshot_info", "()"; + "PCI_bus", ""; + "PV__args", ""; + "last_boot_CPU_flags", "()"; + "memory__target", "536870912"; + "is_a_template", "true"; + "user_version", "1"; + "HVM__shadow_multiplier", "1"; + "affinity", "OpaqueRef:NULL"; + "name__description", ""; + "PV__legacy_args", ""; + "parent", "OpaqueRef:NULL"; + "snapshot_metadata", ""; + "memory__dynamic_max", "0"; + "ha_always_run", "false"; + "other_config", "()"; + "PV__bootloader_args" ,""; + "VCPUs__at_startup", "1"; + "bios_strings", "()"; + "actions__after_shutdown", "destroy"; + "blocked_operations", "()"; + "tags", "()"; + "PV__kernel", ""; + "name__label", name; + "is_a_snapshot", "false"; + "VCPUs__params", "()"; + "VCPUs__max", "1"; + "allowed_operations", "()"; + "protection_policy", "OpaqueRef:NULL"; + "memory__static_min", "268435456"; + "domid", "-1"; + "power_state", "Halted"; + "HVM__boot_policy", ""; + "ha_restart_priority", ""; + "suspend_VDI", "OpaqueRef:NULL"; + "HVM__boot_params", "()"; + "PV__bootloader", "eliloader"; + "transportable_snapshot_id", ""; + "snapshot_of", "OpaqueRef:NULL"; + "guest_metrics", "OpaqueRef:NULL"; + "platform", "()"; + "scheduled_to_be_resident_on", "OpaqueRef:NULL"; + "is_snapshot_from_vmpp", "false"; + "current_operations", "()"; + "recommendations", ""; + "last_booted_record", ""; + "blobs", "()"; + "domarch", ""; + "memory__dynamic_min", "0"; + "metrics", "OpaqueRef:NULL"; + "actions__after_reboot", "restart"; + "xenstore_data", "()"; + "snapshot_time", "19700101T00:00:00Z" + ] + +let make_vbd vm r uuid = [ + "ref", r; + "qos__supported_algorithms", "()"; + "other_config", "(('owner' ''))"; + "uuid", uuid; + "allowed_operations", "('attach')"; + "qos__algorithm_params", "()"; + "type", "Disk"; + "VM", vm; + "VDI", "OpaqueRef:NULL"; + "qos__algorithm_type", ""; + "metrics", "OpaqueRef:NULL"; + "device", ""; + "empty", "false"; + "bootable", "false"; + "current_operations", "()"; + "unpluggable", "true"; + "status_detail", ""; + "runtime_properties", "()"; + "userdevice", "0"; + "mode", "RW"; + "storage_lock", "false"; + "status_code", "0"; + "currently_attached", "false"; +] + +let expect_missing_row tbl r f = + try + f () + with Db_exn.DBCache_NotFound("missing row", tbl', r') when tbl' = tbl && r = r' -> () + +let expect_missing_tbl tbl f = + try + f () + with Db_exn.DBCache_NotFound("missing table", tbl', "") when tbl' = tbl -> () + +let expect_uniqueness_violation tbl fld v f = + try + f () + with Db_exn.Uniqueness_constraint_violation(tbl', fld', v') when tbl' = tbl && fld' = fld && v' = v -> () + +let expect_missing_uuid tbl uuid f = + try + f () + with Db_exn.Read_missing_uuid(tbl', "", uuid') when tbl' = tbl && uuid' = uuid -> () + +let expect_missing_field name f = + try + f () + with Db_exn.DBCache_NotFound("missing field", name', "") when name' = name -> () + +let test_invalid_where_record fn_name fn = + Printf.printf "%s ...\n" fn_name; + expect_missing_tbl "Vm" + (fun () -> + let (_: string list) = fn { Db_cache_types.table = "Vm"; return = ""; where_field = ""; where_value = "" } in + failwith (Printf.sprintf "%s " fn_name) + ); + Printf.printf "%s \n" fn_name; + expect_missing_field "wibble" + (fun () -> + let (_: string list) = fn { Db_cache_types.table = "VM"; return = "wibble"; where_field = Escaping.escape_id [ "name"; "label" ]; where_value = name } in + failwith (Printf.sprintf "%s " fn_name) + ); + Printf.printf "%s \n" fn_name; + expect_missing_field "wibble" + (fun () -> + let (_: string list) = fn { Db_cache_types.table = "VM"; return = Escaping.escape_id [ "name"; "label" ]; where_field = "wibble"; where_value = "" } in + failwith (Printf.sprintf "%s " fn_name) + ) + + +let _ = + Printexc.record_backtrace true; + Arg.parse [ + "--connect-to", Arg.Set_string path, Printf.sprintf "connect to server on path (default %s)" !path; + ] (fun x -> Printf.fprintf stderr "Ignoring unknown parameter: %s\n%!" x) + "query a database server"; + + (* reference which we create *) + let valid_ref = "ref1" in + let valid_uuid = "uuid1" in + let invalid_ref = "foo" in + let invalid_uuid = "bar" in + + let vbd_ref = "waz" in + let vbd_uuid = "whatever" in + + (* Before we begin, clear out any old state: *) + expect_missing_row "VM" valid_ref + (fun () -> + Client.delete_row "VM" valid_ref; + ); + expect_missing_row "VBD" vbd_ref + (fun () -> + Client.delete_row "VBD" vbd_ref; + ); + Printf.printf "Deleted stale state from previous test\n"; + + Printf.printf "get_table_from_ref \n"; + begin + match Client.get_table_from_ref invalid_ref with + | None -> Printf.printf "Reference '%s' has no associated table\n" invalid_ref + | Some t -> failwith (Printf.sprintf "Reference '%s' exists in table '%s'" invalid_ref t) + end; + Printf.printf "is_valid_ref \n"; + if Client.is_valid_ref invalid_ref then failwith "is_valid_ref = true"; + + Printf.printf "read_refs \n"; + let existing_refs = Client.read_refs "VM" in + Printf.printf "VM refs: [ %s ]\n" (String.concat "; " existing_refs); + Printf.printf "read_refs \n"; + expect_missing_tbl "Vm" + (fun () -> + let (_: string list) = Client.read_refs "Vm" in + () + ); + Printf.printf "delete_row \n"; + expect_missing_row "VM" invalid_ref + (fun () -> + Client.delete_row "VM" invalid_ref; + failwith "delete_row of a non-existent row silently succeeded" + ); + Printf.printf "create_row \n"; + Client.create_row "VM" (make_vm valid_ref valid_uuid) valid_ref; + Printf.printf "is_valid_ref \n"; + if not (Client.is_valid_ref valid_ref) + then failwith "is_valid_ref = false, after create_row"; + Printf.printf "get_table_from_ref \n"; + begin match Client.get_table_from_ref valid_ref with + | Some "VM" -> () + | Some t -> failwith "get_table_from_ref : invalid table" + | None -> failwith "get_table_from_ref : None" + end; + Printf.printf "read_refs includes \n"; + if not (List.mem valid_ref (Client.read_refs "VM")) + then failwith "read_refs did not include "; + + Printf.printf "create_row \n"; + expect_uniqueness_violation "VM" "_ref" valid_ref + (fun () -> + Client.create_row "VM" (make_vm valid_ref (valid_uuid ^ "unique")) valid_ref; + failwith "create_row " + ); + Printf.printf "create_row \n"; + expect_uniqueness_violation "VM" "uuid" valid_uuid + (fun () -> + Client.create_row "VM" (make_vm (valid_ref ^ "unique") valid_uuid) (valid_ref ^ "unique"); + failwith "create_row " + ); + Printf.printf "db_get_by_uuid \n"; + if Client.db_get_by_uuid "VM" valid_uuid <> valid_ref + then failwith "db_get_by_uuid "; + Printf.printf "db_get_by_uuid \n"; + expect_missing_uuid "VM" invalid_uuid + (fun () -> + let (_: string) = Client.db_get_by_uuid "VM" invalid_uuid in + failwith "db_get_by_uuid " + ); + Printf.printf "get_by_name_label \n"; + if Client.db_get_by_name_label "VM" invalid_name <> [] + then failwith "db_get_by_name_label "; + + Printf.printf "get_by_name_label \n"; + if Client.db_get_by_name_label "VM" name <> [ valid_ref ] + then failwith "db_get_by_name_label "; + + Printf.printf "read_field \n"; + if Client.read_field "VM" "name__label" valid_ref <> name + then failwith "read_field : invalid name"; + Printf.printf "read_field \n"; + expect_missing_row "VM" invalid_ref + (fun () -> + let (_: string) = Client.read_field "VM" "name__label" invalid_ref in + failwith "read_field " + ); + Printf.printf "read_field \n"; + expect_missing_field "name_label" + (fun () -> + let (_: string) = Client.read_field "VM" "name_label" valid_ref in + failwith "read_field " + ); + Printf.printf "read_field \n"; + expect_missing_row "VM" invalid_ref + (fun () -> + let (_: string) = Client.read_field "VM" "name_label" invalid_ref in + failwith "read_field " + ); + Printf.printf "read_field_where \n"; + let where_name_label = + { Db_cache_types.table = "VM"; return = Escaping.escape_id(["name"; "label"]); where_field="uuid"; where_value = valid_uuid } in + let xs = Client.read_field_where where_name_label in + if not (List.mem name xs) + then failwith "read_field_where "; + test_invalid_where_record "read_field_where" Client.read_field_where; + + let xs = Client.read_set_ref where_name_label in + if not (List.mem name xs) + then failwith "read_set_ref "; + test_invalid_where_record "read_set_ref" Client.read_set_ref; + + Printf.printf "write_field \n"; + expect_missing_tbl "Vm" + (fun () -> + let (_: unit) = Client.write_field "Vm" "" "" "" in + failwith "write_field " + ); + Printf.printf "write_field \n"; + expect_missing_row "VM" invalid_ref + (fun () -> + let (_: unit) = Client.write_field "VM" invalid_ref "" "" in + failwith "write_field " + ); + Printf.printf "write_field \n"; + expect_missing_field "wibble" + (fun () -> + let (_: unit) = Client.write_field "VM" valid_ref "wibble" "" in + failwith "write_field " + ); + Printf.printf "write_field \n"; + let (_: unit) = Client.write_field "VM" valid_ref (Escaping.escape_id ["name"; "description"]) "description" in + + Printf.printf "read_record \n"; + expect_missing_tbl "Vm" + (fun () -> + let _ = Client.read_record "Vm" invalid_ref in + failwith "read_record " + ); + Printf.printf "read_record \n"; + expect_missing_row "VM" invalid_ref + (fun () -> + let _ = Client.read_record "VM" invalid_ref in + failwith "read_record " + ); + Printf.printf "read_record \n"; + let fv_list, fvs_list = Client.read_record "VM" valid_ref in + if not(List.mem_assoc (Escaping.escape_id [ "name"; "label" ]) fv_list) + then failwith "read_record 1"; + if List.assoc "VBDs" fvs_list <> [] + then failwith "read_record 2"; + Printf.printf "read_record foreign key\n"; + Client.create_row "VBD" (make_vbd valid_ref vbd_ref vbd_uuid) vbd_ref; + let fv_list, fvs_list = Client.read_record "VM" valid_ref in + if List.assoc "VBDs" fvs_list <> [ vbd_ref ] + then failwith "read_record 3"; + Printf.printf "read_record deleted foreign key\n"; + Client.delete_row "VBD" vbd_ref; + let fv_list, fvs_list = Client.read_record "VM" valid_ref in + if List.assoc "VBDs" fvs_list <> [] + then failwith "read_record 4"; + Printf.printf "read_record overwritten foreign key\n"; + Client.create_row "VBD" (make_vbd valid_ref vbd_ref vbd_uuid) vbd_ref; + let fv_list, fvs_list = Client.read_record "VM" valid_ref in + if List.assoc "VBDs" fvs_list = [] + then failwith "read_record 5"; + Client.write_field "VBD" vbd_ref (Escaping.escape_id [ "VM" ]) "overwritten"; + let fv_list, fvs_list = Client.read_record "VM" valid_ref in + if List.assoc "VBDs" fvs_list <> [] + then failwith "read_record 6"; + + expect_missing_tbl "Vm" + (fun () -> + let _ = Client.read_records_where "Vm" Db_filter_types.True in + () + ); + let xs = Client.read_records_where "VM" Db_filter_types.True in + if List.length xs <> 1 + then failwith "read_records_where 2"; + let xs = Client.read_records_where "VM" Db_filter_types.False in + if xs <> [] + then failwith "read_records_where 3"; + + expect_missing_tbl "Vm" + (fun () -> + let xs = Client.find_refs_with_filter "Vm" Db_filter_types.True in + failwith "find_refs_with_filter "; + ); + let xs = Client.find_refs_with_filter "VM" Db_filter_types.True in + if List.length xs <> 1 + then failwith "find_refs_with_filter 1"; + let xs = Client.find_refs_with_filter "VM" Db_filter_types.False in + if xs <> [] + then failwith "find_refs_with_filter 2"; + + expect_missing_tbl "Vm" + (fun () -> + Client.process_structured_field ("","") "Vm" "wibble" invalid_ref Db_cache_types.AddSet; + failwith "process_structure_field " + ); + expect_missing_field "wibble" + (fun () -> + Client.process_structured_field ("","") "VM" "wibble" valid_ref Db_cache_types.AddSet; + failwith "process_structure_field " + ); + expect_missing_row "VM" invalid_ref + (fun () -> + Client.process_structured_field ("","") "VM" (Escaping.escape_id ["name"; "label"]) invalid_ref Db_cache_types.AddSet; + failwith "process_structure_field " + ); + Client.process_structured_field ("foo", "") "VM" "tags" valid_ref Db_cache_types.AddSet; + if Client.read_field "VM" "tags" valid_ref <> "('foo')" + then failwith "process_structure_field expected ('foo')"; + Client.process_structured_field ("foo", "") "VM" "tags" valid_ref Db_cache_types.AddSet; + if Client.read_field "VM" "tags" valid_ref <> "('foo')" + then failwith "process_structure_field expected ('foo') 2"; + Client.process_structured_field ("foo", "bar") "VM" "other_config" valid_ref Db_cache_types.AddMap; + + if Client.read_field "VM" "other_config" valid_ref <> "(('foo' 'bar'))" + then failwith "process_structure_field expected (('foo' 'bar')) 3"; + + begin + try + Client.process_structured_field ("foo", "bar") "VM" "other_config" valid_ref Db_cache_types.AddMap; + with Db_exn.Duplicate_key("VM", "other_config", r', "foo") when r' = valid_ref -> () + end; + if Client.read_field "VM" "other_config" valid_ref <> "(('foo' 'bar'))" + then failwith "process_structure_field expected (('foo' 'bar')) 4"; + + (* Performance test *) + let start = Unix.gettimeofday () in + let n = 10000 in + for i = 0 to n do + let (_: bool) = Client.is_valid_ref valid_ref in + () + done; + let total = Unix.gettimeofday () -. start in + Printf.printf "%.2f RPC calls/sec\n" (float_of_int n /. total) diff --git a/ocaml/database/db_cache.ml b/ocaml/database/db_cache.ml index 9b90d9e6..2d7e8ebc 100644 --- a/ocaml/database/db_cache.ml +++ b/ocaml/database/db_cache.ml @@ -13,932 +13,44 @@ *) -(* --------------------------------------------------------------------------------------------- - The dbcache provides low-level access to the data in the db (both getting and setting). - It is below the event-level: calling the functions in this module will not cause any events - to be generated. For that reason, regular xapi code accessing the database must call in via - the Db.* API. - - The DBCache module contains 2 instances of the DB_CACHE module type: one instance for pool - masters (that is an in-memory db-cache, with async write log pushed to database for persistent - store); and one instance for pool slaves (that calls a marshalling protocol that retrieves/ - modifies directly from the ppol master). - - The DBCache module is also itself an instance of DB_CACHE: it's DB_CACHE fns are just a - dispatcher (yes I know it's just a vtable and I could use objects ;) that calls Master/Slave - depending on value in Pool_role.is_master () - --------------------------------------------------------------------------------------------- *) - -(* Note: read_record could be improved - see comment below. It transfers data over network needlessly - (although not much data really) -- main problem is it's somewhat confusing in its current - form *) - -open Db_lock -open Db_exn +open Db_interface module D=Debug.Debugger(struct let name="db_cache" end) open D -type database_mode = Master | Slave - -let database_mode : database_mode option ref = ref None - -exception Must_initialise_database_mode - -exception Read_missing_uuid of (*class*) string * (*ref*) string * (*uuid*) string -exception Too_many_values of (*class*) string * (*ref*) string * (*uuid*) string - -(** Interface of DB_Cache implementations *) -module type DB_CACHE = -sig - - val dump_db_cache : Unix.file_descr -> unit - - val stats : unit -> (string * int) list - - val populate_cache : unit -> unit - val populate_from : Parse_db_conf.db_connection -> unit (* populate cache from specified file/format *) - val spawn_db_flush_threads : unit -> unit - - (* Called by server to initialise db resources. - Master implementation populates cache _and_ spawns db flush threads; - Slave implementation sets up resources reqd for master connection + associated watchdog - *) - val initialise_db_cache : unit -> unit - - (* This will initialise a dbcache, but without syncing all db connections to the latest vsn; - it's used by xapi-db-tool -- allowing it to read the xapi database without generating - any writes to disk.. *) - val initialise_db_cache_nosync : unit -> unit - - val display_sql_writelog : bool -> unit - val flush_dirty : Parse_db_conf.db_connection -> bool - val flush_and_exit : Parse_db_conf.db_connection -> int (* exit code *) -> unit - - val get_table_from_ref : string -> string option - val is_valid_ref : string -> bool - val read_refs : string -> string list - val read_field_where : Db_cache_types.where_record -> string list - val db_get_by_uuid : string -> string -> string - val db_get_by_name_label : string -> string -> string list - val read_set_ref : Db_cache_types.where_record -> string list - val create_row : Context.t -> string -> (string*string) list -> string -> unit - val delete_row : Context.t -> string -> string -> unit - val write_field : Context.t -> string -> string -> string -> string -> unit - val read_field : Context.t -> string -> string -> string -> string - val find_refs_with_filter : string -> Db_filter_types.expr -> string list - - val process_structured_field : Context.t -> (string*string) -> string -> string -> string -> Db_cache_types.structured_op_t -> unit - val apply_delta_to_cache : Redo_log.t -> unit - - type db_record = (string * string) list * (string * (string list)) list (* dictionary of regular fields x dictionary of associated set_ref values *) - val read_record : string -> string -> db_record - val read_records_where : string -> Db_filter_types.expr -> (string * db_record) list -end - -(** The dbcache that switches between master/slave accordingly *) -module DBCache : DB_CACHE = -struct - type db_record = (string * string) list * (string * (string list)) list - (* --------------------------------------------------------------------------------------------------- - Pool master implementation of DB_CACHE - --------------------------------------------------------------------------------------------------- *) - - (** An in-memory cache, used by pool master [not exposed beyond internals of DBCache] *) - module Master : DB_CACHE = - struct - type db_record = (string * string) list * (string * (string list)) list - module D = Debug.Debugger(struct let name = "sql" end) - open D - module W = Debug.Debugger(struct let name = "db_write" end) - module R = Debug.Debugger(struct let name = "redo_log" end) - - open Db_cache_types - open Db_action_helper - open Db_backend - - let display_sql_writelog b = display_sql_writelog_val := b - - let redo_log_context_name = "redo_log" - - (* This fn is part of external interface, so need to take lock *) - let get_table_from_ref objref = - with_lock - (fun () -> - try - Some (Hashtbl.find ref_table_map objref) - with Not_found -> None) - - let is_valid_ref objref = - match (get_table_from_ref objref) with - Some _ -> true - | None -> false - - (** Return an association list of table name * record count *) - let stats () = - with_lock - (fun () -> - fold_over_tables (fun name tbl acc -> - let size = fold_over_rows (fun _ _ acc -> acc + 1) tbl 0 in - (name, size) :: acc) cache []) - - let flush_dirty dbconn = Db_connections.flush_dirty_and_maybe_exit dbconn None - let flush_and_exit dbconn ret_code = ignore (Db_connections.flush_dirty_and_maybe_exit dbconn (Some ret_code)) - - (* Read field from cache *) - let read_field context tblname fldname objref = - with_lock - (fun () -> - let row = find_row cache tblname objref in - lookup_field_in_row row fldname) - - let table_of_kvs kvs = - let row = create_empty_row () in - List.iter (fun (k,v)-> set_field_in_row row k v) kvs; - row - - let save_in_redo_log context entry = - if Redo_log.is_enabled() then begin - Redo_log.write_delta (Db_cache_types.generation_of_cache Db_backend.cache) entry - (fun () -> (* the function which will be invoked if a database write is required instead of a delta *) - Backend_xml.flush_db_to_redo_log Db_backend.cache - ) - end - - (** Finds the longest XML-compatible UTF-8 prefix of the given *) - (** string, by truncating the string at the first incompatible *) - (** character. Writes a warning to the debug log if truncation *) - (** occurs. *) - let ensure_utf8_xml string = - let length = String.length string in - let prefix = Encodings.UTF8_XML.longest_valid_prefix string in - if length > String.length prefix then - warn "string truncated to: '%s'." prefix; - prefix - - (* Write field in cache *) - let write_field context tblname objref fldname newval = - with_lock - (fun () -> - (* if uuid or reference then check uniqueness constraints: *) - if fldname=uuid_fname then begin - check_unique_table_constraints tblname (table_of_kvs [(uuid_fname, newval)]); - Ref_index.update_uuid objref newval; - end else if fldname=reference_fname then - check_unique_table_constraints tblname (table_of_kvs [(reference_fname, newval)]) - else if fldname=name_label_fname then - Ref_index.update_name_label objref newval; - - let row = find_row cache tblname objref in - let current_val = lookup_field_in_row row fldname in - - let other_tbl_refs = Eventgen.follow_references tblname in - let other_tbl_refs_for_this_field = - List.filter (fun (_,fld) -> fld=fldname) other_tbl_refs in - - let newval = ensure_utf8_xml newval in - - if current_val<>newval then - begin - W.debug "write_field %s,%s: %s |-> %s" tblname objref fldname newval; - invalidate_indexes_for_specific_field tblname fldname; - - (* Update the field in the cache whether it's persistent or not *) - set_field_in_row row fldname newval; - - (* then only mark written row as dirty if we persist writes on this table && persist changes on this field *) - if (this_table_persists tblname) && (persist_field_changes tblname fldname) then - begin - (* Only flush to disk if persistent *) - Db_dirty.set_all_row_dirty_status objref Db_dirty.Modified; - Db_dirty.set_all_dirty_table_status tblname; - Db_cache_types.increment Db_backend.cache; - save_in_redo_log context (Redo_log.WriteField(tblname, objref, fldname, newval)) - end; - - let events_old_val = - if is_valid_ref current_val then - Eventgen.events_of_other_tbl_refs - (List.map (fun (tbl,fld) -> - (tbl, current_val, Eventgen.find_get_record tbl ~__context:context ~self:current_val)) other_tbl_refs_for_this_field) - else [] - in - - let events_new_val = - if is_valid_ref newval then - Eventgen.events_of_other_tbl_refs - (List.map (fun (tbl,fld) -> - (tbl, newval, Eventgen.find_get_record tbl ~__context:context ~self:newval)) other_tbl_refs_for_this_field) - else [] - in - - (* Generate event *) - let snapshot = Eventgen.find_get_record tblname ~__context:context ~self:objref in - let record = snapshot() in - List.iter (fun (tbl, ref, s) -> events_notify ~snapshot:s tbl "mod" ref) events_old_val; - events_notify ~snapshot:record tblname "mod" objref; - List.iter (fun (tbl, ref, s) -> events_notify ~snapshot:s tbl "mod" ref) events_new_val; - end) - - (* Read specified field from tbl where where_field == where_value, using indexing *) - let read_set_ref rcd = - with_lock - (fun () -> - (* See if index exists for this lookup, if not make index *) - let index = - try Hashtbl.find indexes (rcd.table, rcd.where_field, rcd.return) - with _ -> - begin - let tbl = lookup_table_in_cache cache rcd.table in - let rows = get_rowlist tbl in - let new_index = Hashtbl.create (List.length rows) in - let rec populate_index rows = - match rows with - [] -> () - | (r::rs) -> - let indexed_field_value = lookup_field_in_row r rcd.where_field in - let result_field_value = lookup_field_in_row r rcd.return in - add_to_index new_index (indexed_field_value, result_field_value); - populate_index rs in - populate_index rows; (* populate new index *) - Hashtbl.replace indexes (rcd.table, rcd.where_field, rcd.return) new_index; - new_index - end in - (* Lookup query in index *) - try Hashtbl.find index rcd.where_value with _ -> []) - - (* setrefs contain the relationships from tbl to other tables in the form: - local-classname, local-fieldname, remote-classname, remote-fieldname. - db_read_record reads row from tbl with reference==objref [returning (fieldname, fieldvalue) list]. - and iterates through set-refs [returning (fieldname, ref list) list; where fieldname is the - name of the Set Ref field in tbl; and ref list is the list of foreign keys from related - table with remote-fieldname=objref] *) - let read_record tbl objref = - with_lock - (fun ()-> - let row = find_row cache tbl objref (* !! need fields as well as values here !! *) in - let fvlist = fold_over_fields (fun k d env -> (k,d)::env) row [] in - let get_set_ref tbl fld objref = - read_set_ref {table=tbl; return=reference_fname; - where_field=fld; where_value=objref} in - - let look_up_related_table_and_field obj other full_name = - (* Set(Ref t) is actually stored in the table t *) - let this_end = obj.Datamodel_types.name, List.hd (full_name) in - (* XXX: relationships should store full names *) - let obj', fld' = Datamodel_utils.Relations.other_end_of Datamodel.all_api this_end in - (obj', fld') in - - (* find datamodel object that corresponds to this table *) - let obj = List.find (fun obj -> obj.Datamodel_types.name = tbl) api_objs in - (* read its fields *) - let obj_fields = Datamodel_utils.fields_of_obj obj in - - let rec set_refs ls = - match ls with - [] -> [] - | ({Datamodel_types.ty = Datamodel_types.Set(Datamodel_types.Ref clsname); full_name = full_name}::fs) -> - let obj', fld' = look_up_related_table_and_field obj clsname full_name in - (Escaping.escape_obj obj.Datamodel_types.name, (* local classname *) - Escaping.escape_id full_name, (* local field *) - Escaping.escape_obj obj', (* remote classname *) - fld' (* remote fieldname *))::(set_refs fs) - | _::fs -> set_refs fs in - - let setrefs = set_refs obj_fields in - - let sr_fields = - List.map (fun (_,local_fieldname,remote_classname,remote_fieldname)-> - (local_fieldname, - get_set_ref remote_classname remote_fieldname objref)) setrefs in - (fvlist, sr_fields)) - - (* Delete row from tbl *) - let delete_row context tblname objref = - - (* NB we generate the delete event BEFORE deleting the object - but then generate the mod events afterwards *) - let generate_delete_event () = - let snapshot = Eventgen.find_get_record tblname ~__context:context ~self:objref () in - events_notify ~snapshot tblname "del" objref in - (* Return a thunk which will cause the mod events to be generated - containing the object states at the time the thunk is evaluated. - We create this closure while the objref is still valid *) - let lazily_generate_mod_events () = - let other_tbl_refs = Eventgen.follow_references tblname in - let other_tbl_refs = - List.fold_left (fun accu (remote_tbl,fld) -> - let (kv,_) = read_record tblname objref in - let fld_value = List.assoc fld kv in - if is_valid_ref fld_value - then (remote_tbl, fld_value, Eventgen.find_get_record remote_tbl ~__context:context ~self:fld_value) :: accu - else accu) - [] other_tbl_refs in - fun () -> - let other_tbl_ref_events = Eventgen.events_of_other_tbl_refs other_tbl_refs in - List.iter (fun (tbl, ref, s) -> - events_notify ~snapshot:s tbl "mod" ref) other_tbl_ref_events in - with_lock - (fun () -> - W.debug "delete_row %s (%s)" tblname objref; - (* send event *) - generate_delete_event(); - let mod_events = lazily_generate_mod_events () in - - invalidate_indexes tblname; - let tbl = lookup_table_in_cache cache tblname in - - remove_row_from_table tbl objref; - - (* Notify each db connection of delete *) - List.iter (fun dbconn->Backend_xml.notify_delete dbconn tblname objref) (Db_conn_store.read_db_connections()); - - if (this_table_persists tblname) then - begin - (* Update cache dirty status *) - Db_dirty.clear_all_row_dirty_status objref; - Db_dirty.set_all_dirty_table_status tblname; - Db_cache_types.increment Db_backend.cache; - save_in_redo_log context (Redo_log.DeleteRow(tblname, objref)) - end; - Ref_index.remove objref; - remove_ref_from_table_map objref; - (* send the rest of the events *) - mod_events ()) - - (* Create new row in tbl containing specified k-v pairs *) - let create_row context tblname kvs new_objref = - - (* Ensure values are valid for UTF-8-encoded XML. *) - let kvs = List.map (fun (key, value) -> (key, ensure_utf8_xml value)) kvs in - - (* fill in default values specifed in datamodel if kv pairs for these are not supplied already *) - let kvs = add_default_kvs kvs tblname in - - (* add the reference to the row itself *) - let kvs = (reference, new_objref) :: kvs in - - let generate_create_event() = - let snapshot = Eventgen.find_get_record tblname ~__context:context ~self:new_objref in - let other_tbl_refs = Eventgen.follow_references tblname in - let other_tbl_refs = - List.fold_left (fun accu (tbl,fld) -> - let fld_value = List.assoc fld kvs in - if is_valid_ref fld_value - then (tbl, fld_value, Eventgen.find_get_record tbl ~__context:context ~self:fld_value) :: accu - else accu) - [] other_tbl_refs in - let record = snapshot() in - let other_tbl_events = Eventgen.events_of_other_tbl_refs other_tbl_refs in - events_notify ~snapshot:(snapshot ()) tblname "add" new_objref; - List.iter (fun (tbl, ref, s) -> events_notify ~snapshot:s tbl "mod" ref) other_tbl_events in - - with_lock - (fun () -> - W.debug "create_row %s (%s) [%s]" tblname new_objref (String.concat "," (List.map (fun (k,v)->"("^k^","^"v"^")") kvs)); - invalidate_indexes tblname; - let newrow = table_of_kvs kvs in - let tbl = lookup_table_in_cache cache tblname in - check_unique_table_constraints tblname newrow; - set_row_in_table tbl new_objref newrow; - if (this_table_persists tblname) then - begin - Db_dirty.set_all_row_dirty_status new_objref Db_dirty.New; - Db_dirty.set_all_dirty_table_status tblname; - Db_cache_types.increment Db_backend.cache; - save_in_redo_log context (Redo_log.CreateRow(tblname, new_objref, kvs)) - end; - add_ref_to_table_map new_objref tblname (* track ref against this table *); - let uuid = lookup_field_in_row newrow uuid_fname in - let name_label = try Some (lookup_field_in_row newrow name_label_fname) with _ -> None in - Ref_index.insert {Ref_index.name_label = name_label; Ref_index.uuid = uuid; Ref_index._ref = new_objref }; - - (* generate events *) - generate_create_event(); - ) - - (* Do linear scan to find field values which match where clause *) - let read_field_where rcd = - with_lock - (fun () -> - let tbl = lookup_table_in_cache cache rcd.table in - let rec do_find tbl acc = - match tbl with - [] -> acc - | (r::rs) -> - let fv = lookup_field_in_row r rcd.where_field in - if fv=rcd.where_value then do_find rs ((lookup_field_in_row r rcd.return)::acc) - else do_find rs acc in - let rows = get_rowlist tbl in - do_find rows [] - ) - let db_get_by_uuid tbl uuid_val = - match (read_field_where - {table=tbl; return=reference; - where_field=uuid; where_value=uuid_val}) with - | [] -> raise (Read_missing_uuid (tbl, "", uuid_val)) - | [r] -> r - | _ -> raise (Too_many_values (tbl, "", uuid_val)) +(** Masters will use this to modify the in-memory cache directly *) +module Local_db : DB_ACCESS = Db_cache_impl - (** Return reference fields from tbl that matches specified name_label field *) - let db_get_by_name_label tbl label = - read_field_where - {table=tbl; return=reference; - where_field=(Escaping.escape_id ["name"; "label"]); - where_value=label} +(** Slaves will use this to call the master by XMLRPC *) +module Remote_db : DB_ACCESS = Db_rpc_client_v1.Make(struct + let initialise () = + ignore (Master_connection.start_master_connection_watchdog()); + ignore (Master_connection.open_secure_connection()) + let rpc request = Master_connection.execute_remote_fn request Constants.remote_db_access_uri +end) - (* Read references from tbl *) - let read_refs tblname = - with_lock - (fun () -> - get_reflist (lookup_table_in_cache cache tblname)) - - let populate_from connection_spec = - Backend_xml.populate connection_spec; - Db_backend.blow_away_non_persistent_fields() - - let sync_all_db_connections() = - (* Unconditionally force-flush all databases. *) - List.iter Db_connections.force_flush_all (List.map snd (Db_connections.get_dbs_and_gen_counts())) - - (* Executed on the master to post-process database after populating cache from db stored on disk *) - let post_populate_hook () = - (* Remove the temporary file used for staging from the metadata LUN -- - * there's no need to keep it and it's preferable for it not to hang - * around. *) - Unixext.unlink_safe Xapi_globs.ha_metadata_db; - (* non-persistent fields will have been flushed to disk anyway [since non-persistent just means dont trigger a flush - if I change]. Hence we blank non-persistent fields with a suitable empty value, depending on their type *) - Db_backend.blow_away_non_persistent_fields(); - (* Flush the in-memory cache to the redo-log *) - Backend_xml.flush_db_to_redo_log Db_backend.cache - - let populate_cache () = - let connections = Db_conn_store.read_db_connections () in - - (* Include a fake connection representing the HA metadata db - (if available). This isn't a full flushing connection per-se but - is only considered as a population source. *) - let fake_ha_dbconn = { Parse_db_conf.dummy_conf with - Parse_db_conf.path = Xapi_globs.ha_metadata_db } in - let connections = - if Sys.file_exists Xapi_globs.ha_metadata_db - then fake_ha_dbconn :: connections else connections in - - let fake_gen_dbconn = { Parse_db_conf.dummy_conf with - Parse_db_conf.path = Xapi_globs.gen_metadata_db } in - let connections = - if Sys.file_exists Xapi_globs.gen_metadata_db - then fake_gen_dbconn :: connections else connections in - - (* If we have a temporary_restore_path (backup uploaded in previous run of xapi process) then restore from that *) - let db = - if Sys.file_exists Xapi_globs.db_temporary_restore_path then begin - (* we know that the backup is XML format so, to get the manifest, we jump right in and use the xml backend directly here.. *) - let manifest = Backend_xml.populate_and_read_manifest Parse_db_conf.backup_file_dbconn in - Db_backend.post_restore_hook manifest; - (* delete file that contained backup *) - Db_backend.try_and_delete_db_file Xapi_globs.db_temporary_restore_path; - Parse_db_conf.backup_file_dbconn - end - else (* if there's no backup to restore from then.. *) - begin - (* Check schema vsn is current; if not try and upgrade; if can't do that then fail startup.. *) - let most_recent_db = Db_connections.pick_most_recent_db connections in - (* populate gets all field names from the existing (old) db file, not the (current) schema... which is nice: *) - Backend_xml.populate most_recent_db; - most_recent_db - end in - (* Always perform the generic database upgrade stuff *) - Db_upgrade.generic_database_upgrade (); - - (* Then look to see whether we have specific upgrade rules to consider *) - if Sys.file_exists db.Parse_db_conf.path then Db_upgrade.maybe_upgrade db; - - post_populate_hook () - - let spawn_db_flush_threads() = - (* Spawn threads that flush cache to db connections at regular intervals *) - List.iter - (fun dbconn -> - ignore (Thread.create - (fun ()-> - Db_connections.inc_db_flush_thread_refcount(); - let db_path = dbconn.Parse_db_conf.path in - Debug.name_thread ("dbflush ["^db_path^"]"); - let my_writes_this_period = ref 0 in - - (* the collesce_period_start records the time of the last write *) - let coallesce_period_start = ref (Unix.gettimeofday()) in - let period_start = ref (Unix.gettimeofday()) in - - (* we set a coallesce period of min(5 mins, write_limit_period / write_limit_write_cycles) *) - let min (x,y) = if x<=y then x else y in - (* if we're not write limiting then set the coallesce period to 5 minutes; otherwise set coallesce period to divide the - number of write cycles across the ... - *) - let coallesce_time = float_of_int (5*60) (* coallesce writes for 5 minutes to avoid serializing db to disk all the time. *) in - debug "In memory DB flushing thread created [%s]. %s" db_path - (if dbconn.Parse_db_conf.mode <> Parse_db_conf.No_limit then - "Write limited with coallesce_time="^(string_of_float coallesce_time) - else ""); - (* check if we are currently in a coallescing_period *) - let in_coallescing_period() = - (Unix.gettimeofday() -. !coallesce_period_start < coallesce_time) in - - while (true) do - try - begin - Thread.delay db_FLUSH_TIMER; - (* If I have some writing capacity left in this write period then consider doing a write; or - if the connection is not write-limited then consider doing a write too. - We also have to consider doing a write if exit_on_next_flush is set: because when this is - set (by a signal handler) we want to do a flush whether or not our write limit has been - exceeded. - *) - if !Db_connections.exit_on_next_flush (* always flush straight away; this request is urgent *) || - (* otherwise, we only write if (i) "coalesscing period has come to an end"; and (ii) "write limiting requirements are met": *) - ((not (in_coallescing_period())) (* see (i) above *) && - ((!my_writes_this_period < dbconn.Parse_db_conf.write_limit_write_cycles) || dbconn.Parse_db_conf.mode = Parse_db_conf.No_limit (* (ii) above *) - ) - ) - then - begin - (* debug "[%s] considering flush" db_path; *) - let was_anything_flushed = Threadext.Mutex.execute Db_lock.global_flush_mutex (fun ()->flush_dirty dbconn) in - if was_anything_flushed then - begin - my_writes_this_period := !my_writes_this_period + 1; - (* when we do a write, reset the coallesce_period_start to now -- recall that this - variable tracks the time since last write *) - coallesce_period_start := Unix.gettimeofday() - end - end; - (* else debug "[%s] not flushing because write-limit exceeded" db_path; *) - (* Check to see if the current write period has finished yet.. *) - if (Unix.gettimeofday() -. !period_start > (float_of_int dbconn.Parse_db_conf.write_limit_period)) then - begin - (* debug "[%s] resetting write-limit counters: start of new period" db_path; *) - (* We're at the start of a new writing period! *) - period_start := Unix.gettimeofday(); - my_writes_this_period := 0; - end - (* else debug "[%s] not resetting write-limit counters: not in new period yet" db_path *) - end - with - e -> debug "Exception in DB flushing thread: %s" (Printexc.to_string e) - done) ()) - ) (Db_conn_store.read_db_connections()) - - (* Called by server at start-of-day to initialiase cache. Populates cache and starts flushing threads *) - let initialise_db_cache() = - populate_cache(); - sync_all_db_connections(); - spawn_db_flush_threads() - - (* entry point for xapi-db-process; initialises a db cache without syncing all db connections "to tip" *) - let initialise_db_cache_nosync() = - populate_cache(); - spawn_db_flush_threads() - - (* Return a list of all the references for which the expression returns true. *) - let find_refs_with_filter (tblname: string) (expr: Db_filter_types.expr) = - with_lock - (fun ()-> - let tbl = lookup_table_in_cache cache tblname in - let rows = get_rowlist tbl in - let eval_val row = function - | Db_filter_types.Literal x -> x - | Db_filter_types.Field x -> lookup_field_in_row row x in - let rows = List.filter (fun row ->Db_filter.eval_expr (eval_val row) expr) rows in - List.map (fun row -> lookup_field_in_row row reference_fname) rows) - - let read_records_where tbl expr = - with_lock - (fun ()-> - let reqd_refs = find_refs_with_filter tbl expr in - List.map (fun ref->ref, read_record tbl ref) reqd_refs - ) - - let process_structured_field context (key,value) tbl fld objref proc_fn_selector = - - (* Ensure that both keys and values are valid for UTF-8-encoded XML. *) - let key = ensure_utf8_xml key in - let value = ensure_utf8_xml value in - - let add_set = (fun fv->add_key_to_set key fv) in - let remove_set = (fun fv->List.filter (function SExpr.String x -> x <> key | _ -> true) fv) in - let add_map = (fun fv-> - let kv = SExpr.Node [ SExpr.String key; SExpr.String value ] in - let duplicate = List.fold_left (||) false - (List.map (function SExpr.Node (SExpr.String k :: _) when k = key -> true - | _ -> false) fv) in - if duplicate then begin - error "Duplicate key in set or map: table %s; field %s; ref %s; key %s" tbl fld objref key; - raise (Duplicate_key (tbl,fld,objref,key)); - end; - kv::fv) in - let remove_map = - (fun fv->List.filter (function SExpr.Node [ SExpr.String x; _ ] -> x <> key - | _ -> true) fv) in - let proc_fn = - begin - match proc_fn_selector with - AddSet -> add_set - | RemoveSet -> remove_set - | AddMap -> add_map - | RemoveMap -> remove_map - end in - with_lock - (fun () -> - let row = find_row cache tbl objref in - let existing_str = lookup_field_in_row row fld in - let existing = parse_sexpr existing_str in - let processed = proc_fn existing in - let processed_str = SExpr.string_of (SExpr.Node processed) in - write_field context tbl objref fld processed_str) - - let dump_db_cache fd = - let db_cache_manifest = Db_cache_types.manifest_of_cache Db_backend.cache in - let time = Unix.gettimeofday() in - (* Snapshot the cache (uses the lock) and then slowly serialise the copy *) - Db_xml.To.fd fd (db_cache_manifest, snapshot cache); - debug "Written xml to fd: (time %f)" (Unix.gettimeofday() -. time) - - let apply_delta_to_cache entry = - let context = Context.make redo_log_context_name in - match entry with - | Redo_log.CreateRow(tblname, objref, kvs) -> - R.debug "Redoing create_row %s (%s)" tblname objref; - create_row context tblname kvs objref - | Redo_log.DeleteRow(tblname, objref) -> - R.debug "Redoing delete_row %s (%s)" tblname objref; - delete_row context tblname objref - | Redo_log.WriteField(tblname, objref, fldname, newval) -> - R.debug "Redoing write_field %s (%s) [%s -> %s]" tblname objref fldname newval; - write_field context tblname objref fldname newval - - end (* of DBCache.Master *) - - - (* --------------------------------------------------------------------------------------------------- - Pool slave implementation of DB_CACHE - --------------------------------------------------------------------------------------------------- *) - - module Slave : DB_CACHE = - struct - open Db_remote_marshall - type db_record = (string * string) list * (string * (string list)) list - module D = Debug.Debugger(struct let name = "slave_db_cache" end) - open D - - let populate_from _ = () - let populate_cache() = () - let spawn_db_flush_threads() = () - - let initialise_db_cache () = - ignore (Master_connection.start_master_connection_watchdog()); - ignore (Master_connection.open_secure_connection()) - - let initialise_db_cache_nosync () = () (* not used on slave *) - - let flush_dirty _ = false - let display_sql_writelog b = () - let flush_and_exit _ i = exit i - - let stats () = [] - - exception Remote_db_server_returned_unknown_exception - (* Process an exception returned from server, throwing local exception *) - let process_exception_xml xml = - match XMLRPC.From.array (fun x->x) xml with - [exn_name_xml; exn_params_xml] -> - let exn_name = XMLRPC.From.string exn_name_xml in - begin - match exn_name with - | "dbcache_notfound" -> - let (x,y,z) = unmarshall_3strings exn_params_xml in - raise (DBCache_NotFound (x,y,z)) - | "duplicate_key_of" -> - let (w,x,y,z) = unmarshall_4strings exn_params_xml in - raise (Duplicate_key (w,x,y,z)) - | "uniqueness_constraint_violation" -> - let (x,y,z) = unmarshall_3strings exn_params_xml in - raise (Uniqueness_constraint_violation (x,y,z)) - | "read_missing_uuid" -> - let (x,y,z) = unmarshall_3strings exn_params_xml in - raise (Read_missing_uuid (x,y,z)) - | "too_many_values" -> - let (x,y,z) = unmarshall_3strings exn_params_xml in - raise (Too_many_values (x,y,z)) - | _ -> raise DB_remote_marshall_error - end - | _ -> raise Remote_db_server_returned_unknown_exception - - exception Remote_db_server_returned_bad_message - let do_remote_call marshall_args unmarshall_resp fn_name args = - let xml = marshall_args args in - let xml = XMLRPC.To.array [XMLRPC.To.string fn_name; XMLRPC.To.string "" (* unused *); xml] in - let resp = Master_connection.execute_remote_fn xml Constants.remote_db_access_uri in - match XMLRPC.From.array (fun x->x) resp with - [status_xml; resp_xml] -> - let status = XMLRPC.From.string status_xml in - if status="success" then unmarshall_resp resp_xml - else process_exception_xml resp_xml - | _ -> raise Remote_db_server_returned_bad_message - - let get_table_from_ref x = - do_remote_call - marshall_get_table_from_ref_args - unmarshall_get_table_from_ref_response - "get_table_from_ref" - x - - let is_valid_ref x = - do_remote_call - marshall_is_valid_ref_args - unmarshall_is_valid_ref_response - "is_valid_ref" - x - - let read_refs x = - do_remote_call - marshall_read_refs_args - unmarshall_read_refs_response - "read_refs" - x - - let read_field_where x = - do_remote_call - marshall_read_field_where_args - unmarshall_read_field_where_response - "read_field_where" - x - - let db_get_by_uuid t u = - do_remote_call - marshall_db_get_by_uuid_args - unmarshall_db_get_by_uuid_response - "db_get_by_uuid" - (t,u) - - let db_get_by_name_label t l = - do_remote_call - marshall_db_get_by_name_label_args - unmarshall_db_get_by_name_label_response - "db_get_by_name_label" - (t,l) - - let read_set_ref x = - do_remote_call - marshall_read_set_ref_args - unmarshall_read_set_ref_response - "read_set_ref" - x - - - let create_row _ x y z = - do_remote_call - marshall_create_row_args - unmarshall_create_row_response - "create_row" - (x,y,z) - - let delete_row _ x y = - do_remote_call - marshall_delete_row_args - unmarshall_delete_row_response - "delete_row" - (x,y) - - let write_field _ a b c d = - do_remote_call - marshall_write_field_args - unmarshall_write_field_response - "write_field" - (a,b,c,d) - - let read_field context x y z = - do_remote_call - marshall_read_field_args - unmarshall_read_field_response - "read_field" - (x,y,z) - - let find_refs_with_filter s e = - do_remote_call - marshall_find_refs_with_filter_args - unmarshall_find_refs_with_filter_response - "find_refs_with_filter" - (s,e) - - let read_record x y = - do_remote_call - marshall_read_record_args - unmarshall_read_record_response - "read_record" - (x,y) - - let read_records_where x e = - do_remote_call - marshall_read_records_where_args - unmarshall_read_records_where_response - "read_records_where" - (x,e) - - let process_structured_field _ a b c d e = - do_remote_call - marshall_process_structured_field_args - unmarshall_process_structured_field_response - "process_structured_field" - (a,b,c,d,e) - - let dump_db_cache _ = () (* this is master-only *) - - let apply_delta_to_cache _ = () (* this is master-only *) - - end (* of DBCache.Slave *) - - - (* --------------------------------------------------------------------------------------------------- - Master/slave dispatch table. - --------------------------------------------------------------------------------------------------- *) - - module D = Debug.Debugger(struct let name = "sql" end) - open D - - let sw master slave = match !database_mode with - | Some Master -> master - | Some Slave -> slave - | None -> raise Must_initialise_database_mode - - let context_to_task_string context = - let task,ctxname = Context.get_task_id_string_name context in - let task_name = match Ref_index.lookup task with - | Some x -> (match x.Ref_index.name_label with Some y -> Printf.sprintf " (%s)" y | None -> "") - | None -> "" - in - task ^ task_name ^ " (" ^ ctxname ^ ")" - - let populate_from s = - (sw Master.populate_from Slave.populate_from) s - let populate_cache() = - (sw Master.populate_cache Slave.populate_cache) () - let spawn_db_flush_threads() = - (sw Master.spawn_db_flush_threads Slave.spawn_db_flush_threads) () - let initialise_db_cache () = - (sw Master.initialise_db_cache Slave.initialise_db_cache) () - let initialise_db_cache_nosync () = - (sw Master.initialise_db_cache_nosync Slave.initialise_db_cache_nosync) () - let display_sql_writelog b = - (sw Master.display_sql_writelog Slave.display_sql_writelog) b - let flush_dirty dbconn = - (sw Master.flush_dirty Slave.flush_dirty) dbconn - let flush_and_exit dbconn ret_code = - (sw Master.flush_and_exit Slave.flush_and_exit) dbconn ret_code - let stats () = - (sw Master.stats Slave.stats) () - let get_table_from_ref s = - (sw Master.get_table_from_ref Slave.get_table_from_ref) s - let is_valid_ref s = - (sw Master.is_valid_ref Slave.is_valid_ref) s - let read_refs s = - (sw Master.read_refs Slave.read_refs) s - let read_field_where w = - (sw Master.read_field_where Slave.read_field_where) w - let db_get_by_uuid t u = - (sw Master.db_get_by_uuid Slave.db_get_by_uuid) t u - let db_get_by_name_label t l = - (sw Master.db_get_by_name_label Slave.db_get_by_name_label) t l - let read_set_ref w = - (sw Master.read_set_ref Slave.read_set_ref) w - let create_row context s1 s2 s3 = - Stats.log_db_call (Some (context_to_task_string context)) (s1) Stats.Create; - (sw Master.create_row Slave.create_row) context s1 s2 s3 - let delete_row context s1 s2 = - Stats.log_db_call (Some (context_to_task_string context)) (s1) Stats.Drop; - (sw Master.delete_row Slave.delete_row) context s1 s2 - let write_field context s1 s2 s3 s4 = - Stats.log_db_call (Some (context_to_task_string context)) (s1^"."^s3) Stats.Write; - (sw Master.write_field Slave.write_field) context s1 s2 s3 s4 - let read_field context s1 s2 s3 = - Stats.log_db_call (Some (context_to_task_string context)) (s1^"."^s2) Stats.Read; - (sw Master.read_field Slave.read_field) context s1 s2 s3 - let find_refs_with_filter s e = - (sw Master.find_refs_with_filter Slave.find_refs_with_filter) s e - let process_structured_field context s1 s2 s3 s4 op = - (sw Master.process_structured_field Slave.process_structured_field) context s1 s2 s3 s4 op - let read_record s1 s2 = - Stats.log_db_call None ("record:"^s1) Stats.Read; - (sw Master.read_record Slave.read_record) s1 s2 - let read_records_where s e = - Stats.log_db_call None ("record(where):"^s) Stats.Read; - (sw Master.read_records_where Slave.read_records_where) s e - let dump_db_cache fd = - (sw Master.dump_db_cache Slave.dump_db_cache) fd - let apply_delta_to_cache entry = - (sw Master.apply_delta_to_cache Slave.apply_delta_to_cache) entry - -end - +exception Must_initialise_database_mode +let implementation = ref None +let set_master = function + | false -> + implementation := Some (module Remote_db : DB_ACCESS) + | true -> + implementation := Some (module Local_db : DB_ACCESS) +let get () = match !implementation with + | None -> raise Must_initialise_database_mode + | Some m -> m + +let apply_delta_to_cache entry = + let module DB = (val (get ()) : DB_ACCESS) in + let context = Context.make "redo_log" in + match entry with + | Redo_log.CreateRow(tblname, objref, kvs) -> + debug "Redoing create_row %s (%s)" tblname objref; + DB.create_row tblname kvs objref + | Redo_log.DeleteRow(tblname, objref) -> + debug "Redoing delete_row %s (%s)" tblname objref; + DB.delete_row tblname objref + | Redo_log.WriteField(tblname, objref, fldname, newval) -> + debug "Redoing write_field %s (%s) [%s -> %s]" tblname objref fldname newval; + DB.write_field tblname objref fldname newval diff --git a/ocaml/database/db_cache_impl.ml b/ocaml/database/db_cache_impl.ml new file mode 100644 index 00000000..a21c6edf --- /dev/null +++ b/ocaml/database/db_cache_impl.ml @@ -0,0 +1,623 @@ +(* + * Copyright (C) 2006-2010 Citrix Systems Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** An in-memory cache, used by pool master *) + +open Db_exn +open Db_lock + +module D = Debug.Debugger(struct let name = "sql" end) +open D +module W = Debug.Debugger(struct let name = "db_write" end) + +open Db_cache_types +open Db_action_helper +open Db_backend + +let context = Context.make "db_cache" + +(* This fn is part of external interface, so need to take lock *) +let get_table_from_ref objref = + with_lock + (fun () -> + if Hashtbl.mem ref_table_map objref + then Some (Hashtbl.find ref_table_map objref) + else None) + +let is_valid_ref objref = + match (get_table_from_ref objref) with + | Some _ -> true + | None -> false + +(* Read field from cache *) +let read_field tblname fldname objref = + with_lock + (fun () -> + let row = find_row cache tblname objref in + lookup_field_in_row row fldname) + +let table_of_kvs kvs = + let row = create_empty_row () in + List.iter (fun (k,v)-> set_field_in_row row k v) kvs; + row + +let save_in_redo_log context entry = + if Redo_log.is_enabled() then begin + Redo_log.write_delta (Db_cache_types.generation_of_cache Db_backend.cache) entry + (fun () -> (* the function which will be invoked if a database write is required instead of a delta *) + Backend_xml.flush_db_to_redo_log Db_backend.cache + ) + end + +(** Finds the longest XML-compatible UTF-8 prefix of the given *) +(** string, by truncating the string at the first incompatible *) +(** character. Writes a warning to the debug log if truncation *) +(** occurs. *) +let ensure_utf8_xml string = + let length = String.length string in + let prefix = Encodings.UTF8_XML.longest_valid_prefix string in + if length > String.length prefix then + warn "string truncated to: '%s'." prefix; + prefix + +(* Write field in cache *) +let write_field tblname objref fldname newval = + with_lock + (fun () -> + (* if uuid or reference then check uniqueness constraints: *) + if fldname=uuid_fname then begin + check_unique_table_constraints tblname (table_of_kvs [(uuid_fname, newval)]); + Ref_index.update_uuid objref newval; + end else if fldname=reference_fname then + check_unique_table_constraints tblname (table_of_kvs [(reference_fname, newval)]) + else if fldname=name_label_fname then + Ref_index.update_name_label objref newval; + + let row = find_row cache tblname objref in + let current_val = lookup_field_in_row row fldname in + + let other_tbl_refs = Eventgen.follow_references tblname in + let other_tbl_refs_for_this_field = + List.filter (fun (_,fld) -> fld=fldname) other_tbl_refs in + + let newval = ensure_utf8_xml newval in + + if current_val<>newval then + begin + W.debug "write_field %s,%s: %s |-> %s" tblname objref fldname newval; + invalidate_indexes_for_specific_field tblname fldname; + + (* Update the field in the cache whether it's persistent or not *) + set_field_in_row row fldname newval; + + (* then only mark written row as dirty if we persist writes on this table && persist changes on this field *) + if (this_table_persists tblname) && (persist_field_changes tblname fldname) then + begin + (* Only flush to disk if persistent *) + Db_dirty.set_all_row_dirty_status objref Db_dirty.Modified; + Db_dirty.set_all_dirty_table_status tblname; + Db_cache_types.increment Db_backend.cache; + save_in_redo_log context (Redo_log.WriteField(tblname, objref, fldname, newval)) + end; + + let events_old_val = + if is_valid_ref current_val then + Eventgen.events_of_other_tbl_refs + (List.map (fun (tbl,fld) -> + (tbl, current_val, Eventgen.find_get_record tbl ~__context:context ~self:current_val)) other_tbl_refs_for_this_field) + else [] + in + + let events_new_val = + if is_valid_ref newval then + Eventgen.events_of_other_tbl_refs + (List.map (fun (tbl,fld) -> + (tbl, newval, Eventgen.find_get_record tbl ~__context:context ~self:newval)) other_tbl_refs_for_this_field) + else [] + in + + (* Generate event *) + let snapshot = Eventgen.find_get_record tblname ~__context:context ~self:objref in + let record = snapshot() in + List.iter (function + | tbl, ref, None -> + error "Failed to send MOD event for %s %s" tbl ref; + Printf.printf "Failed to send MOD event for %s %s\n%!" tbl ref; + | tbl, ref, Some s -> + events_notify ~snapshot:s tbl "mod" ref + ) events_old_val; + begin match record with + | None -> + error "Failed to send MOD event for %s %s" tblname objref; + Printf.printf "Failed to send MOD event for %s %s\n%!" tblname objref; + | Some record -> + events_notify ~snapshot:record tblname "mod" objref; + end; + List.iter (function + | tbl, ref, None -> + error "Failed to send MOD event for %s %s" tbl ref; + Printf.printf "Failed to send MOD event for %s %s\n%!" tbl ref; + | tbl, ref, Some s -> + events_notify ~snapshot:s tbl "mod" ref + ) events_new_val; + end) + +(* Read specified field from tbl where where_field == where_value, using indexing *) +let read_set_ref rcd = + with_lock + (fun () -> + (* See if index exists for this lookup, if not make index *) + let index = + try Hashtbl.find indexes (rcd.table, rcd.where_field, rcd.return) + with _ -> + begin + let tbl = lookup_table_in_cache cache rcd.table in + let rows = get_rowlist tbl in + let new_index = Hashtbl.create (List.length rows) in + let rec populate_index rows = + match rows with + [] -> () + | (r::rs) -> + let indexed_field_value = lookup_field_in_row r rcd.where_field in + let result_field_value = lookup_field_in_row r rcd.return in + add_to_index new_index (indexed_field_value, result_field_value); + populate_index rs in + populate_index rows; (* populate new index *) + Hashtbl.replace indexes (rcd.table, rcd.where_field, rcd.return) new_index; + new_index + end in + (* Lookup query in index *) + try Hashtbl.find index rcd.where_value with _ -> []) + +(* setrefs contain the relationships from tbl to other tables in the form: + local-classname, local-fieldname, remote-classname, remote-fieldname. + db_read_record reads row from tbl with reference==objref [returning (fieldname, fieldvalue) list]. + and iterates through set-refs [returning (fieldname, ref list) list; where fieldname is the + name of the Set Ref field in tbl; and ref list is the list of foreign keys from related + table with remote-fieldname=objref] *) +let read_record tbl objref = + with_lock + (fun ()-> + let row = find_row cache tbl objref (* !! need fields as well as values here !! *) in + let fvlist = fold_over_fields (fun k d env -> (k,d)::env) row [] in + let get_set_ref tbl fld objref = + read_set_ref {table=tbl; return=reference_fname; + where_field=fld; where_value=objref} in + + let look_up_related_table_and_field obj other full_name = + (* Set(Ref t) is actually stored in the table t *) + let this_end = obj.Datamodel_types.name, List.hd (full_name) in + (* XXX: relationships should store full names *) + let obj', fld' = Datamodel_utils.Relations.other_end_of Datamodel.all_api this_end in + (obj', fld') in + + (* find datamodel object that corresponds to this table *) + let obj = List.find (fun obj -> obj.Datamodel_types.name = tbl) api_objs in + (* read its fields *) + let obj_fields = Datamodel_utils.fields_of_obj obj in + + let rec set_refs ls = + match ls with + [] -> [] + | ({Datamodel_types.ty = Datamodel_types.Set(Datamodel_types.Ref clsname); full_name = full_name}::fs) -> + let obj', fld' = look_up_related_table_and_field obj clsname full_name in + (Escaping.escape_obj obj.Datamodel_types.name, (* local classname *) + Escaping.escape_id full_name, (* local field *) + Escaping.escape_obj obj', (* remote classname *) + fld' (* remote fieldname *))::(set_refs fs) + | _::fs -> set_refs fs in + + let setrefs = set_refs obj_fields in + + let sr_fields = + List.map (fun (_,local_fieldname,remote_classname,remote_fieldname)-> + (local_fieldname, + get_set_ref remote_classname remote_fieldname objref)) setrefs in + (fvlist, sr_fields)) + +(* Delete row from tbl *) +let delete_row tblname objref = + let tbl = lookup_table_in_cache cache tblname in + (* Look up the row first: in the event it doesn't exist, this will + immediately failed with a DBCache_NotFound *) + let (_: row) = lookup_row_in_table tbl tblname objref in + (* NB we generate the delete event BEFORE deleting the object + but then generate the mod events afterwards *) + let generate_delete_event () = + match Eventgen.find_get_record tblname ~__context:context ~self:objref () with + | None -> + error "Failed to generate DEL event for %s %s" tblname objref; + Printf.printf "Failed to generate DEL event for %s %s\n%!" tblname objref; + | Some snapshot -> + events_notify ~snapshot tblname "del" objref in + (* Return a thunk which will cause the mod events to be generated + containing the object states at the time the thunk is evaluated. + We create this closure while the objref is still valid *) + let lazily_generate_mod_events () = + let other_tbl_refs = Eventgen.follow_references tblname in + let other_tbl_refs = + List.fold_left (fun accu (remote_tbl,fld) -> + let (kv,_) = read_record tblname objref in + let fld_value = List.assoc fld kv in + if is_valid_ref fld_value + then (remote_tbl, fld_value, Eventgen.find_get_record remote_tbl ~__context:context ~self:fld_value) :: accu + else accu) + [] other_tbl_refs in + fun () -> + let other_tbl_ref_events = Eventgen.events_of_other_tbl_refs other_tbl_refs in + List.iter (function + | tbl, ref, None -> + error "Failed to generate MOD event on %s %s" tbl ref; + Printf.printf "Failed to generate MOD event on %s %s\n%!" tbl ref; + | tbl, ref, Some s -> + events_notify ~snapshot:s tbl "mod" ref + ) other_tbl_ref_events in + with_lock + (fun () -> + W.debug "delete_row %s (%s)" tblname objref; + (* send event *) + generate_delete_event(); + let mod_events = lazily_generate_mod_events () in + + invalidate_indexes tblname; + + remove_row_from_table tbl objref; + + (* Notify each db connection of delete *) + List.iter (fun dbconn->Backend_xml.notify_delete dbconn tblname objref) (Db_conn_store.read_db_connections()); + + if (this_table_persists tblname) then + begin + (* Update cache dirty status *) + Db_dirty.clear_all_row_dirty_status objref; + Db_dirty.set_all_dirty_table_status tblname; + Db_cache_types.increment Db_backend.cache; + save_in_redo_log context (Redo_log.DeleteRow(tblname, objref)) + end; + Ref_index.remove objref; + remove_ref_from_table_map objref; + (* send the rest of the events *) + mod_events ()) + +(* Create new row in tbl containing specified k-v pairs *) +let create_row tblname kvs new_objref = + + (* Ensure values are valid for UTF-8-encoded XML. *) + let kvs = List.map (fun (key, value) -> (key, ensure_utf8_xml value)) kvs in + + (* fill in default values specifed in datamodel if kv pairs for these are not supplied already *) + let kvs = add_default_kvs kvs tblname in + + (* add the reference to the row itself *) + let kvs = (reference, new_objref) :: kvs in + + let generate_create_event() = + let snapshot = Eventgen.find_get_record tblname ~__context:context ~self:new_objref in + let other_tbl_refs = Eventgen.follow_references tblname in + let other_tbl_refs = + List.fold_left (fun accu (tbl,fld) -> + let fld_value = List.assoc fld kvs in + if is_valid_ref fld_value + then (tbl, fld_value, Eventgen.find_get_record tbl ~__context:context ~self:fld_value) :: accu + else accu) + [] other_tbl_refs in + let other_tbl_events = Eventgen.events_of_other_tbl_refs other_tbl_refs in + begin match snapshot() with + | None -> + error "Failed to generate ADD event for %s %s" tblname new_objref; + Printf.printf "Failed to generate ADD event for %s %s\n%!" tblname new_objref; + | Some snapshot -> + events_notify ~snapshot tblname "add" new_objref; + end; + List.iter (function + | tbl, ref, None -> + error "Failed to generate MOD event for %s %s" tbl ref; + Printf.printf "Failed to generate MOD event for %s %s\n%!" tbl ref; + | tbl, ref, Some s -> + events_notify ~snapshot:s tbl "mod" ref + ) other_tbl_events in + + with_lock + (fun () -> + W.debug "create_row %s (%s) [%s]" tblname new_objref (String.concat "," (List.map (fun (k,v)->"("^k^","^"v"^")") kvs)); + invalidate_indexes tblname; + let newrow = table_of_kvs kvs in + let tbl = lookup_table_in_cache cache tblname in + check_unique_table_constraints tblname newrow; + set_row_in_table tbl new_objref newrow; + if (this_table_persists tblname) then + begin + Db_dirty.set_all_row_dirty_status new_objref Db_dirty.New; + Db_dirty.set_all_dirty_table_status tblname; + Db_cache_types.increment Db_backend.cache; + save_in_redo_log context (Redo_log.CreateRow(tblname, new_objref, kvs)) + end; + add_ref_to_table_map new_objref tblname (* track ref against this table *); + let uuid = lookup_field_in_row newrow uuid_fname in + let name_label = try Some (lookup_field_in_row newrow name_label_fname) with _ -> None in + Ref_index.insert {Ref_index.name_label = name_label; Ref_index.uuid = uuid; Ref_index._ref = new_objref }; + + (* generate events *) + begin + try + generate_create_event(); + with Not_found -> + error "Failed to send a create event for %s %s" tblname new_objref + end + ) + +(* Do linear scan to find field values which match where clause *) +let read_field_where rcd = + with_lock + (fun () -> + let tbl = lookup_table_in_cache cache rcd.table in + let rec do_find tbl acc = + match tbl with + [] -> acc + | (r::rs) -> + let fv = lookup_field_in_row r rcd.where_field in + if fv=rcd.where_value then do_find rs ((lookup_field_in_row r rcd.return)::acc) + else do_find rs acc in + let rows = get_rowlist tbl in + do_find rows [] + ) + +let db_get_by_uuid tbl uuid_val = + match (read_field_where + {table=tbl; return=reference; + where_field=uuid; where_value=uuid_val}) with + | [] -> raise (Read_missing_uuid (tbl, "", uuid_val)) + | [r] -> r + | _ -> raise (Too_many_values (tbl, "", uuid_val)) + +(** Return reference fields from tbl that matches specified name_label field *) +let db_get_by_name_label tbl label = + read_field_where + {table=tbl; return=reference; + where_field=(Escaping.escape_id ["name"; "label"]); + where_value=label} + +(* Read references from tbl *) +let read_refs tblname = + with_lock + (fun () -> + get_reflist (lookup_table_in_cache cache tblname)) + +(* Return a list of all the references for which the expression returns true. *) +let find_refs_with_filter (tblname: string) (expr: Db_filter_types.expr) = + with_lock + (fun ()-> + let tbl = lookup_table_in_cache cache tblname in + let rows = get_rowlist tbl in + let eval_val row = function + | Db_filter_types.Literal x -> x + | Db_filter_types.Field x -> lookup_field_in_row row x in + let rows = List.filter (fun row ->Db_filter.eval_expr (eval_val row) expr) rows in + List.map (fun row -> lookup_field_in_row row reference_fname) rows) + +let read_records_where tbl expr = + with_lock + (fun ()-> + let reqd_refs = find_refs_with_filter tbl expr in + List.map (fun ref->ref, read_record tbl ref) reqd_refs + ) + +let process_structured_field (key,value) tbl fld objref proc_fn_selector = + + (* Ensure that both keys and values are valid for UTF-8-encoded XML. *) + let key = ensure_utf8_xml key in + let value = ensure_utf8_xml value in + + let add_set = (fun fv->add_key_to_set key fv) in + let remove_set = (fun fv->List.filter (function SExpr.String x -> x <> key | _ -> true) fv) in + let add_map = (fun fv-> + let kv = SExpr.Node [ SExpr.String key; SExpr.String value ] in + let duplicate = List.fold_left (||) false + (List.map (function SExpr.Node (SExpr.String k :: _) when k = key -> true + | _ -> false) fv) in + if duplicate then begin + error "Duplicate key in set or map: table %s; field %s; ref %s; key %s" tbl fld objref key; + raise (Duplicate_key (tbl,fld,objref,key)); + end; + kv::fv) in + let remove_map = + (fun fv->List.filter (function SExpr.Node [ SExpr.String x; _ ] -> x <> key + | _ -> true) fv) in + let proc_fn = + begin + match proc_fn_selector with + AddSet -> add_set + | RemoveSet -> remove_set + | AddMap -> add_map + | RemoveMap -> remove_map + end in + with_lock + (fun () -> + let row = find_row cache tbl objref in + let existing_str = lookup_field_in_row row fld in + let existing = parse_sexpr existing_str in + let processed = proc_fn existing in + let processed_str = SExpr.string_of (SExpr.Node processed) in + write_field tbl objref fld processed_str) + + +(* -------------------------------------------------------------------- *) + + +(* Executed on the master to post-process database after populating cache from db stored on disk *) +let post_populate_hook () = + (* Remove the temporary file used for staging from the metadata LUN -- + * there's no need to keep it and it's preferable for it not to hang + * around. *) + Unixext.unlink_safe Xapi_globs.ha_metadata_db; + (* non-persistent fields will have been flushed to disk anyway [since non-persistent just means dont trigger a flush + if I change]. Hence we blank non-persistent fields with a suitable empty value, depending on their type *) + Db_backend.blow_away_non_persistent_fields(); + (* Flush the in-memory cache to the redo-log *) + Backend_xml.flush_db_to_redo_log Db_backend.cache + +let populate_cache () = + let connections = Db_conn_store.read_db_connections () in + + (* Include a fake connection representing the HA metadata db + (if available). This isn't a full flushing connection per-se but + is only considered as a population source. *) + let fake_ha_dbconn = { Parse_db_conf.dummy_conf with + Parse_db_conf.path = Xapi_globs.ha_metadata_db } in + let connections = + if Sys.file_exists Xapi_globs.ha_metadata_db + then fake_ha_dbconn :: connections else connections in + + let fake_gen_dbconn = { Parse_db_conf.dummy_conf with + Parse_db_conf.path = Xapi_globs.gen_metadata_db } in + let connections = + if Sys.file_exists Xapi_globs.gen_metadata_db + then fake_gen_dbconn :: connections else connections in + + (* If we have a temporary_restore_path (backup uploaded in previous run of xapi process) then restore from that *) + let db = + if Sys.file_exists Xapi_globs.db_temporary_restore_path then begin + (* we know that the backup is XML format so, to get the manifest, we jump right in and use the xml backend directly here.. *) + let manifest = Backend_xml.populate_and_read_manifest Parse_db_conf.backup_file_dbconn in + Db_backend.post_restore_hook manifest; + (* delete file that contained backup *) + Db_backend.try_and_delete_db_file Xapi_globs.db_temporary_restore_path; + Parse_db_conf.backup_file_dbconn + end + else (* if there's no backup to restore from then.. *) + begin + (* Check schema vsn is current; if not try and upgrade; if can't do that then fail startup.. *) + let most_recent_db = Db_connections.pick_most_recent_db connections in + (* populate gets all field names from the existing (old) db file, not the (current) schema... which is nice: *) + Backend_xml.populate most_recent_db; + most_recent_db + end in + (* Always perform the generic database upgrade stuff *) + Db_upgrade.generic_database_upgrade (); + + (* Then look to see whether we have specific upgrade rules to consider *) + if Sys.file_exists db.Parse_db_conf.path then Db_upgrade.maybe_upgrade db; + + post_populate_hook () + +let sync_all_db_connections() = + (* Unconditionally force-flush all databases. *) + List.iter Db_connections.force_flush_all (List.map snd (Db_connections.get_dbs_and_gen_counts())) + +let flush_dirty dbconn = Db_connections.flush_dirty_and_maybe_exit dbconn None +let flush_and_exit dbconn ret_code = ignore (Db_connections.flush_dirty_and_maybe_exit dbconn (Some ret_code)) + + +let spawn_db_flush_threads() = + (* Spawn threads that flush cache to db connections at regular intervals *) + List.iter + (fun dbconn -> + ignore (Thread.create + (fun ()-> + Db_connections.inc_db_flush_thread_refcount(); + let db_path = dbconn.Parse_db_conf.path in + Debug.name_thread ("dbflush ["^db_path^"]"); + let my_writes_this_period = ref 0 in + + (* the collesce_period_start records the time of the last write *) + let coallesce_period_start = ref (Unix.gettimeofday()) in + let period_start = ref (Unix.gettimeofday()) in + + (* we set a coallesce period of min(5 mins, write_limit_period / write_limit_write_cycles) *) + let min (x,y) = if x<=y then x else y in + (* if we're not write limiting then set the coallesce period to 5 minutes; otherwise set coallesce period to divide the + number of write cycles across the ... + *) + let coallesce_time = float_of_int (5*60) (* coallesce writes for 5 minutes to avoid serializing db to disk all the time. *) in + debug "In memory DB flushing thread created [%s]. %s" db_path + (if dbconn.Parse_db_conf.mode <> Parse_db_conf.No_limit then + "Write limited with coallesce_time="^(string_of_float coallesce_time) + else ""); + (* check if we are currently in a coallescing_period *) + let in_coallescing_period() = + (Unix.gettimeofday() -. !coallesce_period_start < coallesce_time) in + + while (true) do + try + begin + Thread.delay Db_backend.db_FLUSH_TIMER; + (* If I have some writing capacity left in this write period then consider doing a write; or + if the connection is not write-limited then consider doing a write too. + We also have to consider doing a write if exit_on_next_flush is set: because when this is + set (by a signal handler) we want to do a flush whether or not our write limit has been + exceeded. + *) + if !Db_connections.exit_on_next_flush (* always flush straight away; this request is urgent *) || + (* otherwise, we only write if (i) "coalesscing period has come to an end"; and (ii) "write limiting requirements are met": *) + ((not (in_coallescing_period())) (* see (i) above *) && + ((!my_writes_this_period < dbconn.Parse_db_conf.write_limit_write_cycles) || dbconn.Parse_db_conf.mode = Parse_db_conf.No_limit (* (ii) above *) + ) + ) + then + begin + (* debug "[%s] considering flush" db_path; *) + let was_anything_flushed = Threadext.Mutex.execute Db_lock.global_flush_mutex (fun ()->flush_dirty dbconn) in + if was_anything_flushed then + begin + my_writes_this_period := !my_writes_this_period + 1; + (* when we do a write, reset the coallesce_period_start to now -- recall that this + variable tracks the time since last write *) + coallesce_period_start := Unix.gettimeofday() + end + end; + (* else debug "[%s] not flushing because write-limit exceeded" db_path; *) + (* Check to see if the current write period has finished yet.. *) + if (Unix.gettimeofday() -. !period_start > (float_of_int dbconn.Parse_db_conf.write_limit_period)) then + begin + (* debug "[%s] resetting write-limit counters: start of new period" db_path; *) + (* We're at the start of a new writing period! *) + period_start := Unix.gettimeofday(); + my_writes_this_period := 0; + end + (* else debug "[%s] not resetting write-limit counters: not in new period yet" db_path *) + end + with + e -> debug "Exception in DB flushing thread: %s" (Printexc.to_string e) + done) ()) + ) (Db_conn_store.read_db_connections()) + + +(* Called by server at start-of-day to initialiase cache. Populates cache and starts flushing threads *) +let initialise () = + populate_cache(); + sync_all_db_connections(); + spawn_db_flush_threads() + +(* entry point for xapi-db-process; initialises a db cache without syncing all db connections "to tip" *) +let initialise_db_cache_nosync() = + populate_cache(); + spawn_db_flush_threads() + +let dump_db_cache fd = + let db_cache_manifest = Db_cache_types.manifest_of_cache Db_backend.cache in + let time = Unix.gettimeofday() in + (* Snapshot the cache (uses the lock) and then slowly serialise the copy *) + Db_xml.To.fd fd (db_cache_manifest, snapshot Db_backend.cache); + debug "Written xml to fd: (time %f)" (Unix.gettimeofday() -. time) + +(** Return an association list of table name * record count *) +let stats () = + with_lock + (fun () -> + fold_over_tables (fun name tbl acc -> + let size = fold_over_rows (fun _ _ acc -> acc + 1) tbl 0 in + (name, size) :: acc) Db_backend.cache []) + + + diff --git a/ocaml/database/db_cache_impl.mli b/ocaml/database/db_cache_impl.mli new file mode 100644 index 00000000..71c1db9a --- /dev/null +++ b/ocaml/database/db_cache_impl.mli @@ -0,0 +1,19 @@ +include Db_interface.DB_ACCESS + +(** [initialise ()] initialises the in-memory cache *) +val initialise : unit -> unit + +(** [flush_and_exit db code] flushes the specific backend [db] and exits + xapi with [code] *) +val flush_and_exit : Parse_db_conf.db_connection -> int -> unit + +(** [initialise_db_cache_nosync ()] is the same as [initialise ()] without + the side-effect of writing to any database files *) +val initialise_db_cache_nosync : unit -> unit + +(** [dump_db_cache fd] writes a snapshot of the database to file descriptor + [fd] *) +val dump_db_cache : Unix.file_descr -> unit + +(** [stats ()] returns some stats data for logging *) +val stats : unit -> (string * int) list diff --git a/ocaml/database/db_cache_types.ml b/ocaml/database/db_cache_types.ml index 5adb14f6..6f51f326 100644 --- a/ocaml/database/db_cache_types.ml +++ b/ocaml/database/db_cache_types.ml @@ -21,8 +21,8 @@ type cache = { generation: Generation.t ref; } -type where_record = {table:string; return:string; where_field:string; where_value:string} -type structured_op_t = AddSet | RemoveSet | AddMap | RemoveMap +type where_record = {table:string; return:string; where_field:string; where_value:string} with rpc +type structured_op_t = AddSet | RemoveSet | AddMap | RemoveMap with rpc let string_of_structured_op op = match op with | AddSet -> "add_set" diff --git a/ocaml/database/db_cache_types.mli b/ocaml/database/db_cache_types.mli index f185d4d2..58583e69 100644 --- a/ocaml/database/db_cache_types.mli +++ b/ocaml/database/db_cache_types.mli @@ -21,7 +21,13 @@ type where_record = { where_field : string; where_value : string; } +val rpc_of_where_record: where_record -> Rpc.t +val where_record_of_rpc: Rpc.t -> where_record + type structured_op_t = AddSet | RemoveSet | AddMap | RemoveMap +val rpc_of_structured_op_t: structured_op_t -> Rpc.t +val structured_op_t_of_rpc: Rpc.t -> structured_op_t + type db_dump_manifest = { schema_major_vsn : int; schema_minor_vsn : int; diff --git a/ocaml/database/db_connections.ml b/ocaml/database/db_connections.ml index fd7e1ecd..89e19f34 100644 --- a/ocaml/database/db_connections.ml +++ b/ocaml/database/db_connections.ml @@ -11,32 +11,27 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. *) -module D = Debug.Debugger(struct let name = "sql" end) +module D = Debug.Debugger(struct let name = "xapi" end) module R = Debug.Debugger(struct let name = "redo_log" end) open D let get_dbs_and_gen_counts() = List.map (fun conn->(Generation.read conn, conn)) (Db_conn_store.read_db_connections()) +exception No_databases + (* This returns the most recent of the db connections to populate from. It also initialises the in-memory generation count to the largest of the db connections' generation counts *) -let pick_most_recent_db connections = - let conns_and_gens = List.map (fun conn -> Generation.read conn, conn) connections in - let _ = List.iter (fun (g,conn)->debug "Dbconf contains: %Ld, %s" g conn.Parse_db_conf.path) conns_and_gens in - let conn = ref Parse_db_conf.dummy_conf in - let rec pick_largest l sofar = - match l,sofar with - [], g -> !conn - | (this_g, this_conn)::cs, largest_g -> - if this_g > largest_g then - begin - conn := this_conn; - pick_largest cs this_g - end - else pick_largest cs largest_g in - let most_recent = pick_largest conns_and_gens (Int64.sub 0L 1L) in - debug "Most recent db in db.conf file is '%s'" most_recent.Parse_db_conf.path; - most_recent +let pick_most_recent_db = function +| [] -> raise No_databases +| (c :: cs) as connections -> + List.iter (fun c -> debug "Dbconf contains: %s (generation %Ld)" c.Parse_db_conf.path (Generation.read c)) connections; + let gen, most_recent = List.fold_left (fun (g, c) c' -> + let g' = Generation.read c' in + if g' > g then (g', c') else (g, c)) + (Generation.read c, c) cs in + debug "Most recent db is %s (generation %Ld)" most_recent.Parse_db_conf.path gen; + most_recent let preferred_write_db () = List.hd (Db_conn_store.read_db_connections()) (* !!! FIX ME *) diff --git a/ocaml/database/db_exn.ml b/ocaml/database/db_exn.ml index e0ee2903..05355dd1 100644 --- a/ocaml/database/db_exn.ml +++ b/ocaml/database/db_exn.ml @@ -15,3 +15,10 @@ exception Duplicate_key of (*class*) string * (*field*) string * (*uuid*) string * (*key*) string exception DBCache_NotFound of string*string*string exception Uniqueness_constraint_violation of string*string*string + +exception Read_missing_uuid of (*class*) string * (*ref*) string * (*uuid*) string +exception Too_many_values of (*class*) string * (*ref*) string * (*uuid*) string + +exception Remote_db_server_returned_unknown_exception +exception Remote_db_server_returned_bad_message + diff --git a/ocaml/database/db_filter_types.ml b/ocaml/database/db_filter_types.ml index a9b44135..2413b29c 100644 --- a/ocaml/database/db_filter_types.ml +++ b/ocaml/database/db_filter_types.ml @@ -15,6 +15,7 @@ type _val = | Field of string | Literal of string +with rpc (** Represent a predicate: table row -> bool *) type expr = @@ -24,4 +25,4 @@ type expr = | Eq of _val * _val | And of expr * expr | Or of expr * expr - +with rpc diff --git a/ocaml/database/db_interface.ml b/ocaml/database/db_interface.ml new file mode 100644 index 00000000..8188b00e --- /dev/null +++ b/ocaml/database/db_interface.ml @@ -0,0 +1,98 @@ +(* + * Copyright (C) 2006-2009 Citrix Systems Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** A generic RPC interface *) +module type RPC = sig + + (** [initialise ()] should be called before [rpc] *) + val initialise : unit -> unit + + (** [rpc request] transmits [request] and receives a response *) + val rpc : string -> string +end + +(** dictionary of regular fields x dictionary of associated set_ref values *) +type db_record = (string * string) list * (string * (string list)) list + +(** The client interface to the database *) +module type DB_ACCESS = sig + (** [initialise ()] must be called before any other function in this + interface *) + val initialise : unit -> unit + + (** [get_table_from_ref ref] returns [Some tbl] if [ref] is a + valid reference; None otherwise *) + val get_table_from_ref : string -> string option + + (** [is_valid_ref ref] returns true if [ref] is valid; false otherwise *) + val is_valid_ref : string -> bool + + (** [read_refs tbl] returns a list of all references in table [tbl] *) + val read_refs : string -> string list + + (** [find_refs_with_filter tbl expr] returns a list of all references + to rows which match [expr] *) + val find_refs_with_filter : + string -> Db_filter_types.expr -> string list + + (** [read_field_where {tbl,return,where_field,where_value}] returns a + list of the [return] fields in table [tbl] where the [where_field] + equals [where_value] *) + val read_field_where : Db_cache_types.where_record -> string list + + (** [db_get_by_uuid tbl uuid] returns the single object reference + associated with [uuid] *) + val db_get_by_uuid : string -> string -> string + + (** [db_get_by_name_label tbl label] returns the list of object references + associated with [label] *) + val db_get_by_name_label : string -> string -> string list + + (** [read_set_ref {tbl,return,where_field,where_value}] is identical + to [read_field_where ...] except it builds and consults an index *) + val read_set_ref : Db_cache_types.where_record -> string list + + (** [create_row tbl kvpairs ref] create a new row in [tbl] with + key [ref] and contents [kvpairs] *) + val create_row : + string -> (string * string) list -> string -> unit + + (** [delete_row context tbl ref] deletes row [ref] from table [tbl] *) + val delete_row : string -> string -> unit + + (** [write_field context tbl ref fld val] changes field [fld] to [val] in + row [ref] in table [tbl] *) + val write_field : string -> string -> string -> string -> unit + + (** [read_field context tbl ref fld] returns the value of field [fld] + in row [ref] in table [tbl] *) + val read_field : string -> string -> string -> string + + (** [read_record tbl ref] returns + [ (field, value) ] * [ (set_ref fieldname * [ ref ]) ] *) + val read_record : string -> string -> db_record + + (** [read_records_where tbl expr] returns a list of the values returned + by read_record that match the expression *) + val read_records_where : string -> Db_filter_types.expr -> + (string * db_record) list + + (** [process_structured_field context kv tbl fld ref op] modifies the + value of field [fld] in row [ref] in table [tbl] according to [op] + which may be one of AddSet RemoveSet AddMap RemoveMap with + arguments [kv] *) + val process_structured_field : + string * string -> + string -> string -> string -> Db_cache_types.structured_op_t -> unit +end diff --git a/ocaml/database/db_remote_cache_access.ml b/ocaml/database/db_remote_cache_access.ml deleted file mode 100644 index 4a78d5f8..00000000 --- a/ocaml/database/db_remote_cache_access.ml +++ /dev/null @@ -1,135 +0,0 @@ -(* - * Copyright (C) 2006-2009 Citrix Systems Inc. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; version 2.1 only. with the special - * exception on linking described in file LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - *) - -module DBCacheRemoteListener = -struct - open Db_remote_marshall - open Db_action_helper - open Db_cache - open Db_exn - - exception DBCacheListenerInvalidMessageReceived - exception DBCacheListenerUnknownMessageName of string - - module D = Debug.Debugger(struct let name = "db_server" end) - open D - - let ctr_mutex = Mutex.create() - let calls_processed = ref 0 - let total_recv_len = ref 0 - let total_transmit_len = ref 0 - - (* Performance counters for debugging *) - let update_lengths msg resp = - Mutex.lock ctr_mutex; - total_transmit_len := (!total_transmit_len) + (String.length (Xml.to_string_fmt resp)); - total_recv_len := (!total_recv_len) + (String.length (Xml.to_string_fmt msg)); - Mutex.unlock ctr_mutex - - let success xml = - let resp = - XMLRPC.To.array - [XMLRPC.To.string "success"; - xml] in - (* update_lengths xml resp; *) - (* let s = Xml.to_string_fmt resp in *) - (* debug "Resp [Len = %d]: %s" (String.length s) s; *) - debug "Call succeeded"; - resp - - let failure exn_name xml = - let resp = - XMLRPC.To.array - [XMLRPC.To.string "failure"; - XMLRPC.To.array - [XMLRPC.To.string exn_name; - xml]] in - (* update_lengths xml resp; *) - debug "Call failed"; - resp - - (** Unmarshals the request, calls the DBCache function and marshals the result. - Note that, although the messages still contain the pool_secret for historical reasons, - access has already been applied by the RBAC code in Xapi_http.add_handler. *) - let process_xmlrpc xml = - Mutex.lock ctr_mutex; - calls_processed := !calls_processed + 1; - Mutex.unlock ctr_mutex; - let fn_name, args = - match (XMLRPC.From.array (fun x->x) xml) with - [fn_name; _; args] -> - XMLRPC.From.string fn_name, args - | _ -> raise DBCacheListenerInvalidMessageReceived in - try - debug "Received [total=%d rx=%d tx=%d] %s" !calls_processed !total_recv_len !total_transmit_len fn_name; - match fn_name with - "get_table_from_ref" -> - let s = unmarshall_get_table_from_ref_args args in - success (marshall_get_table_from_ref_response (DBCache.get_table_from_ref s)) - | "is_valid_ref" -> - let s = unmarshall_is_valid_ref_args args in - success (marshall_is_valid_ref_response (DBCache.is_valid_ref s)) - | "read_refs" -> - let s = unmarshall_read_refs_args args in - success (marshall_read_refs_response (DBCache.read_refs s)) - | "read_field_where" -> - let w = unmarshall_read_field_where_args args in - success (marshall_read_field_where_response (DBCache.read_field_where w)) - | "read_set_ref" -> - let w = unmarshall_read_set_ref_args args in - success (marshall_read_set_ref_response (DBCache.read_field_where w)) - | "create_row" -> - let (s1,ssl,s2) = unmarshall_create_row_args args in - success (marshall_create_row_response (DBCache.create_row Context.initial s1 ssl s2)) - | "delete_row" -> - let (s1,s2) = unmarshall_delete_row_args args in - success (marshall_delete_row_response (DBCache.delete_row Context.initial s1 s2)) - | "write_field" -> - let (s1,s2,s3,s4) = unmarshall_write_field_args args in - success (marshall_write_field_response (DBCache.write_field Context.initial s1 s2 s3 s4)) - | "read_field" -> - let (s1,s2,s3) = unmarshall_read_field_args args in - success (marshall_read_field_response (DBCache.read_field Context.initial s1 s2 s3)) - | "find_refs_with_filter" -> - let (s,e) = unmarshall_find_refs_with_filter_args args in - success (marshall_find_refs_with_filter_response (DBCache.find_refs_with_filter s e)) - | "process_structured_field" -> - let (ss,s1,s2,s3,op) = unmarshall_process_structured_field_args args in - success (marshall_process_structured_field_response (DBCache.process_structured_field Context.initial ss s1 s2 s3 op)) - | "read_record" -> - let (s1,s2) = unmarshall_read_record_args args in - success (marshall_read_record_response (DBCache.read_record s1 s2)) - | "read_records_where" -> - let (s,e) = unmarshall_read_records_where_args args in - success (marshall_read_records_where_response (DBCache.read_records_where s e)) - | "db_get_by_uuid" -> - let (s,e) = unmarshall_db_get_by_uuid_args args in - success (marshall_db_get_by_uuid_response (DBCache.db_get_by_uuid s e)) - | "db_get_by_name_label" -> - let (s,e) = unmarshall_db_get_by_name_label_args args in - success (marshall_db_get_by_name_label_response (DBCache.db_get_by_name_label s e)) - | _ -> raise (DBCacheListenerUnknownMessageName fn_name) - with - Duplicate_key (c,f,u,k) -> - failure "duplicate_key_of" (marshall_4strings (c,f,u,k)) - | DBCache_NotFound (s1,s2,s3) -> - failure "dbcache_notfound" (marshall_3strings (s1,s2,s3)) - | Uniqueness_constraint_violation (s1,s2,s3) -> - failure "uniqueness_constraint_violation" (marshall_3strings (s1,s2,s3)) - | Read_missing_uuid (s1,s2,s3) -> - failure "read_missing_uuid" (marshall_3strings (s1,s2,s3)) - | Too_many_values (s1,s2,s3) -> - failure "too_many_values" (marshall_3strings (s1,s2,s3)) - | e -> raise e -end diff --git a/ocaml/database/db_remote_cache_access_v1.ml b/ocaml/database/db_remote_cache_access_v1.ml new file mode 100644 index 00000000..3e893771 --- /dev/null +++ b/ocaml/database/db_remote_cache_access_v1.ml @@ -0,0 +1,131 @@ + +module DBCacheRemoteListener = struct + open Db_rpc_common_v1 + open Db_action_helper + open Db_cache + open Db_exn + + exception DBCacheListenerInvalidMessageReceived + exception DBCacheListenerUnknownMessageName of string + + module D = Debug.Debugger(struct let name = "db_server" end) + open D + + let ctr_mutex = Mutex.create() + let calls_processed = ref 0 + let total_recv_len = ref 0 + let total_transmit_len = ref 0 + + (* Performance counters for debugging *) + let update_lengths msg resp = + Mutex.lock ctr_mutex; + total_transmit_len := (!total_transmit_len) + (String.length (Xml.to_string_fmt resp)); + total_recv_len := (!total_recv_len) + (String.length (Xml.to_string_fmt msg)); + Mutex.unlock ctr_mutex + + let success xml = + let resp = + XMLRPC.To.array + [XMLRPC.To.string "success"; + xml] in + (* update_lengths xml resp; *) + (* let s = Xml.to_string_fmt resp in *) + (* debug "Resp [Len = %d]: %s" (String.length s) s; *) + debug "Call succeeded"; + resp + + let failure exn_name xml = + let resp = + XMLRPC.To.array + [XMLRPC.To.string "failure"; + XMLRPC.To.array + [XMLRPC.To.string exn_name; + xml]] in + (* update_lengths xml resp; *) + debug "Call failed"; + resp + + module DBCache : Db_interface.DB_ACCESS = Db_cache_impl + + (** Unmarshals the request, calls the DBCache function and marshals the result. + Note that, although the messages still contain the pool_secret for historical reasons, + access has already been applied by the RBAC code in Xapi_http.add_handler. *) + let process_xmlrpc xml = + Mutex.lock ctr_mutex; + calls_processed := !calls_processed + 1; + Mutex.unlock ctr_mutex; + let fn_name, args = + match (XMLRPC.From.array (fun x->x) xml) with + [fn_name; _; args] -> + XMLRPC.From.string fn_name, args + | _ -> raise DBCacheListenerInvalidMessageReceived in + try + debug "Received [total=%d rx=%d tx=%d] %s" !calls_processed !total_recv_len !total_transmit_len fn_name; + match fn_name with + "get_table_from_ref" -> + let s = unmarshall_get_table_from_ref_args args in + success (marshall_get_table_from_ref_response (DBCache.get_table_from_ref s)) + | "is_valid_ref" -> + let s = unmarshall_is_valid_ref_args args in + success (marshall_is_valid_ref_response (DBCache.is_valid_ref s)) + | "read_refs" -> + let s = unmarshall_read_refs_args args in + success (marshall_read_refs_response (DBCache.read_refs s)) + | "read_field_where" -> + let w = unmarshall_read_field_where_args args in + success (marshall_read_field_where_response (DBCache.read_field_where w)) + | "read_set_ref" -> + let w = unmarshall_read_set_ref_args args in + success (marshall_read_set_ref_response (DBCache.read_field_where w)) + | "create_row" -> + let (s1,ssl,s2) = unmarshall_create_row_args args in + success (marshall_create_row_response (DBCache.create_row s1 ssl s2)) + | "delete_row" -> + let (s1,s2) = unmarshall_delete_row_args args in + success (marshall_delete_row_response (DBCache.delete_row s1 s2)) + | "write_field" -> + let (s1,s2,s3,s4) = unmarshall_write_field_args args in + success (marshall_write_field_response (DBCache.write_field s1 s2 s3 s4)) + | "read_field" -> + let (s1,s2,s3) = unmarshall_read_field_args args in + success (marshall_read_field_response (DBCache.read_field s1 s2 s3)) + | "find_refs_with_filter" -> + let (s,e) = unmarshall_find_refs_with_filter_args args in + success (marshall_find_refs_with_filter_response (DBCache.find_refs_with_filter s e)) + | "process_structured_field" -> + let (ss,s1,s2,s3,op) = unmarshall_process_structured_field_args args in + success (marshall_process_structured_field_response (DBCache.process_structured_field ss s1 s2 s3 op)) + | "read_record" -> + let (s1,s2) = unmarshall_read_record_args args in + success (marshall_read_record_response (DBCache.read_record s1 s2)) + | "read_records_where" -> + let (s,e) = unmarshall_read_records_where_args args in + success (marshall_read_records_where_response (DBCache.read_records_where s e)) + | "db_get_by_uuid" -> + let (s,e) = unmarshall_db_get_by_uuid_args args in + success (marshall_db_get_by_uuid_response (DBCache.db_get_by_uuid s e)) + | "db_get_by_name_label" -> + let (s,e) = unmarshall_db_get_by_name_label_args args in + success (marshall_db_get_by_name_label_response (DBCache.db_get_by_name_label s e)) + | _ -> raise (DBCacheListenerUnknownMessageName fn_name) + with + Duplicate_key (c,f,u,k) -> + failure "duplicate_key_of" (marshall_4strings (c,f,u,k)) + | DBCache_NotFound (s1,s2,s3) -> + failure "dbcache_notfound" (marshall_3strings (s1,s2,s3)) + | Uniqueness_constraint_violation (s1,s2,s3) -> + failure "uniqueness_constraint_violation" (marshall_3strings (s1,s2,s3)) + | Read_missing_uuid (s1,s2,s3) -> + failure "read_missing_uuid" (marshall_3strings (s1,s2,s3)) + | Too_many_values (s1,s2,s3) -> + failure "too_many_values" (marshall_3strings (s1,s2,s3)) + | e -> raise e +end + +let handler req bio = + let fd = Buf_io.fd_of bio in (* fd only used for writing *) + let body = Http_svr.read_body ~limit:Xapi_globs.http_limit_max_rpc_size req bio in + let body_xml = Xml.parse_string body in + let response = Xml.to_bigbuffer (DBCacheRemoteListener.process_xmlrpc body_xml) in + Http_svr.response_fct req fd (Bigbuffer.length response) + (fun fd -> Bigbuffer.to_fct response (fun s -> ignore(Unix.write fd s 0 (String.length s)))) diff --git a/ocaml/database/db_remote_cache_access_v1.mli b/ocaml/database/db_remote_cache_access_v1.mli new file mode 100644 index 00000000..8253fee4 --- /dev/null +++ b/ocaml/database/db_remote_cache_access_v1.mli @@ -0,0 +1,2 @@ +(** HTTP handler for v1 of the remote DB access protocol *) +val handler: Http.request -> Buf_io.t -> unit diff --git a/ocaml/database/db_remote_cache_access_v2.ml b/ocaml/database/db_remote_cache_access_v2.ml new file mode 100644 index 00000000..1b313724 --- /dev/null +++ b/ocaml/database/db_remote_cache_access_v2.ml @@ -0,0 +1,78 @@ +(* + * Copyright (C) 2010 Citrix Systems Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** server-side for remote database access protocol v2 *) + +open Db_rpc_common_v2 +open Db_exn + +(** Convert a marshalled Request Rpc.t into a marshalled Response Rpc.t *) +let process_rpc (req: Rpc.t) = + let module DB = (Db_cache_impl : Db_interface.DB_ACCESS) in + Response.rpc_of_t + (try + match Request.t_of_rpc req with + | Request.Get_table_from_ref x -> + Response.Get_table_from_ref (DB.get_table_from_ref x) + | Request.Is_valid_ref x -> + Response.Is_valid_ref (DB.is_valid_ref x) + | Request.Read_refs x -> + Response.Read_refs (DB.read_refs x) + | Request.Find_refs_with_filter (x, e) -> + Response.Find_refs_with_filter (DB.find_refs_with_filter x e) + | Request.Read_field_where w -> + Response.Read_field_where (DB.read_field_where w) + | Request.Db_get_by_uuid (a, b) -> + Response.Db_get_by_uuid (DB.db_get_by_uuid a b) + | Request.Db_get_by_name_label (a, b) -> + Response.Db_get_by_name_label (DB.db_get_by_name_label a b) + | Request.Read_set_ref w -> + Response.Read_set_ref (DB.read_set_ref w) + | Request.Create_row (a, b, c) -> + Response.Create_row (DB.create_row a b c) + | Request.Delete_row (a, b) -> + Response.Delete_row (DB.delete_row a b) + | Request.Write_field (a, b, c, d) -> + Response.Write_field (DB.write_field a b c d) + | Request.Read_field (a, b, c) -> + Response.Read_field (DB.read_field a b c) + | Request.Read_record (a, b) -> + let a', b' = DB.read_record a b in + Response.Read_record (a', b') + | Request.Read_records_where (a, b) -> + Response.Read_records_where (DB.read_records_where a b) + | Request.Process_structured_field (a, b, c, d, e) -> + Response.Process_structured_field (DB.process_structured_field a b c d e) + with + | DBCache_NotFound (x,y,z) -> + Response.Dbcache_notfound (x, y, z) + | Duplicate_key (w,x,y,z) -> + Response.Duplicate_key_of (w, x, y, z) + | Uniqueness_constraint_violation (x,y,z) -> + Response.Uniqueness_constraint_violation (x, y, z) + | Read_missing_uuid (x,y,z) -> + Response.Read_missing_uuid (x, y, z) + | Too_many_values (x,y,z) -> + Response.Too_many_values (x, y, z) + + ) + +let handler req bio = + let fd = Buf_io.fd_of bio in (* fd only used for writing *) + let body = Http_svr.read_body ~limit:Xapi_globs.http_limit_max_rpc_size req bio in + let request_rpc = Jsonrpc.of_string body in + (* XXX: need to cope with > 16MiB responses *) + let response = Jsonrpc.to_string (process_rpc request_rpc) in + Http_svr.response_str req fd response + diff --git a/ocaml/database/db_remote_cache_access_v2.mli b/ocaml/database/db_remote_cache_access_v2.mli new file mode 100644 index 00000000..b9b7a86f --- /dev/null +++ b/ocaml/database/db_remote_cache_access_v2.mli @@ -0,0 +1,2 @@ +(** HTTP handler for v2 of the remote DB access protocol *) +val handler: Http.request -> Buf_io.t -> unit diff --git a/ocaml/database/db_remote_marshall.ml b/ocaml/database/db_remote_marshall.ml deleted file mode 100644 index 540d8cce..00000000 --- a/ocaml/database/db_remote_marshall.ml +++ /dev/null @@ -1,307 +0,0 @@ -(* - * Copyright (C) 2006-2009 Citrix Systems Inc. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; version 2.1 only. with the special - * exception on linking described in file LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - *) -open Db_cache_types - -exception DB_remote_marshall_error - -let marshall_2strings (x,y) = - XMLRPC.To.array [XMLRPC.To.string x; XMLRPC.To.string y] -let unmarshall_2strings xml = - match (XMLRPC.From.array (fun x->x) xml) with - [x1;x2] -> - (XMLRPC.From.string x1, - XMLRPC.From.string x2) - | _ -> raise DB_remote_marshall_error - -let marshall_4strings (x,y,w,z) = - XMLRPC.To.array [XMLRPC.To.string x; XMLRPC.To.string y; XMLRPC.To.string w; XMLRPC.To.string z] -let unmarshall_4strings xml = - match (XMLRPC.From.array (fun x->x) xml) with - [x1;x2;x3;x4] -> - (XMLRPC.From.string x1, - XMLRPC.From.string x2, - XMLRPC.From.string x3, - XMLRPC.From.string x4) - | _ -> raise DB_remote_marshall_error - -let marshall_3strings (x,y,w) = - XMLRPC.To.array [XMLRPC.To.string x; XMLRPC.To.string y; XMLRPC.To.string w] -let unmarshall_3strings xml = - match (XMLRPC.From.array (fun x->x) xml) with - [x1;x2;x3] -> - (XMLRPC.From.string x1, - XMLRPC.From.string x2, - XMLRPC.From.string x3) - | _ -> raise DB_remote_marshall_error - -let marshall_stringlist sl = - XMLRPC.To.array (List.map XMLRPC.To.string sl) -let unmarshall_stringlist xml = - List.map XMLRPC.From.string (XMLRPC.From.array (fun x->x) xml) - -let marshall_stringstringlist ssl = - XMLRPC.To.array (List.map marshall_2strings ssl) -let unmarshall_stringstringlist xml = - List.map unmarshall_2strings (XMLRPC.From.array (fun x->x) xml) - -let marshall_stringopt x = - match x with - None -> XMLRPC.To.array [] - | (Some x) -> XMLRPC.To.array [XMLRPC.To.string x] -let unmarshall_stringopt xml = - match (XMLRPC.From.array (fun x->x) xml) with - [] -> None - | [xml] -> Some (XMLRPC.From.string xml) - -let marshall_expr x = - Db_filter.xml_of_expr x -let unmarshall_expr xml = - Db_filter.expr_of_xml xml - -let marshall_structured_op x = - let str = - match x with - AddSet -> "addset" - | RemoveSet -> "removeset" - | AddMap -> "addmap" - | RemoveMap -> "removemap" in - XMLRPC.To.string str -let unmarshall_structured_op xml = - match (XMLRPC.From.string xml) with - "addset" -> AddSet - | "removeset" -> RemoveSet - | "addmap" -> AddMap - | "removemap" -> RemoveMap - -let marshall_where_rec r = - XMLRPC.To.array [XMLRPC.To.string r.table; - XMLRPC.To.string r.return; - XMLRPC.To.string r.where_field; - XMLRPC.To.string r.where_value] -let unmarshall_where_rec xml = - match (XMLRPC.From.array (fun x->x) xml) with - [t;r;wf;wv] -> - {table=XMLRPC.From.string t; return=XMLRPC.From.string r; - where_field=XMLRPC.From.string wf; where_value=XMLRPC.From.string wv} - | _ -> raise DB_remote_marshall_error - -let marshall_unit () = - XMLRPC.To.string "" -let unmarshall_unit xml = - match XMLRPC.From.string xml with - "" -> () - | _ -> raise DB_remote_marshall_error - -(* get_table_from_ref *) -let marshall_get_table_from_ref_args s = XMLRPC.To.string s -let unmarshall_get_table_from_ref_args xml = XMLRPC.From.string xml -let marshall_get_table_from_ref_response so = marshall_stringopt so -let unmarshall_get_table_from_ref_response so = unmarshall_stringopt so - -(* is_valid_ref *) -let marshall_is_valid_ref_args s = XMLRPC.To.string s -let unmarshall_is_valid_ref_args xml = XMLRPC.From.string xml -let marshall_is_valid_ref_response b = XMLRPC.To.boolean b -let unmarshall_is_valid_ref_response xml = XMLRPC.From.boolean xml - -(* read_refs *) -let marshall_read_refs_args s = XMLRPC.To.string s -let unmarshall_read_refs_args s = XMLRPC.From.string s -let marshall_read_refs_response sl = marshall_stringlist sl -let unmarshall_read_refs_response xml = unmarshall_stringlist xml - -(* read_field_where *) -let marshall_read_field_where_args w = marshall_where_rec w -let unmarshall_read_field_where_args xml = unmarshall_where_rec xml -let marshall_read_field_where_response sl = - marshall_stringlist sl -let unmarshall_read_field_where_response xml = - unmarshall_stringlist xml - -(* db_get_by_uuid *) -let marshall_db_get_by_uuid_args (s1,s2) = - marshall_2strings (s1,s2) -let unmarshall_db_get_by_uuid_args xml = - unmarshall_2strings xml -let marshall_db_get_by_uuid_response s = - XMLRPC.To.string s -let unmarshall_db_get_by_uuid_response xml = - XMLRPC.From.string xml - -(* db_get_by_name_label *) -let marshall_db_get_by_name_label_args (s1,s2) = - marshall_2strings (s1,s2) -let unmarshall_db_get_by_name_label_args xml = - unmarshall_2strings xml -let marshall_db_get_by_name_label_response sl = - marshall_stringlist sl -let unmarshall_db_get_by_name_label_response xml = - unmarshall_stringlist xml - -(* read_set_ref *) -let marshall_read_set_ref_args w = marshall_where_rec w -let unmarshall_read_set_ref_args xml = unmarshall_where_rec xml -let marshall_read_set_ref_response sl = - marshall_stringlist sl -let unmarshall_read_set_ref_response xml = - unmarshall_stringlist xml - - -(* create_row *) -let marshall_create_row_args (s1,ssl,s2) = - XMLRPC.To.array - [XMLRPC.To.string s1; - XMLRPC.To.array (List.map marshall_2strings ssl); - XMLRPC.To.string s2] -let unmarshall_create_row_args xml = - match (XMLRPC.From.array (fun x->x) xml) with - [s1_xml; ssl_xml; s2_xml] -> - (XMLRPC.From.string s1_xml, - List.map unmarshall_2strings (XMLRPC.From.array (fun x->x) ssl_xml), - XMLRPC.From.string s2_xml) - | _ -> raise DB_remote_marshall_error -let marshall_create_row_response () = - marshall_unit () -let unmarshall_create_row_response xml = - unmarshall_unit xml - -(* delete_row *) -let marshall_delete_row_args (s1,s2) = - XMLRPC.To.array - [XMLRPC.To.string s1; - XMLRPC.To.string s2] -let unmarshall_delete_row_args xml = - match (XMLRPC.From.array (fun x->x) xml) with - [s1_xml; s2_xml] -> - (XMLRPC.From.string s1_xml, XMLRPC.From.string s2_xml) - | _ -> raise DB_remote_marshall_error -let marshall_delete_row_response () = - marshall_unit () -let unmarshall_delete_row_response xml = - unmarshall_unit xml - -(* write_field *) -let marshall_write_field_args (s1,s2,s3,s4) = - XMLRPC.To.array - (List.map XMLRPC.To.string [s1;s2;s3;s4]) -let unmarshall_write_field_args xml = - match (XMLRPC.From.array (fun x->x) xml) with - ([s1x;s2x;s3x;s4x] as l) -> - (XMLRPC.From.string s1x, XMLRPC.From.string s2x, - XMLRPC.From.string s3x, XMLRPC.From.string s4x) - | _ -> raise DB_remote_marshall_error -let marshall_write_field_response () = - marshall_unit () -let unmarshall_write_field_response xml = - unmarshall_unit xml - -(* read_field *) -let marshall_read_field_args (s1,s2,s3) = - XMLRPC.To.array - (List.map XMLRPC.To.string [s1;s2;s3]) -let unmarshall_read_field_args xml = - match (XMLRPC.From.array (fun x->x) xml) with - ([s1x;s2x;s3x] as l) -> - (XMLRPC.From.string s1x, XMLRPC.From.string s2x, - XMLRPC.From.string s3x) - | _ -> raise DB_remote_marshall_error -let marshall_read_field_response s = - XMLRPC.To.string s -let unmarshall_read_field_response xml = - XMLRPC.From.string xml - -(* find_refs_with_filter *) -let marshall_find_refs_with_filter_args (s,e) = - XMLRPC.To.array - [XMLRPC.To.string s; marshall_expr e] -let unmarshall_find_refs_with_filter_args xml = - match (XMLRPC.From.array (fun x->x) xml) with - [s;e] -> (XMLRPC.From.string s, unmarshall_expr e) -let marshall_find_refs_with_filter_response sl = - marshall_stringlist sl -let unmarshall_find_refs_with_filter_response xml = - unmarshall_stringlist xml - -(* process_structured_field *) -let marshall_process_structured_field_args (ss,s1,s2,s3,op) = - XMLRPC.To.array - [marshall_2strings ss; - XMLRPC.To.string s1; - XMLRPC.To.string s2; - XMLRPC.To.string s3; - marshall_structured_op op] -let unmarshall_process_structured_field_args xml = - match (XMLRPC.From.array (fun x->x) xml) with - [ss_xml;s1_xml;s2_xml;s3_xml;op_xml] -> - (unmarshall_2strings ss_xml, - XMLRPC.From.string s1_xml, - XMLRPC.From.string s2_xml, - XMLRPC.From.string s3_xml, - unmarshall_structured_op op_xml) - | _ -> raise DB_remote_marshall_error -let marshall_process_structured_field_response () = - marshall_unit () -let unmarshall_process_structured_field_response xml = - unmarshall_unit xml - -(* read_record *) -let marshall_read_record_args = marshall_2strings -let unmarshall_read_record_args = unmarshall_2strings -let marshall_read_record_response (ssl, ssll) = - XMLRPC.To.array - [XMLRPC.To.array (List.map marshall_2strings ssl); - XMLRPC.To.array - (List.map - (fun (s,sl) -> - XMLRPC.To.array [XMLRPC.To.string s; - XMLRPC.To.array (List.map XMLRPC.To.string sl)]) ssll)] -let unmarshall_read_record_response xml = - match (XMLRPC.From.array (fun x->x) xml) with - [ssl_xml; ssll_xml] -> - (List.map unmarshall_2strings (XMLRPC.From.array (fun x->x) ssl_xml), - List.map - (fun xml -> - match XMLRPC.From.array (fun x->x) xml with - [s_xml; sl_xml] -> (XMLRPC.From.string s_xml, unmarshall_stringlist sl_xml) - | _ -> raise DB_remote_marshall_error) - (XMLRPC.From.array (fun x->x) ssll_xml)) - | _ -> raise DB_remote_marshall_error - -(* read_records_where *) -let marshall_read_records_where_args (s,e) = - XMLRPC.To.array - [XMLRPC.To.string s; marshall_expr e] -let unmarshall_read_records_where_args xml = - match (XMLRPC.From.array (fun x->x) xml) with - [s_xml; expr_xml] -> - (XMLRPC.From.string s_xml, - unmarshall_expr expr_xml) - | _ -> raise DB_remote_marshall_error - -let marshall_read_records_where_response refs_and_recs_list = - XMLRPC.To.array - (List.map - (fun (ref,record)-> - XMLRPC.To.array - [XMLRPC.To.string ref; - marshall_read_record_response record]) refs_and_recs_list) -let unmarshall_read_records_where_response xml = - match (XMLRPC.From.array (fun x->x) xml) with - xml_refs_and_recs_list -> - List.map - (fun xml_ref_and_rec -> - match (XMLRPC.From.array (fun x->x) xml_ref_and_rec) with - [ref_xml; rec_xml] -> (XMLRPC.From.string ref_xml, unmarshall_read_record_response rec_xml) - | _ -> raise DB_remote_marshall_error) - xml_refs_and_recs_list diff --git a/ocaml/database/db_rpc_client_v1.ml b/ocaml/database/db_rpc_client_v1.ml new file mode 100644 index 00000000..27891f04 --- /dev/null +++ b/ocaml/database/db_rpc_client_v1.ml @@ -0,0 +1,169 @@ +(* + * Copyright (C) 2006-2010 Citrix Systems Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +open Db_rpc_common_v1 +open Db_exn + +module Make = functor(RPC: Db_interface.RPC) -> struct + exception Remote_db_server_returned_unknown_exception + + (* Process an exception returned from server, throwing local exception *) + let process_exception_xml xml = + match XMLRPC.From.array (fun x->x) xml with + [exn_name_xml; exn_params_xml] -> + let exn_name = XMLRPC.From.string exn_name_xml in + begin + match exn_name with + | "dbcache_notfound" -> + let (x,y,z) = unmarshall_3strings exn_params_xml in + raise (DBCache_NotFound (x,y,z)) + | "duplicate_key_of" -> + let (w,x,y,z) = unmarshall_4strings exn_params_xml in + raise (Duplicate_key (w,x,y,z)) + | "uniqueness_constraint_violation" -> + let (x,y,z) = unmarshall_3strings exn_params_xml in + raise (Uniqueness_constraint_violation (x,y,z)) + | "read_missing_uuid" -> + let (x,y,z) = unmarshall_3strings exn_params_xml in + raise (Read_missing_uuid (x,y,z)) + | "too_many_values" -> + let (x,y,z) = unmarshall_3strings exn_params_xml in + raise (Too_many_values (x,y,z)) + | _ -> raise DB_remote_marshall_error + end + | _ -> raise Remote_db_server_returned_unknown_exception + + + exception Remote_db_server_returned_bad_message + let do_remote_call marshall_args unmarshall_resp fn_name args = + let xml = marshall_args args in + let xml = XMLRPC.To.array [XMLRPC.To.string fn_name; XMLRPC.To.string "" (* unused *); xml] in + let resp = Xml.parse_string (RPC.rpc (Xml.to_string xml)) in + match XMLRPC.From.array (fun x->x) resp with + [status_xml; resp_xml] -> + let status = XMLRPC.From.string status_xml in + if status="success" then unmarshall_resp resp_xml + else process_exception_xml resp_xml + | _ -> raise Remote_db_server_returned_bad_message + + let get_table_from_ref x = + do_remote_call + marshall_get_table_from_ref_args + unmarshall_get_table_from_ref_response + "get_table_from_ref" + x + + let is_valid_ref x = + do_remote_call + marshall_is_valid_ref_args + unmarshall_is_valid_ref_response + "is_valid_ref" + x + + let read_refs x = + do_remote_call + marshall_read_refs_args + unmarshall_read_refs_response + "read_refs" + x + + let read_field_where x = + do_remote_call + marshall_read_field_where_args + unmarshall_read_field_where_response + "read_field_where" + x + + + let db_get_by_uuid t u = + do_remote_call + marshall_db_get_by_uuid_args + unmarshall_db_get_by_uuid_response + "db_get_by_uuid" + (t,u) + + let db_get_by_name_label t l = + do_remote_call + marshall_db_get_by_name_label_args + unmarshall_db_get_by_name_label_response + "db_get_by_name_label" + (t,l) + + let read_set_ref x = + do_remote_call + marshall_read_set_ref_args + unmarshall_read_set_ref_response + "read_set_ref" + x + + + let create_row x y z = + do_remote_call + marshall_create_row_args + unmarshall_create_row_response + "create_row" + (x,y,z) + + let delete_row x y = + do_remote_call + marshall_delete_row_args + unmarshall_delete_row_response + "delete_row" + (x,y) + + let write_field a b c d = + do_remote_call + marshall_write_field_args + unmarshall_write_field_response + "write_field" + (a,b,c,d) + + let read_field x y z = + do_remote_call + marshall_read_field_args + unmarshall_read_field_response + "read_field" + (x,y,z) + + let find_refs_with_filter s e = + do_remote_call + marshall_find_refs_with_filter_args + unmarshall_find_refs_with_filter_response + "find_refs_with_filter" + (s,e) + + let read_record x y = + do_remote_call + marshall_read_record_args + unmarshall_read_record_response + "read_record" + (x,y) + + let read_records_where x e = + do_remote_call + marshall_read_records_where_args + unmarshall_read_records_where_response + "read_records_where" + (x,e) + + let process_structured_field a b c d e = + do_remote_call + marshall_process_structured_field_args + unmarshall_process_structured_field_response + "process_structured_field" + (a,b,c,d,e) + + let initialise = RPC.initialise + +end diff --git a/ocaml/database/db_rpc_client_v1.mli b/ocaml/database/db_rpc_client_v1.mli new file mode 100644 index 00000000..9a7fc986 --- /dev/null +++ b/ocaml/database/db_rpc_client_v1.mli @@ -0,0 +1,3 @@ + +(** Constructs a database RPC client speaking protocol v1 *) +module Make : functor (RPC : Db_interface.RPC) -> Db_interface.DB_ACCESS diff --git a/ocaml/database/db_rpc_client_v2.ml b/ocaml/database/db_rpc_client_v2.ml new file mode 100644 index 00000000..01a37e7b --- /dev/null +++ b/ocaml/database/db_rpc_client_v2.ml @@ -0,0 +1,114 @@ +(* + * Copyright (C) 2010 Citrix Systems Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** client-side for remote database access protocol v2 *) + +open Db_rpc_common_v2 +open Db_exn + +module Make = functor(RPC: Db_interface.RPC) -> struct + + let initialise = RPC.initialise + let rpc x = Jsonrpc.of_string (RPC.rpc (Jsonrpc.to_string x)) + + let process (x: Request.t) = + let y : Response.t = Response.t_of_rpc (rpc (Request.rpc_of_t x)) in + match y with + | Response.Dbcache_notfound (x, y, z) -> + raise (DBCache_NotFound (x,y,z)) + | Response.Duplicate_key_of (w, x, y, z) -> + raise (Duplicate_key (w,x,y,z)) + | Response.Uniqueness_constraint_violation (x, y, z) -> + raise (Uniqueness_constraint_violation (x,y,z)) + | Response.Read_missing_uuid (x, y, z) -> + raise (Read_missing_uuid (x,y,z)) + | Response.Too_many_values (x, y, z) -> + raise (Too_many_values (x,y,z)) + | y -> y + + let get_table_from_ref x = + match process (Request.Get_table_from_ref x) with + | Response.Get_table_from_ref y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let is_valid_ref x = + match process (Request.Is_valid_ref x) with + | Response.Is_valid_ref y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let read_refs x = + match process (Request.Read_refs x) with + | Response.Read_refs y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let read_field_where x = + match process (Request.Read_field_where x) with + | Response.Read_field_where y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let db_get_by_uuid t u = + match process (Request.Db_get_by_uuid (t, u)) with + | Response.Db_get_by_uuid y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let db_get_by_name_label t l = + match process (Request.Db_get_by_name_label (t, l)) with + | Response.Db_get_by_name_label y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let read_set_ref x = + match process (Request.Read_set_ref x) with + | Response.Read_set_ref y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let create_row x y z = + match process (Request.Create_row (x, y, z)) with + | Response.Create_row y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let delete_row x y = + match process (Request.Delete_row (x, y)) with + | Response.Delete_row y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let write_field a b c d = + match process (Request.Write_field (a, b, c, d)) with + | Response.Write_field y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let read_field x y z = + match process (Request.Read_field (x, y, z)) with + | Response.Read_field y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let find_refs_with_filter s e = + match process (Request.Find_refs_with_filter (s, e)) with + | Response.Find_refs_with_filter y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let read_record x y = + match process (Request.Read_record (x, y)) with + | Response.Read_record (x, y) -> x, y + | _ -> raise Remote_db_server_returned_bad_message + + let read_records_where x e = + match process (Request.Read_records_where (x, e)) with + | Response.Read_records_where y -> y + | _ -> raise Remote_db_server_returned_bad_message + + let process_structured_field a b c d e = + match process (Request.Process_structured_field(a, b, c, d, e)) with + | Response.Process_structured_field y -> y + | _ -> raise Remote_db_server_returned_bad_message +end diff --git a/ocaml/database/db_rpc_client_v2.mli b/ocaml/database/db_rpc_client_v2.mli new file mode 100644 index 00000000..ee56b798 --- /dev/null +++ b/ocaml/database/db_rpc_client_v2.mli @@ -0,0 +1,2 @@ +(** Constructs a database RPC client speaking protocol v2 *) +module Make : functor (RPC : Db_interface.RPC) -> Db_interface.DB_ACCESS diff --git a/ocaml/database/db_rpc_common_v1.ml b/ocaml/database/db_rpc_common_v1.ml new file mode 100644 index 00000000..bb8c396b --- /dev/null +++ b/ocaml/database/db_rpc_common_v1.ml @@ -0,0 +1,311 @@ +(* + * Copyright (C) 2006-2009 Citrix Systems Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** Marshall/unmarshall functions for relevant types to XMLRPC *) + +open Db_cache_types + +exception DB_remote_marshall_error + +let marshall_2strings (x,y) = + XMLRPC.To.array [XMLRPC.To.string x; XMLRPC.To.string y] +let unmarshall_2strings xml = + match (XMLRPC.From.array (fun x->x) xml) with + [x1;x2] -> + (XMLRPC.From.string x1, + XMLRPC.From.string x2) + | _ -> raise DB_remote_marshall_error + +let marshall_4strings (x,y,w,z) = + XMLRPC.To.array [XMLRPC.To.string x; XMLRPC.To.string y; XMLRPC.To.string w; XMLRPC.To.string z] +let unmarshall_4strings xml = + match (XMLRPC.From.array (fun x->x) xml) with + [x1;x2;x3;x4] -> + (XMLRPC.From.string x1, + XMLRPC.From.string x2, + XMLRPC.From.string x3, + XMLRPC.From.string x4) + | _ -> raise DB_remote_marshall_error + +let marshall_3strings (x,y,w) = + XMLRPC.To.array [XMLRPC.To.string x; XMLRPC.To.string y; XMLRPC.To.string w] +let unmarshall_3strings xml = + match (XMLRPC.From.array (fun x->x) xml) with + [x1;x2;x3] -> + (XMLRPC.From.string x1, + XMLRPC.From.string x2, + XMLRPC.From.string x3) + | _ -> raise DB_remote_marshall_error + +let marshall_stringlist sl = + XMLRPC.To.array (List.map XMLRPC.To.string sl) +let unmarshall_stringlist xml = + List.map XMLRPC.From.string (XMLRPC.From.array (fun x->x) xml) + +let marshall_stringstringlist ssl = + XMLRPC.To.array (List.map marshall_2strings ssl) +let unmarshall_stringstringlist xml = + List.map unmarshall_2strings (XMLRPC.From.array (fun x->x) xml) + +let marshall_stringopt x = + match x with + None -> XMLRPC.To.array [] + | (Some x) -> XMLRPC.To.array [XMLRPC.To.string x] +let unmarshall_stringopt xml = + match (XMLRPC.From.array (fun x->x) xml) with + [] -> None + | [xml] -> Some (XMLRPC.From.string xml) + +let marshall_expr x = + Db_filter.xml_of_expr x +let unmarshall_expr xml = + Db_filter.expr_of_xml xml + +let marshall_structured_op x = + let str = + match x with + AddSet -> "addset" + | RemoveSet -> "removeset" + | AddMap -> "addmap" + | RemoveMap -> "removemap" in + XMLRPC.To.string str +let unmarshall_structured_op xml = + match (XMLRPC.From.string xml) with + "addset" -> AddSet + | "removeset" -> RemoveSet + | "addmap" -> AddMap + | "removemap" -> RemoveMap + +let marshall_where_rec r = + XMLRPC.To.array [XMLRPC.To.string r.table; + XMLRPC.To.string r.return; + XMLRPC.To.string r.where_field; + XMLRPC.To.string r.where_value] +let unmarshall_where_rec xml = + match (XMLRPC.From.array (fun x->x) xml) with + [t;r;wf;wv] -> + {table=XMLRPC.From.string t; return=XMLRPC.From.string r; + where_field=XMLRPC.From.string wf; where_value=XMLRPC.From.string wv} + | _ -> raise DB_remote_marshall_error + +let marshall_unit () = + XMLRPC.To.string "" +let unmarshall_unit xml = + match XMLRPC.From.string xml with + "" -> () + | _ -> raise DB_remote_marshall_error + +(* get_table_from_ref *) +let marshall_get_table_from_ref_args s = XMLRPC.To.string s +let unmarshall_get_table_from_ref_args xml = XMLRPC.From.string xml +let marshall_get_table_from_ref_response so = marshall_stringopt so +let unmarshall_get_table_from_ref_response so = unmarshall_stringopt so + +(* is_valid_ref *) +let marshall_is_valid_ref_args s = XMLRPC.To.string s +let unmarshall_is_valid_ref_args xml = XMLRPC.From.string xml +let marshall_is_valid_ref_response b = XMLRPC.To.boolean b +let unmarshall_is_valid_ref_response xml = XMLRPC.From.boolean xml + +(* read_refs *) +let marshall_read_refs_args s = XMLRPC.To.string s +let unmarshall_read_refs_args s = XMLRPC.From.string s +let marshall_read_refs_response sl = marshall_stringlist sl +let unmarshall_read_refs_response xml = unmarshall_stringlist xml + +(* read_field_where *) +let marshall_read_field_where_args w = marshall_where_rec w +let unmarshall_read_field_where_args xml = unmarshall_where_rec xml +let marshall_read_field_where_response sl = + marshall_stringlist sl +let unmarshall_read_field_where_response xml = + unmarshall_stringlist xml + +(* db_get_by_uuid *) +let marshall_db_get_by_uuid_args (s1,s2) = + marshall_2strings (s1,s2) +let unmarshall_db_get_by_uuid_args xml = + unmarshall_2strings xml +let marshall_db_get_by_uuid_response s = + XMLRPC.To.string s +let unmarshall_db_get_by_uuid_response xml = + XMLRPC.From.string xml + +(* db_get_by_name_label *) +let marshall_db_get_by_name_label_args (s1,s2) = + marshall_2strings (s1,s2) +let unmarshall_db_get_by_name_label_args xml = + unmarshall_2strings xml +let marshall_db_get_by_name_label_response sl = + marshall_stringlist sl +let unmarshall_db_get_by_name_label_response xml = + unmarshall_stringlist xml + +(* read_set_ref *) +let marshall_read_set_ref_args w = marshall_where_rec w +let unmarshall_read_set_ref_args xml = unmarshall_where_rec xml +let marshall_read_set_ref_response sl = + marshall_stringlist sl +let unmarshall_read_set_ref_response xml = + unmarshall_stringlist xml + + +(* create_row *) +let marshall_create_row_args (s1,ssl,s2) = + XMLRPC.To.array + [XMLRPC.To.string s1; + XMLRPC.To.array (List.map marshall_2strings ssl); + XMLRPC.To.string s2] +let unmarshall_create_row_args xml = + match (XMLRPC.From.array (fun x->x) xml) with + [s1_xml; ssl_xml; s2_xml] -> + (XMLRPC.From.string s1_xml, + List.map unmarshall_2strings (XMLRPC.From.array (fun x->x) ssl_xml), + XMLRPC.From.string s2_xml) + | _ -> raise DB_remote_marshall_error +let marshall_create_row_response () = + marshall_unit () +let unmarshall_create_row_response xml = + unmarshall_unit xml + +(* delete_row *) +let marshall_delete_row_args (s1,s2) = + XMLRPC.To.array + [XMLRPC.To.string s1; + XMLRPC.To.string s2] +let unmarshall_delete_row_args xml = + match (XMLRPC.From.array (fun x->x) xml) with + [s1_xml; s2_xml] -> + (XMLRPC.From.string s1_xml, XMLRPC.From.string s2_xml) + | _ -> raise DB_remote_marshall_error +let marshall_delete_row_response () = + marshall_unit () +let unmarshall_delete_row_response xml = + unmarshall_unit xml + +(* write_field *) +let marshall_write_field_args (s1,s2,s3,s4) = + XMLRPC.To.array + (List.map XMLRPC.To.string [s1;s2;s3;s4]) +let unmarshall_write_field_args xml = + match (XMLRPC.From.array (fun x->x) xml) with + ([s1x;s2x;s3x;s4x] as l) -> + (XMLRPC.From.string s1x, XMLRPC.From.string s2x, + XMLRPC.From.string s3x, XMLRPC.From.string s4x) + | _ -> raise DB_remote_marshall_error +let marshall_write_field_response () = + marshall_unit () +let unmarshall_write_field_response xml = + unmarshall_unit xml + +(* read_field *) +let marshall_read_field_args (s1,s2,s3) = + XMLRPC.To.array + (List.map XMLRPC.To.string [s1;s2;s3]) +let unmarshall_read_field_args xml = + match (XMLRPC.From.array (fun x->x) xml) with + ([s1x;s2x;s3x] as l) -> + (XMLRPC.From.string s1x, XMLRPC.From.string s2x, + XMLRPC.From.string s3x) + | _ -> raise DB_remote_marshall_error +let marshall_read_field_response s = + XMLRPC.To.string s +let unmarshall_read_field_response xml = + XMLRPC.From.string xml + +(* find_refs_with_filter *) +let marshall_find_refs_with_filter_args (s,e) = + XMLRPC.To.array + [XMLRPC.To.string s; marshall_expr e] +let unmarshall_find_refs_with_filter_args xml = + match (XMLRPC.From.array (fun x->x) xml) with + [s;e] -> (XMLRPC.From.string s, unmarshall_expr e) +let marshall_find_refs_with_filter_response sl = + marshall_stringlist sl +let unmarshall_find_refs_with_filter_response xml = + unmarshall_stringlist xml + +(* process_structured_field *) +let marshall_process_structured_field_args (ss,s1,s2,s3,op) = + XMLRPC.To.array + [marshall_2strings ss; + XMLRPC.To.string s1; + XMLRPC.To.string s2; + XMLRPC.To.string s3; + marshall_structured_op op] +let unmarshall_process_structured_field_args xml = + match (XMLRPC.From.array (fun x->x) xml) with + [ss_xml;s1_xml;s2_xml;s3_xml;op_xml] -> + (unmarshall_2strings ss_xml, + XMLRPC.From.string s1_xml, + XMLRPC.From.string s2_xml, + XMLRPC.From.string s3_xml, + unmarshall_structured_op op_xml) + | _ -> raise DB_remote_marshall_error +let marshall_process_structured_field_response () = + marshall_unit () +let unmarshall_process_structured_field_response xml = + unmarshall_unit xml + +(* read_record *) +let marshall_read_record_args = marshall_2strings +let unmarshall_read_record_args = unmarshall_2strings +let marshall_read_record_response (ssl, ssll) = + XMLRPC.To.array + [XMLRPC.To.array (List.map marshall_2strings ssl); + XMLRPC.To.array + (List.map + (fun (s,sl) -> + XMLRPC.To.array [XMLRPC.To.string s; + XMLRPC.To.array (List.map XMLRPC.To.string sl)]) ssll)] +let unmarshall_read_record_response xml = + match (XMLRPC.From.array (fun x->x) xml) with + [ssl_xml; ssll_xml] -> + (List.map unmarshall_2strings (XMLRPC.From.array (fun x->x) ssl_xml), + List.map + (fun xml -> + match XMLRPC.From.array (fun x->x) xml with + [s_xml; sl_xml] -> (XMLRPC.From.string s_xml, unmarshall_stringlist sl_xml) + | _ -> raise DB_remote_marshall_error) + (XMLRPC.From.array (fun x->x) ssll_xml)) + | _ -> raise DB_remote_marshall_error + +(* read_records_where *) +let marshall_read_records_where_args (s,e) = + XMLRPC.To.array + [XMLRPC.To.string s; marshall_expr e] +let unmarshall_read_records_where_args xml = + match (XMLRPC.From.array (fun x->x) xml) with + [s_xml; expr_xml] -> + (XMLRPC.From.string s_xml, + unmarshall_expr expr_xml) + | _ -> raise DB_remote_marshall_error + +let marshall_read_records_where_response refs_and_recs_list = + XMLRPC.To.array + (List.map + (fun (ref,record)-> + XMLRPC.To.array + [XMLRPC.To.string ref; + marshall_read_record_response record]) refs_and_recs_list) +let unmarshall_read_records_where_response xml = + match (XMLRPC.From.array (fun x->x) xml) with + xml_refs_and_recs_list -> + List.map + (fun xml_ref_and_rec -> + match (XMLRPC.From.array (fun x->x) xml_ref_and_rec) with + [ref_xml; rec_xml] -> (XMLRPC.From.string ref_xml, unmarshall_read_record_response rec_xml) + | _ -> raise DB_remote_marshall_error) + xml_refs_and_recs_list + diff --git a/ocaml/database/db_rpc_common_v2.ml b/ocaml/database/db_rpc_common_v2.ml new file mode 100644 index 00000000..35c8c811 --- /dev/null +++ b/ocaml/database/db_rpc_common_v2.ml @@ -0,0 +1,66 @@ +(* + * Copyright (C) 2010 Citrix Systems Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** Marshall/unmarshall functions and types for db remote access protocol v2 *) + +module Request = struct + + (** All possible request messages *) + type t = + | Get_table_from_ref of string + | Is_valid_ref of string + | Read_refs of string + | Find_refs_with_filter of string * Db_filter_types.expr + | Read_field_where of Db_cache_types.where_record + | Db_get_by_uuid of string * string + | Db_get_by_name_label of string * string + | Read_set_ref of Db_cache_types.where_record + | Create_row of string * (string * string) list * string + | Delete_row of string * string + | Write_field of string * string * string * string + | Read_field of string * string * string + | Read_record of string * string + | Read_records_where of string * Db_filter_types.expr + | Process_structured_field of (string * string) * string * string * string * Db_cache_types.structured_op_t + with rpc +end + +module Response = struct + + (** All possible response messages *) + type t = + | Get_table_from_ref of string option + | Is_valid_ref of bool + | Read_refs of string list + | Find_refs_with_filter of string list + | Read_field_where of string list + | Db_get_by_uuid of string + | Db_get_by_name_label of string list + | Read_set_ref of string list + | Create_row of unit + | Delete_row of unit + | Write_field of unit + | Read_field of string + | Read_record of (string * string) list * (string * string list) list + | Read_records_where of (string * ((string * string) list * (string * string list) list )) list + | Process_structured_field of unit + + | Dbcache_notfound of string * string * string + | Duplicate_key_of of string * string * string * string + | Uniqueness_constraint_violation of string * string * string + | Read_missing_uuid of string * string * string + | Too_many_values of string * string * string + with rpc +end + diff --git a/ocaml/database/eventgen.ml b/ocaml/database/eventgen.ml index 1c0ba503..76764c60 100644 --- a/ocaml/database/eventgen.ml +++ b/ocaml/database/eventgen.ml @@ -15,8 +15,14 @@ module D = Debug.Debugger(struct let name = "sql" end) open D type getrecord = unit -> XMLRPC.xmlrpc + let get_record_table : (string, __context:Context.t -> self:string -> getrecord ) Hashtbl.t = Hashtbl.create 20 -let find_get_record x = try Hashtbl.find get_record_table x with Not_found as e -> debug "Failed to find get_record function for class: %s" x; raise e + +let find_get_record x ~__context ~self () : XMLRPC.xmlrpc option = + if Hashtbl.mem get_record_table x + then Some (Hashtbl.find get_record_table x ~__context ~self ()) + else None + (* If a record is created or destroyed, then for any (Ref _) field which is one end of a relationship, need to send modified events for all those other objects. *) diff --git a/ocaml/database/master_connection.ml b/ocaml/database/master_connection.ml index 5e5afe72..1dc1823f 100644 --- a/ocaml/database/master_connection.ml +++ b/ocaml/database/master_connection.ml @@ -107,10 +107,10 @@ let connection_timeout = ref 10. (* -ve means retry forever *) are exceeded *) let restart_on_connection_timeout = ref true -let do_db_xml_rpc_persistent_with_reopen ~host ~path (req: Xml.xml) : Xml.xml = +let do_db_xml_rpc_persistent_with_reopen ~host ~path (req: string) : string = let time_call_started = Unix.gettimeofday() in let write_ok = ref false in - let result = ref (Xml.PCData "") in + let result = ref "" in let surpress_no_timeout_logs = ref false in let backoff_delay = ref 2.0 in (* initial delay = 2s *) let update_backoff_delay () = @@ -122,7 +122,7 @@ let do_db_xml_rpc_persistent_with_reopen ~host ~path (req: Xml.xml) : Xml.xml = do begin try - let req_string = Xml.to_string_fmt req in + let req_string = req in let headers = Xmlrpcclient.xmlrpc_headers ~version:"1.1" host path (String.length req_string + 0) in (* The pool_secret is added here and checked by the Xapi_http.add_handler RBAC code. *) let headers = headers @ ["Cookie: pool_secret="^(!Xapi_globs.pool_secret)] in @@ -135,7 +135,11 @@ let do_db_xml_rpc_persistent_with_reopen ~host ~path (req: Xml.xml) : Xml.xml = in_channel function: the input channel will buffer an arbitrary amount of stuff and we'll be out of sync with the next request. *) if content_length < 0 then raise Xmlrpcclient.Content_length_required; - let res = with_timestamp (fun ()->Xmlrpcclient.read_xml_rpc_response content_length task_id fd) in + let res = with_timestamp (fun ()-> + let buffer = String.make content_length '\000' in + Unixext.really_read fd buffer 0 content_length; + buffer + ) in write_ok := true; result := res (* yippeee! return and exit from while loop *) with @@ -193,9 +197,9 @@ let do_db_xml_rpc_persistent_with_reopen ~host ~path (req: Xml.xml) : Xml.xml = done; !result -let execute_remote_fn xml path = +let execute_remote_fn string path = let host = Pool_role.get_master_address () in Db_lock.with_lock (fun () -> (* Ensure that this function is always called under mutual exclusion (provided by the recursive db lock) *) - do_db_xml_rpc_persistent_with_reopen ~host ~path xml) + do_db_xml_rpc_persistent_with_reopen ~host ~path string) diff --git a/ocaml/database/unit_test_marshall.ml b/ocaml/database/unit_test_marshall.ml index b1130063..c04733be 100644 --- a/ocaml/database/unit_test_marshall.ml +++ b/ocaml/database/unit_test_marshall.ml @@ -11,7 +11,7 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. *) -open Db_remote_marshall +open Db_rpc_common_v1 open Db_cache_types open Db_filter open Db_filter_types diff --git a/ocaml/db_process/xapi-db-process.ml b/ocaml/db_process/xapi-db-process.ml index 7651e689..ba28679d 100644 --- a/ocaml/db_process/xapi-db-process.ml +++ b/ocaml/db_process/xapi-db-process.ml @@ -51,11 +51,11 @@ let initialise_db_connections() = let read_in_database() = (* Make sure we're running in master mode: we cannot be a slave and then access the dbcache *) - Db_cache.database_mode := Some Db_cache.Master; + Db_cache.set_master true; initialise_db_connections(); Db_dirty.make_blank_dirty_records(); (* Initialiase in-memory database cache *) - Db_cache.DBCache.initialise_db_cache_nosync() + Db_cache_impl.initialise_db_cache_nosync() let write_out_databases() = List.iter @@ -94,7 +94,7 @@ let do_write_database() = begin read_in_database(); if !xmltostdout then - Db_cache.DBCache.dump_db_cache (Unix.descr_of_out_channel stdout) + Db_cache_impl.dump_db_cache (Unix.descr_of_out_channel stdout) else write_out_database !filename end diff --git a/ocaml/idl/constants.ml b/ocaml/idl/constants.ml index f00017d0..d5fdf923 100644 --- a/ocaml/idl/constants.ml +++ b/ocaml/idl/constants.ml @@ -33,6 +33,7 @@ let vm_connect_uri = "http" (* ocaml/xapi/xapi_udhcpd. let vncsnapshot_uri = "/vncsnapshot" (* ocaml/xapi/xapi_vncsnapshot.ml *) let system_status_uri = "/system-status" (* ocaml/xapi/system_status.ml *) let remote_db_access_uri = "/remote_db_access" (* ocaml/xapi/xapi.ml *) +let remote_db_access_uri_v2 = "/remote_db_access_v2" (* ocaml/xapi/xapi.ml *) let remote_stats_uri = "/remote_stats" (* ocaml/xapi/xapi.ml *) let json_uri = "/json" (* ocaml/xapi/xapi.ml *) let cli_uri = "/cli" (* ocaml/xapi/xapi_cli.ml *) diff --git a/ocaml/idl/datamodel.ml b/ocaml/idl/datamodel.ml index 401ad04e..86df34de 100644 --- a/ocaml/idl/datamodel.ml +++ b/ocaml/idl/datamodel.ml @@ -6768,6 +6768,7 @@ let rbac_http_permission_prefix = "http/" *) let http_actions = [ ("post_remote_db_access", (Post, Constants.remote_db_access_uri, false, [], _R_POOL_ADMIN, [])); + ("post_remote_db_access_v2", (Post, Constants.remote_db_access_uri_v2, false, [], _R_POOL_ADMIN, [])); ("connect_migrate", (Connect, Constants.migrate_uri, false, [], _R_VM_POWER_ADMIN, [])); ("put_import", (Put, Constants.import_uri, true, [Bool_query_arg "restore"; Bool_query_arg "force"; String_query_arg "sr_id"], _R_VM_ADMIN, [])); diff --git a/ocaml/idl/ocaml_backend/OMakefile b/ocaml/idl/ocaml_backend/OMakefile index 29a0771f..862363bc 100644 --- a/ocaml/idl/ocaml_backend/OMakefile +++ b/ocaml/idl/ocaml_backend/OMakefile @@ -63,13 +63,19 @@ SERVER_OBJS = ../../database/escaping locking_helpers \ $(AUTOGEN_HELPER_DIR)/xml_spaces \ $(AUTOGEN_HELPER_DIR)/db_action_helper \ $(AUTOGEN_HELPER_DIR)/db_xml \ - $(AUTOGEN_HELPER_DIR)/db_remote_marshall \ - $(AUTOGEN_HELPER_DIR)/db_remote_cache_access \ + $(AUTOGEN_HELPER_DIR)/db_remote_cache_access_v1 \ + $(AUTOGEN_HELPER_DIR)/db_rpc_common_v1 \ + $(AUTOGEN_HELPER_DIR)/db_rpc_client_v1 \ + $(AUTOGEN_HELPER_DIR)/db_remote_cache_access_v2 \ + $(AUTOGEN_HELPER_DIR)/db_rpc_common_v2 \ + $(AUTOGEN_HELPER_DIR)/db_rpc_client_v2 \ $(AUTOGEN_HELPER_DIR)/db_cache_types \ $(AUTOGEN_HELPER_DIR)/db_filter \ + $(AUTOGEN_HELPER_DIR)/db_filter_types \ $(AUTOGEN_HELPER_DIR)/db_filter_parse \ $(AUTOGEN_HELPER_DIR)/db_filter_lex \ $(AUTOGEN_HELPER_DIR)/db_cache \ + $(AUTOGEN_HELPER_DIR)/db_cache_impl \ $(AUTOGEN_HELPER_DIR)/db_names \ $(AUTOGEN_HELPER_DIR)/db_upgrade \ $(AUTOGEN_HELPER_DIR)/db_exn \ diff --git a/ocaml/idl/ocaml_backend/exnHelper.ml b/ocaml/idl/ocaml_backend/exnHelper.ml index aa2f0635..f76b0b3e 100644 --- a/ocaml/idl/ocaml_backend/exnHelper.ml +++ b/ocaml/idl/ocaml_backend/exnHelper.ml @@ -33,7 +33,7 @@ let error_of_exn e = | Db_exn.DBCache_NotFound ("missing reference", tblname, reference) -> (* whenever a reference has been destroyed *) handle_invalid, [tblname; reference ] - | Db_cache.Too_many_values(tbl, objref, uuid) -> + | Db_exn.Too_many_values(tbl, objref, uuid) -> (* Very bad: database has duplicate references or UUIDs *) internal_error, [ sprintf "duplicate objects in database: tbl='%s'; object_ref='%s'; uuid='%s'" tbl objref uuid ] | Db_action_helper.Db_set_or_map_parse_fail s -> @@ -46,7 +46,7 @@ let error_of_exn e = end | Db_exn.Duplicate_key (tbl,fld,uuid,key) -> map_duplicate_key, [ tbl; fld; uuid; key ] - | Db_cache.Read_missing_uuid (tbl,ref,uuid) -> + | Db_exn.Read_missing_uuid (tbl,ref,uuid) -> uuid_invalid, [ tbl; uuid ] | Db_actions.DM_to_String.StringEnumTypeError s | Db_actions.DM_to_String.DateTimeError s diff --git a/ocaml/idl/ocaml_backend/gen_db_actions.ml b/ocaml/idl/ocaml_backend/gen_db_actions.ml index 68f623fb..bdd42467 100644 --- a/ocaml/idl/ocaml_backend/gen_db_actions.ml +++ b/ocaml/idl/ocaml_backend/gen_db_actions.ml @@ -167,7 +167,7 @@ let look_up_related_table_and_field obj other full_name = let read_set_ref obj other full_name = (* Set(Ref t) is actually stored in the table t *) let obj', fld' = look_up_related_table_and_field obj other full_name in - Printf.sprintf "ignore (read_field __context \"%s\" \"%s\" %s); List.map %s.%s (read_set_ref {table=\"%s\"; return=Db_action_helper.reference; where_field=\"%s\"; where_value=%s})" + Printf.sprintf "ignore (DB.read_field (*__context*) \"%s\" \"%s\" %s); List.map %s.%s (DB.read_set_ref {table=\"%s\"; return=Db_action_helper.reference; where_field=\"%s\"; where_value=%s})" (Escaping.escape_obj obj.DT.name) "uuid" Client._self _string_to_dm (OU.alias_of_ty (DT.Ref other)) (Escaping.escape_obj obj') fld' Client._self @@ -175,7 +175,7 @@ let read_set_ref obj other full_name = let get_record (obj: obj) aux_fn_name = let body = [ - Printf.sprintf "let (__regular_fields, __set_refs) = read_record \"%s\" %s in" + Printf.sprintf "let (__regular_fields, __set_refs) = DB.read_record \"%s\" %s in" (Escaping.escape_obj obj.DT.name) Client._self; aux_fn_name^" ~__regular_fields ~__set_refs"; ] in @@ -215,6 +215,8 @@ let make_shallow_copy api (obj: obj) (src: string) (dst: string) (all_fields: fi (String.concat "; " (List.map (fun f -> "\"" ^ f ^ "\"") sql_fields)) *) +let open_db_module = "let module DB = (val (Db_cache.get ()) : Db_interface.DB_ACCESS) in\n" + let db_action api : O.Module.t = let api = make_db_api api in @@ -227,7 +229,7 @@ let db_action api : O.Module.t = ~name: "get_refs_where" ~params: [ Gen_common.context_arg; expr_arg ] ~ty: ( OU.alias_of_ty (Ref obj.DT.name) ^ " list") - ~body: [ "let refs = (find_refs_with_filter \"" ^ tbl ^ "\" " ^ expr ^ ") in "; + ~body: [ open_db_module; "let refs = (DB.find_refs_with_filter \"" ^ tbl ^ "\" " ^ expr ^ ") in "; "List.map Ref.of_string refs " ] () in let get_record_aux_fn_body ?(m="API.") (obj: obj) (all_fields: field list) = @@ -273,7 +275,8 @@ let db_action api : O.Module.t = ~name: name ~params: [ Gen_common.context_arg; expr_arg ] ~ty: ("'a") - ~body: [ Printf.sprintf "let records = read_records_where \"%s\" %s in" + ~body: [ open_db_module; + Printf.sprintf "let records = DB.read_records_where \"%s\" %s in" (Escaping.escape_obj obj.DT.name) expr; Printf.sprintf "List.map (fun (ref,(__regular_fields,__set_refs)) -> Ref.of_string ref, %s __regular_fields __set_refs) records" conversion_fn] () in @@ -301,7 +304,7 @@ let db_action api : O.Module.t = let body = match tag with | FromField(Setter, fld) -> - Printf.sprintf "write_field __context \"%s\" %s \"%s\" value" + Printf.sprintf "DB.write_field (*__context*) \"%s\" %s \"%s\" value" (Escaping.escape_obj obj.DT.name) Client._self (Escaping.escape_id fld.DT.full_name) @@ -318,31 +321,31 @@ let db_action api : O.Module.t = (Escaping.escape_obj obj') fld' Client._self *) | FromField(Getter, { DT.ty = ty; full_name = full_name }) -> - Printf.sprintf "%s.%s (read_field __context \"%s\" \"%s\" %s)" + Printf.sprintf "%s.%s (DB.read_field (*__context*) \"%s\" \"%s\" %s)" _string_to_dm (OU.alias_of_ty ty) (Escaping.escape_obj obj.DT.name) (Escaping.escape_id full_name) Client._self | FromField(Add, { DT.ty = DT.Map(_, _); full_name = full_name }) -> - Printf.sprintf "process_structured_field __context (%s,%s) \"%s\" \"%s\" %s AddMap" + Printf.sprintf "DB.process_structured_field (*__context*) (%s,%s) \"%s\" \"%s\" %s AddMap" Client._key Client._value (Escaping.escape_obj obj.DT.name) (Escaping.escape_id full_name) Client._self | FromField(Add, { DT.ty = DT.Set(_); full_name = full_name }) -> - Printf.sprintf "process_structured_field __context (%s,\"\") \"%s\" \"%s\" %s AddSet" + Printf.sprintf "DB.process_structured_field (*__context*) (%s,\"\") \"%s\" \"%s\" %s AddSet" Client._value (Escaping.escape_obj obj.DT.name) (Escaping.escape_id full_name) Client._self | FromField(Remove, { DT.ty = DT.Map(_, _); full_name = full_name }) -> - Printf.sprintf "process_structured_field __context (%s,\"\") \"%s\" \"%s\" %s RemoveMap" + Printf.sprintf "DB.process_structured_field (*__context*) (%s,\"\") \"%s\" \"%s\" %s RemoveMap" Client._key (Escaping.escape_obj obj.DT.name) (Escaping.escape_id full_name) Client._self | FromField(Remove, { DT.ty = DT.Set(_); full_name = full_name }) -> - Printf.sprintf "process_structured_field __context (%s,\"\") \"%s\" \"%s\" %s RemoveSet" + Printf.sprintf "DB.process_structured_field (*__context*) (%s,\"\") \"%s\" \"%s\" %s RemoveSet" Client._value (Escaping.escape_obj obj.DT.name) (Escaping.escape_id full_name) @@ -351,7 +354,7 @@ let db_action api : O.Module.t = | FromField((Add | Remove), _) -> failwith "Cannot generate db add/remove for non sets and maps" | FromObject(Delete) -> - (Printf.sprintf "delete_row __context \"%s\" %s" + (Printf.sprintf "DB.delete_row (*__context*) \"%s\" %s" (Escaping.escape_obj obj.DT.name) Client._self) | FromObject(Make) -> let fields = List.filter field_in_this_table (DU.fields_of_obj obj) in @@ -361,13 +364,13 @@ let db_action api : O.Module.t = OU.escape (OU.ocaml_of_id fld.full_name)) fields in let kvs' = List.map (fun (sql, o) -> Printf.sprintf "(\"%s\", %s)" sql o) kvs in - Printf.sprintf "create_row __context \"%s\" [ %s ] ref" + Printf.sprintf "DB.create_row (*__context*) \"%s\" [ %s ] ref" (Escaping.escape_obj obj.DT.name) (String.concat "; " kvs') | FromObject(GetByUuid) -> begin match x.msg_params, x.msg_result with | [ {param_type=ty; param_name=name} ], Some (result_ty, _) -> - let query = Printf.sprintf "db_get_by_uuid \"%s\" %s" + let query = Printf.sprintf "DB.db_get_by_uuid \"%s\" %s" (Escaping.escape_obj obj.DT.name) (OU.escape name) in _string_to_dm ^ "." ^ (OU.alias_of_ty result_ty) ^ " (" ^ query ^ ")" @@ -376,7 +379,7 @@ let db_action api : O.Module.t = | FromObject(GetByLabel) -> begin match x.msg_params, x.msg_result with | [ {param_type=ty; param_name=name} ], Some (Set result_ty, _) -> - let query = Printf.sprintf "db_get_by_name_label \"%s\" %s" + let query = Printf.sprintf "DB.db_get_by_name_label \"%s\" %s" (Escaping.escape_obj obj.DT.name) (OU.escape name) in if DU.obj_has_get_by_name_label obj @@ -392,7 +395,7 @@ let db_action api : O.Module.t = Eventually we'll need to provide user filtering for the public version *) begin match x.msg_result with | Some (Set result_ty, _) -> - let query = Printf.sprintf "read_refs \"%s\"" + let query = Printf.sprintf "DB.read_refs \"%s\"" (Escaping.escape_obj obj.DT.name) in "List.map " ^ _string_to_dm ^ "." ^ (OU.alias_of_ty result_ty) ^ "(" ^ query ^ ")" | _ -> failwith "GetAll call needs a result type" @@ -414,12 +417,11 @@ let db_action api : O.Module.t = end *) in - O.Let.make ~name: x.msg_name ~params: (Gen_common.context_arg :: args) ~ty: "'a" - ~body: (List.map to_string args @ [ body ]) () in + ~body: (List.map to_string args @ [ open_db_module; body ]) () in let obj (obj: obj) = let others = @@ -450,7 +452,6 @@ let db_action api : O.Module.t = O.Module.make ~name:_db_action ~preamble:[ - "open Db_cache.DBCache"; "open Db_cache_types"; "module D=Debug.Debugger(struct let name=\"db\" end)"; "open D"; diff --git a/ocaml/xapi/console.ml b/ocaml/xapi/console.ml index fa1ffd63..a7f006cf 100644 --- a/ocaml/xapi/console.ml +++ b/ocaml/xapi/console.ml @@ -78,7 +78,9 @@ let console_of_request __context req = (* The _ref may be either a VM ref in which case we look for a default VNC console or it may be a console ref in which case we go for that. *) - let is_vm, is_console = match Db_cache.DBCache.get_table_from_ref _ref with + let is_vm, is_console = + let module DB = (val (Db_cache.get ()) : Db_interface.DB_ACCESS) in + match DB.get_table_from_ref _ref with | Some c when c = Db_names.vm -> true, false | Some c when c = Db_names.console -> false, true | _ -> diff --git a/ocaml/xapi/db.ml b/ocaml/xapi/db.ml index c40b5b1a..4956dfb9 100644 --- a/ocaml/xapi/db.ml +++ b/ocaml/xapi/db.ml @@ -17,4 +17,5 @@ include Db_actions.DB_Action let is_valid_ref r = - Db_cache.DBCache.is_valid_ref (Ref.string_of r) + let module DB = (val (Db_cache.get ()) : Db_interface.DB_ACCESS) in + DB.is_valid_ref (Ref.string_of r) diff --git a/ocaml/xapi/db_gc.ml b/ocaml/xapi/db_gc.ml index 56ebfd5e..a192313c 100644 --- a/ocaml/xapi/db_gc.ml +++ b/ocaml/xapi/db_gc.ml @@ -39,6 +39,7 @@ let _time = "time" let valid_ref x = Db.is_valid_ref x let gc_connector ~__context get_all get_record valid_ref1 valid_ref2 delete_record = + let module DB = (val (Db_cache.get ()) : Db_interface.DB_ACCESS) in let all_refs = get_all ~__context in let do_gc ref = let print_valid b = if b then "valid" else "INVALID" in @@ -49,7 +50,7 @@ let gc_connector ~__context get_all get_record valid_ref1 valid_ref2 delete_reco if not (ref_1_valid && ref_2_valid) then begin let table,reference,valid1,valid2 = - (match Db_cache.DBCache.get_table_from_ref (Ref.string_of ref) with + (match DB.get_table_from_ref (Ref.string_of ref) with None -> "UNKNOWN CLASS" | Some c -> c), (Ref.string_of ref), diff --git a/ocaml/xapi/helpers.ml b/ocaml/xapi/helpers.ml index 8363dbbb..b12a0780 100644 --- a/ocaml/xapi/helpers.ml +++ b/ocaml/xapi/helpers.ml @@ -784,7 +784,8 @@ let vm_to_string vm = if not (Db.is_valid_ref vm) then raise (Api_errors.Server_error(Api_errors.invalid_value ,[str])); - let fields = fst (Db_cache.DBCache.read_record Db_names.vm str) in + let module DB = (val (Db_cache.get ()) : Db_interface.DB_ACCESS) in + let fields = fst (DB.read_record Db_names.vm str) in let sexpr = SExpr.Node (List.map (fun (key,value) -> SExpr.Node [SExpr.String key; SExpr.String value]) fields) in SExpr.string_of sexpr diff --git a/ocaml/xapi/import_raw_vdi.ml b/ocaml/xapi/import_raw_vdi.ml index dfe64523..f0328c63 100644 --- a/ocaml/xapi/import_raw_vdi.ml +++ b/ocaml/xapi/import_raw_vdi.ml @@ -34,7 +34,7 @@ let vdi_of_req ~__context (req: request) = if List.mem_assoc "vdi" all then List.assoc "vdi" all else raise (Failure "Missing vdi query parameter") in - if Db_cache.DBCache.is_valid_ref vdi + if Db.is_valid_ref (Ref.of_string vdi) then Ref.of_string vdi else Db.VDI.get_by_uuid ~__context ~uuid:vdi diff --git a/ocaml/xapi/message_forwarding.ml b/ocaml/xapi/message_forwarding.ml index d0a6fcc6..80e73939 100644 --- a/ocaml/xapi/message_forwarding.ml +++ b/ocaml/xapi/message_forwarding.ml @@ -999,8 +999,8 @@ module Forward = functor(Local: Custom_actions.CUSTOM_ACTIONS) -> struct try Db.SR.get_by_uuid ~__context ~uuid:sr with - Db_cache.Read_missing_uuid (_,_,_) - | Db_cache.Too_many_values (_,_,_) -> + Db_exn.Read_missing_uuid (_,_,_) + | Db_exn.Too_many_values (_,_,_) -> begin match (Db.SR.get_by_name_label ~__context ~label:sr) with [] -> raise Not_forwarding (* couldn't find it. Do it locally and will report correct error *) diff --git a/ocaml/xapi/monitor_dbcalls.ml b/ocaml/xapi/monitor_dbcalls.ml index 0559745a..1a578c72 100644 --- a/ocaml/xapi/monitor_dbcalls.ml +++ b/ocaml/xapi/monitor_dbcalls.ml @@ -184,7 +184,7 @@ let full_update_fn () = if Pool_role.is_master () then Monitor_master.update_all ~__context host_stats (* read stats locally and update locally *) else - ignore (Master_connection.execute_remote_fn (Monitor_transfer.marshall host_stats) Constants.remote_stats_uri); + ignore (Master_connection.execute_remote_fn (Xml.to_string_fmt (Monitor_transfer.marshall host_stats)) Constants.remote_stats_uri); ) (* End of legacy function *) diff --git a/ocaml/xapi/monitor_self.ml b/ocaml/xapi/monitor_self.ml index 3687fb5d..29cde4ac 100644 --- a/ocaml/xapi/monitor_self.ml +++ b/ocaml/xapi/monitor_self.ml @@ -100,7 +100,7 @@ let string_of_process_memory_info (x: process_memory_info) = Printf.sprintf "size: %d KiB; rss: %d KiB; data: %d KiB; stack: %d KiB" x.size x.rss x.data x.stack -let summarise_db_size () = match Db_cache.DBCache.stats () with +let summarise_db_size () = match Db_cache_impl.stats () with | [] -> "(running as slave; no in-memory db cache)" | xs -> Printf.sprintf "(%s)" (String.concat "; " (List.map (fun (tbl, x) -> Printf.sprintf "%s[%d records]" tbl x) xs)) diff --git a/ocaml/xapi/pool_db_backup.ml b/ocaml/xapi/pool_db_backup.ml index 34b66512..a3df4a6a 100644 --- a/ocaml/xapi/pool_db_backup.ml +++ b/ocaml/xapi/pool_db_backup.ml @@ -40,7 +40,7 @@ let write_database (s: Unix.file_descr) ~__context = let len = String.length minimally_compliant_miami_database in ignore (Unix.write s minimally_compliant_miami_database 0 len) else - Db_cache.DBCache.dump_db_cache s + Db_cache_impl.dump_db_cache s (** Make sure the backup database version is compatible *) let version_check manifest = diff --git a/ocaml/xapi/redo_log_usage.ml b/ocaml/xapi/redo_log_usage.ml index 5ed8d4bb..5f7013a0 100644 --- a/ocaml/xapi/redo_log_usage.ml +++ b/ocaml/xapi/redo_log_usage.ml @@ -13,7 +13,7 @@ *) open Pervasiveext (* for ignore_exn *) -module R = Debug.Debugger(struct let name = "redo_log" end) +module R = Debug.Debugger(struct let name = "xapi" end) exception NoGeneration exception DeltaTooOld @@ -48,7 +48,7 @@ let read_from_redo_log staging_path = } in (* ideally, the reading from the file would also respect the latest_response_time *) ignore (Backend_xml.populate_and_read_manifest fake_conn_db_file); - R.debug "Finished reading database from %s into cache" temp_file; + R.debug "Finished reading database from %s into cache (generation = %Ld)" temp_file gen_count; (* Set the generation count *) latest_generation := Some gen_count @@ -61,7 +61,7 @@ let read_from_redo_log staging_path = let read_delta gen_count delta = (* Apply the delta *) - Db_cache.DBCache.apply_delta_to_cache delta; + Db_cache.apply_delta_to_cache delta; (* Update the generation count *) match !latest_generation with | None -> raise NoGeneration (* we should have already read in a database with a generation count *) @@ -90,6 +90,7 @@ let read_from_redo_log staging_path = R.debug "No database was read, so no staging is necessary"; raise NoGeneration | Some generation -> + R.debug "Database from redo log has generation %Ld" generation; (* Write the in-memory cache to the file *) (* Make sure the generation count is right -- is this necessary? *) Db_cache_types.set_generation Db_backend.cache generation; diff --git a/ocaml/xapi/workload_balancing.ml b/ocaml/xapi/workload_balancing.ml index 2bd0fcbe..17c259f2 100755 --- a/ocaml/xapi/workload_balancing.ml +++ b/ocaml/xapi/workload_balancing.ml @@ -401,8 +401,8 @@ let retrieve_vm_recommendations ~__context ~vm = | Xml_parse_failure error -> (* let this parse error carry on upwards, perform_wlb_request will catch it and check the rest of the xml for an error code *) raise (Xml_parse_failure error) - | Db_cache.Read_missing_uuid (_,_,_) - | Db_cache.Too_many_values (_,_,_) -> + | Db_exn.Read_missing_uuid (_,_,_) + | Db_exn.Too_many_values (_,_,_) -> raise_malformed_response' "VMGetRecommendations" "Invalid VM or host UUID" "unknown" in @@ -613,8 +613,8 @@ let get_opt_recommendations ~__context = match result_map with (map, opt_id) -> List.map (fun kvs -> remap None None None None None opt_id kvs) map with - | Db_cache.Read_missing_uuid (_,_,_) - | Db_cache.Too_many_values (_,_,_) -> + | Db_exn.Read_missing_uuid (_,_,_) + | Db_exn.Too_many_values (_,_,_) -> raise_malformed_response' "GetOptimizationRecommendations" "Invalid VM or host UUID" "unknown" @@ -689,8 +689,8 @@ let get_evacuation_recoms ~__context ~uuid = try List.map (fun kvs -> remap None None None kvs) result_map with - | Db_cache.Read_missing_uuid (_,_,_) - | Db_cache.Too_many_values (_,_,_) -> + | Db_exn.Read_missing_uuid (_,_,_) + | Db_exn.Too_many_values (_,_,_) -> raise_malformed_response' "HostGetRecommendations" "Invalid VM or host UUID" "unknown" diff --git a/ocaml/xapi/xapi.ml b/ocaml/xapi/xapi.ml index 8b6ccfed..e9ce71d2 100644 --- a/ocaml/xapi/xapi.ml +++ b/ocaml/xapi/xapi.ml @@ -49,7 +49,7 @@ let startup_check () = (* Tell the dbcache whether we're a master or a slave *) let set_db_mode() = - Db_cache.database_mode := Some (if Pool_role.is_master () then Db_cache.Master else Db_cache.Slave) + Db_cache.set_master (Pool_role.is_master ()) (* Parse db conf file from disk and use this to initialise database connections. This is done on both master and slave. On masters the parsed data is used to flush databases to and to populate @@ -78,7 +78,7 @@ let start_database_engine () = (* Initialise in-memory database cache *) debug "Populating db cache"; - Db_cache.DBCache.initialise_db_cache(); + Db_cache_impl.initialise (); debug "Performing initial DB GC"; Db_gc.single_pass (); @@ -104,23 +104,20 @@ let wait_until_database_is_ready_for_clients () = (fun () -> while not !database_ready_for_clients do Condition.wait database_ready_for_clients_c database_ready_for_clients_m done) -let read_and_parse_body req bio = - let fd = Buf_io.fd_of bio in (* fd only used for writing *) - let body = Http_svr.read_body ~limit:Xapi_globs.http_limit_max_rpc_size req bio in - fd, Xml.parse_string body - (** Handler for the remote database access URL *) let remote_database_access_handler req bio = - wait_until_database_is_ready_for_clients (); + wait_until_database_is_ready_for_clients (); + Db_remote_cache_access_v1.handler req bio - let fd, body_xml = read_and_parse_body req bio in - let response = Xml.to_bigbuffer (Db_remote_cache_access.DBCacheRemoteListener.process_xmlrpc body_xml) in - Http_svr.response_fct req fd (Bigbuffer.length response) - (fun fd -> Bigbuffer.to_fct response (fun s -> ignore(Unix.write fd s 0 (String.length s)))) +(** Handler for the remote database access URL *) +let remote_database_access_handler_v2 req bio = + wait_until_database_is_ready_for_clients (); + Db_remote_cache_access_v2.handler req bio (** Handler for the legacy remote stats URL *) let remote_stats_handler req bio = wait_until_database_is_ready_for_clients (); + let fd = Buf_io.fd_of bio in (* fd only used for writing *) (* CA-20487: need to authenticate this URL, but only when we're not in pool rolling-upgrade mode; this URL is depricated and should be removed ASAP.. *) @@ -137,13 +134,15 @@ let remote_stats_handler req bio = with _ -> auth_failed() end; - let fd, body_xml = read_and_parse_body req bio in + + let body = Http_svr.read_body ~limit:Xapi_globs.http_limit_max_rpc_size req bio in + let body_xml = Xml.parse_string body in Stats.time_this "remote_stats" (fun () -> let stats = Monitor_transfer.unmarshall body_xml in Server_helpers.exec_with_new_task "performance monitor" (fun __context -> Monitor_master.update_all ~__context stats); - let response = Xml.to_string (Db_remote_marshall.marshall_unit ()) in + let response = Xml.to_string (Db_rpc_common_v1.marshall_unit ()) in Http_svr.response_str req fd response ) @@ -245,7 +244,6 @@ let init_args() = "-logconfig", Arg.Set_string Xapi_globs.log_config_file, "set log config file to use"; "-writereadyfile", Arg.Set_string Xapi_globs.ready_file, "touch specified file when xapi is ready to accept requests"; "-writeinitcomplete", Arg.Set_string Xapi_globs.init_complete, "touch specified file when xapi init process is complete"; - "-writelog", Arg.Set writelog, "display sql writelog"; "-nowatchdog", Arg.Set nowatchdog, "turn watchdog off, avoiding initial fork"; "-setdom0mem", Arg.Unit (fun () -> ()), "(ignored)"; "-dom0memgradient", Arg.Unit (fun () -> ()), "(ignored)"; @@ -255,8 +253,7 @@ let init_args() = "-dummydata", Arg.Set debug_dummy_data, "populate with dummy data for demo/debugging purposes"; "-version", Arg.Unit show_version, "show version of the binary" ] (fun x -> printf "Warning, ignoring unknown argument: %s" x) - "Citrix XenServer API server"; - if !writelog then Db_cache.DBCache.display_sql_writelog !writelog + "Citrix XenServer API server" let wait_to_die() = (* don't call Thread.join cos this interacts strangely with OCAML runtime and stops @@ -616,6 +613,7 @@ let startup_script () = let master_only_http_handlers = [ (* CA-26044: don't let people DoS random slaves *) ("post_remote_db_access", (Http_svr.BufIO remote_database_access_handler)); + ("post_remote_db_access_v2", (Http_svr.BufIO remote_database_access_handler_v2)); ] let common_http_handlers = [ @@ -856,7 +854,7 @@ let server_init() = if this happens and try again later.. *) Master_connection.restart_on_connection_timeout := false; Master_connection.connection_timeout := 10.; (* give up retrying after 10s *) - Db_cache.DBCache.initialise_db_cache(); + Db_cache_impl.initialise (); Dbsync.setup () with e -> begin diff --git a/ocaml/xapi/xapi_fuse.ml b/ocaml/xapi/xapi_fuse.ml index c17fc618..59c58921 100644 --- a/ocaml/xapi/xapi_fuse.ml +++ b/ocaml/xapi/xapi_fuse.ml @@ -38,7 +38,7 @@ let light_fuse_and_run ?(fuse_length=Xapi_globs.fuse_time) () = *) try let dbconn = Db_connections.preferred_write_db () in - Db_cache.DBCache.flush_and_exit dbconn Xapi_globs.restart_return_code + Db_cache_impl.flush_and_exit dbconn Xapi_globs.restart_return_code with e -> warn "Caught an exception flushing database (perhaps it hasn't been initialised yet): %s; restarting immediately" (ExnHelper.string_of_exn e); exit Xapi_globs.restart_return_code @@ -66,7 +66,7 @@ let light_fuse_and_dont_restart ?(fuse_length=Xapi_globs.fuse_time) () = debug "light_fuse_and_dont_restart: calling Monitor_rrds.backup to save current RRDs locally"; Monitor_rrds.backup (); Thread.delay (float_of_int fuse_length); - Db_cache.DBCache.flush_and_exit (Db_connections.preferred_write_db ()) 0) ()); + Db_cache_impl.flush_and_exit (Db_connections.preferred_write_db ()) 0) ()); (* This is a best-effort attempt to use the database. We must not block the flush_and_exit above, hence the use of a background thread. *) Helpers.log_exn_continue "setting Host.enabled to false" diff --git a/ocaml/xapi/xapi_pool_transition.ml b/ocaml/xapi/xapi_pool_transition.ml index c0ddd6d5..5311f425 100644 --- a/ocaml/xapi/xapi_pool_transition.ml +++ b/ocaml/xapi/xapi_pool_transition.ml @@ -129,7 +129,7 @@ let attempt_two_phase_commit_of_new_master ~__context (manual: bool) (peer_addre error "Some hosts failed to commit [ %s ]: this node will now restart in a broken state" (String.concat "; " !hosts_which_failed); (* Immediately just in case *) - Db_cache.DBCache.flush_and_exit (Db_connections.preferred_write_db ()) Xapi_globs.restart_return_code + Db_cache_impl.flush_and_exit (Db_connections.preferred_write_db ()) Xapi_globs.restart_return_code end; (* If this is an automatic transition then there is no other master to clash with and so we can restart immediately. NB if we are the master (and this code is being used to assert @@ -137,7 +137,7 @@ let attempt_two_phase_commit_of_new_master ~__context (manual: bool) (peer_addre if not(manual) then if am_master_already then info "Not restarting since we are the master already" - else Db_cache.DBCache.flush_and_exit (Db_connections.preferred_write_db ()) Xapi_globs.restart_return_code; + else Db_cache_impl.flush_and_exit (Db_connections.preferred_write_db ()) Xapi_globs.restart_return_code; (* If manual, periodicly access to the database to check whether the old master has restarted. *) if manual then diff --git a/ocaml/xapi/xapi_vm_placement.ml b/ocaml/xapi/xapi_vm_placement.ml index 9cc0493c..9878b5b8 100644 --- a/ocaml/xapi/xapi_vm_placement.ml +++ b/ocaml/xapi/xapi_vm_placement.ml @@ -75,8 +75,7 @@ let create_pool_snapshot_summary __context extra_guests pool = (** Returns a list of affinity host identifiers for the given [guest]. *) let affinity_host_ids_of_guest __context guest = let affinity_host = Db.VM.get_affinity ~__context ~self:guest in - let affinity_host_is_valid = Db_cache.DBCache.is_valid_ref - (Ref.string_of affinity_host) in + let affinity_host_is_valid = Db.is_valid_ref affinity_host in if affinity_host_is_valid then [Db.Host.get_uuid __context affinity_host] else [] diff --git a/ocaml/xapi/xapi_vm_snapshot.ml b/ocaml/xapi/xapi_vm_snapshot.ml index a110219b..b12b0a5e 100644 --- a/ocaml/xapi/xapi_vm_snapshot.ml +++ b/ocaml/xapi/xapi_vm_snapshot.ml @@ -249,6 +249,7 @@ let checkpoint ~__context ~vm ~new_name = let copy_vm_fields ~__context ~metadata ~dst ~do_not_copy ~default_values = assert (Pool_role.is_master ()); debug "copying metadata into %s" (Ref.string_of dst); + let module DB = (val (Db_cache.get ()) : Db_interface.DB_ACCESS) in List.iter (fun (key,value) -> let value = @@ -256,7 +257,7 @@ let copy_vm_fields ~__context ~metadata ~dst ~do_not_copy ~default_values = then List.assoc key default_values else value in if not (List.mem key do_not_copy) - then Db_cache.DBCache.write_field __context Db_names.vm (Ref.string_of dst) key value) + then DB.write_field Db_names.vm (Ref.string_of dst) key value) metadata let safe_destroy_vbd ~__context ~rpc ~session_id vbd = -- 2.39.5