Source code for coalib.core.Core
import asyncio
import concurrent.futures
import functools
import logging
import multiprocessing
def _get_cpu_count():
try:
return multiprocessing.cpu_count()
except NotImplementedError: # pragma: no cover
# cpu_count is not implemented for some CPU architectures/OSes
return 1
[docs]def cleanup_bear(bear,
running_tasks,
event_loop):
"""
Cleans up state of an ongoing run for a bear.
- If the given bear has no running tasks left, it removes the bear from
the ``running_tasks`` dict.
- Checks whether there are any remaining tasks, and quits the event loop
accordingly if none are left.
:param bear:
The bear to clean up state for.
:param running_tasks:
The dict of running-tasks.
:param event_loop:
The event-loop tasks are scheduled on.
"""
if not running_tasks[bear]:
del running_tasks[bear]
if not running_tasks:
event_loop.stop()
[docs]def schedule_bears(bears,
result_callback,
event_loop,
running_tasks,
executor):
"""
Schedules the tasks of bears to the given executor and runs them on the
given event loop.
:param bears:
A list of bear instances to be scheduled onto the process pool.
:param result_callback:
A callback function which is called when results are available.
:param event_loop:
The asyncio event loop to schedule bear tasks on.
:param running_tasks:
Tasks that are already scheduled, organized in a dict with
bear instances as keys and asyncio-coroutines as values containing
their scheduled tasks.
:param executor:
The executor to which the bear tasks are scheduled.
"""
for bear in bears:
tasks = {
event_loop.run_in_executor(
executor, bear.execute_task, bear_args, bear_kwargs)
for bear_args, bear_kwargs in bear.generate_tasks()}
running_tasks[bear] = tasks
for task in tasks:
task.add_done_callback(functools.partial(
finish_task, bear, result_callback,
running_tasks, event_loop, executor))
logging.debug('Scheduled {!r} (tasks: {})'.format(bear,
len(tasks)))
if not tasks:
# We need to recheck our runtime if something is left to
# process, as when no tasks were offloaded the event-loop could
# hang up otherwise.
cleanup_bear(bear, running_tasks, event_loop)
[docs]def finish_task(bear,
result_callback,
running_tasks,
event_loop,
executor,
task):
"""
The callback for when a task of a bear completes. It is responsible for
checking if the bear completed its execution and the handling of the
result generated by the task.
:param bear:
The bear that the task belongs to.
:param result_callback:
A callback function which is called when results are available.
:param running_tasks:
Dictionary that keeps track of the remaining tasks of each bear.
:param event_loop:
The ``asyncio`` event loop bear-tasks are scheduled on.
:param executor:
The executor to which the bear tasks are scheduled.
:param task:
The task that completed.
"""
try:
results = task.result()
except Exception as ex:
# FIXME Try to display only the relevant traceback of the bear if error
# FIXME occurred there, not the complete event-loop traceback.
logging.error('An exception was thrown during bear execution.',
exc_info=ex)
results = None
finally:
running_tasks[bear].remove(task)
cleanup_bear(bear, running_tasks, event_loop)
if results is not None:
for result in results:
try:
# FIXME Long operations on the result-callback could block the
# FIXME scheduler significantly. It should be possible to
# FIXME schedule new Python Threads on the given event_loop
# FIXME and process the callback there.
result_callback(result)
except Exception as ex:
# FIXME Try to display only the relevant traceback of the
# FIXME result handler if error occurred there, not the
# FIXME complete event-loop traceback.
logging.error(
'An exception was thrown during result-handling.',
exc_info=ex)
[docs]def run(bears, result_callback):
"""
Runs a coala session.
:param bears:
The bear instances to run.
:param result_callback:
A callback function which is called when results are available. Must
have following signature::
def result_callback(result):
pass
"""
# FIXME Allow to pass different executors nicely, for example to execute
# FIXME coala with less cores, or to schedule jobs on distributed systems
# FIXME (for example Mesos).
# Set up event loop and executor.
event_loop = asyncio.SelectorEventLoop()
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=_get_cpu_count())
# Let's go.
schedule_bears(bears, result_callback, event_loop, {}, executor)
try:
event_loop.run_forever()
finally:
event_loop.close()