Source code for nsot.api.filters

from __future__ import unicode_literals
import logging

from django.db.models import Q
import django_filters

from .. import models
from ..util import qpbool


log = logging.getLogger(__name__)


class AttributeFilter(django_filters.rest_framework.FilterSet):
    required = django_filters.BooleanFilter()
    display = django_filters.BooleanFilter()
    multi = django_filters.BooleanFilter()

    class Meta:
        model = models.Attribute
        fields = ['name', 'resource_name', 'required', 'display', 'multi']


[docs]class ResourceFilter(django_filters.rest_framework.FilterSet): """Attribute-aware filtering for Resource objects.""" attributes = django_filters.CharFilter(method='filter_attributes')
[docs] def filter_attributes(self, queryset, name, value): """ Reads 'attributes' from query params and joins them together as an intersection set query. """ attributes = self.data.getlist('attributes', []) resource_name = queryset.model.__name__ # Iterate the attributes and try to look them up as if they are k=v # and naively do an intersection query. log.debug('GOT ATTRIBUTES: %r', attributes) for attribute in attributes: name, _, value = attribute.partition('=') # Retrieve next set of objects using the same arguments as the # initial query. next_set = Q( id__in=models.Value.objects.filter( name=name, value=value, resource_name=resource_name ).values_list('resource_id', flat=True) ) queryset = queryset.filter(next_set) return queryset
[docs]class DeviceFilter(ResourceFilter): """Filter for Device objects.""" class Meta: model = models.Device fields = ['hostname', 'attributes']
[docs]class NetworkFilter(ResourceFilter): """Filter for Network objects.""" include_networks = django_filters.BooleanFilter( method='filter_include_networks' ) include_ips = django_filters.BooleanFilter(method='filter_include_ips') cidr = django_filters.CharFilter(method='filter_cidr') root_only = django_filters.BooleanFilter(method='filter_root_only') network_address = django_filters.CharFilter() # Override type class Meta: model = models.Network fields = [ 'include_networks', 'include_ips', 'root_only', 'cidr', 'network_address', 'prefix_length', 'ip_version', 'state', 'attributes' ]
[docs] def filter_include_networks(self, queryset, name, value): """Converts ``include_networks`` to queryset filters.""" include_ips = qpbool(self.form.cleaned_data['include_ips']) include_networks = qpbool(value) if not all([include_networks, include_ips]): if include_networks: return queryset.filter(is_ip=False) else: return queryset.exclude(is_ip=False) return queryset
[docs] def filter_include_ips(self, queryset, name, value): """Converts ``include_ips`` to queryset filters.""" include_ips = qpbool(value) include_networks = qpbool(self.form.cleaned_data['include_networks']) if not all([include_networks, include_ips]): if include_ips: return queryset.filter(is_ip=True) else: return queryset.exclude(is_ip=True) return queryset
[docs] def filter_cidr(self, queryset, name, value): """Converts ``cidr`` to network/prefix filter.""" if value: network_address, _, prefix_length = value.partition('/') else: return queryset return queryset.filter( network_address=network_address, prefix_length=prefix_length )
[docs] def filter_root_only(self, queryset, name, value): """Converts ``root_only`` to null parent filter.""" if qpbool(value): return queryset.filter(parent=None) return queryset
[docs]class InterfaceFilter(ResourceFilter): """ Filter for Interface objects. Includes a custom override for filtering on mac_address because this is not a Django built-in field. """ mac_address = django_filters.CharFilter(method='filter_mac_address') class Meta: model = models.Interface # TODO: Remove `device__hostname` in a future release after # updating pynsot to use `device_hostname`. fields = [ 'device', 'device__hostname', 'name', 'speed', 'type', 'mac_address', 'description', 'parent_id', 'attributes', 'device_hostname' ]
[docs] def filter_mac_address(self, queryset, name, value): """ Overloads queryset filtering to use built-in. Doesn't work by default because MACAddressField is not a Django built-in field type. """ return queryset.filter(mac_address=value)
[docs]class CircuitFilter(ResourceFilter): """Filter for Circuit objects.""" endpoint_a = django_filters.CharFilter(method='filter_endpoint_a') endpoint_z = django_filters.CharFilter(method='filter_endpoint_z') class Meta: model = models.Circuit fields = ['endpoint_a', 'endpoint_z', 'name', 'attributes'] # FIXME(jathan): The copy/pasted methods can be ripped out once we upgrade # filters in support of the V2 API. For now this is quicker and easier.
[docs] def filter_endpoint_a(self, queryset, name, value): """Overload to use natural key.""" if isinstance(value, int): value = str(value) if value.isdigit(): return queryset.filter(endpoint_a=value) else: return queryset.filter(endpoint_a__name_slug=value)
[docs] def filter_endpoint_z(self, queryset, name, value): """Overload to use natural key.""" if isinstance(value, int): value = str(value) if value.isdigit(): return queryset.filter(endpoint_z=value) else: return queryset.filter(endpoint_z__name_slug=value)
[docs]class ProtocolTypeFilter(django_filters.rest_framework.FilterSet): """Filter for ProtocolType (non-resource) objects.""" class Meta: model = models.ProtocolType fields = ['name', 'description']
[docs]class ProtocolFilter(ResourceFilter): """Filter for Protocol objects.""" device = django_filters.CharFilter(method='filter_device') type = django_filters.CharFilter(method='filter_type') interface = django_filters.CharFilter(method='filter_interface') circuit = django_filters.CharFilter(method='filter_circuit') class Meta: model = models.Protocol fields = ['device', 'type', 'interface', 'circuit', 'description']
[docs] def filter_device(self, queryset, name, value): """Overload to use natural key.""" if isinstance(value, int): value = str(value) if value.isdigit(): return queryset.filter(device=value) else: return queryset.filter(device__hostname=value)
[docs] def filter_type(self, queryset, name, value): """Overload to use natural key.""" if isinstance(value, int): value = str(value) if value.isdigit(): return queryset.filter(type=value) else: return queryset.filter(type__name=value)
[docs] def filter_interface(self, queryset, name, value): """Overload to use natural key.""" if isinstance(value, int): value = str(value) if value.isdigit(): return queryset.filter(interface=value) else: return queryset.filter(interface__name_slug=value)
[docs] def filter_circuit(self, queryset, name, value): """Overload to use natural key.""" if isinstance(value, int): value = str(value) if value.isdigit(): return queryset.filter(circuit=value) else: return queryset.filter(circuit__name_slug=value)