Source code for wolfgpu.utils

"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import re
import logging
from datetime import timedelta,datetime
from time import time

[docs] class EveryNSeconds: def __init__(self, period: float, trigger_start = True, name: str = ""): """ :param period: Period expressed in seconds. :param name: Optional name (for display purposes) """ # n = duration in seconds self._period = period self._trigger_start = trigger_start self._absolute_start = self._mark = time() self.name = name def __str__(self): return f"EveryNSeconds {self.name} p={self._period} mark:{self.mark}"
[docs] def has_shot(self) -> bool: """Calling this method will clear the has_shot flag. So if you call it twice one after the other, without waiting no more than the period, it will give True one time and False the next time. """ t = time() # How much time since last shot. delta = t - self._mark has_shot = delta >= self._period or self._trigger_start if self._trigger_start: self._trigger_start = False # If we the timer has shot, then we plan immediately for # the next shot. if has_shot: #print(f"Timer {self.name or ''}: {delta:.3f}/{self._period} Accuracy:{delta/self._period - 1:.2f}") #old_mark = self._mark self._mark += (int(delta / self._period)) * self._period #print(f"mark={old_mark-self._absolute_start:.2f} delta={delta:.2f} => new mark = {self._mark-self._absolute_start:.2f}") return has_shot
[docs] def add_logging_level(levelName, levelNum, methodName = None): """ Adds a new logging level to the `logging` module and the currently configured logging class. `levelName` becomes an attribute of the `logging` module with the value `levelNum`. `methodName` becomes a convenience method for both `logging` itself and the class returned by `logging.getLoggerClass()` (usually just `logging.Logger`). If `methodName` is not specified, `levelName.lower()` is used. To avoid accidental clobberings of existing attributes, this method will raise an `AttributeError` if the level name is already an attribute of the `logging` module or if the method name is already present . Example ------- >>> add_logging_level('TRACE', logging.DEBUG - 5) >>> logging.getLogger(__name__).setLevel("TRACE") >>> logging.getLogger(__name__).trace('that worked') >>> logging.trace('so did this') >>> logging.TRACE 5 """ if not methodName: methodName = levelName.lower() if hasattr(logging, levelName) or hasattr(logging, methodName) or hasattr(logging.getLoggerClass(), methodName): return def logForLevel(self, message, *args, **kwargs): if self.isEnabledFor(levelNum): self._log(levelNum, message, args, **kwargs) def logToRoot(message, *args, **kwargs): logging.log(levelNum, message, *args, **kwargs) logging.addLevelName(levelNum, levelName) setattr(logging, levelName, levelNum) setattr(logging.getLoggerClass(), methodName, logForLevel) setattr(logging, methodName, logToRoot)
[docs] def init_global_logging(lvl=logging.INFO): add_logging_level("TRACE", logging.DEBUG - 1) # Import here to remove a dependency if one doesn't use # init_global_logging. import colorlog logger = logging.getLogger() # Get the *root* logger (see python's doc) logger.setLevel(lvl) handler = colorlog.StreamHandler() handler.setFormatter(colorlog.ColoredFormatter( '%(log_color)s%(levelname)s%(reset)s:%(message)s')) logger.handlers.clear() # Remove the current handlers to avoid double printing with my own handler logger.addHandler(handler)
[docs] def delete_dir_recursion(p): """Delete folder, sub-folders and files. Taken from: https://stackoverflow.com/questions/70246591/delete-directory-and-all-symlinks-recursively Use this to remove the content of a directory without removing the directory itself. """ for f in p.glob('**/*'): if f.is_symlink(): f.unlink(missing_ok=True) # missing_ok is added in python 3.8 #print(f'symlink {f.name} from path {f} was deleted') elif f.is_file(): f.unlink() #print(f'file: {f.name} from path {f} was deleted') elif f.is_dir(): try: f.rmdir() # delete empty sub-folder #print(f'folder: {f.name} from path {f} was deleted') except OSError: # sub-folder is not empty delete_dir_recursion(f) # recurse the current sub-folder except Exception as exception: # capture other exception logging.error(exception)
[docs] def nice_timestamp() -> str: """ A nice timestamp which makes sure the day, month and year cannot be mismatched. """ return datetime.now().strftime('%d %b %Y %X')
[docs] PARSE_DURATION_REGEX = re.compile(r'^((?P<days>[\.\d]+?)d)?((?P<hours>[\.\d]+?)h)?((?P<minutes>[\.\d]+?)m)?((?P<seconds>[\.\d]+?)s)?$')
[docs] def parse_duration(time_str) -> timedelta: """ Parse a time string e.g. (2h13m) into a timedelta object. Modified from virhilo's answer at https://stackoverflow.com/a/4628148/851699 :param time_str: A string identifying a duration. (eg. 2h13m) :return datetime.timedelta: A datetime.timedelta object Taken from https://stackoverflow.com/a/51916936/2030384 """ parts = PARSE_DURATION_REGEX.match(time_str) assert parts is not None, "Could not parse any time information from '{}'. Examples of valid strings: '8h', '2d8h5m20s', '2m4s'".format(time_str) time_params = {name: float(param) for name, param in parts.groupdict().items() if param} return timedelta(**time_params)
[docs] def parse_duration_to_seconds(time_str) -> float: try: return parse_duration(time_str).total_seconds() except: pass try: return parse_duration(time_str+"s").total_seconds() except: pass try: return float(time_str) except: pass raise Exception(f"Unable to parse seconds in '{time_str}'")
[docs] def seconds_to_duration_str(s:float): days = int(s) // (3600*24) s -= days * 3600*24 hours = int(s) // 3600 s -= hours * 3600 minutes = int(s) // 60 s -= minutes * 60 seconds = s r = [] if days: r.append(f"{days} days") if hours: r.append(f"{hours} h.") if minutes: r.append(f"{minutes} m.") if seconds > 0 or (hours == 0 and days == 0 and minutes == 0): r.append(f"{seconds:.3g} s.") return " ".join(r)
if __name__ == "__main__": assert seconds_to_duration_str(0) == "0 s." assert seconds_to_duration_str(1) == "1 s." assert seconds_to_duration_str(61) == "1 m. 1 s." assert seconds_to_duration_str(3600+60+1) == "1 h. 1 m. 1 s.", f"{seconds_to_duration_str(3600+60+1)}" assert seconds_to_duration_str(3600*24+3600+61) == "1 days 1 h. 1 m. 1 s." assert seconds_to_duration_str(2*3600*24+2*3600+60*2+2.2) == "2 days 2 h. 2 m. 2.2 s.", f"{seconds_to_duration_str(2*3600*24+2*3600+60*2+2.2)}" exit() assert parse_duration("11").seconds == 11 assert parse_duration("10s").seconds == 10 assert parse_duration("0.25s").total_seconds() == 0.25,f"{parse_duration('0.25s').seconds}" assert parse_duration("0s").total_seconds() == 0 assert parse_duration("1m").total_seconds() == 60 assert parse_duration("1m1.1s").total_seconds() == 61.1 from time import sleep
[docs] t = EveryNSeconds(1)
i= 0 start_t = time() while i < 10: print(f"{time() - start_t:.2f}") if t.has_shot(): print(f"top {i} {time() - start_t}") i += 1 else: sleep(1.3)