Source code for coalib.core.Core

import asyncio
import concurrent.futures
import functools
import logging

from coalib.core.DependencyTracker import DependencyTracker
from coalib.core.Graphs import traverse_graph
from coalib.core.PersistentHash import persistent_hash


[docs]def group(iterable, key=lambda x: x): """ Groups elements (out-of-order) together in the given iterable. Supports non-hashable keys by comparing keys with ``==``. Accessing the groups is supported using the iterator as follows: >>> for key, elements in group([1, 3, 7, 1, 2, 1, 2]): ... print(key, list(elements)) 1 [1, 1, 1] 3 [3] 7 [7] 2 [2, 2] You can control how elements are grouped by using the ``key`` parameter. It takes a function with a single parameter and maps to the group. >>> data = [(1, 2), (3, 4), (1, 9), (2, 10), (1, 11), (7, 2), (10, 2), ... (2, 1), (3, 7), (4, 5)] >>> for key, elements in group(data, key=sum): ... print(key, list(elements)) 3 [(1, 2), (2, 1)] 7 [(3, 4)] 10 [(1, 9), (3, 7)] 12 [(2, 10), (1, 11), (10, 2)] 9 [(7, 2), (4, 5)] :param iterable: The iterable to group elements in. :param key: The key-function mapping an element to its group. :return: An iterable yielding tuples with ``key, elements``, where ``elements`` is also an iterable yielding the elements grouped under ``key``. """ keys = [] elements = [] for element in iterable: k = key(element) try: position = keys.index(k) element_list = elements[position] except ValueError: keys.append(k) element_list = [] elements.append(element_list) element_list.append(element) return zip(keys, elements)
[docs]def initialize_dependencies(bears): """ Initializes and returns a ``DependencyTracker`` instance together with a set of bears ready for scheduling. This function acquires, processes and registers bear dependencies accordingly using a consumer-based system, where each dependency bear has only a single instance per section and file-dictionary. The bears set returned accounts for bears that have dependencies and excludes them accordingly. Dependency bears that have themselves no further dependencies are included so the dependency chain can be processed correctly. :param bears: The set of instantiated bears to run that serve as an entry-point. :return: A tuple with ``(dependency_tracker, bears_to_schedule)``. """ # Pre-collect bears in a set as we use them more than once. Especially # remove duplicate instances. bears = set(bears) dependency_tracker = DependencyTracker() # For a consumer-based system, we have a situation which can be visualized # with a graph. Each dependency relation from one bear-type to another # bear-type is represented with an arrow, starting from the dependent # bear-type and ending at the dependency: # # (section1, file_dict1) (section1, file_dict2) (section2, file_dict2) # | | | | # V V V V # bear1 bear2 bear3 bear4 # | | | | # V V | | # BearType1 BearType2 -----------------------| # | | | # | | V # ---------------------------------------------> BearType3 # # We need to traverse this graph and instantiate dependency bears # accordingly, one per section. # Group bears by sections and file-dictionaries. These will serve as # entry-points for the dependency-instantiation-graph. grouping = group(bears, key=lambda bear: (bear.section, bear.file_dict)) for (section, file_dict), bears_per_section in grouping: # Pre-collect bears as the iterator only works once. bears_per_section = list(bears_per_section) # Now traverse each edge of the graph, and instantiate a new dependency # bear if not already instantiated. For the entry point bears, we hack # in identity-mappings because those are already instances. Also map # the types of the instantiated bears to those instances, as if the # user already supplied an instance of a dependency, we reuse it # accordingly. type_to_instance_map = {} for bear in bears_per_section: type_to_instance_map[bear] = bear type_to_instance_map[type(bear)] = bear def get_successive_nodes_and_track(bear): for dependency_bear_type in bear.BEAR_DEPS: if dependency_bear_type not in type_to_instance_map: dependency_bear = dependency_bear_type(section, file_dict) type_to_instance_map[dependency_bear_type] = dependency_bear dependency_tracker.add( type_to_instance_map[dependency_bear_type], bear) # Return the dependencies of the instances instead of the types, so # bears are capable to specify dependencies at runtime. return (type_to_instance_map[dependency_bear_type] for dependency_bear_type in bear.BEAR_DEPS) traverse_graph(bears_per_section, get_successive_nodes_and_track) # Get all bears that aren't resolved and exclude those from scheduler set. bears -= {bear for bear in bears if dependency_tracker.get_dependencies(bear)} # Get all bears that have no further dependencies and shall be # scheduled additionally. for dependency in dependency_tracker.dependencies: if not dependency_tracker.get_dependencies(dependency): bears.add(dependency) return dependency_tracker, bears
[docs]class Session: """ Maintains a session for a coala execution. For each session, there are set of bears to run along with a callback function, which is called when results are available. Dependencies of bears (provided via ``bear.BEAR_DEPS``) are automatically handled. If BearA requires BearB as dependency, then on running BearA, first BearB will be executed, followed by BearA. """ def __init__(self, bears, result_callback, cache=None, executor=None): """ :param bears: The bear instances to run. :param result_callback: A callback function which is called when results are available. Must have following signature:: def result_callback(result): pass Only those results are passed for bears that were explicitly requested via the ``bears`` parameter, implicit dependency results do not call the callback. :param cache: A cache bears can use to speed up runs. If ``None``, no cache will be used. The cache stores the results that were returned last time from the parameters passed to ``execute_task`` in bears. If the parameters to ``execute_task`` are the same from a previous run, the cache will be queried instead of executing ``execute_task``. The cache has to be a dictionary-like object, that maps bear types to respective cache-tables. The cache-tables itself are dictionary-like objects that map hash-values (generated by ``PersistentHash.persistent_hash`` from the task objects) to actual bear results. When bears are about to be scheduled, the core performs a cache-lookup. If there's a hit, the results stored in the cache are returned and the task won't be scheduled. In case of a miss, ``execute_task`` is called normally in the executor. :param executor: Custom executor used to run the bears. If ``None``, a ``ProcessPoolExecutor`` is used using as many processes as cores available on the system. Note that a passed custom executor is closed after the core has finished. """ self.bears = bears self.result_callback = result_callback self.cache = cache # Set up event loop and executor. self.event_loop = asyncio.SelectorEventLoop() self.executor = (concurrent.futures.ProcessPoolExecutor() if executor is None else executor) self.running_futures = {} # Initialize dependency tracking. self.dependency_tracker, self.bears_to_schedule = ( initialize_dependencies(self.bears))
[docs] def run(self): """ Runs the coala session. """ try: if self.bears: self._schedule_bears(self.bears_to_schedule) try: self.event_loop.run_forever() finally: self.event_loop.close() finally: self.executor.shutdown()
def _schedule_bears(self, bears): """ Schedules the tasks of bears. :param bears: A list of bear instances to be scheduled onto the process pool. """ bears_without_tasks = [] for bear in bears: if self.dependency_tracker.get_dependencies( bear): # pragma: no cover logging.warning( 'Dependencies for {!r} not yet resolved, holding back. ' 'This should not happen, the dependency tracking system ' 'should be smarter. Please report this to the developers.' .format(bear)) else: futures = set() for task in bear.generate_tasks(): bear_args, bear_kwargs = task if self.cache is None: future = self.event_loop.run_in_executor( self.executor, bear.execute_task, bear_args, bear_kwargs) else: # Execute the cache lookup in the default # ThreadPoolExecutor, so cache updates reflect properly # in the main process. future = self.event_loop.run_in_executor( None, self._execute_task_with_cache, bear, task) futures.add(future) self.running_futures[bear] = futures # Cleanup bears without tasks after all bears had the chance to # schedule their tasks. Not doing so might stop the run too # early, as the cleanup is also responsible for stopping the # event-loop when no more tasks do exist. if not futures: logging.debug('{!r} scheduled no tasks.'.format(bear)) bears_without_tasks.append(bear) continue for future in futures: future.add_done_callback(functools.partial( self._finish_task, bear)) logging.debug('Scheduled {!r} (tasks: {})'.format( bear, len(futures))) for bear in bears_without_tasks: self._cleanup_bear(bear) def _cleanup_bear(self, bear): """ Cleans up state of an ongoing run for a bear. - If the given bear has no running tasks left: - Resolves its dependencies. - Schedules dependant bears. - Removes the bear from the ``running_tasks`` dict. - Checks whether there are any remaining tasks, and quits the event loop accordingly if none are left. :param bear: The bear to clean up state for. """ if not self.running_futures[bear]: resolved_bears = self.dependency_tracker.resolve(bear) if resolved_bears: self._schedule_bears(resolved_bears) del self.running_futures[bear] if not self.running_futures: # Check the DependencyTracker additionally for remaining # dependencies. resolved = self.dependency_tracker.are_dependencies_resolved if not resolved: # pragma: no cover logging.warning( 'Core finished with run, but it seems some dependencies ' 'were unresolved: {}. Ignoring them, but this is a bug, ' 'please report it to the developers.'.format(', '.join( repr(dependant) + ' depends on ' + repr(dependency) for dependency, dependant in self.dependency_tracker))) self.event_loop.stop() def _execute_task_with_cache(self, bear, task): if type(bear) not in self.cache: bear_cache = {} self.cache[type(bear)] = bear_cache else: bear_cache = self.cache[type(bear)] fingerprint = persistent_hash(task) if fingerprint in bear_cache: results = bear_cache[fingerprint] else: bear_args, bear_kwargs = task future = asyncio.run_coroutine_threadsafe( asyncio.wait_for( self.event_loop.run_in_executor( self.executor, bear.execute_task, bear_args, bear_kwargs), None, loop=self.event_loop), loop=self.event_loop) results = future.result() bear_cache[fingerprint] = results return results def _finish_task(self, bear, future): """ The callback for when a task of a bear completes. It is responsible for checking if the bear completed its execution and the handling of the result generated by the task. It also schedules new tasks if dependencies get resolved. :param bear: The bear that the task belongs to. :param future: The future that completed. """ try: results = future.result() for dependant in self.dependency_tracker.get_dependants(bear): dependant.dependency_results[type(bear)] += results except Exception as ex: # FIXME Try to display only the relevant traceback of the bear if # FIXME error occurred there, not the complete event-loop # FIXME traceback. logging.error('An exception was thrown during bear execution.', exc_info=ex) results = None # Unschedule/resolve dependent bears, as these can't run any more. dependants = self.dependency_tracker.get_all_dependants(bear) for dependant in dependants: self.dependency_tracker.resolve(dependant) logging.debug('Following dependent bears were unscheduled: ' + ', '.join(repr(dependant) for dependant in dependants)) finally: self.running_futures[bear].remove(future) self._cleanup_bear(bear) # Only pass results to the callback for bears that were desired during # init. if results is not None and bear in self.bears: for result in results: try: # FIXME Long operations on the result-callback could block # FIXME the scheduler significantly. It should be # FIXME possible to schedule new Python Threads on the # FIXME given event_loop and process the callback there. self.result_callback(result) except Exception as ex: # FIXME Try to display only the relevant traceback of the # FIXME result handler if error occurred there, not the # FIXME complete event-loop traceback. logging.error( 'An exception was thrown during result-handling.', exc_info=ex)
[docs]def run(bears, result_callback, cache=None, executor=None): """ Initiates a session with the given parameters and runs it. :param bears: The bear instances to run. :param result_callback: A callback function which is called when results are available. Must have following signature:: def result_callback(result): pass Only those results are passed for bears that were explicitly requested via the ``bears`` parameter, implicit dependency results do not call the callback. :param cache: A cache bears can use to speed up runs. If ``None``, no cache will be used. The cache stores the results that were returned last time from the parameters passed to ``execute_task`` in bears. If the parameters to ``execute_task`` are the same from a previous run, the cache will be queried instead of executing ``execute_task``. The cache has to be a dictionary-like object, that maps bear types to respective cache-tables. The cache-tables itself are dictionary-like objects that map hash-values (generated by ``PersistentHash.persistent_hash`` from the task objects) to actual bear results. When bears are about to be scheduled, the core performs a cache-lookup. If there's a hit, the results stored in the cache are returned and the task won't be scheduled. In case of a miss, ``execute_task`` is called normally in the executor. :param executor: Custom executor used to run the bears. If ``None``, a ``ProcessPoolExecutor`` is used using as many processes as cores available on the system. """ Session(bears, result_callback, cache, executor).run()