defread_and_process_input(): whileTrue: n = int(input()) print('fib({}) = {}'.format(n, timed_fib(n)))
defmain(): # Second thread will print the hello message. Starting as a daemon means # the thread will not prevent the process from exiting. t = Thread(target=print_hello) t.daemon = True t.start() # Main thread will read and process input read_and_process_input()
defmain(): selector = selectors.DefaultSelector() # Register the selector to poll for "read" readiness on stdin selector.register(sys.stdin, selectors.EVENT_READ) last_hello = 0# Setting to 0 means the timer will start right away whileTrue: # Wait at most 100 milliseconds for input to be available for event, mask in selector.select(0.1): process_input(event.fileobj) if time() - last_hello > 3: last_hello = time() print_hello()
from bisect import insort from fib import timed_fib from time import time import selectors import sys
classEventLoop(object): """ Implements a callback based single-threaded event loop as a simple demonstration. """ def__init__(self, *tasks): self._running = False self._stdin_handlers = [] self._timers = [] self._selector = selectors.DefaultSelector() self._selector.register(sys.stdin, selectors.EVENT_READ)
defrun_forever(self): self._running = True while self._running: # First check for available IO input for key, mask in self._selector.select(0): line = key.fileobj.readline().strip() for callback in self._stdin_handlers: callback(line)
# Handle timer events while self._timers and self._timers[0][0] < time(): handler = self._timers[0][1] del self._timers[0] handler()
Python中 yield 是一个关键词,它可以用来创建协程。 1.当调用 yield value 的时候,这个 value 就被返回出去了,CPU控制权就交给了协程的调用方。调用 yield 之后,如果想要重新返回协程,需要调用Python中内置的 next 方法。 2.当调用 y = yield x 的时候,x被返回给调用方。要继续返回协程上下文,调用方需要再执行协程的 send 方法。在这个列子中,给send方法的参数会被传入协程作为这个表达式的值(本例中,这个值会被y接收到)。
from bisect import insort from collections import deque from fib import timed_fib from functools import partial from time import time import selectors import sys import types
classsleep_for_seconds(object): """ Yield an object of this type from a coroutine to have it "sleep" for the given number of seconds. """ def__init__(self, wait_time): self._wait_time = wait_time
classEventLoop(object): """ Implements a simplified coroutine-based event loop as a demonstration. Very similar to the "Trampoline" example in PEP 342, with exception handling taken out for simplicity, and selectors added to handle file IO """ def__init__(self, *tasks): self._running = False self._selector = selectors.DefaultSelector()
# Queue of functions scheduled to run self._tasks = deque(tasks)
# (coroutine, stack) pair of tasks waiting for input from stdin self._tasks_waiting_on_stdin = []
# List of (time_to_run, task) pairs, in sorted order self._timers = []
# Register for polling stdin for input to read self._selector.register(sys.stdin, selectors.EVENT_READ)
defschedule(self, coroutine, value=None, stack=(), when=None): """ Schedule a coroutine task to be run, with value to be sent to it, and stack containing the coroutines that are waiting for the value yielded by this coroutine. """ # Bind the parameters to a function to be scheduled as a function with # no parameters. task = partial(self.resume_task, coroutine, value, stack) if when: insort(self._timers, (when, task)) else: self._tasks.append(task)
defrun_forever(self): self._running = True while self._running: # First check for available IO input for key, mask in self._selector.select(0): line = key.fileobj.readline().strip() for task, stack in self._tasks_waiting_on_stdin: self.schedule(task, line, stack) self._tasks_waiting_on_stdin.clear()
# Next, run the next task if self._tasks: task = self._tasks.popleft() task()
# Finally run time scheduled tasks while self._timers and self._timers[0][0] < time(): task = self._timers[0][1] del self._timers[0] task()
self._running = False
defprint_every(message, interval): """ Coroutine task to repeatedly print the message at the given interval (in seconds) """ whileTrue: print("{} - {}".format(int(time()), message)) yield sleep_for_seconds(interval)
defread_input(loop): """ Coroutine task to repeatedly read new lines of input from stdin, treat the input as a number n, and calculate and display fib(n). """ whileTrue: line = yield sys.stdin if line == 'exit': loop.do_on_next_tick(loop.stop) continue n = int(line) print("fib({}) = {}".format(n, timed_fib(n)))
defcoroutine(): print("Starting") try: yield"Let's pause until continued." print("Continuing") except Exception as e: yield"Got an exception: " + str(e)
defmain(): c = coroutine() next(c) # Execute until the first yield # Now throw an exception at the point where the coroutine has paused value = c.throw(Exception("Have an exceptional day!")) print(value)
if __name__ == '__main__': main()
输出如下:
1 2
Starting Got an exception: Have an exceptional day!
@asyncio.coroutine defwrite_headers(writer): for key, value in request_headers.items(): writer.write((key + ': ' + value + '\r\n').encode()) writer.write(b'\r\n') yieldfrom writer.drain()
@asyncio.coroutine defget_my_ip_address(verbose): reader, writer = yieldfrom asyncio.open_connection(host, 80) writer.write(b'GET /?format=json HTTP/1.1\r\n') yieldfrom write_headers(writer) status_line = yieldfrom reader.readline() status_line = status_line.decode().strip() http_version, status_code, status = status_line.split(' ') if verbose: print('Got status {} {}'.format(status_code, status)) response_headers = yieldfrom read_headers(reader) if verbose: print('Response headers:') for key, value in response_headers.items(): print(key + ': ' + value) # Assume the content length is sent by the server, which is the case # with ipify content_length = int(response_headers['Content-Length']) response_body_bytes = yieldfrom reader.read(content_length) response_body = response_body_bytes.decode() response_object = json.loads(response_body) writer.close() return response_object['ip']
@asyncio.coroutine defprint_my_ip_address(verbose): try: ip_address = yieldfrom get_my_ip_address(verbose) print("My IP address is:") print(ip_address) except Exception as e: print("Error: ", e)
$ python3.4 ipify.py Got status 200 OK Response headers: Content-Length: 21 Server: Cowboy Connection: keep-alive Via: 1.1 vegur Content-Type: application/json Date: Fri, 10 Oct 2014 03:46:31 GMT My IP address is: # <my IP address here, hidden for privacy!>
@asyncio.coroutine defget_url(url): wait_time = random.randint(1, 4) yieldfrom asyncio.sleep(wait_time) print('Done: URL {} took {}s to get!'.format(url, wait_time)) return url, wait_time
@asyncio.coroutine defprocess_as_results_come_in(): coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']] for coroutine in asyncio.as_completed(coroutines): url, wait_time = yieldfrom coroutine print('Coroutine for {} is done'.format(url))
@asyncio.coroutine defprocess_once_everything_ready(): coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']] results = yieldfrom asyncio.gather(*coroutines) print(results)
defmain(): loop = asyncio.get_event_loop() print("First, process results as they come in:") loop.run_until_complete(process_as_results_come_in()) print("\nNow, process results once they are all ready:") loop.run_until_complete(process_once_everything_ready())
if __name__ == '__main__': main()
输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
$ python3.4 gather.py First, process results as they come in: Done: URL URL2 took 2s to get! Coroutine for URL2 is done Done: URL URL3 took 3s to get! Coroutine for URL3 is done Done: URL URL1 took 4s to get! Coroutine for URL1 is done
Now, process results once they are all ready: Done: URL URL1 took 1s to get! Done: URL URL2 took 3s to get! Done: URL URL3 took 4s to get! [('URL1', 1), ('URL2', 3), ('URL3', 4)]