Source code for cherrypy.test.test_bus

import threading
import time
import unittest.mock

from cherrypy.process import wspbus


msg = 'Listener %d on channel %s: %s.'


[docs]class PublishSubscribeTests(unittest.TestCase):
[docs] def get_listener(self, channel, index): def listener(arg=None): self.responses.append(msg % (index, channel, arg)) return listener
[docs] def test_builtin_channels(self): b = wspbus.Bus() self.responses, expected = [], [] for channel in b.listeners: for index, priority in enumerate([100, 50, 0, 51]): b.subscribe(channel, self.get_listener(channel, index), priority) for channel in b.listeners: b.publish(channel) expected.extend([msg % (i, channel, None) for i in (2, 1, 3, 0)]) b.publish(channel, arg=79347) expected.extend([msg % (i, channel, 79347) for i in (2, 1, 3, 0)]) self.assertEqual(self.responses, expected)
[docs] def test_custom_channels(self): b = wspbus.Bus() self.responses, expected = [], [] custom_listeners = ('hugh', 'louis', 'dewey') for channel in custom_listeners: for index, priority in enumerate([None, 10, 60, 40]): b.subscribe(channel, self.get_listener(channel, index), priority) for channel in custom_listeners: b.publish(channel, 'ah so') expected.extend([msg % (i, channel, 'ah so') for i in (1, 3, 0, 2)]) b.publish(channel) expected.extend([msg % (i, channel, None) for i in (1, 3, 0, 2)]) self.assertEqual(self.responses, expected)
[docs] def test_listener_errors(self): b = wspbus.Bus() self.responses, expected = [], [] channels = [c for c in b.listeners if c != 'log'] for channel in channels: b.subscribe(channel, self.get_listener(channel, 1)) # This will break since the lambda takes no args. b.subscribe(channel, lambda: None, priority=20) for channel in channels: self.assertRaises(wspbus.ChannelFailures, b.publish, channel, 123) expected.append(msg % (1, channel, 123)) self.assertEqual(self.responses, expected)
[docs]class BusMethodTests(unittest.TestCase):
[docs] def log(self, bus): self._log_entries = [] def logit(msg, level): self._log_entries.append(msg) bus.subscribe('log', logit)
[docs] def assertLog(self, entries): self.assertEqual(self._log_entries, entries)
[docs] def get_listener(self, channel, index): def listener(arg=None): self.responses.append(msg % (index, channel, arg)) return listener
[docs] def test_start(self): b = wspbus.Bus() self.log(b) self.responses = [] num = 3 for index in range(num): b.subscribe('start', self.get_listener('start', index)) b.start() try: # The start method MUST call all 'start' listeners. self.assertEqual( set(self.responses), set([msg % (i, 'start', None) for i in range(num)])) # The start method MUST move the state to STARTED # (or EXITING, if errors occur) self.assertEqual(b.state, b.states.STARTED) # The start method MUST log its states. self.assertLog(['Bus STARTING', 'Bus STARTED']) finally: # Exit so the atexit handler doesn't complain. b.exit()
[docs] def test_stop(self): b = wspbus.Bus() self.log(b) self.responses = [] num = 3 for index in range(num): b.subscribe('stop', self.get_listener('stop', index)) b.stop() # The stop method MUST call all 'stop' listeners. self.assertEqual(set(self.responses), set([msg % (i, 'stop', None) for i in range(num)])) # The stop method MUST move the state to STOPPED self.assertEqual(b.state, b.states.STOPPED) # The stop method MUST log its states. self.assertLog(['Bus STOPPING', 'Bus STOPPED'])
[docs] def test_graceful(self): b = wspbus.Bus() self.log(b) self.responses = [] num = 3 for index in range(num): b.subscribe('graceful', self.get_listener('graceful', index)) b.graceful() # The graceful method MUST call all 'graceful' listeners. self.assertEqual( set(self.responses), set([msg % (i, 'graceful', None) for i in range(num)])) # The graceful method MUST log its states. self.assertLog(['Bus graceful'])
[docs] def test_exit(self): b = wspbus.Bus() self.log(b) self.responses = [] num = 3 for index in range(num): b.subscribe('stop', self.get_listener('stop', index)) b.subscribe('exit', self.get_listener('exit', index)) b.exit() # The exit method MUST call all 'stop' listeners, # and then all 'exit' listeners. self.assertEqual(set(self.responses), set([msg % (i, 'stop', None) for i in range(num)] + [msg % (i, 'exit', None) for i in range(num)])) # The exit method MUST move the state to EXITING self.assertEqual(b.state, b.states.EXITING) # The exit method MUST log its states. self.assertLog( ['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED'])
[docs] def test_wait(self): b = wspbus.Bus() def f(method): time.sleep(0.2) getattr(b, method)() flow = [ ('start', [b.states.STARTED]), ('stop', [b.states.STOPPED]), ('start', [b.states.STARTING, b.states.STARTED]), ('exit', [b.states.EXITING]), ] for method, states in flow: threading.Thread(target=f, args=(method,)).start() b.wait(states) # The wait method MUST wait for the given state(s). if b.state not in states: self.fail('State %r not in %r' % (b.state, states))
[docs] def test_wait_publishes_periodically(self): bus = wspbus.Bus() callback = unittest.mock.MagicMock() bus.subscribe('main', callback) def set_start(): time.sleep(0.05) bus.start() threading.Thread(target=set_start).start() bus.wait(bus.states.STARTED, interval=0.01, channel='main') assert callback.call_count > 3
[docs] def test_block(self): b = wspbus.Bus() self.log(b) def f(): time.sleep(0.2) b.exit() def g(): time.sleep(0.4) threading.Thread(target=f).start() threading.Thread(target=g).start() threads = [t for t in threading.enumerate() if not t.daemon] self.assertEqual(len(threads), 3) b.block() # The block method MUST wait for the EXITING state. self.assertEqual(b.state, b.states.EXITING) # The block method MUST wait for ALL non-main, non-daemon threads to # finish. threads = [t for t in threading.enumerate() if not t.daemon] self.assertEqual(len(threads), 1) # The last message will mention an indeterminable thread name; ignore # it self.assertEqual(self._log_entries[:-1], ['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED', 'Waiting for child threads to terminate...'])
[docs] def test_start_with_callback(self): b = wspbus.Bus() self.log(b) try: events = [] def f(*args, **kwargs): events.append(('f', args, kwargs)) def g(): events.append('g') b.subscribe('start', g) b.start_with_callback(f, (1, 3, 5), {'foo': 'bar'}) # Give wait() time to run f() time.sleep(0.2) # The callback method MUST wait for the STARTED state. self.assertEqual(b.state, b.states.STARTED) # The callback method MUST run after all start methods. self.assertEqual(events, ['g', ('f', (1, 3, 5), {'foo': 'bar'})]) finally: b.exit()
[docs] def test_log(self): b = wspbus.Bus() self.log(b) self.assertLog([]) # Try a normal message. expected = [] for msg in ["O mah darlin'"] * 3 + ['Clementiiiiiiiine']: b.log(msg) expected.append(msg) self.assertLog(expected) # Try an error message try: foo except NameError: b.log('You are lost and gone forever', traceback=True) lastmsg = self._log_entries[-1] if 'Traceback' not in lastmsg or 'NameError' not in lastmsg: self.fail('Last log message %r did not contain ' 'the expected traceback.' % lastmsg) else: self.fail('NameError was not raised as expected.')
if __name__ == '__main__': unittest.main()