# Copyright (c) 2011, Karl Gyllstrom
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing
# official policies, either expressed or implied, of the FreeBSD Project.
from __future__ import print_function
import os
import operator
from functools import partial, wraps
from itertools import starmap, permutations
import sys
if sys.version_info[0] == 3:
iterkeys = lambda x: x.keys()
iteritems = lambda x: x.items()
isstring = lambda x: isinstance(x, str)
from urllib.parse import urlparse
xrange = range
import configparser as cpars
else:
iterkeys = lambda x: x.iterkeys()
iteritems = lambda x: x.iteritems()
isstring = lambda x: isinstance(x, basestring)
from urlparse import urlparse
import ConfigParser as cpars
#iterkeys
#def iterkeys(
def _ConfigCaster(f):
cfp = cpars.ConfigParser()
cfp.readfp(open(f))
for sec in cfp.sections():
for key, value in cfp.items(sec):
yield key, value
def _RangeCaster(value):
def raise_error():
raise FormatError(('%s is not range format: N:N+i, N-N+i, or N'
+ ' N+i') % value)
splitter = None
for char in (' :-'):
if char in value:
splitter = char
break
if splitter:
toks = value.split(splitter)
else:
toks = [value]
if not (1 <= len(toks) <= 3):
raise_error()
try:
return xrange(*[int(y) for y in toks])
except ValueError:
raise_error()
class _DirectoryOpenerCaster(object):
def __init__(self, create):
self._create = create
def __call__(self, name):
if not os.path.exists(name):
if self._create:
os.makedirs(name)
return name
raise IOError('%s does not exist' % name)
if not os.path.isdir(name):
raise IOError('%s is not directory' % name)
return name
def _FileOpenerCaster(mode=None, buffering=None):
kw = {}
if mode is not None:
kw['mode'] = mode
if buffering is not None:
kw['buffering'] = buffering
return partial(open, **kw)
# ---------- decorators ---------- #
def _names_to_options(f):
@wraps(f)
def inner(*args, **kwargs):
new_args = []
for arg in args[1:]:
if isstring(arg):
arg = args[0]._options[arg]
new_args.append(arg)
return f(*[args[0]] + new_args, **kwargs)
return inner
def _options_to_names(f):
''' Convert any :class:`Option`s to names. '''
@wraps(f)
def inner(*args, **kwargs):
new_args = []
for arg in args[1:]:
if isinstance(arg, Option) and not isinstance(arg, Group):
arg = arg.argname
new_args.append(arg)
return f(*[args[0]] + new_args, **kwargs)
return inner
def _verify_args_exist(f):
@wraps(f)
def inner(*args, **kwargs):
def raise_error(name):
raise ValueError('%s not known' % name)
self = args[0]
for arg in args[1:]:
if isstring(arg) and arg not in self._readers:
raise_error(arg)
return f(*args, **kwargs)
return inner
def _localize_all(f):
@wraps(f)
def inner(*args, **kwargs):
args = list(args)
self = args[0]
new_args = []
for arg in args[1:]:
if isstring(arg):
arg = self._localize(arg)
new_args.append(arg)
args[1:] = new_args
return f(*args, **kwargs)
return inner
def localize(f):
@wraps(f)
def inner(*args, **kwargs):
args = list(args)
self = args[0]
# args[1:] = [self._localize(x) for x in args[1]]
args[1] = self._localize(args[1])
return f(*args, **kwargs)
return inner
# ---------- end decorators ---------- #
# ---------- exceptions ---------- #
[docs]class ArgumentError(ValueError):
''' Root class of all arguments that are thrown due to user input which
violates a rule set by the parser. In other words, errors of this type
should be caught and communicated to the user some how. The default
behavior is to signal the particular error and show the `usage`.'''
pass
class ConditionError(ArgumentError):
''' Condition not met. '''
def __init__(self, argname, condition):
self.argname = argname
self.condition = condition
def __str__(self):
return '%s required unless %s' % (self.argname, self.condition)
[docs]class MissingRequiredArgumentError(ArgumentError):
''' Required argument not specified. '''
def __init__(self, arg):
if isinstance(arg, Option):
arg = arg.argname
super(MissingRequiredArgumentError, self).__init__(
'No value passed for %s' % arg)
[docs]class ManyAllowedNoneSpecifiedArgumentError(ArgumentError):
''' An argument out of a list of required is missing.
e.g., one of -a and -b is required and neither is specified. '''
def __init__(self, allowed):
super(ManyAllowedNoneSpecifiedArgumentError, self).__init__(('[%s] not'
+ ' specified') % ', '.join(sorted(map(str, allowed))))
[docs]class UnspecifiedArgumentError(ArgumentError):
''' User supplies argument that isn't specified. '''
def __init__(self, arg):
super(UnspecifiedArgumentError, self).__init__('illegal option %s' % arg)
[docs]class MultipleSpecifiedArgumentError(ArgumentError):
''' Multiple of the same argument specified. '''
pass
[docs]class DependencyError(ArgumentError):
''' User specified an argument that requires another, unspecified argument.
'''
def __init__(self, arg1, arg2):
super(DependencyError, self).__init__('%s requires %s' % (arg1, arg2))
[docs]class ConflictError(ArgumentError):
''' User specified an argument that conflicts another specified argument.
'''
def __init__(self, offender1, offender2):
super(ConflictError, self).__init__('%s conflicts with %s' %
(offender1, offender2))
class FailedConditionError(ArgumentError):
''' Condition failed. '''
pass
class MissingValueError(ArgumentError):
''' Argument flag specified without value. '''
pass
class InvalidEnumValueError(ArgumentError):
''' Enum value provided not allowed. '''
pass
# ---------- end exceptions ---------- #
class Condition(object):
def __init__(self):
self._other_conditions = []
self._and = True
self._neg = False
def copy(self):
c = self.__new__(self.__class__)
c._other_conditions = self._other_conditions[:]
c._and = self._and
c._neg = self._neg
return c
def __neg__(self):
c = self.copy()
c._neg = True
return c
def _inner_satisfied(self, parsed):
raise NotImplementedError
def and_(self, condition):
if not self._and:
raise ValueError('and/or both specified')
c = self.copy()
c._other_conditions.append(condition)
return c
def or_(self, condition):
c = self.copy()
c._other_conditions.append(condition)
c._and = False
return c
def _is_satisfied(self, parsed):
def _inner():
for n in self._other_conditions:
if not n._is_satisfied(parsed):
if self._and:
return False
elif not self._and:
return True
return self._inner_satisfied(parsed)
result = _inner()
if self._neg:
result = not result
return result
class CallableCondition(Condition):
def __init__(self, call, main, other):
super(CallableCondition, self).__init__()
self._call = call
self._main = main
self._other = other
def copy(self):
c = super(CallableCondition, self).copy()
c._call = self._call
c._main = self._main
c._other = self._other
return c
def _inner_satisfied(self, parsed):
newargs = []
for arg in (self._main, self._other):
if isinstance(arg, Option):
newargs.append(parsed.get(arg.argname))
else:
newargs.append(arg)
return self._call(*newargs)
def __repr__(self):
x = self._main.argname
names = {operator.le: '<=',
operator.lt: '<',
operator.gt: '>',
operator.ge: '>=',
operator.eq: '==',
operator.ne: '!='}
x += ' ' + names[self._call] + ' ' + str(self._other)
return x
[docs]class Option(Condition):
def __init__(self, argname, parser):
''' Do not construct directly, as it will not be tethered to a
:class:`Parser` object and thereby will not be handled in argument
parsing. '''
super(Option, self).__init__()
self.argname = argname
self._parser = parser
self._conditions = []
def copy(self):
c = super(Option, self).copy()
c.argname = self.argname
c._parser = self._parser
c._conditions = self._conditions
return c
[docs] def requires(self, *conditions):
''' Specifiy other options/conditions which this argument requires.
:param conditions: required conditions
:type others: sequence of either :class:`Option` or :class:`Condition`s
'''
[self._parser._set_requires(self.argname, x) for x in conditions]
return self
def condition(self, func):
self._conditions.append(func)
return self
[docs] def conflicts(self, *conditions):
''' Specifiy other conditions which this argument conflicts with.
:param conditions: conflicting options/conditions
:type conditions: sequence of either :class:`Option` or :class:`Condition`
'''
[self._parser._set_conflicts(self.argname, x) for x in conditions]
return self
def __str__(self):
v = '%s%s' % (self._parser._double_prefix, self.argname)
alias = self._alias()
if alias:
v += '/%s%s' % (self._parser._single_prefix, alias)
return v
[docs] def shorthand(self, alias):
''' Set shorthand for this argument. Shorthand arguments are 1
character in length and are prefixed by a single '-'. For example:
>>> parser.str('option').shorthand('o')
would cause '--option' and '-o' to be alias argument labels when
invoked on the command line.
:param alias: alias of argument
'''
self._parser._add_shorthand(self.argname, alias)
return self
[docs] def default(self, value):
''' Provide a default value for this argument. '''
self._parser._set_default(self.argname, value)
return self
[docs] def environment(self):
''' Pull argument value from OS environment if unspecified. The case of
the argument name, all lower, and all upper are all tried. For example,
if the argument name is `Port`, the following names will be used for
environment lookups: `Port`, `port`, `PORT`.
>>> with Parser(locals()) as p:
... p.int('port').environment()
Both command lines work:
::
python test.py --port 5000
export PORT=5000; python test.py
'''
default = os.environ.get(self.argname)
if default is None:
default = os.environ.get(self.argname.lower())
if default is None:
default = os.environ.get(self.argname.upper())
if default is not None:
return self.default(default)
return self
def name(self):
return self.argname
[docs] def cast(self, cast):
''' Provide a casting value for this argument. '''
self._parser._casts[self.argname] = cast
return self
[docs] def required(self):
''' Indicate that this argument is required. '''
self._parser._set_required(self.argname)
return self
[docs] def if_(self, condition):
''' Argument is required if ``conditions``. '''
self._parser._set_required(self.argname, [-condition])
return self
[docs] def unless(self, condition):
''' Argument is required unless ``conditions``. '''
self._parser._set_required(self.argname, [condition])
return self
[docs] def unspecified_default(self):
''' Indicate that values passed without argument labels will be
attributed to this argument. '''
self._parser._set_unspecified_default(self.argname)
return self
[docs] def multiple(self):
''' Indicate that the argument can be specified multiple times. '''
self._parser._set_multiple(self.argname)
return self
# --- conditions
def _inner_satisfied(self, parsed):
v = parsed.get(self.argname)
return v is not None and v != False
def _make_condition(self, func, other):
return CallableCondition(func, self, other)
def __le__(self, other):
return self._make_condition(operator.__le__, other)
def __lt__(self, other):
return self._make_condition(operator.__lt__, other)
def __gt__(self, other):
return self._make_condition(operator.__gt__, other)
def __ge__(self, other):
return self._make_condition(operator.__ge__, other)
def __eq__(self, other):
return self._make_condition(operator.__eq__, other)
def __ne__(self, other):
return self._make_condition(operator.__ne__, other)
def __hash__(self):
return hash(str(self.argname))
# -- private access methods
def _cast(self):
return self._parser._casts.get(self.argname)
def _isrequired(self):
return self in self._parser._required
def _getconflicts(self):
return self._parser._conflicts.get(self, [])
def _getreqs(self):
return self._parser._requires.get(self, [])
def _alias(self):
return self._parser._source_to_alias.get(self.argname)
# ---------- Argument readers ---------- #
# Argument readers help parse the command line. The flow is as follows:
#
# 1) We create a reader for each argument based on the type specified by the
# programmer (e.g., a bool will get a _FlagArgumentReader
#
# 2) The reader is used when we hit the argument label
class _ArgumentReader(object):
class _UNSPECIFIED(object):
def __repr__(self):
return 'UNSPECIFIED'
UNSPECIFIED = _UNSPECIFIED()
def __init__(self, parent):
self.value = _ArgumentReader.UNSPECIFIED
self.parent = parent
self._init()
def consume_or_skip(self, arg):
raise NotImplementedError()
def _init(self):
raise NotImplementedError()
@classmethod
def default(cls):
return _ArgumentReader.UNSPECIFIED
class _MultiWordArgumentReader(_ArgumentReader):
def _init(self):
self.value = []
def consume_or_skip(self, arg):
if self.parent._is_argument_label(arg):
return False
self.value.append(arg)
return True
def get(self):
if len(self.value) == 0:
raise MissingValueError
return ' '.join(self.value)
class _FlagArgumentReader(_ArgumentReader):
def _init(self):
self.value = True
def consume_or_skip(self, arg):
return False
def get(self):
return self.value
@classmethod
def default(cls):
return False
class _SingleWordReader(_ArgumentReader):
def _init(self):
pass
def consume_or_skip(self, arg):
if self.value is not _ArgumentReader.UNSPECIFIED:
return False
self.value = arg
return True
def get(self):
if self.value is _ArgumentReader.UNSPECIFIED:
raise MissingValueError
return self.value
# ---------- Argument readers ---------- #
class Group(Option):
def __init__(self, parser, *names):
self._parser = parser
# super(Group, self).__init__('group', parser)
self._names = names
self.argname = self
def default(self, name):
if name not in self._names:
raise ValueError('%s not in group' % name)
self._default = name
return self
def _is_satisfied(self, parsed):
for item in self._names:
if item._is_satisfied(parsed):
return True
return False
def __str__(self):
return ', '.join([str(name) for name in self._names])
class FakeSystemExit(ValueError):
# Used for testing
pass
[docs]class Parser(object):
''' Command line parser. '''
def __init__(self, store=None, default_help=True):
self._options = {}
self._readers = {}
self._option_labels = {}
self._multiple = set()
self._casts = {}
self._defaults = {}
self._extras = []
self._unspecified_default = None
self.require_n = {}
# dict of A (required) -> args that could replace A if A is not
# specified
self._required = {}
# dict of A -> args that A depends on
self._requires = {}
# dict of A -> args that A conflicts with
self._conflicts = {}
self._alias = {}
self._source_to_alias = {}
# convert arg labels to understore in value dict
self._to_underscore = False
# prefix to shorthand args
self._single_prefix = '-'
# prefix to fullname args
self._double_prefix = '--'
# help message
self._help_prefix = None
self._suppress_sys_exit = False
self._out = sys.stdout
# set by user
self._init_user_set(store)
if default_help:
self.flag('help').shorthand('h')
def _init_user_set(self, store=None):
if store is None:
store = {}
self._store = store
self._namemaps = {}
self._rnamemaps = {}
[docs] def set_help_prefix(self, message):
''' Indicate text to appear before argument list when the ``help``
function is triggered. '''
self._help_prefix = message
return self
[docs] def underscore(self):
''' Convert '-' to '_' in argument names. This is enabled if
``with_locals`` is used, as variable naming rules are applied. '''
self._to_underscore = True
return self
[docs] def set_single_prefix(self, flag):
''' Set the single flag prefix. This appears before short arguments
(e.g., -a). '''
if self._double_prefix in flag:
raise ValueError('single_prefix cannot be superset of double_prefix')
self._single_prefix = flag
return self
[docs] def set_double_prefix(self, flag):
''' Set the double flag prefix. This appears before long arguments
(e.g., --arg). '''
if flag in self._single_prefix:
raise ValueError('single_flag cannot be superset of double_flag')
self._double_prefix = flag
return self
def use_aliases(self):
raise NotImplementedError
@classmethod
[docs] def with_locals(cls):
''' Create :class:`Parser` using locals() dict. '''
import inspect
vals = inspect.currentframe().f_back.f_locals
return Parser(vals).underscore()
# --- types --- #
[docs] def config(self, name):
''' Add configuration file, whose key/value pairs will provide/replace
any arguments created for this parser. For example:
>>> with Parser() as p:
... p.int('a')
... p.str('b')
... p.config('conf')
Now, arg ``a`` can be specfied on the command line, or in the
configuration file passed to ``conf``. For example:
::
python test.py --a 3
python test.py --conf myconfig.cfg
Where myconfig.cfg:
::
[myconfigfile]
a = 5
b = 9
x = 'hello'
Note that any parameters in the config that aren't created as
arguments via this parser are ignored. In the example above, the
values of variables ``a`` and ``b`` would be assigned, while ``x``
would be ignored (as the developer did not create an ``x``
argument).
If anything is specified on the command line, its value is not
taken from the config file. For example:
::
python test.py --a 3 --config myconfig.cfg
In this case, the value of ``a`` is 3 (from the command line) and
not 5 (from the config file).
'''
return self.str(name).cast(_ConfigCaster)
[docs] def int(self, name):
''' Add integer argument. '''
return self._add_option(name).cast(int)
[docs] def float(self, name):
''' Add float argument. '''
return self._add_option(name).cast(float)
# def enum(self, name, values):
# ''' Add enum type. '''
#
# def inner(value):
# if not value in values:
# raise InvalidEnumValueError()
# return value
#
# return self._add_option(name).cast(inner)
[docs] def str(self, name):
''' Add :py:class:`str` argument. '''
return self._add_option(name)
[docs] def range(self, name):
''' Range type. Accepts similar values to that of python's py:`range`
and py:`xrange`. Accepted delimiters are space, -, and :.
>>> with Parser() as p:
... p.range('values')
Now accepts:
::
python test.py --values 10 # -> xrange(10)
python test.py --values 0-1 # -> xrange(0, 1)
python test.py --values 0:10:2 # -> xrange(0, 10, 2)
python test.py --values 0 10 3 # -> xrange(0, 10, 3)
'''
result = self._add_option(name).cast(_RangeCaster)
self._set_reader(name, _MultiWordArgumentReader)
return result
[docs] def multiword(self, name):
''' Accepts multiple terms as an argument. For example:
>>> with Parser() as p:
... p.multiword('multi')
Now accepts:
::
python test.py --multi path to something
'''
result = self._add_option(name)
self._set_reader(name, _MultiWordArgumentReader)
return result
[docs] def bool(self, name):
''' Alias of :func:`flag`. '''
return self.flag(name)
[docs] def flag(self, name):
''' Boolean value. The presence of this flag indicates a true value,
while an absence indicates false. No arguments. '''
result = self._add_option(name)
self._set_reader(name, _FlagArgumentReader)
return result
[docs] def file(self, name, mode=None, buffering=None):
''' Opens the file indicated by the name passed by the user. ``mode``
and ``buffering`` are arguments passed to ``open``.
The example below implements a file copy operation:
>>> with Parser(locals()) as p:
... p.file('input_file')
... p.file('output_file', mode='w')
...
... output_file.write(input_file.read())
'''
return self.multiword(name).cast(_FileOpenerCaster(mode, buffering))
[docs] def directory(self, name, create=False):
''' File directory value. Checks to ensure that the user passed file
name exists and is a directory (i.e., not some other file object). If
``create`` is specified, creates the directory using ``os.makedirs``;
any intermediate directories are also created. '''
return self.multiword(name).cast(_DirectoryOpenerCaster(create))
[docs] def url(self, name):
''' URL value; verifies that argument has a scheme (e.g., http, ftp,
file). '''
def parse(value):
if urlparse(value).scheme == '':
raise FormatError('%s not valid URL' % value)
return value
self.str(name).cast(parse)
# --- aggregate calls --- #
[docs] def at_least_one(self, *args):
''' Require at least one of ``args``. '''
return self._require_at_least_one(*args)
[docs] def require_one(self, *args):
''' Require only and only one of ``args``. '''
self._set_one_required(*args)
return Group(self, *args)
[docs] def all_if_any(self, *args):
''' If *any* of ``args`` is specified, then all of ``args`` must be
specified. '''
list(starmap(self._set_requires, permutations(args, 2)))
return Group(self, *args)
[docs] def only_one_if_any(self, *args):
''' If *any* of ``args`` is specified, then none of the remaining
``args`` may be specified.'''
list(starmap(self._set_conflicts, permutations(args, 2)))
return Group(self, *args)
def __getitem__(self, name):
return Option(name, self)
# --- private --- #
def _require_at_least_one(self, *names):
''' At least one of the arguments is required. '''
s = set(names)
for v in s:
self._set_required(v, s - set([v]))
return Group(self, *names)
@localize
def _set_unspecified_default(self, name):
if self._unspecified_default is not None:
raise ValueError('Trying to specify multiple unspecified defaults')
self._unspecified_default = name
@localize
@_verify_args_exist
@_names_to_options
def _set_required(self, arg, replacements=None):
newreplacements = []
for item in replacements or []:
if not isinstance(item, (Option, Condition)):
item = self._options[item]
newreplacements.append(item)
self._required.setdefault(arg, []).extend(newreplacements)
@localize
def _set_reader(self, name, option):
self._readers[name] = option
def _add_shorthand(self, source, alias):
if source not in self._readers:
raise ValueError('%s not an option' % source)
if alias in self._alias:
raise ValueError('{0} already shorthand for {1}'.format(alias,
self._alias[alias]))
self._alias[alias] = source
self._source_to_alias[source] = alias
def _add_option(self, name, argument_label=None):
name = self._localize(name)
if name in self._readers:
raise ValueError('multiple types specified for %s' % name)
self._readers[name] = _SingleWordReader
if argument_label is not None:
self._option_labels[name] = argument_label
o = Option(name, self)
self._options[name] = o
return o
def _getoption(self, option):
o = self._readers.get(option, None)
if o is not None:
return option, o
source = self._alias.get(option, None)
if source is None:
return option, None
return source, self._readers.get(source, None)
def _localize(self, key):
if self._to_underscore:
modified = key.replace('-', '_')
self._rnamemaps.setdefault(modified, key)
return self._namemaps.setdefault(key, modified)
return key
def _unlocalize(self, key):
v = self._rnamemaps.get(key, None)
if v is None:
# already unlocalized(?)
return key
return v
def _tokenize(self, args):
new_args = []
for arg in args:
if '=' in arg:
new_args += arg.split('=')
else:
new_args.append(arg)
return new_args
def _is_argument_label(self, arg):
return (arg.startswith(self._single_prefix) or
arg.startswith(self._double_prefix))
def _parse(self, tokenized):
argument_value = None
parsed = {}
for arg in tokenized:
if argument_value is not None:
if argument_value.consume_or_skip(arg):
continue
argument_value = None
argument_name = None
if self._is_argument_label(arg):
is_full = arg.startswith(self._double_prefix)
if is_full:
prefix = self._double_prefix
else:
prefix = self._single_prefix
arg = arg[len(prefix):]
argument_name = self._localize(arg)
if is_full:
argument_name = self._options.get(argument_name).argname
argument_value = self._readers.get(argument_name)
else:
argument_name = self._options.get(self._alias.get(argument_name)).argname
argument_value = self._readers.get(argument_name)
if argument_value is None:
raise UnspecifiedArgumentError(argument_name)
argument_value = argument_value(self)
elif self._unspecified_default is not None:
argument_name = self._unspecified_default
# push value onto _SingleWordReader
argument_value = _SingleWordReader(self)
argument_value.consume_or_skip(arg)
if argument_name:
parsed.setdefault(argument_name,
[]).append(argument_value)
else:
self._extras.append(arg)
return parsed
def _help_if_necessary(self, processed):
if 'help' in processed:
self.print_help()
sys.exit(0)
def _assign_defaults(self, parsed):
assigned = {}
for key, value in iteritems(self._readers):
if key in parsed:
continue
cast = self._casts.get(key)
cast = cast if cast else lambda x: x
if key in self._defaults:
value = self._defaults[key]
else:
value = value.default()
assigned[key] = value
return assigned
def _assign_configs(self, parsed, valid_args):
argvalues = {}
for key, values in iteritems(parsed):
cast = self._casts.get(key)
if cast and cast == _ConfigCaster:
value = values[0].get()
for k, v in cast(value):
if k in valid_args:
argvalues[k] = v
return argvalues
def _assign(self, parsed):
argvalues = {}
argvalues.update(self._assign_defaults(parsed))
argvalues.update(self._assign_configs(parsed, set(argvalues.keys())))
for key, values in iteritems(parsed):
try:
if key not in self._multiple:
value = values[0].get()
else:
value = [v.get() for v in values]
if value is _ArgumentReader.UNSPECIFIED:
continue
argvalues[key] = value
except MissingValueError:
raise MissingValueError('%s specified but missing given value'
% key)
assigned = {}
for key, value in iteritems(argvalues):
cast = self._casts.get(key)
if value is _ArgumentReader.UNSPECIFIED:
value = None
elif cast:
if cast == _ConfigCaster:
continue
try:
if isinstance(value, list):
newv = []
values = value
for value in values:
newv.append(cast(value))
value = newv
else:
value = cast(value)
except FormatError:
raise
except ValueError as e:
raise FormatError('Cannot cast \'%s\' to %s' % (value, cast))
assigned[key] = value
return assigned
def _check_multiple(self, assigned):
for key, values in iteritems(assigned):
if len(values) > 1 and key not in self._multiple:
raise MultipleSpecifiedArgumentError(('%s specified multiple' +
' times') % self._options[key])
def _check_conditions(self, assigned):
for arg in iterkeys(assigned):
for cond in self._options[arg]._conditions:
if not cond(assigned):
raise FailedConditionError()
def _check_required(self, assigned):
for arg, replacements in iteritems(self._required):
missing = []
if not arg._is_satisfied(assigned):
for v in replacements:
if isinstance(v, Group):
missing += v._names
elif not isinstance(v, Condition):
missing.append(v)
if v._is_satisfied(assigned):
break
else:
if missing:
raise ManyAllowedNoneSpecifiedArgumentError([arg] +
missing)
else:
raise MissingRequiredArgumentError(arg)
def _check_dependencies(self, assigned):
for arg, deps in iteritems(self._requires):
if arg._is_satisfied(assigned):
for v in deps:
if not v._is_satisfied(assigned):
if isinstance(v, CallableCondition):
raise ConditionError(arg.argname, v)
raise DependencyError(arg, v)
def _check_conflicts(self, assigned):
for arg, conflicts in iteritems(self._conflicts):
if arg._is_satisfied(assigned):
for conflict in conflicts:
if conflict._is_satisfied(assigned):
raise ConflictError(arg.argname, conflict.argname)
def _verify(self, assigned):
self._check_conditions(assigned)
self._check_required(assigned)
self._check_dependencies(assigned)
self._check_conflicts(assigned)
def _assign_to_store(self, assigned):
for key, value in iteritems(assigned):
self._store[key] = value
@_options_to_names
def _set_one_required(self, *names):
self.only_one_if_any(*names)
self._require_at_least_one(*names)
def _get_args(self, args):
if args is None:
args = sys.argv[1:]
if not isinstance(args, list):
raise TypeError('%s not list' % args)
return args
def _process_command_line(self, args=None):
try:
args = self._get_args(args)
tokenized = self._tokenize(args)
parsed = self._parse(tokenized)
self._help_if_necessary(parsed)
self._check_multiple(parsed)
assigned = self._assign(parsed)
self._verify(assigned)
self._assign_to_store(assigned)
except ArgumentError as e:
raise e
# self._init_user_set() # reset
return self._store
def emit(self, *args):
print(*args, file=self._out)
def process_command_line(self, args=None):
try:
return self._process_command_line(args)
except ArgumentError as e:
self.bail(e)
def bail(self, e):
msg = []
msg.append('Error: ' + str(e))
msg.append('usage: %s' % sys.argv[0])
msg.append(' '.join('[%s]' % self._label(value) for value in self._options.values()))
self.emit('\n'.join(msg))
if self._suppress_sys_exit:
raise FakeSystemExit
sys.exit(1)
@localize
@_options_to_names
def _set_multiple(self, name):
self._multiple.add(name)
@localize
@_options_to_names
def _set_default(self, name, value):
self._defaults[name] = value
@_localize_all
@_verify_args_exist
@_names_to_options
def _set_requires(self, a, b):
self._requires.setdefault(a, []).append(b)
@_localize_all
@_verify_args_exist
@_options_to_names
def set_requires_n_of(self, a, n, *others):
self.require_n[a] = (n, others)
@_localize_all
@_verify_args_exist
@_names_to_options
def _set_conflicts(self, a, b):
self._conflicts.setdefault(a, []).append(b)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None or isinstance(exc_type, ArgumentError):
self.process_command_line()
return True
return False
def _option_label(self, name, opt):
label = self._option_labels.get(name)
if label is not None:
return label
if opt._cast() is int:
return 'int'
if opt._cast() is float:
return 'float'
if opt._cast() is float:
return 'float'
if opt._cast() is _RangeCaster:
return 'range'
return 'option'
def _label(self, opt):
pkey = str(opt)
reader = self._readers[opt.argname]
if reader != _FlagArgumentReader:
pkey = '%s <%s>' % (pkey, self._option_label(opt.argname, opt))
return pkey
def print_help(self):
if self._help_prefix:
self.emit(self._help_prefix)
self.emit('Arguments: (! denotes required argument)')
column_width = -1
for key, value in iteritems(self._options):
column_width = max(column_width, len(self._label(value)))
fmt = ' %-' + str(column_width) + 's'
for key, value in iteritems(self._options):
msg = fmt % self._label(value)
if value._isrequired():
msg = '!' + msg[1:]
conflicts = value._getconflicts()
if conflicts:
msg += ' (Conflicts with %s)' % ', '.join(str(item) for item in conflicts)
reqs = value._getreqs()
if reqs:
msg += ' (Requires %s)' % ', '.join(str(item) for item in reqs)
self.emit(msg)
__all__ = ['Parser']
__version__ = '0.2.22a'