Source code for coalib.processes.Processing

import multiprocessing
import os
import platform
import queue
import subprocess
from itertools import chain

from coalib.collecting import Dependencies
from coalib.collecting.Collectors import collect_files
from coala_utils.string_processing.StringConverter import StringConverter
from coalib.output.printers.LOG_LEVEL import LOG_LEVEL
from coalib.processes.BearRunning import run
from coalib.processes.CONTROL_ELEMENT import CONTROL_ELEMENT
from coalib.processes.LogPrinterThread import LogPrinterThread
from coalib.results.Result import Result
from coalib.results.result_actions.ApplyPatchAction import ApplyPatchAction
from coalib.results.result_actions.IgnoreResultAction import IgnoreResultAction
from coalib.results.result_actions.PrintDebugMessageAction import (
    PrintDebugMessageAction)
from coalib.results.result_actions.ShowPatchAction import ShowPatchAction
from coalib.results.RESULT_SEVERITY import RESULT_SEVERITY
from coalib.results.SourceRange import SourceRange
from coalib.settings.Setting import glob_list
from coalib.parsing.Globbing import fnmatch


ACTIONS = [ApplyPatchAction,
           PrintDebugMessageAction,
           ShowPatchAction,
           IgnoreResultAction]


[docs]def get_cpu_count(): try: return multiprocessing.cpu_count() # cpu_count is not implemented for some CPU architectures/OSes except NotImplementedError: # pragma: no cover return 2
[docs]def fill_queue(queue_fill, any_list): """ Takes element from a list and populates a queue with those elements. :param queue_fill: The queue to be filled. :param any_list: List containing the elements. """ for elem in any_list: queue_fill.put(elem)
[docs]def get_running_processes(processes): return sum((1 if process.is_alive() else 0) for process in processes)
[docs]def create_process_group(command_array, **kwargs): if platform.system() == 'Windows': # pragma: no cover proc = subprocess.Popen( command_array, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, **kwargs) else: proc = subprocess.Popen(command_array, preexec_fn=os.setsid, **kwargs) return proc
[docs]def get_default_actions(section): """ Parses the key ``default_actions`` in the given section. :param section: The section where to parse from. :return: A dict with the bearname as keys and their default actions as values and another dict that contains bears and invalid action names. """ try: default_actions = dict(section['default_actions']) except IndexError: return {}, {} action_dict = {action.get_metadata().name: action for action in ACTIONS} invalid_action_set = default_actions.values() - action_dict.keys() invalid_actions = {} if len(invalid_action_set) != 0: invalid_actions = { bear: action for bear, action in default_actions.items() if action in invalid_action_set} for invalid in invalid_actions.keys(): del default_actions[invalid] actions = {bearname: action_dict[action_name] for bearname, action_name in default_actions.items()} return actions, invalid_actions
[docs]def autoapply_actions(results, file_dict, file_diff_dict, section, log_printer): """ Auto-applies actions like defined in the given section. :param results: A list of results. :param file_dict: A dictionary containing the name of files and its contents. :param file_diff_dict: A dictionary that contains filenames as keys and diff objects as values. :param section: The section. :param log_printer: A log printer instance to log messages on. :return: A list of unprocessed results. """ default_actions, invalid_actions = get_default_actions(section) for bearname, actionname in invalid_actions.items(): log_printer.warn('Selected default action {!r} for bear {!r} does ' 'not exist. Ignoring action.'.format(actionname, bearname)) if len(default_actions) == 0: # There's nothing to auto-apply. return results not_processed_results = [] for result in results: try: # Match full bear names deterministically, prioritized! action = default_actions[result.origin] except KeyError: for bear_glob in default_actions: if fnmatch(result.origin, bear_glob): action = default_actions[bear_glob] break else: not_processed_results.append(result) continue if not action.is_applicable(result, file_dict, file_diff_dict): log_printer.warn('Selected default action {!r} for bear {!r} is ' 'not applicable. Action not applied.'.format( action.get_metadata().name, result.origin)) not_processed_results.append(result) continue try: action().apply_from_section(result, file_dict, file_diff_dict, section) log_printer.info('Applied {!r} on {} from {!r}.'.format( action.get_metadata().name, result.location_repr(), result.origin)) except Exception as ex: not_processed_results.append(result) log_printer.log_exception( 'Failed to execute action {!r} with error: {}.'.format( action.get_metadata().name, ex), ex) log_printer.debug('-> for result ' + repr(result) + '.') return not_processed_results
[docs]def check_result_ignore(result, ignore_ranges): """ Determines if the result has to be ignored. :param result: The result that needs to be checked. :param ignore_ranges: A list of tuples, each containing a list of lower cased affected bearnames and a SourceRange to ignore. If any of the bearname lists is empty, it is considered an ignore range for all bears. This may be a list of globbed bear wildcards. :return: True if the result has to be ignored. """ for bears, range in ignore_ranges: orig = result.origin.lower() if (result.overlaps(range) and (len(bears) == 0 or orig in bears or fnmatch(orig, bears))): return True return False
[docs]def get_file_dict(filename_list, log_printer): """ Reads all files into a dictionary. :param filename_list: List of names of paths to files to get contents of. :param log_printer: The logger which logs errors. :return: Reads the content of each file into a dictionary with filenames as keys. """ file_dict = {} for filename in filename_list: try: with open(filename, 'r', encoding='utf-8') as _file: file_dict[filename] = tuple(_file.readlines()) except UnicodeDecodeError: log_printer.warn("Failed to read file '{}'. It seems to contain " 'non-unicode characters. Leaving it ' 'out.'.format(filename)) except OSError as exception: # pragma: no cover log_printer.log_exception("Failed to read file '{}' because of " 'an unknown error. Leaving it ' 'out.'.format(filename), exception, log_level=LOG_LEVEL.WARNING) log_printer.debug('Files that will be checked:\n' + '\n'.join(file_dict.keys())) return file_dict
[docs]def filter_raising_callables(it, exception, *args, **kwargs): """ Filters all callable items inside the given iterator that raise the given exceptions. :param it: The iterator to filter. :param exception: The (tuple of) exception(s) to filter for. :param args: Positional arguments to pass to the callable. :param kwargs: Keyword arguments to pass to the callable. """ for elem in it: try: yield elem(*args, **kwargs) except exception: pass
[docs]def instantiate_bears(section, local_bear_list, global_bear_list, file_dict, message_queue, console_printer): """ Instantiates each bear with the arguments it needs. :param section: The section the bears belong to. :param local_bear_list: List of local bear classes to instantiate. :param global_bear_list: List of global bear classes to instantiate. :param file_dict: Dictionary containing filenames and their contents. :param message_queue: Queue responsible to maintain the messages delivered by the bears. :param console_printer: Object to print messages on the console. :return: The local and global bear instance lists. """ local_bear_list = [bear for bear in filter_raising_callables( local_bear_list, RuntimeError, section, message_queue, timeout=0.1)] global_bear_list = [bear for bear in filter_raising_callables( global_bear_list, RuntimeError, file_dict, section, message_queue, timeout=0.1)] return local_bear_list, global_bear_list
[docs]def instantiate_processes(section, local_bear_list, global_bear_list, job_count, cache, log_printer, console_printer): """ Instantiate the number of processes that will run bears which will be responsible for running bears in a multiprocessing environment. :param section: The section the bears belong to. :param local_bear_list: List of local bears belonging to the section. :param global_bear_list: List of global bears belonging to the section. :param job_count: Max number of processes to create. :param cache: An instance of ``misc.Caching.FileCache`` to use as a file cache buffer. :param log_printer: The log printer to warn to. :param console_printer: Object to print messages on the console. :return: A tuple containing a list of processes, and the arguments passed to each process which are the same for each object. """ filename_list = collect_files( glob_list(section.get('files', '')), log_printer, ignored_file_paths=glob_list(section.get('ignore', '')), limit_file_paths=glob_list(section.get('limit_files', ''))) # This stores all matched files irrespective of whether coala is run # only on changed files or not. Global bears require all the files complete_filename_list = filename_list # Start tracking all the files if cache: cache.track_files(set(complete_filename_list)) changed_files = cache.get_uncached_files( set(filename_list)) if cache else filename_list # If caching is enabled then the local bears should process only the # changed files. log_printer.debug("coala is run only on changed files, bears' log " 'messages from previous runs may not appear. You may ' 'use the `--flush-cache` flag to see them.') filename_list = changed_files # Note: the complete file dict is given as the file dict to bears and # the whole project is accessible to every bear. However, local bears are # run only for the changed files if caching is enabled. complete_file_dict = get_file_dict(complete_filename_list, log_printer) file_dict = {filename: complete_file_dict[filename] for filename in filename_list if filename in complete_file_dict} manager = multiprocessing.Manager() global_bear_queue = multiprocessing.Queue() filename_queue = multiprocessing.Queue() local_result_dict = manager.dict() global_result_dict = manager.dict() message_queue = multiprocessing.Queue() control_queue = multiprocessing.Queue() bear_runner_args = {'file_name_queue': filename_queue, 'local_bear_list': local_bear_list, 'global_bear_list': global_bear_list, 'global_bear_queue': global_bear_queue, 'file_dict': file_dict, 'local_result_dict': local_result_dict, 'global_result_dict': global_result_dict, 'message_queue': message_queue, 'control_queue': control_queue, 'timeout': 0.1} local_bear_list[:], global_bear_list[:] = instantiate_bears( section, local_bear_list, global_bear_list, complete_file_dict, message_queue, console_printer=console_printer) fill_queue(filename_queue, file_dict.keys()) fill_queue(global_bear_queue, range(len(global_bear_list))) return ([multiprocessing.Process(target=run, kwargs=bear_runner_args) for i in range(job_count)], bear_runner_args)
[docs]def get_ignore_scope(line, keyword): """ Retrieves the bears that are to be ignored defined in the given line. :param line: The line containing the ignore declaration. :param keyword: The keyword that was found. Everything after the rightmost occurrence of it will be considered for the scope. :return: A list of lower cased bearnames or an empty list (-> "all") """ toignore = line[line.rfind(keyword) + len(keyword):] if toignore.startswith('all'): return [] else: return list(StringConverter(toignore, list_delimiters=', '))
[docs]def yield_ignore_ranges(file_dict): """ Yields tuples of affected bears and a SourceRange that shall be ignored for those. :param file_dict: The file dictionary. """ for filename, file in file_dict.items(): start = None bears = [] stop_ignoring = False for line_number, line in enumerate(file, start=1): # Before lowering all lines ever read, first look for the biggest # common substring, case sensitive: I*gnor*e, start i*gnor*ing. if 'gnor' in line: line = line.lower() if 'start ignoring ' in line: start = line_number bears = get_ignore_scope(line, 'start ignoring ') elif 'stop ignoring' in line: stop_ignoring = True if start: yield (bears, SourceRange.from_values( filename, start, 1, line_number, len(file[line_number-1]))) elif 'ignore ' in line: end_line = min(line_number + 1, len(file)) yield (get_ignore_scope(line, 'ignore '), SourceRange.from_values( filename, line_number, 1, end_line, len(file[end_line - 1]))) if stop_ignoring is False and start is not None: yield (bears, SourceRange.from_values(filename, start, 1, len(file), len(file[-1])))
[docs]def get_file_list(results): """ Get the set of files that are affected in the given results. :param results: A list of results from which the list of files is to be extracted. :return: A set of file paths containing the mentioned list of files. """ return {code.file for result in results for code in result.affected_code}
[docs]def process_queues(processes, control_queue, local_result_dict, global_result_dict, file_dict, print_results, section, cache, log_printer, console_printer): """ Iterate the control queue and send the results received to the print_result method so that they can be presented to the user. :param processes: List of processes which can be used to run Bears. :param control_queue: Containing control elements that indicate whether there is a result available and which bear it belongs to. :param local_result_dict: Dictionary containing results respective to local bears. It is modified by the processes i.e. results are added to it by multiple processes. :param global_result_dict: Dictionary containing results respective to global bears. It is modified by the processes i.e. results are added to it by multiple processes. :param file_dict: Dictionary containing file contents with filename as keys. :param print_results: Prints all given results appropriate to the output medium. :param cache: An instance of ``misc.Caching.FileCache`` to use as a file cache buffer. :return: Return True if all bears execute successfully and Results were delivered to the user. Else False. """ file_diff_dict = {} retval = False # Number of processes working on local/global bears. They are count down # when the last queue element of that process is processed which may be # *after* the process has ended! local_processes = len(processes) global_processes = len(processes) global_result_buffer = [] result_files = set() ignore_ranges = list(yield_ignore_ranges(file_dict)) # One process is the logger thread while local_processes > 1: try: control_elem, index = control_queue.get(timeout=0.1) if control_elem == CONTROL_ELEMENT.LOCAL_FINISHED: local_processes -= 1 elif control_elem == CONTROL_ELEMENT.GLOBAL_FINISHED: global_processes -= 1 elif control_elem == CONTROL_ELEMENT.LOCAL: assert local_processes != 0 result_files.update(get_file_list(local_result_dict[index])) retval, res = print_result(local_result_dict[index], file_dict, retval, print_results, section, log_printer, file_diff_dict, ignore_ranges, console_printer=console_printer) local_result_dict[index] = res else: assert control_elem == CONTROL_ELEMENT.GLOBAL global_result_buffer.append(index) except queue.Empty: if get_running_processes(processes) < 2: # pragma: no cover # Recover silently, those branches are only # nondeterministically covered. break # Flush global result buffer for elem in global_result_buffer: result_files.update(get_file_list(global_result_dict[elem])) retval, res = print_result(global_result_dict[elem], file_dict, retval, print_results, section, log_printer, file_diff_dict, ignore_ranges, console_printer=console_printer) global_result_dict[elem] = res # One process is the logger thread while global_processes > 1: try: control_elem, index = control_queue.get(timeout=0.1) if control_elem == CONTROL_ELEMENT.GLOBAL: result_files.update(get_file_list(global_result_dict[index])) retval, res = print_result(global_result_dict[index], file_dict, retval, print_results, section, log_printer, file_diff_dict, ignore_ranges, console_printer) global_result_dict[index] = res else: assert control_elem == CONTROL_ELEMENT.GLOBAL_FINISHED global_processes -= 1 except queue.Empty: if get_running_processes(processes) < 2: # pragma: no cover # Recover silently, those branches are only # nondeterministically covered. break if cache: cache.untrack_files(result_files) return retval
[docs]def simplify_section_result(section_result): """ Takes in a section's result from ``execute_section`` and simplifies it for easy usage in other functions. :param section_result: The result of a section which was executed. :return: Tuple containing: - bool - True if results were yielded - bool - True if unfixed results were yielded - list - Results from all bears (local and global) """ section_yielded_result = section_result[0] results_for_section = [] for value in chain(section_result[1].values(), section_result[2].values()): if value is None: continue for result in value: results_for_section.append(result) section_yielded_unfixed_results = len(results_for_section) > 0 return (section_yielded_result, section_yielded_unfixed_results, results_for_section)
[docs]def execute_section(section, global_bear_list, local_bear_list, print_results, cache, log_printer, console_printer): """ Executes the section with the given bears. The execute_section method does the following things: 1. Prepare a Process - Load files - Create queues 2. Spawn up one or more Processes 3. Output results from the Processes 4. Join all processes :param section: The section to execute. :param global_bear_list: List of global bears belonging to the section. Dependencies are already resolved. :param local_bear_list: List of local bears belonging to the section. Dependencies are already resolved. :param print_results: Prints all given results appropriate to the output medium. :param cache: An instance of ``misc.Caching.FileCache`` to use as a file cache buffer. :param log_printer: The log_printer to warn to. :param console_printer: Object to print messages on the console. :return: Tuple containing a bool (True if results were yielded, False otherwise), a Manager.dict containing all local results(filenames are key) and a Manager.dict containing all global bear results (bear names are key) as well as the file dictionary. """ try: running_processes = int(section['jobs']) except ValueError: log_printer.warn("Unable to convert setting 'jobs' into a number. " 'Falling back to CPU count.') running_processes = get_cpu_count() except IndexError: running_processes = get_cpu_count() processes, arg_dict = instantiate_processes(section, local_bear_list, global_bear_list, running_processes, cache, log_printer, console_printer=console_printer) logger_thread = LogPrinterThread(arg_dict['message_queue'], log_printer) # Start and join the logger thread along with the processes to run bears processes.append(logger_thread) for runner in processes: runner.start() try: return (process_queues(processes, arg_dict['control_queue'], arg_dict['local_result_dict'], arg_dict['global_result_dict'], arg_dict['file_dict'], print_results, section, cache, log_printer, console_printer=console_printer), arg_dict['local_result_dict'], arg_dict['global_result_dict'], arg_dict['file_dict']) finally: logger_thread.running = False for runner in processes: runner.join()