Source code for varlink.tests.test_orgexamplemore

#!/usr/bin/env python

"""Server and Client example of varlink for python

From the main git repository directory run::

    $ PYTHONPATH=$(pwd) python3 ./varlink/tests/test_orgexamplemore.py

or::

    $ PYTHONPATH=$(pwd) python3 ./varlink/tests/test_orgexamplemore.py --varlink="unix:@test" &
    Listening on @test
    [1] 6434
    $ PYTHONPATH=$(pwd) python3 ./varlink/tests/test_orgexamplemore.py --client --varlink="unix:@test"
    [...]

"""

from __future__ import print_function
from __future__ import unicode_literals

import argparse
import os
import shlex
import socket
import sys
import textwrap
import threading
import time
import unittest
from sys import platform

try:
    from builtins import int
    from builtins import next
    from builtins import object
    from builtins import range
except ImportError:
    pass

import varlink


######## CLIENT #############

def run_client(client):
    print('Connecting to %s\n' % client)
    try:
        with \
                client.open('org.example.more', namespaced=True) as con1, \
                client.open('org.example.more', namespaced=True) as con2:

            for m in con1.TestMore(10, _more=True):
                if hasattr(m.state, 'start') and m.state.start != None:
                    if m.state.start:
                        print("--- Start ---", file=sys.stderr)

                if hasattr(m.state, 'end') and m.state.end != None:
                    if m.state.end:
                        print("--- End ---", file=sys.stderr)

                if hasattr(m.state, 'progress') and m.state.progress != None:
                    print("Progress:", m.state.progress, file=sys.stderr)
                    if m.state.progress > 50:
                        ret = con2.Ping("Test")
                        print("Ping: ", ret.pong)

    except varlink.ConnectionError as e:
        print("ConnectionError:", e)
        raise e
    except varlink.VarlinkError as e:
        print(e)
        print(e.error())
        print(e.parameters())
        raise e


######## SERVER #############

service = varlink.Service(
    vendor='Varlink',
    product='Varlink Examples',
    version='1',
    url='http://varlink.org',
    interface_dir=os.path.dirname(__file__)
)


[docs]class ServiceRequestHandler(varlink.RequestHandler): service = service
[docs]class ActionFailed(varlink.VarlinkError): def __init__(self, reason): varlink.VarlinkError.__init__(self, {'error': 'org.example.more.ActionFailed', 'parameters': {'field': reason}})
@service.interface('org.example.more') class Example(object): sleep_duration = 1 def TestMore(self, n, _more=True, _server=None): try: if not _more: yield varlink.InvalidParameter('more') yield {'state': {'start': True}, '_continues': True} for i in range(0, n): yield {'state': {'progress': int(i * 100 / n)}, '_continues': True} time.sleep(self.sleep_duration) yield {'state': {'progress': 100}, '_continues': True} yield {'state': {'end': True}, '_continues': False} except Exception as error: print("ERROR", error, file=sys.stderr) if _server: _server.shutdown() def Ping(self, ping): return {'pong': ping} def StopServing(self, reason=None, _request=None, _server=None): print("Server ends.") if _request: print("Shutting down client connection") _server.shutdown_request(_request) if _server: print("Shutting down server") _server.shutdown() def TestMap(self, map): i = 1 ret = {} for (key, val) in map.items(): ret[key] = {"i": i, "val": val} i += 1 return {'map': ret} def TestObject(self, object): import json return {"object": json.loads(json.dumps(object))} def run_server(address): with varlink.ThreadingServer(address, ServiceRequestHandler) as server: print("Listening on", server.server_address) try: server.serve_forever() except KeyboardInterrupt: pass ######## MAIN ############# def epilog(): return textwrap.dedent(""" Examples: \tSelf Exec: $ {0} \tServer : $ {0} --varlink=<varlink address> \tClient : $ {0} --client --varlink=<varlink address> \tClient : $ {0} --client --bridge=<bridge command> \tClient : $ {0} --client --activate=<activation command> """.format(sys.argv[0])) if __name__ == '__main__': parser = argparse.ArgumentParser( description='Varlink org.example.more test case', epilog=epilog(), formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument( '--varlink', type=str, help='The varlink address' ) parser.add_argument( '-b', '--bridge', type=str, help='bridge command' ) parser.add_argument( '-A', '--activate', type=str, help='activation command' ) parser.add_argument( '--client', action='store_true', help='launch the client mode' ) args = parser.parse_args() address = args.varlink client_mode = args.client activate = args.activate bridge = args.bridge client = None if client_mode: if bridge: client = varlink.Client.new_with_bridge(shlex.split(bridge)) if activate: client = varlink.Client.new_with_activate(shlex.split(activate)) if address: client = varlink.Client.new_with_address(address) if not address and not client_mode: if not hasattr(socket, "AF_UNIX"): print("varlink activate: not supported on platform %s" % platform, file=sys.stderr) parser.print_help() sys.exit(2) client_mode = True with varlink.Client.new_with_activate( [__file__, "--varlink=$VARLINK_ADDRESS"]) as client: run_client(client) elif client_mode: with client: run_client(client) else: run_server(address) sys.exit(0) ######## UNITTEST #############
[docs]class TestService(unittest.TestCase): def test_service(self): address = "tcp:127.0.0.1:23451" Example.sleep_duration = 0.1 server = varlink.ThreadingServer(address, ServiceRequestHandler) server_thread = threading.Thread(target=server.serve_forever) server_thread.daemon = True server_thread.start() try: client = varlink.Client.new_with_address(address) run_client(client) with \ client.open('org.example.more', namespaced=True) as con1, \ client.open('org.example.more', namespaced=True) as con2: self.assertEqual(con1.Ping("Test").pong, "Test") it = con1.TestMore(10, _more=True) m = next(it) self.assertTrue(hasattr(m.state, 'start')) self.assertFalse(hasattr(m.state, 'end')) self.assertFalse(hasattr(m.state, 'progress')) self.assertIsNotNone(m.state.start) for i in range(0, 110, 10): m = next(it) self.assertTrue(hasattr(m.state, 'progress')) self.assertFalse(hasattr(m.state, 'start')) self.assertFalse(hasattr(m.state, 'end')) self.assertIsNotNone(m.state.progress) self.assertEqual(i, m.state.progress) if i > 50: ret = con2.Ping("Test") self.assertEqual("Test", ret.pong) m = next(it) self.assertTrue(hasattr(m.state, 'end')) self.assertFalse(hasattr(m.state, 'start')) self.assertFalse(hasattr(m.state, 'progress')) self.assertIsNotNone(m.state.end) self.assertRaises(StopIteration, next, it) con1.StopServing(_oneway=True) time.sleep(0.5) self.assertRaises(varlink.ConnectionError, con1.Ping, "Test") finally: server.shutdown() server.server_close()