"""
Project-wide utilities.
"""
import collections
import logging
import shlex
from cryptography.fernet import Fernet
from django.core.exceptions import FieldDoesNotExist
from logan.runner import run_app
log = logging.getLogger(__name__)
_TRUTHY = set([
'true', 'yes', 'on', '1', ''
])
__all__ = (
'qpbool', 'normalize_auth_header', 'generate_secret_key', 'get_field_attr',
'SetQuery', 'parse_set_query', 'generate_settings', 'initialize_app',
'main', 'cidr_to_dict', 'slugify', 'slugify_interface'
)
[docs]def qpbool(arg):
"""
Convert "truthy" strings into Booleans.
>>> qpbool('true')
True
:param arg:
Truthy string
"""
return str(arg).lower() in _TRUTHY
[docs]def generate_secret_key():
"""
Return a secret key suitable for use w/ Fernet.
>>> generate_secret_key()
'1BpuqeM5d5pi-U2vIsqeQ8YnTrXRRUAfqV-hu6eQ5Gw='
"""
return Fernet.generate_key()
[docs]def get_field_attr(model, field_name, attr_name):
"""
Return the specified attribute value from a model field.
This is used for field overrides in API serializers to retrieve the
``verbose_name`` and ``help_text`` model field attributes so they don't
need to be duplicated in code.
Example::
>>> get_field_attr(models.Interface, 'parent', 'help_text')
'Unique ID of the parent Interface.'
:param model:
Model class
:param field_name:
Model field name
:param attr_name:
Model field attribute name
"""
try:
field = model._meta.get_field(field_name)
except (AttributeError, FieldDoesNotExist):
return ''
else:
return getattr(field, attr_name, '')
[docs]def cidr_to_dict(cidr):
"""
Take a cidr and return it as a dictionary.
>>> cidr_to_dict('192.168.0.0/16')
{'network_address': '192.168.0.0', 'prefix_length': 16}
:param cidr:
IPv4/IPv6 CIDR string
"""
from .. import validators
cidr = validators.validate_cidr(cidr)
return {
'network_address': cidr.network_address,
'prefix_length': cidr.prefixlen,
}
[docs]def slugify(s):
"""
Slugify a string.
This works in a less-agressive manner than Django's slugify, which simply
drops most drops most non-alphanumeric characters and lowercases the entire
string. It would likely to cause uniqueness conflicts for things like
interface names, such as Eth1/2/3 and Eth12/3, which would slugify to be
the same.
>>> slugify('switch-foo01:Ethernet1/2')
'switch-foo01:Ethernet1_2'
:param s:
String to slugify
"""
disallowed_chars = ['/']
replacement = '_'
for char in disallowed_chars:
s = s.replace(char, replacement)
return s
[docs]def slugify_interface(interface=None, device_hostname=None, name=None,
**kwargs):
"""
Return a slug (natural key) representation of an Interface.
If ``interface`` is not provided, ``device_hostname`` and ``name`` are
required. If all are provided, ``interface`` will take precedence.
:param interface:
Interface dict
:param device_hostname:
Device hostname
:param name:
Interface name
:rtype: str
"""
if not (interface or (device_hostname and name)):
raise RuntimeError('Either interface or device_hostname/name required')
if interface is None:
interface = dict(device_hostname=device_hostname, name=name)
slug = '{device_hostname}:{name}'.format(**interface)
return slug
#: Namedtuple for resultant items from ``parse_set_query()``
SetQuery = collections.namedtuple('SetQuery', 'action name value')
[docs]def parse_set_query(query):
"""
Parse a representation of set operations for attribute/value pairs into
(action, name, value) and return a list of ``SetQuery`` objects.
Computes left-to-right evaluation, where the first character indicates the
set operation:
+ "+" indicates a union
+ "-" indicates a difference
+ no marker indicates an intersection
For example::
>>> parse_set_query('+owner=team-networking')
[SetQuery(action='union', name='owner', value='team-networking')]
>>> parse_set_query('foo=bar')
[SetQuery(action='intersection', name='foo', value='bar')]
>>> parse_set_query('foo=bar -owner=team-networking')
[SetQuery(action='intersection', name='foo', value='bar'),
SetQuery(action='difference', name='owner', value='team-networking')]
:param query:
Set query string
"""
log.debug('Incoming query = %r' % (query,))
if not isinstance(query, basestring):
raise TypeError('Query must be a string.')
queries = shlex.split(query)
attributes = []
for q in queries:
if q.startswith('+'):
action = 'union'
q = q[1:]
elif q.startswith('-'):
action = 'difference'
q = q[1:]
else:
action = 'intersection'
name, _, value = q.partition('=')
attributes.append(SetQuery(action, name, value))
log.debug('Outgoing attributes = %r' % (attributes,))
return attributes
#: Configuration template emitted when a user runs ``nsot-server init``.
CONFIG_TEMPLATE = '''
"""
This configuration file is just Python code. You may override any global
defaults by specifying them here.
For more information on this file, see
https://docs.djangoproject.com/en/1.8/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.8/ref/settings/
"""
from nsot.conf.settings import *
import os.path
# Path where the config is found.
CONF_ROOT = os.path.dirname(__file__)
# A boolean that turns on/off debug mode. Never deploy a site into production
# with DEBUG turned on.
# Default: False
DEBUG = False
############
# Database #
############
# https://docs.djangoproject.com/en/dev/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(CONF_ROOT, 'nsot.sqlite3'),
'USER': 'nsot',
'PASSWORD': '',
'HOST': '',
'PORT': '',
}
}
###############
# Application #
###############
# The address on which the application will listen.
# Default: localhost
NSOT_HOST = 'localhost'
# The port on which the application will be accessed.
# Default: 8990
NSOT_PORT = 8990
# The number of gunicorn worker processes for handling requests.
# Default: 4
NSOT_NUM_WORKERS = 4
# Timeout in seconds before gunicorn workers are killed/restarted.
# Default: 30
NSOT_WORKER_TIMEOUT = 30
# If True, serve static files directly from the app.
# Default: True
SERVE_STATIC_FILES = True
############
# Security #
############
# A URL-safe base64-encoded 32-byte key. This must be kept secret. Anyone with
# this key is able to create and read messages. This key is used for
# encryption/decryption of sessions and auth tokens.
SECRET_KEY = %(secret_key)r
# Header to check for Authenticated Email. This is intended for use behind an
# authenticating reverse proxy.
USER_AUTH_HEADER = 'X-NSoT-Email'
# The age, in seconds, until an AuthToken granted by the API will expire.
# Default: 600
AUTH_TOKEN_EXPIRY = 600 # 10 minutes
# A list of strings representing the host/domain names that this Django site
# can serve. This is a security measure to prevent an attacker from poisoning
# caches and triggering password reset emails with links to malicious hosts by
# submitting requests with a fake HTTP Host header, which is possible even
# under many seemingly-safe web server configurations.
# https://docs.djangoproject.com/en/1.8/ref/settings/#allowed-hosts
ALLOWED_HOSTS = ['*']
'''
[docs]def generate_settings(config_template=None):
"""
Used to emit a generated configuration from ``config_template``.
:param config_template:
Config template
"""
if config_template is None:
config_template = CONFIG_TEMPLATE.strip()
secret_key = generate_secret_key()
return config_template % dict(secret_key=secret_key)
[docs]def initialize_app(config):
"""
Actions to be performed prior to creating the Application object.
:param config:
Config object
"""
# FIXME(jathan): Move this to a setting/parameter that can be toggled when
# gevent workers are not used.
USE_GEVENT = True # Hard-coding gevent for now.
if USE_GEVENT:
# TODO(jathan): We need to keep an eye on this. If we run into any race
# conditions w/ database updates, this should be the first place we
# look.
from django.db import connections
connections['default'].allow_thread_sharing = True
[docs]def main():
"""CLI application used to manage NSoT."""
run_app(
project='nsot',
default_config_path='~/.nsot/nsot.conf.py',
default_settings='nsot.conf.settings',
settings_initializer=generate_settings,
settings_envvar='NSOT_CONF',
initializer=initialize_app,
)
if __name__ == '__main__':
main()