#!-*-coding:utf8-*-
from __future__ import print_function
from __future__ import unicode_literals
try:
from builtins import str
from builtins import int
from builtins import object
except ImportError:
pass
import re
try:
basestring
except NameError:
basestring = str
try:
from types import SimpleNamespace
except: # Python 2
from argparse import Namespace as SimpleNamespace
from collections import (Set, OrderedDict, Mapping)
from .error import (MethodNotFound, InvalidParameter)
[docs]class Scanner(object):
"""Class for scanning a varlink interface definition."""
[docs] def __init__(self, string):
if hasattr(re, "ASCII"):
ASCII = re.ASCII
else:
ASCII = 0
self.whitespace = re.compile(r'([ \t\n]|#.*$)+', ASCII | re.MULTILINE)
self.docstring = re.compile(r'(?:.?)+#(.*)(?:\n|\r\n)')
# FIXME: nested ()
self.method_signature = re.compile(r'([ \t\n]|#.*$)*(\([^)]*\))([ \t\n]|#.*$)*->([ \t\n]|#.*$)*(\([^)]*\))',
ASCII | re.MULTILINE)
self.keyword_pattern = re.compile(r'\b[a-z]+\b|[:,(){}]|->|\[\]|\?|\[string\]\(\)|\[string\]', ASCII)
self.patterns = {
'interface-name': re.compile(r'[a-z]([-]*[a-z0-9])*([.][a-z0-9]([-]*[a-z0-9])*)+'),
'member-name': re.compile(r'\b[A-Z][A-Za-z0-9]*\b', ASCII),
'identifier': re.compile(r'\b[A-Za-z]([_]?[A-Za-z0-9])*\b', ASCII),
}
self.string = string
self.pos = 0
self.current_doc = ""
[docs] def get(self, expected):
m = self.whitespace.match(self.string, self.pos)
if m:
doc = self.docstring.findall(self.string[m.start():m.end()])
if len(doc):
try:
self.current_doc += "\n".join(doc)
except UnicodeError:
self.current_doc += "\n".join(
[el.decode("utf-8") for el in doc])
self.pos = m.end()
pattern = self.patterns.get(expected)
if pattern:
m = pattern.match(self.string, self.pos)
if m:
self.pos = m.end()
return m.group(0)
else:
m = self.keyword_pattern.match(self.string, self.pos)
if m and m.group(0) == expected:
self.pos = m.end()
return True
[docs] def expect(self, expected):
value = self.get(expected)
if not value:
raise SyntaxError("expected '{}'".format(expected))
return value
[docs] def end(self):
m = self.whitespace.match(self.string, self.pos)
if m:
doc = self.docstring.findall(self.string[m.start():m.end()])
if len(doc):
try:
self.current_doc += "\n".join(doc)
except UnicodeError:
self.current_doc += "\n".join(
[el.decode("utf-8") for el in doc])
self.pos = m.end()
return self.pos >= len(self.string)
[docs] def read_type(self, lastmaybe=False):
if self.get('?'):
if lastmaybe:
raise SyntaxError("double '??'")
return _Maybe(self.read_type(lastmaybe=True))
if self.get('[string]()'):
return set()
if self.get('[string]'):
return _Dict(self.read_type())
if self.get('[]'):
return _Array(self.read_type())
if self.get('object'):
return _Object()
if self.get('bool'):
t = bool()
elif self.get('int'):
t = int()
elif self.get('float'):
t = float()
elif self.get('string'):
t = str()
else:
name = self.get('member-name')
if name:
t = _CustomType(name)
else:
t = self.read_struct()
return t
[docs] def read_struct(self):
_isenum = None
self.expect('(')
fields = OrderedDict()
if not self.get(')'):
while True:
name = self.expect('identifier')
if _isenum == None:
if self.get(':'):
_isenum = False
fields[name] = self.read_type()
if not self.get(','):
break
continue
elif self.get(','):
_isenum = True
fields[name] = True
continue
else:
raise SyntaxError("after '{}'".format(name))
elif not _isenum:
try:
self.expect(':')
fields[name] = self.read_type()
except SyntaxError as e:
raise SyntaxError("after '{}': {}".format(name, e))
else:
fields[name] = True
if not self.get(','):
break
self.expect(')')
if _isenum:
return _Enum(fields.keys())
else:
return _Struct(fields)
[docs] def read_member(self):
if self.get('type'):
try:
_name = self.expect('member-name')
except SyntaxError:
m = self.whitespace.match(self.string, self.pos)
if m:
start = m.end()
else:
start = self.pos
m = self.whitespace.search(self.string, start)
if m:
stop = m.start()
else:
stop = start
raise SyntaxError("'{}' not a valid type name.".format(self.string[start:stop]))
try:
_type = self.read_type()
except SyntaxError as e:
raise SyntaxError("in '{}': {}".format(_name, e))
doc = self.current_doc
self.current_doc = ""
return _Alias(_name, _type, doc)
elif self.get('method'):
name = self.expect('member-name')
# FIXME
sig = self.method_signature.match(self.string, self.pos)
if sig:
sig = name + sig.group(0)
in_type = self.read_struct()
self.expect('->')
out_type = self.read_struct()
doc = self.current_doc
self.current_doc = ""
return _Method(name, in_type, out_type, sig, doc)
elif self.get('error'):
doc = self.current_doc
self.current_doc = ""
return _Error(self.expect('member-name'), self.read_type(), doc)
else:
raise SyntaxError('expected type, method, or error')
class _Object(object):
pass
class _Struct(object):
def __init__(self, fields):
self.fields = OrderedDict(fields)
class _Enum(object):
def __init__(self, fields):
self.fields = fields
class _Array(object):
def __init__(self, element_type):
self.element_type = element_type
class _Maybe(object):
def __init__(self, element_type):
self.element_type = element_type
class _Dict(object):
def __init__(self, element_type):
self.element_type = element_type
class _CustomType(object):
def __init__(self, name):
self.name = name
class _Alias(object):
def __init__(self, name, varlink_type, doc=None):
self.name = name
self.type = varlink_type
self.doc = doc
class _Method(object):
def __init__(self, name, in_type, out_type, _signature, doc=None):
self.name = name
self.in_type = in_type
self.out_type = out_type
self.signature = _signature
self.doc = doc
class _Error(object):
def __init__(self, name, varlink_type, doc=None):
self.name = name
self.type = varlink_type
self.doc = doc
[docs]class Interface(object):
"""Class for a parsed varlink interface definition."""
[docs] def __init__(self, description):
"""description -- description string in varlink interface definition language"""
self.description = description
scanner = Scanner(description)
scanner.expect('interface')
self.name = scanner.expect('interface-name')
self.doc = scanner.current_doc
scanner.current_doc = ""
self.members = OrderedDict()
while not scanner.end():
member = scanner.read_member()
self.members[member.name] = member
[docs] def get_description(self):
"""return the description string in varlink interface definition language"""
return self.description
[docs] def get_method(self, name):
method = self.members.get(name)
if method and isinstance(method, _Method):
return method
raise MethodNotFound(name)
[docs] def filter_params(self, parent_name, varlink_type, _namespaced, args, kwargs):
# print("filter_params", type(varlink_type), repr(varlink_type), args, kwargs, type(args))
if isinstance(varlink_type, _Maybe):
if args == None:
return None
return self.filter_params(parent_name, varlink_type.element_type, _namespaced, args, kwargs)
if isinstance(varlink_type, _Dict):
if args == None:
return {}
if isinstance(args, Mapping):
for (k, v) in args.items():
args[k] = self.filter_params(parent_name + '[' + k + ']', varlink_type.element_type, _namespaced, v,
None)
return args
else:
InvalidParameter(parent_name)
if isinstance(varlink_type, _CustomType):
# print("CustomType", varlink_type.name)
return self.filter_params(parent_name, self.members.get(varlink_type.name), _namespaced, args, kwargs)
if isinstance(varlink_type, _Alias):
# print("Alias", varlink_type.name)
return self.filter_params(parent_name, varlink_type.type, _namespaced, args, kwargs)
if isinstance(varlink_type, _Object):
return args
if isinstance(varlink_type, _Enum) and isinstance(args, str):
# print("Returned str:", args)
return args
if isinstance(varlink_type, _Array):
if args == None:
return []
return [self.filter_params(parent_name + '[]', varlink_type.element_type, _namespaced, x, None) for x in
args]
if isinstance(varlink_type, Set):
# print("Returned set:", set(args))
return set(args)
if isinstance(varlink_type, basestring) and isinstance(args, basestring):
return args
if isinstance(varlink_type, float) and (isinstance(args, float) or isinstance(args, int)):
# print("Returned float:", args)
return float(args)
if isinstance(varlink_type, bool) and isinstance(args, bool):
# print("Returned bool:", args)
return args
if isinstance(varlink_type, int) and (isinstance(args, float) or isinstance(args, int)):
# print("Returned int:", args)
if isinstance(args, float):
return int(args + 0.5)
return int(args)
if not isinstance(varlink_type, _Struct):
raise InvalidParameter(parent_name)
# SyntaxError("Expected type %s, got %s with value '%s'" % (type(varlink_type), type(args),
# args))
if _namespaced:
out = SimpleNamespace()
else:
out = {}
varlink_struct = None
if not isinstance(args, tuple):
varlink_struct = args
args = None
for name in varlink_type.fields:
if isinstance(args, tuple):
if args:
val = args[0]
if len(args) > 1:
args = args[1:]
else:
args = None
ret = self.filter_params(parent_name + "." + name, varlink_type.fields[name], _namespaced, val,
None)
if ret != None:
# print("SetOUT:", name)
if _namespaced:
setattr(out, name, ret)
else:
out[name] = ret
continue
else:
if name in kwargs:
ret = self.filter_params(parent_name + "." + name, varlink_type.fields[name], _namespaced,
kwargs[name], None)
if ret != None:
# print("SetOUT:", name)
if _namespaced:
setattr(out, name, ret)
else:
out[name] = ret
continue
if varlink_struct:
if isinstance(varlink_struct, Mapping):
if name not in varlink_struct:
continue
val = varlink_struct[name]
ret = self.filter_params(parent_name + "." + name, varlink_type.fields[name], _namespaced, val,
None)
if ret != None:
# print("SetOUT:", name)
if _namespaced:
setattr(out, name, ret)
else:
out[name] = ret
elif hasattr(varlink_struct, name):
val = getattr(varlink_struct, name)
ret = self.filter_params(parent_name + "." + name, varlink_type.fields[name], _namespaced, val,
None)
if ret != None:
# print("SetOUT:", name)
if _namespaced:
setattr(out, name, ret)
else:
out[name] = ret
else:
continue
return out