-- Data validation utilities local M = {} local type_utils = require('notex.utils.types') -- Validation rules local VALIDATION_RULES = { string = { min_length = 0, max_length = 1000, required = false, pattern = nil, enum = nil }, number = { min_value = nil, max_value = nil, integer = false, required = false }, boolean = { required = false }, date = { required = false, min_date = nil, max_date = nil, format = "iso8601" }, email = { required = false, domain_whitelist = nil }, url = { required = false, schemes = {"http", "https"}, domain_whitelist = nil }, array = { min_items = 0, max_items = 100, item_type = nil, required = false }, object = { required_fields = {}, optional_fields = {}, field_types = {}, strict = false, required = false } } -- Validate value against schema function M.validate_value(value, schema) if not schema or not schema.type then return false, "Schema must specify type" end -- Handle null/nil values if value == nil then if schema.required == true then return false, "Value is required" end return true, "Value is optional and nil" end -- Type validation local detected_type = type_utils.detect_type(value) if detected_type ~= schema.type then local converted = type_utils.convert_to_type(value, schema.type) if converted == nil then return false, string.format("Expected %s, got %s", schema.type, detected_type) end value = converted -- Use converted value for further validation end -- Type-specific validation local type_validator = "validate_" .. schema.type if M[type_validator] then local valid, error = M[type_validator](value, schema) if not valid then return false, error end end return true, "Validation passed" end -- Validate string function M.validate_string(value, schema) local str = tostring(value) -- Length validation if schema.min_length and #str < schema.min_length then return false, string.format("String too short (min %d characters)", schema.min_length) end if schema.max_length and #str > schema.max_length then return false, string.format("String too long (max %d characters)", schema.max_length) end -- Pattern validation if schema.pattern then if not str:match(schema.pattern) then return false, "String does not match required pattern" end end -- Enum validation if schema.enum then local found = false for _, enum_value in ipairs(schema.enum) do if str == tostring(enum_value) then found = true break end end if not found then return false, string.format("Value must be one of: %s", vim.inspect(schema.enum)) end end return true, "String validation passed" end -- Validate number function M.validate_number(value, schema) local num = tonumber(value) if not num then return false, "Value is not a number" end -- Integer validation if schema.integer and num ~= math.floor(num) then return false, "Value must be an integer" end -- Range validation if schema.min_value and num < schema.min_value then return false, string.format("Value too small (minimum: %s)", tostring(schema.min_value)) end if schema.max_value and num > schema.max_value then return false, string.format("Value too large (maximum: %s)", tostring(schema.max_value)) end return true, "Number validation passed" end -- Validate boolean function M.validate_boolean(value, schema) local bool = value if type(value) ~= "boolean" then bool = type_utils.convert_to_boolean(tostring(value)) if bool == nil then return false, "Value is not a boolean" end end return true, "Boolean validation passed" end -- Validate date function M.validate_date(value, schema) local date_parser = require('notex.utils.date') local timestamp = date_parser.parse_date(tostring(value)) if not timestamp then return false, "Invalid date format" end -- Range validation if schema.min_date then local min_timestamp = date_parser.parse_date(schema.min_date) if min_timestamp and timestamp < min_timestamp then return false, "Date is before minimum allowed date" end end if schema.max_date then local max_timestamp = date_parser.parse_date(schema.max_date) if max_timestamp and timestamp > max_timestamp then return false, "Date is after maximum allowed date" end end return true, "Date validation passed" end -- Validate email function M.validate_email(value, schema) local email = tostring(value) if not email:match("^[^@]+@[^@]+%.[^@]+$") then return false, "Invalid email format" end -- Domain whitelist validation if schema.domain_whitelist then local domain = email:match("@([^@]+)$") local found = false for _, allowed_domain in ipairs(schema.domain_whitelist) do if domain == allowed_domain then found = true break end end if not found then return false, string.format("Domain not in whitelist: %s", domain) end end return true, "Email validation passed" end -- Validate URL function M.validate_url(value, schema) local url = tostring(value) -- Basic URL validation if not url:match("^https?://") then -- Check if scheme is required if schema.schemes and #schema.schemes > 0 then return false, string.format("URL must start with: %s", table.concat(schema.schemes, ", ")) end end -- Scheme validation if schema.schemes then local scheme = url:match("^(https?)://") if not scheme then return false, "Invalid URL scheme" end local found = false for _, allowed_scheme in ipairs(schema.schemes) do if scheme == allowed_scheme then found = true break end end if not found then return false, string.format("URL scheme not allowed: %s", scheme) end end return true, "URL validation passed" end -- Validate array function M.validate_array(value, schema) local arr = value if type(arr) ~= "table" then -- Try to convert to array arr = type_utils.convert_to_array(tostring(value)) if type(arr) ~= "table" then return false, "Value is not an array" end end -- Length validation if schema.min_items and #arr < schema.min_items then return false, string.format("Array too short (min %d items)", schema.min_items) end if schema.max_items and #arr > schema.max_items then return false, string.format("Array too long (max %d items)", schema.max_items) end -- Item type validation if schema.item_type then for i, item in ipairs(arr) do local valid, error = M.validate_value(item, { type = schema.item_type, required = false }) if not valid then return false, string.format("Array item %d invalid: %s", i, error) end end end return true, "Array validation passed" end -- Validate object function M.validate_object(value, schema) local obj = value if type(obj) ~= "table" then return false, "Value is not an object" end -- Required fields validation for _, field in ipairs(schema.required_fields or {}) do if obj[field] == nil then return false, string.format("Required field missing: %s", field) end end -- Field validation local all_fields = {} for field, _ in pairs(obj) do table.insert(all_fields, field) end for _, field in ipairs(all_fields) do local field_schema = schema.field_types and schema.field_types[field] if field_schema then local valid, error = M.validate_value(obj[field], field_schema) if not valid then return false, string.format("Field '%s' invalid: %s", field, error) end elseif schema.strict then return false, string.format("Unexpected field '%s' (strict mode)", field) end end return true, "Object validation passed" end -- Validate document schema function M.validate_document_properties(properties, schema_definition) local errors = {} local warnings = {} if not properties then return false, {"No properties provided"} end -- Validate each property against schema for prop_name, prop_value in pairs(properties) do local prop_schema = schema_definition[prop_name] if prop_schema then local valid, error = M.validate_value(prop_value, prop_schema) if not valid then table.insert(errors, string.format("Property '%s': %s", prop_name, error)) end else table.insert(warnings, string.format("Unknown property '%s'", prop_name)) end end -- Check for missing required properties for prop_name, prop_schema in pairs(schema_definition) do if prop_schema.required and properties[prop_name] == nil then table.insert(errors, string.format("Missing required property: %s", prop_name)) end end return #errors == 0, {errors = errors, warnings = warnings} end -- Create validation schema function M.create_schema(field_name, options) options = options or {} local base_schema = VALIDATION_RULES[options.type] or VALIDATION_RULES.string local schema = vim.tbl_deep_extend("force", base_schema, options) schema.field_name = field_name return schema end -- Validate query parameters function M.validate_query_params(params, allowed_params) local errors = {} for param_name, param_value in pairs(params) do if not allowed_params[param_name] then table.insert(errors, string.format("Unknown parameter: %s", param_name)) end end for param_name, param_schema in pairs(allowed_params) do if params[param_name] == nil and param_schema.required then table.insert(errors, string.format("Required parameter missing: %s", param_name)) elseif params[param_name] ~= nil then local valid, error = M.validate_value(params[param_name], param_schema) if not valid then table.insert(errors, string.format("Parameter '%s': %s", param_name, error)) end end end return #errors == 0, errors end -- Sanitize input function M.sanitize_input(value, options) options = options or {} local max_length = options.max_length or 1000 if not value then return nil end local sanitized = tostring(value) -- Remove potentially dangerous characters sanitized = sanitized:gsub("[<>\"'&]", "") -- Trim whitespace sanitized = sanitized:trim() -- Limit length if #sanitized > max_length then sanitized = sanitized:sub(1, max_length) end return sanitized end -- Validate file path function M.validate_file_path(path) if not path or path == "" then return false, "Empty file path" end -- Check for invalid characters if path:match('[<>:"|?*]') then return false, "Invalid characters in file path" end -- Check for directory traversal if path:match("%.%.") then return false, "Directory traversal not allowed" end return true, "File path is valid" end -- Return validation summary function M.create_validation_summary(results) local summary = { total = #results, valid = 0, invalid = 0, errors = {}, warnings = {} } for _, result in ipairs(results) do if result.valid then summary.valid = summary.valid + 1 else summary.invalid = summary.invalid + 1 if result.errors then for _, error in ipairs(result.errors) do table.insert(summary.errors, error) end end if result.warnings then for _, warning in ipairs(result.warnings) do table.insert(summary.warnings, warning) end end end end return summary end return M