Source code for wolfgpu.utils

import re
import logging
from datetime import timedelta,datetime
from time import time

[docs] class EveryNSeconds: def __init__(self, n: float, trigger_start = True): # n = duration in seconds self._n = n self._trigger_start = trigger_start self._mark = time() self._absolute_start = self._mark
[docs] def has_shot(self): t = time() # How much time since last shot. delta = t - self._mark has_shot = delta >= self._n or self._trigger_start if self._trigger_start: self._trigger_start = False if has_shot: #old_mark = self._mark self._mark += (int(delta / self._n)) * self._n #print(f"mark={old_mark-self._absolute_start:.2f} delta={delta:.2f} => new mark = {self._mark-self._absolute_start:.2f}") return has_shot
[docs] def add_logging_level(levelName, levelNum, methodName = None): """ Adds a new logging level to the `logging` module and the currently configured logging class. `levelName` becomes an attribute of the `logging` module with the value `levelNum`. `methodName` becomes a convenience method for both `logging` itself and the class returned by `logging.getLoggerClass()` (usually just `logging.Logger`). If `methodName` is not specified, `levelName.lower()` is used. To avoid accidental clobberings of existing attributes, this method will raise an `AttributeError` if the level name is already an attribute of the `logging` module or if the method name is already present . Example ------- >>> add_logging_level('TRACE', logging.DEBUG - 5) >>> logging.getLogger(__name__).setLevel("TRACE") >>> logging.getLogger(__name__).trace('that worked') >>> logging.trace('so did this') >>> logging.TRACE 5 """ if not methodName: methodName = levelName.lower() if hasattr(logging, levelName) or hasattr(logging, methodName) or hasattr(logging.getLoggerClass(), methodName): return def logForLevel(self, message, *args, **kwargs): if self.isEnabledFor(levelNum): self._log(levelNum, message, args, **kwargs) def logToRoot(message, *args, **kwargs): logging.log(levelNum, message, *args, **kwargs) logging.addLevelName(levelNum, levelName) setattr(logging, levelName, levelNum) setattr(logging.getLoggerClass(), methodName, logForLevel) setattr(logging, methodName, logToRoot)
[docs] def init_global_logging(lvl=logging.INFO): add_logging_level("TRACE", logging.DEBUG - 1) # Import here to remove a dependency if one doesn't use # init_global_logging. import colorlog logger = logging.getLogger() # Get the *root* logger (see python's doc) logger.setLevel(lvl) handler = colorlog.StreamHandler() handler.setFormatter(colorlog.ColoredFormatter( '%(log_color)s%(levelname)s%(reset)s:%(message)s')) logger.handlers.clear() # Remove the current handlers to avoid double printing with my own handler logger.addHandler(handler)
[docs] def delete_dir_recursion(p): """Delete folder, sub-folders and files. Taken from: https://stackoverflow.com/questions/70246591/delete-directory-and-all-symlinks-recursively Use this to remove the content of a directory without removing the directory itself. """ for f in p.glob('**/*'): if f.is_symlink(): f.unlink(missing_ok=True) # missing_ok is added in python 3.8 #print(f'symlink {f.name} from path {f} was deleted') elif f.is_file(): f.unlink() #print(f'file: {f.name} from path {f} was deleted') elif f.is_dir(): try: f.rmdir() # delete empty sub-folder #print(f'folder: {f.name} from path {f} was deleted') except OSError: # sub-folder is not empty delete_dir_recursion(f) # recurse the current sub-folder except Exception as exception: # capture other exception logging.error(exception)
[docs] def nice_timestamp() -> str: """ A nice timestamp which makes sure the day, month and year cannot be mismatched. """ return datetime.now().strftime('%d %b %Y %X')
[docs] PARSE_DURATION_REGEX = re.compile(r'^((?P<days>[\.\d]+?)d)?((?P<hours>[\.\d]+?)h)?((?P<minutes>[\.\d]+?)m)?((?P<seconds>[\.\d]+?)s)?$')
[docs] def parse_duration(time_str) -> timedelta: """ Parse a time string e.g. (2h13m) into a timedelta object. Modified from virhilo's answer at https://stackoverflow.com/a/4628148/851699 :param time_str: A string identifying a duration. (eg. 2h13m) :return datetime.timedelta: A datetime.timedelta object Taken from https://stackoverflow.com/a/51916936/2030384 """ parts = PARSE_DURATION_REGEX.match(time_str) assert parts is not None, "Could not parse any time information from '{}'. Examples of valid strings: '8h', '2d8h5m20s', '2m4s'".format(time_str) time_params = {name: float(param) for name, param in parts.groupdict().items() if param} return timedelta(**time_params)
[docs] def parse_duration_to_seconds(time_str) -> float: try: return parse_duration(time_str).total_seconds() except: pass try: return parse_duration(time_str+"s").total_seconds() except: pass try: return float(time_str) except: pass raise Exception(f"Unable to parse seconds in '{time_str}'")
[docs] def seconds_to_duration_str(s:float): days = int(s) // (3600*24) s -= days * 3600*24 hours = int(s) // 3600 s -= hours * 3600 minutes = int(s) // 60 s -= minutes * 60 seconds = s r = [] if days: r.append(f"{days} days") if hours: r.append(f"{hours} h.") if minutes: r.append(f"{minutes} m.") if seconds > 0 or (hours == 0 and days == 0 and minutes == 0): r.append(f"{seconds:.3g} s.") return " ".join(r)
if __name__ == "__main__": assert seconds_to_duration_str(0) == "0 s." assert seconds_to_duration_str(1) == "1 s." assert seconds_to_duration_str(61) == "1 m. 1 s." assert seconds_to_duration_str(3600+60+1) == "1 h. 1 m. 1 s.", f"{seconds_to_duration_str(3600+60+1)}" assert seconds_to_duration_str(3600*24+3600+61) == "1 days 1 h. 1 m. 1 s." assert seconds_to_duration_str(2*3600*24+2*3600+60*2+2.2) == "2 days 2 h. 2 m. 2.2 s.", f"{seconds_to_duration_str(2*3600*24+2*3600+60*2+2.2)}" exit() assert parse_duration("11").seconds == 11 assert parse_duration("10s").seconds == 10 assert parse_duration("0.25s").total_seconds() == 0.25,f"{parse_duration('0.25s').seconds}" assert parse_duration("0s").total_seconds() == 0 assert parse_duration("1m").total_seconds() == 60 assert parse_duration("1m1.1s").total_seconds() == 61.1 from time import sleep
[docs] t = EveryNSeconds(1)
i= 0 start_t = time() while i < 10: print(f"{time() - start_t:.2f}") if t.has_shot(): print(f"top {i} {time() - start_t}") i += 1 else: sleep(1.3)