import re
from django.conf import settings
from django.db import models
from .. import exc, fields, validators
from . import constants
# Internal key used to wrap default values for JSON serialization.
# The django-extensions JSONField doesn't properly serialize plain strings,
# so we wrap them in a dict with this key.
_DEFAULT_VALUE_KEY = "_v"
[docs]
class Attribute(models.Model):
"""Represents a flexible attribute for Resource objects."""
# This is purposely not unique as there is a compound index with site_id.
name = models.CharField(
max_length=64,
null=False,
db_index=True,
help_text="The name of the Attribute.",
)
description = models.CharField(
max_length=255,
default="",
blank=True,
null=False,
help_text="A helpful description of the Attribute.",
)
# The resource must contain a key and value
required = models.BooleanField(
default=False,
null=False,
help_text="Whether the Attribute should be required.",
)
# Whether attribute values propagate from parent to child resources.
# Only valid for resource types with parent chains (Network, Interface).
inheritable = models.BooleanField(
default=False,
null=False,
help_text=(
"Whether this Attribute's values are inherited by child resources "
"that do not define their own explicit value. Only allowed for "
"resource types with parent-child hierarchies (Network, Interface)."
),
)
# In UIs this attribute will be displayed by default. Required implies
# display.
display = models.BooleanField(
default=False,
null=False,
help_text=(
"Whether the Attribute should be be displayed by default in "
"UIs. If required is set, this is also set."
),
)
# Attribute values are expected as lists of strings.
multi = models.BooleanField(
default=False,
null=False,
help_text="Whether the Attribute should be treated as a list type.",
)
constraints = fields.JSONField(
"Constraints",
null=False,
blank=True,
help_text="Dictionary of Attribute constraints.",
)
# Internal storage for default value. Use the `default` property instead.
# Stored as {"_v": value} to ensure proper JSON serialization.
_default = fields.JSONField(
"Default",
null=True,
blank=True,
default=None,
db_column="default",
help_text="Default value applied when this attribute is not provided on resource creation.",
)
@property
def default(self):
"""Get the default value, unwrapping from internal storage format."""
if self._default is None or self._default == {}:
return None
if (
isinstance(self._default, dict)
and _DEFAULT_VALUE_KEY in self._default
):
return self._default[_DEFAULT_VALUE_KEY]
# Fallback for any legacy data
return self._default
@default.setter
def default(self, value):
"""Set the default value, wrapping for internal storage format."""
if value is None:
self._default = None
else:
self._default = {_DEFAULT_VALUE_KEY: value}
depends_on = models.ManyToManyField(
"self",
symmetrical=False,
blank=True,
related_name="dependents",
help_text="Attributes that must also be present on a resource when this Attribute is set.",
)
site = models.ForeignKey(
"Site",
db_index=True,
related_name="attributes",
on_delete=models.PROTECT,
verbose_name="Site",
help_text="Unique ID of the Site this Attribute is under.",
)
resource_name = models.CharField(
"Resource Name",
max_length=20,
null=False,
db_index=True,
choices=constants.RESOURCE_CHOICES,
help_text="The name of the Resource to which this Attribute is bound.",
)
def __str__(self):
return "%s %s (site_id: %s)" % (
self.resource_name,
self.name,
self.site_id,
)
class Meta:
unique_together = ("site", "resource_name", "name")
@classmethod
def all_by_name(cls, resource_name=None, site=None):
if resource_name is None:
raise SyntaxError("You must provided a resource_name.")
if site is None:
raise SyntaxError("You must provided a site.")
query = cls.objects.filter(resource_name=resource_name, site=site)
return {attribute.name: attribute for attribute in query.all()}
[docs]
def clean_constraints(self, value):
"""Enforce formatting of constraints."""
if not isinstance(value, dict):
msg = f"Expected dictionary but received {type(value)}"
raise exc.ValidationError({"constraints": msg})
constraints = {
"allow_empty": value.get("allow_empty", False),
"pattern": value.get("pattern", ""),
"valid_values": value.get("valid_values", []),
}
if not isinstance(constraints["allow_empty"], bool):
raise exc.ValidationError(
{"constraints": "allow_empty expected type bool."}
)
if not isinstance(constraints["pattern"], str):
raise exc.ValidationError(
{"constraints": "pattern expected type string."}
)
if not isinstance(constraints["valid_values"], list):
raise exc.ValidationError(
{"constraints": "valid_values expected type list."}
)
return constraints
def clean_display(self, value):
if self.required:
return True
return value
def clean_resource_name(self, value):
if value not in constants.VALID_ATTRIBUTE_RESOURCES:
raise exc.ValidationError(
{"resource_name": "Invalid resource name: %r." % value}
)
return value
def clean_name(self, value):
value = validators.validate_name(value)
if not settings.ATTRIBUTE_NAME.match(value):
raise exc.ValidationError(
{
"name": "Invalid name: %r. Names must match: %s"
% (value, settings.ATTRIBUTE_NAME.pattern)
}
)
return value or False
[docs]
def clean_default(self, value):
"""Validate the default value against multi type and constraints."""
# No default set
if value is None:
return None
# Validate type based on multi flag
if self.multi:
if not isinstance(value, list):
raise exc.ValidationError(
{"default": "Default for multi attribute must be a list."}
)
# Ensure all items are strings
if not all(isinstance(v, str) for v in value):
raise exc.ValidationError(
{"default": "Default list items must be strings."}
)
else:
if not isinstance(value, str):
raise exc.ValidationError(
{
"default": "Default for single-value attribute must be a string."
}
)
# Validate the default value against constraints using validate_value()
# This checks pattern, valid_values, and allow_empty
try:
self.validate_value(value)
except exc.ValidationError as e:
# Re-raise with "default" key for clarity
raise exc.ValidationError({"default": str(e)})
return value
[docs]
def clean_inheritable(self, value):
"""Validate that inheritable is only set for hierarchical resources."""
if value and self.resource_name not in constants.INHERITABLE_RESOURCES:
raise exc.ValidationError(
{
"inheritable": (
"Inheritable attributes are only supported for "
"resource types with parent-child hierarchies: %s. "
"Got: %r."
% (
", ".join(constants.INHERITABLE_RESOURCES),
self.resource_name,
)
)
}
)
return value
[docs]
def clean_fields(self, exclude=None):
self.constraints = self.clean_constraints(self.constraints)
self.display = self.clean_display(self.display)
self.resource_name = self.clean_resource_name(self.resource_name)
self.name = self.clean_name(self.name)
self.default = self.clean_default(self.default)
self.inheritable = self.clean_inheritable(self.inheritable)
def _validate_single_value(self, value, constraints=None):
if not isinstance(value, str):
raise exc.ValidationError(
{"value": "Attribute values must be a string type"}
)
if constraints is None:
constraints = self.constraints
allow_empty = constraints.get("allow_empty", False)
if not allow_empty and not value:
msg = f"Attribute {self.name} doesn't allow empty values"
raise exc.ValidationError({"constraints": msg})
pattern = constraints.get("pattern")
if pattern and not re.match(pattern, value):
msg = f"Attribute value {value} for {self.name} didn't match pattern: {pattern}"
raise exc.ValidationError({"pattern": msg})
valid_values = set(constraints.get("valid_values", []))
if valid_values and value not in valid_values:
raise exc.ValidationError(
"Attribute value {} for {} not a valid value: {}".format(
value, self.name, ", ".join(valid_values)
)
)
return {
"attribute_id": self.id,
"value": value,
}
def validate_value(self, value):
if self.multi:
if not isinstance(value, list):
raise exc.ValidationError(
{"multi": "Attribute values must be a list type"}
)
else:
value = [value]
inserts = []
# This does a deserialization so save the result
constraints = self.constraints
for val in value:
inserts.append(self._validate_single_value(val, constraints))
return inserts
[docs]
def get_all_dependencies(self):
"""Return the full set of transitive dependencies (not just direct)."""
visited = set()
stack = list(self.depends_on.all())
while stack:
dep = stack.pop()
if dep.pk not in visited:
visited.add(dep.pk)
stack.extend(dep.depends_on.all())
return Attribute.objects.filter(pk__in=visited)
[docs]
def validate_dependencies(self):
"""Validate that depends_on entries are valid (same site/resource_name, no cycles)."""
for dep in self.depends_on.all():
if dep.pk == self.pk:
raise exc.ValidationError(
{"depends_on": "An attribute cannot depend on itself."}
)
if dep.site_id != self.site_id:
raise exc.ValidationError(
{
"depends_on": f"Dependency '{dep.name}' belongs to a different site."
}
)
if dep.resource_name != self.resource_name:
raise exc.ValidationError(
{
"depends_on": f"Dependency '{dep.name}' is for resource type "
f"'{dep.resource_name}', expected '{self.resource_name}'."
}
)
# Cycle detection via DFS
visited = set()
stack = list(self.depends_on.all())
while stack:
dep = stack.pop()
if dep.pk == self.pk:
raise exc.ValidationError(
{"depends_on": "Circular dependency detected."}
)
if dep.pk not in visited:
visited.add(dep.pk)
stack.extend(dep.depends_on.all())
[docs]
def delete(self, *args, **kwargs):
"""Prevent deletion if other attributes depend on this one."""
dependents = self.dependents.all()
if dependents.exists():
names = ", ".join(sorted(d.name for d in dependents))
raise exc.ValidationError(
{
"depends_on": f"Cannot delete attribute '{self.name}' because "
f"the following attributes depend on it: {names}"
}
)
return super().delete(*args, **kwargs)
[docs]
def save(self, *args, **kwargs):
"""Always enforce constraints."""
self.full_clean()
super().save(*args, **kwargs)
def to_dict(self):
return {
"id": self.id,
"site_id": self.site_id,
"description": self.description,
"name": self.name,
"resource_name": self.resource_name,
"required": self.required,
"display": self.display,
"multi": self.multi,
"constraints": self.constraints,
"default": self.default,
"inheritable": self.inheritable,
}