Source code for nsot.api.filters

import logging

import django_filters
from django.db.models import Q

from .. import models
from ..util import qpbool

log = logging.getLogger(__name__)


[docs] class ChangeFilter(django_filters.rest_framework.FilterSet): """Filter for Change objects.""" user = django_filters.NumberFilter(field_name="user") change_at__gte = django_filters.IsoDateTimeFilter( field_name="change_at", lookup_expr="gte" ) change_at__lte = django_filters.IsoDateTimeFilter( field_name="change_at", lookup_expr="lte" ) class Meta: model = models.Change fields = [ "event", "resource_name", "resource_id", "user", "change_at__gte", "change_at__lte", ]
class AttributeFilter(django_filters.rest_framework.FilterSet): required = django_filters.BooleanFilter() display = django_filters.BooleanFilter() multi = django_filters.BooleanFilter() has_dependencies = django_filters.BooleanFilter( method="filter_has_dependencies", help_text="Filter attributes that have dependencies (true) or don't (false).", ) def filter_has_dependencies(self, queryset, name, value): if value: return queryset.filter(depends_on__isnull=False).distinct() return queryset.filter(depends_on__isnull=True) class Meta: model = models.Attribute fields = ["name", "resource_name", "required", "display", "multi"]
[docs] class ResourceFilter(django_filters.rest_framework.FilterSet): """Attribute-aware filtering for Resource objects.""" attributes = django_filters.CharFilter(method="filter_attributes") expired = django_filters.BooleanFilter( method="filter_expired", help_text="Filter by expiration status (true=expired, false=not expired).", ) expires_before = django_filters.IsoDateTimeFilter( field_name="expires_at", lookup_expr="lte", help_text="Resources expiring on or before this ISO timestamp.", ) expires_after = django_filters.IsoDateTimeFilter( field_name="expires_at", lookup_expr="gte", help_text="Resources expiring on or after this ISO timestamp.", )
[docs] def filter_expired(self, queryset, name, value): """Delegate to the queryset's ``.expired()`` method.""" return queryset.expired(expired=value)
[docs] def filter_attributes(self, queryset, name, value): """ Reads 'attributes' from query params and joins them together as an intersection set query. When an attribute is marked as ``inheritable``, resources that inherit the value from an ancestor (i.e. they don't have their own explicit value but an ancestor does) are included in the results. """ attributes = self.data.getlist("attributes", []) resource_name = queryset.model.__name__ # Iterate the attributes and try to look them up as if they are k=v # and naively do an intersection query. log.debug("GOT ATTRIBUTES: %r", attributes) for attribute in attributes: attr_name, _, attr_value = attribute.partition("=") # Retrieve next set of objects with explicit matches. explicit_ids = set( models.Value.objects.filter( name=attr_name, value=attr_value, resource_name=resource_name, ).values_list("resource_id", flat=True) ) matching_ids = set(explicit_ids) # Check if this attribute is inheritable and expand to descendants. try: # Derive site from the queryset (all resources share a site). site_id = None request = getattr(self, "request", None) if request is None: site_value = self.data.get("site_pk") or self.data.get( "site_id" ) if site_value: site_id = int(site_value) if site_id is None and request is not None: # Try from view kwargs view_obj = getattr(request, "parser_context", {}).get( "view" ) if view_obj: site_id = view_obj.kwargs.get("site_pk") if site_id is None: # Last resort: peek at queryset first = queryset.first() if first: site_id = first.site_id if site_id is not None: attr_obj = models.Attribute.objects.filter( name=attr_name, resource_name=resource_name, site_id=site_id, inheritable=True, ).first() if attr_obj is not None and hasattr( queryset.model, "get_descendants" ): # For each explicit match, get its descendants. explicit_resources = queryset.model.objects.filter( id__in=explicit_ids ) descendant_ids = set() for resource in explicit_resources: descendant_ids.update( resource.get_descendants().values_list( "id", flat=True ) ) # Exclude descendants that have their own explicit # value for this attribute (they override). # For multi-value attributes, a resource that has # BOTH the matching value and a different value is # NOT an overrider — it still matches. Only # resources that have a value for this attribute # but do NOT have the queried value are overriders. descendants_with_attr = set( models.Value.objects.filter( name=attr_name, resource_name=resource_name, resource_id__in=descendant_ids, ).values_list("resource_id", flat=True) ) descendants_with_match = set( models.Value.objects.filter( name=attr_name, resource_name=resource_name, resource_id__in=descendant_ids, value=attr_value, ).values_list("resource_id", flat=True) ) # Set difference: resources that have this attr # but NOT with the queried value. These are # "overriders" — children that set their own # different value, breaking the inheritance chain. # # Example: root has region=us-west, child has # region=us-east → child is an overrider. # But if child has region=us-west too (multi), # it's NOT an overrider (it still matches). overrider_ids = ( descendants_with_attr - descendants_with_match ) # Recursively exclude the entire subtree below # each overrider, since those descendants inherit # the overrider's value, not the original. # E.g. if B overrides A's region, then B's # children C, D also inherit B's value, not A's. overrider_subtree_ids = set() overrider_resources = queryset.model.objects.filter( id__in=overrider_ids ) for overrider in overrider_resources: overrider_subtree_ids.update( overrider.get_descendants().values_list( "id", flat=True ) ) # Re-override check: within the excluded subtrees, # find any descendant that sets the attr BACK to # the queried value. These "re-overriders" restart # the inheritance chain for their own subtree. # # Example: A(us-west) → B(us-east) → C(us-west) # B is an overrider, but C re-overrides back to # us-west, so C and C's children should match. re_overrider_ids = set( models.Value.objects.filter( name=attr_name, resource_name=resource_name, resource_id__in=overrider_subtree_ids, value=attr_value, ).values_list("resource_id", flat=True) ) re_overrider_subtree_ids = set() if re_overrider_ids: for ro in queryset.model.objects.filter( id__in=re_overrider_ids ): ro_descendants = set( ro.get_descendants().values_list( "id", flat=True ) ) # Apply the same overrider-exclusion # logic: exclude descendants that have # their own explicit override to a # DIFFERENT value within these subtrees. ro_desc_with_attr = set( models.Value.objects.filter( name=attr_name, resource_name=resource_name, resource_id__in=ro_descendants, ).values_list("resource_id", flat=True) ) ro_desc_with_match = set( models.Value.objects.filter( name=attr_name, resource_name=resource_name, resource_id__in=ro_descendants, value=attr_value, ).values_list("resource_id", flat=True) ) ro_overriders = ( ro_desc_with_attr - ro_desc_with_match ) ro_overrider_subtrees = set() for roo in queryset.model.objects.filter( id__in=ro_overriders ): ro_overrider_subtrees.update( roo.get_descendants().values_list( "id", flat=True ) ) re_overrider_subtree_ids.update( ro_descendants - ro_overriders - ro_overrider_subtrees ) # Final set assembly: # # explicit_ids — resources that explicitly # have the queried attr=value # # descendant_ids — ALL descendants of explicit # - overrider_ids matches, MINUS overriders # - overrider_subtree_ids (different value) and # their subtrees # # re_overrider_ids — descendants within excluded # re_overrider_subtree_ids subtrees that set the value # BACK to the queried value, # plus THEIR subtrees # # This produces the complete set of resources that # either explicitly have the value OR would inherit # it from an ancestor through the parent chain. matching_ids = ( explicit_ids | ( descendant_ids - overrider_ids - overrider_subtree_ids ) | re_overrider_ids | re_overrider_subtree_ids ) except ValueError: log.warning( "Inheritance expansion failed for %r, using explicit only", attr_name, exc_info=True, ) next_set = Q(id__in=matching_ids) queryset = queryset.filter(next_set) return queryset
[docs] class DeviceFilter(ResourceFilter): """Filter for Device objects.""" class Meta: model = models.Device fields = [ "hostname", "attributes", "expired", "expires_before", "expires_after", ]
[docs] class NetworkFilter(ResourceFilter): """Filter for Network objects.""" include_networks = django_filters.BooleanFilter( method="filter_include_networks" ) include_ips = django_filters.BooleanFilter(method="filter_include_ips") cidr = django_filters.CharFilter(method="filter_cidr") root_only = django_filters.BooleanFilter(method="filter_root_only") network_address = django_filters.CharFilter() # Override type class Meta: model = models.Network fields = [ "include_networks", "include_ips", "root_only", "cidr", "network_address", "prefix_length", "ip_version", "state", "attributes", "expired", "expires_before", "expires_after", ]
[docs] def filter_include_networks(self, queryset, name, value): """Converts ``include_networks`` to queryset filters.""" include_ips = qpbool(self.form.cleaned_data["include_ips"]) include_networks = qpbool(value) if not all([include_networks, include_ips]): if include_networks: return queryset.filter(is_ip=False) return queryset.exclude(is_ip=False) return queryset
[docs] def filter_include_ips(self, queryset, name, value): """Converts ``include_ips`` to queryset filters.""" include_ips = qpbool(value) include_networks = qpbool(self.form.cleaned_data["include_networks"]) if not all([include_networks, include_ips]): if include_ips: return queryset.filter(is_ip=True) return queryset.exclude(is_ip=True) return queryset
[docs] def filter_cidr(self, queryset, name, value): """Converts ``cidr`` to network/prefix filter.""" if value: network_address, _, prefix_length = value.partition("/") else: return queryset return queryset.filter( network_address=network_address, prefix_length=prefix_length )
[docs] def filter_root_only(self, queryset, name, value): """Converts ``root_only`` to null parent filter.""" if qpbool(value): return queryset.filter(parent=None) return queryset
[docs] class InterfaceFilter(ResourceFilter): """ Filter for Interface objects. Includes a custom override for filtering on mac_address because this is not a Django built-in field. """ mac_address = django_filters.CharFilter(method="filter_mac_address") class Meta: model = models.Interface # TODO: Remove `device__hostname` in a future release after # updating pynsot to use `device_hostname`. fields = [ "device", "device__hostname", "name", "speed", "mtu", "type", "mac_address", "description", "parent_id", "attributes", "device_hostname", "expired", "expires_before", "expires_after", ]
[docs] def filter_mac_address(self, queryset, name, value): """ Overloads queryset filtering to use built-in. Doesn't work by default because MACAddressField is not a Django built-in field type. """ return queryset.filter(mac_address=value)
[docs] class CircuitFilter(ResourceFilter): """Filter for Circuit objects.""" endpoint_a = django_filters.CharFilter(method="filter_endpoint_a") endpoint_z = django_filters.CharFilter(method="filter_endpoint_z") device_hostname = django_filters.CharFilter( method="filter_device_hostname" ) class Meta: model = models.Circuit fields = [ "endpoint_a", "endpoint_z", "name", "device_hostname", "attributes", "expired", "expires_before", "expires_after", ] # FIXME(jathan): The copy/pasted methods can be ripped out once we upgrade # filters in support of the V2 API. For now this is quicker and easier.
[docs] def filter_endpoint_a(self, queryset, name, value): """Overload to use natural key.""" if isinstance(value, int): value = str(value) if value.isdigit(): return queryset.filter(endpoint_a=value) return queryset.filter(endpoint_a__name_slug=value)
[docs] def filter_endpoint_z(self, queryset, name, value): """Overload to use natural key.""" if isinstance(value, int): value = str(value) if value.isdigit(): return queryset.filter(endpoint_z=value) return queryset.filter(endpoint_z__name_slug=value)
[docs] def filter_device_hostname(self, queryset, name, value): """Filter circuits by device hostname on either endpoint.""" return queryset.filter( Q(endpoint_a__device__hostname=value) | Q(endpoint_z__device__hostname=value) )
[docs] class ProtocolTypeFilter(django_filters.rest_framework.FilterSet): """Filter for ProtocolType (non-resource) objects.""" class Meta: model = models.ProtocolType fields = ["name", "description"]
[docs] class ProtocolFilter(ResourceFilter): """Filter for Protocol objects.""" device = django_filters.CharFilter(method="filter_device") type = django_filters.CharFilter(method="filter_type") interface = django_filters.CharFilter(method="filter_interface") circuit = django_filters.CharFilter(method="filter_circuit") class Meta: model = models.Protocol fields = [ "device", "type", "interface", "circuit", "description", "expired", "expires_before", "expires_after", ]
[docs] def filter_device(self, queryset, name, value): """Overload to use natural key.""" if isinstance(value, int): value = str(value) if value.isdigit(): return queryset.filter(device=value) return queryset.filter(device__hostname=value)
[docs] def filter_type(self, queryset, name, value): """Overload to use natural key.""" if isinstance(value, int): value = str(value) if value.isdigit(): return queryset.filter(type=value) return queryset.filter(type__name=value)
[docs] def filter_interface(self, queryset, name, value): """Overload to use natural key.""" if isinstance(value, int): value = str(value) if value.isdigit(): return queryset.filter(interface=value) return queryset.filter(interface__name_slug=value)
[docs] def filter_circuit(self, queryset, name, value): """Overload to use natural key.""" if isinstance(value, int): value = str(value) if value.isdigit(): return queryset.filter(circuit=value) return queryset.filter(circuit__name_slug=value)