Source code for varlink.client

# coding=utf-8

from __future__ import print_function
from __future__ import unicode_literals

import json
import os
import shutil
import signal
import socket
import sys
import tempfile
import subprocess
import threading

try:
    from builtins import next
    from builtins import object
    from builtins import open
    from builtins import str
except ImportError:
    pass

from .error import (VarlinkError, InterfaceNotFound, VarlinkEncoder, BrokenPipeError)
from .scanner import (Interface, _Method)

PY2 = sys.version_info[0] == 2
PY3 = (sys.version_info[0] >= 3)

if PY2:
    FileNotFoundError = IOError
    ChildProcessError = OSError


class ConnectionError(OSError):
    pass


[docs]class ClientInterfaceHandler(object): """Base class for varlink client, which wraps varlink methods of an interface to the class"""
[docs] def __init__(self, interface, namespaced=False): """Base class for varlink client, which wraps varlink methods of an interface. The object allows to talk to a varlink service, which implements the specified interface transparently by calling the methods. The call blocks until enough messages are received. For monitor calls with '_more=True' a generator object is returned. :param interface: an Interface object :param namespaced: if True, varlink methods return SimpleNamespace objects instead of dictionaries """ if not isinstance(interface, Interface): raise TypeError self._interface = interface self._namespaced = namespaced self._in_use = False for member in interface.members.values(): if isinstance(member, _Method): self._add_method(member)
[docs] def close(self): """To be implemented.""" raise NotImplementedError
def _send_message(self, out): """To be implemented. This should send a varlink message to the varlink service adding a trailing zero byte. """ raise NotImplementedError def _next_message(self): """To be implemented. This must be a generator yielding the next received varlink message without the trailing zero byte. """ raise NotImplementedError def _add_method(self, method): def _wrapped(*args, **kwargs): if "_more" in kwargs and kwargs.pop("_more"): return self._call_more(method.name, *args, **kwargs) else: return self._call(method.name, *args, **kwargs) if sys.version_info.major >= 3: _wrapped.__name__ = method.name else: _wrapped.__name__ = method.name.encode("latin-1") # FIXME: add comments if method.signature: if method.doc: _wrapped.__doc__ = method.doc + "\n" else: _wrapped.__doc__ = "" "\n" _wrapped.__doc__ += method.signature setattr(self, method.name, _wrapped) def _next_varlink_message(self): message = next(self._next_message()) message = json.loads(message) if not 'parameters' in message: message['parameters'] = {} if 'error' in message and message["error"] != None: self._in_use = False e = VarlinkError.new(message, self._namespaced) raise e else: return message['parameters'], ('continues' in message) and message['continues'] def _call(self, method_name, *args, **kwargs): if self._in_use: raise ConnectionError("Tried to call a varlink method, while other call still in progress") oneway = False if "_oneway" in kwargs and kwargs.pop("_oneway"): oneway = True method = self._interface.get_method(method_name) parameters = self._interface.filter_params("client.call", method.in_type, False, args, kwargs) out = {'method': self._interface.name + "." + method_name} if oneway: out['oneway'] = True if parameters: out['parameters'] = parameters self._send_message(json.dumps(out, cls=VarlinkEncoder).encode('utf-8')) if oneway: return None self._in_use = True (message, more) = self._next_varlink_message() if more: self.close() self._in_use = False raise ConnectionError("Server indicated more varlink messages") self._in_use = False if message: message = self._interface.filter_params("client.reply", method.out_type, self._namespaced, message, None) return message def _call_more(self, method_name, *args, **kwargs): if self._in_use: raise ConnectionError("Tried to call a varlink method, while other call still in progress") method = self._interface.get_method(method_name) parameters = self._interface.filter_params("client.call", method.in_type, False, args, kwargs) out = {'method': self._interface.name + "." + method_name, 'more': True, 'parameters': parameters} self._send_message(json.dumps(out, cls=VarlinkEncoder).encode('utf-8')) more = True self._in_use = True while more: (message, more) = self._next_varlink_message() if message: message = self._interface.filter_params("client.reply", method.out_type, self._namespaced, message, None) yield message self._in_use = False
[docs]class SimpleClientInterfaceHandler(ClientInterfaceHandler): """A varlink client for an interface doing send/write and receive/read on a socket or file stream"""
[docs] def __init__(self, interface, file_or_socket, namespaced=False): """Creates an object with the varlink methods of an interface installed. The object allows to talk to a varlink service, which implements the specified interface transparently by calling the methods. The call blocks until enough messages are received. For monitor calls with '_more=True' a generator object is returned. :param interface: an Interface object :param file_or_socket: an open socket or io stream :param namespaced: if True, varlink methods return SimpleNamespace objects instead of dictionaries """ ClientInterfaceHandler.__init__(self, interface, namespaced=namespaced) self._connection = file_or_socket if hasattr(self._connection, 'send_bytes'): self._send_bytes = True self._sendall = False elif hasattr(self._connection, 'sendall'): self._send_bytes = False self._sendall = True else: if not hasattr(self._connection, 'write'): raise TypeError self._sendall = False self._send_bytes = False if hasattr(self._connection, 'recv_bytes'): self._recv_bytes = True self._recv = False elif hasattr(self._connection, 'recv'): self._recv = True self._recv_bytes = False else: if not hasattr(self._connection, 'read'): raise TypeError self._recv = False self._recv_bytes = False self._in_buffer = b''
def __enter__(self): return self def __exit__(self, _type, _value, _traceback): self.close()
[docs] def close(self): try: if hasattr(self._connection, 'shutdown'): self._connection.shutdown(socket.SHUT_RDWR) except: pass self._connection.close()
def _send_message(self, out): if self._send_bytes: self._connection.send_bytes(out + b'\0') elif self._sendall: self._connection.sendall(out + b'\0') elif hasattr: self._connection.write(out + b'\0') def _next_message(self): while True: message, sep, self._in_buffer = self._in_buffer.partition(b'\0') if not sep: # No zero byte found self._in_buffer = message message = None if message: yield message.decode('utf-8') continue if self._recv_bytes: data = self._connection.recv_bytes(8192) elif self._recv: data = self._connection.recv(8192) else: data = self._connection.read(1) if len(data) == 0: raise BrokenPipeError("Disconnected") self._in_buffer += data
def pipe_bridge(reader, writer): if hasattr(reader, "recv"): recv = True else: recv = False if hasattr(writer, "sendall"): sendall = True else: sendall = False while True: try: if recv: data = reader.recv(8192) else: data = reader.read(1) except: data = [] if len(data) == 0: reader.close() writer.close() return try: if sendall: writer.sendall(data) elif hasattr: writer.write(data) writer.flush() except Exception as e: reader.close() writer.close() return
[docs]class Client(object): """Varlink client class. >>> with varlink.Client("unix:/run/org.example.ping") as client, client.open('org.example.ping') as connection: >>> assert connection.Ping("Test")["pong"] == "Test" If the varlink resolver is running: >>> client = varlink.Client(resolve_interface='com.redhat.logging') >>> print(client.get_interfaces()['com.redhat.logging'].get_description()) # Query and monitor the log messages of a system. interface com.redhat.logging type Entry (cursor: string, time: string, message: string, process: string, priority: string) # Monitor the log. Returns the @initial_lines most recent entries in the # first reply and then continuously replies when new entries are available. method Monitor(initial_lines: int) -> (entries: Entry[]) >>> connection = client.open("com.redhat.logging") connection now holds an object with all the varlink methods available. Do varlink method call with varlink arguments and a single varlink return structure wrapped in a namespace class: >>> ret = connection.Monitor(initial_lines=1) >>> ret namespace(entries=[namespace(cursor='s=[…]', message="req:1 'dhcp4-change' [wlp3s0][…]", priority='critical', process='nm-dispatcher', time='2018-01-29 12:19:59Z')]) >>> ret.entries[0].process 'nm-dispatcher' Do varlink method call with varlink arguments and a multiple return values in monitor mode, using the "_more" keyword: >>> for m in connection.Monitor(_more=True): >>> for e in m.entries: >>> print("%s: %s" % (e.time, e.message)) 2018-01-29 12:19:59Z: [system] Activating via systemd: service name='[…] 2018-01-29 12:19:59Z: Starting Network Manager Script Dispatcher Service... 2018-01-29 12:19:59Z: bound to 10.200.159.150 -- renewal in 1423 seconds. 2018-01-29 12:19:59Z: [system] Successfully activated service 'org.freedesktop.nm_dispatcher' 2018-01-29 12:19:59Z: Started Network Manager Script Dispatcher Service. 2018-01-29 12:19:59Z: req:1 'dhcp4-change' [wlp3s0]: new request (6 scripts) 2018-01-29 12:19:59Z: req:1 'dhcp4-change' [wlp3s0]: start running ordered scripts... "_more" is special to this python varlink binding. If "_more=True", then the method call does not return a normal namespace wrapped varlink return value, but a generator, which yields the return values and waits (blocks) for the service to return more return values in the generator's .__next__() call. """ handler = SimpleClientInterfaceHandler
[docs] def __init__(self, address=None, resolve_interface=None, resolver=None): """Creates a Client object to reach the interfaces of a varlink service. For more constructors see the class constructor methods new_with_*() returning an Client object. :param address: the exact address like "unix:/run/org.varlink.resolver" :param resolve_interface: an interface name, which is resolved with the system wide resolver :param resolver: the exact address of the resolver to be used to resolve the interface name :exception ConnectionError: could not connect to the service or resolver """ self._interfaces = {} self._socket = None self._socket_fn = None self._tmpdir = None self._child_pid = 0 self._str = "Client<uninitialized>" with open(os.path.join(os.path.dirname(__file__), 'org.varlink.service.varlink')) as f: interface = Interface(f.read()) self.add_interface(interface) if resolve_interface: self._with_interface(resolve_interface, resolver) if address: self._with_address(address)
def __enter__(self): return self def __exit__(self, exc, value, tb): self.cleanup() def __str__(self): return self._str
[docs] @classmethod def new_with_activate(cls, argv): """Creates a Client object to a varlink service server started via socket activation. :param argv: executable in argv[0] and parameters in argv[1:] to run the varlink service server via socket activation. """ return cls()._with_activate(argv)
def _with_activate(self, argv): s = socket.socket(socket.AF_UNIX) s.setblocking(False) self._tmpdir = tempfile.mkdtemp() address = self._tmpdir + "/" + str(os.getpid()) s.bind(address) s.listen(100) self._child_pid = os.fork() if self._child_pid == 0: # child n = s.fileno() if n == 3: # without dup() the socket is closed with the python destructor n = os.dup(3) del s else: try: os.close(3) except OSError: pass os.dup2(n, 3) address = address.replace('\0', '@', 1) for i in range(1, len(argv)): argv[i] = argv[i].replace("$VARLINK_ADDRESS", "unix:" + address) os.environ["VARLINK_ADDRESS"] = "unix:" + address os.environ["LISTEN_FDS"] = "1" os.environ["LISTEN_FDNAMES"] = "varlink" os.environ["LISTEN_PID"] = str(os.getpid()) os.execvp(argv[0], argv) sys.exit(1) # parent s.close() self._with_address("unix:" + address) return self
[docs] @classmethod def new_with_bridge(cls, argv): """Creates a Client object to a varlink service started via the bridge command. The bridge command like "ssh <host> varlink bridge" is executed for every connection. This client object will do IO via stdio to the bridge command. :param argv: executable in argv[0] and parameters in argv[1:] to run the varlink service server via the bridge connection. """ return cls()._with_bridge(argv)
def _with_bridge(self, argv): def new_bridge_socket(): sp = socket.socketpair() p = subprocess.Popen(argv, stdin=sp[1], stdout=sp[1], close_fds=True) sp[1].close() self._child_pid = p.pid return sp[0] def new_bridge_socket_compat(): sp = socket.socketpair() p = subprocess.Popen(" ".join(argv), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True) self._child_pid = p.pid thread1 = threading.Thread(daemon=True, target=pipe_bridge, args=(sp[1], p.stdin)) thread1.start() thread2 = threading.Thread(daemon=True, target=pipe_bridge, args=(p.stdout, sp[1])) thread2.start() return sp[0] self._str = "Bridge with: '%s'" % " ".join(argv) if sys.platform == 'win32': self._socket_fn = new_bridge_socket_compat else: self._socket_fn = new_bridge_socket return self
[docs] @classmethod def new_with_address(cls, address): """Creates a Client object to reach the interfaces of a varlink service. :param address: the exact address like "unix:/run/org.varlink.resolver" :exception ConnectionError: could not connect to the service or resolver """ return cls()._with_address(address)
def _with_address(self, address): if address.startswith("unix:"): self._str = address address = address[5:] mode = address.find(';') if mode != -1: address = address[:mode] if address[0] == '@': address = address.replace('@', '\0', 1) def open_unix(): s = socket.socket(socket.AF_UNIX) s.setblocking(True) s.connect(address) return s self._socket_fn = open_unix elif address.startswith("tcp:"): self._str = address address = address[4:] p = address.rfind(':') if p != -1: port = address[p + 1:] address = address[:p] else: raise ConnectionError("Invalid address 'tcp:%s'" % address) address = address.replace('[', '') address = address.replace(']', '') def open_tcp(): s = socket.create_connection((address, int(port))) s.setblocking(True) return s self._socket_fn = open_tcp elif address is not None: raise ConnectionError("Invalid address '%s'" % address) return self
[docs] @classmethod def new_with_resolved_interface(cls, interface, resolver_address=None): """Creates a Client object to reach the interfaces of a varlink service. :param interface: an interface name, which is resolved with the system wide resolver :param resolver_address: the exact address of the resolver to be used to resolve the interface name :exception ConnectionError: could not connect to the service or resolver """ return cls()._with_resolved_interface(interface, resolver_address)
def _with_resolved_interface(self, interface, resolver_address=None): if not resolver_address: resolver_address = "unix:/run/org.varlink.resolver" if interface == 'org.varlink.resolver': self._with_address(resolver_address) else: with Client.new_with_address(resolver_address) as client, \ client.open('org.varlink.resolver') as _rc: # noinspection PyUnresolvedReferences _r = _rc.Resolve(interface) self._with_address(_r['address']) return self
[docs] def cleanup(self): if hasattr(self, "_tmpdir") and self._tmpdir != None: try: shutil.rmtree(self._tmpdir) except FileNotFoundError: pass if hasattr(self, "_child_pid") and self._child_pid != 0: try: os.kill(self._child_pid, signal.SIGTERM) except OSError: pass try: os.waitpid(self._child_pid, 0) except: pass
[docs] def open(self, interface_name, namespaced=False, connection=None): """Open a new connection and get a client interface handle with the varlink methods installed. :param interface_name: an interface name, which the service this client object is connected to, provides. :param namespaced: If arguments and return values are instances of SimpleNamespace rather than dictionaries. :param connection: If set, get the interface handle for an already opened connection. :exception InterfaceNotFound: if the interface is not found """ if not connection: connection = self.open_connection() if interface_name not in self._interfaces: self.get_interface(interface_name, socket_connection=connection) if interface_name not in self._interfaces: raise InterfaceNotFound(interface_name) return self.handler(self._interfaces[interface_name], connection, namespaced=namespaced)
[docs] def open_connection(self): """Open a new connection and return the socket. :exception OSError: anything socket.connect() throws """ return self._socket_fn()
[docs] def get_interfaces(self, socket_connection=None): """Returns the a list of Interface objects the service implements.""" if not socket_connection: socket_connection = self.open_connection() close_socket = True else: close_socket = False # noinspection PyUnresolvedReferences _service = self.handler(self._interfaces["org.varlink.service"], socket_connection) self.info = _service.GetInfo() if close_socket: socket_connection.close() return self.info['interfaces']
[docs] def get_interface(self, interface_name, socket_connection=None): if not socket_connection: socket_connection = self.open_connection() close_socket = True else: close_socket = False _service = self.handler(self._interfaces["org.varlink.service"], socket_connection) # noinspection PyUnresolvedReferences desc = _service.GetInterfaceDescription(interface_name) interface = Interface(desc['description']) self._interfaces[interface.name] = interface if close_socket: socket_connection.close() return interface
[docs] def add_interface(self, interface): """Manually add or overwrite an interface definition from an Interface object. :param interface: an Interface() object """ if not isinstance(interface, Interface): raise TypeError self._interfaces[interface.name] = interface