"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
import re
import logging
from datetime import timedelta,datetime
from time import time
[docs]
class EveryNSeconds:
def __init__(self, period: float, trigger_start = True, name: str = ""):
"""
:param period: Period expressed in seconds.
:param name: Optional name (for display purposes)
"""
# n = duration in seconds
self._period = period
self._trigger_start = trigger_start
self._absolute_start = self._mark = time()
self.name = name
def __str__(self):
return f"EveryNSeconds {self.name} p={self._period} mark:{self.mark}"
[docs]
def has_shot(self) -> bool:
"""Calling this method will clear the has_shot flag.
So if you call it twice one after the other, without waiting no more
than the period, it will give True one time and False the next time.
"""
t = time()
# How much time since last shot.
delta = t - self._mark
has_shot = delta >= self._period or self._trigger_start
if self._trigger_start:
self._trigger_start = False
# If we the timer has shot, then we plan immediately for
# the next shot.
if has_shot:
#print(f"Timer {self.name or ''}: {delta:.3f}/{self._period} Accuracy:{delta/self._period - 1:.2f}")
#old_mark = self._mark
self._mark += (int(delta / self._period)) * self._period
#print(f"mark={old_mark-self._absolute_start:.2f} delta={delta:.2f} => new mark = {self._mark-self._absolute_start:.2f}")
return has_shot
[docs]
def add_logging_level(levelName, levelNum, methodName = None):
""" Adds a new logging level to the `logging` module and the currently configured logging class.
`levelName` becomes an attribute of the `logging` module with the value `levelNum`.
`methodName` becomes a convenience method for both `logging` itself and the class returned by `logging.getLoggerClass()`
(usually just `logging.Logger`). If `methodName` is not specified, `levelName.lower()` is used.
To avoid accidental clobberings of existing attributes, this method will raise an `AttributeError` if the level name
is already an attribute of the `logging` module or if the method name is already present .
Example
-------
>>> add_logging_level('TRACE', logging.DEBUG - 5)
>>> logging.getLogger(__name__).setLevel("TRACE")
>>> logging.getLogger(__name__).trace('that worked')
>>> logging.trace('so did this')
>>> logging.TRACE
5
"""
if not methodName:
methodName = levelName.lower()
if hasattr(logging, levelName) or hasattr(logging, methodName) or hasattr(logging.getLoggerClass(), methodName):
return
def logForLevel(self, message, *args, **kwargs):
if self.isEnabledFor(levelNum):
self._log(levelNum, message, args, **kwargs)
def logToRoot(message, *args, **kwargs):
logging.log(levelNum, message, *args, **kwargs)
logging.addLevelName(levelNum, levelName)
setattr(logging, levelName, levelNum)
setattr(logging.getLoggerClass(), methodName, logForLevel)
setattr(logging, methodName, logToRoot)
[docs]
def init_global_logging(lvl=logging.INFO):
add_logging_level("TRACE", logging.DEBUG - 1)
# Import here to remove a dependency if one doesn't use
# init_global_logging.
import colorlog
logger = logging.getLogger() # Get the *root* logger (see python's doc)
logger.setLevel(lvl)
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)s%(reset)s:%(message)s'))
logger.handlers.clear() # Remove the current handlers to avoid double printing with my own handler
logger.addHandler(handler)
[docs]
def delete_dir_recursion(p):
"""Delete folder, sub-folders and files.
Taken from: https://stackoverflow.com/questions/70246591/delete-directory-and-all-symlinks-recursively
Use this to remove the content of a directory without removing the
directory itself.
"""
for f in p.glob('**/*'):
if f.is_symlink():
f.unlink(missing_ok=True) # missing_ok is added in python 3.8
#print(f'symlink {f.name} from path {f} was deleted')
elif f.is_file():
f.unlink()
#print(f'file: {f.name} from path {f} was deleted')
elif f.is_dir():
try:
f.rmdir() # delete empty sub-folder
#print(f'folder: {f.name} from path {f} was deleted')
except OSError: # sub-folder is not empty
delete_dir_recursion(f) # recurse the current sub-folder
except Exception as exception: # capture other exception
logging.error(exception)
[docs]
def nice_timestamp() -> str:
""" A nice timestamp which makes sure the day, month and year
cannot be mismatched.
"""
return datetime.now().strftime('%d %b %Y %X')
[docs]
PARSE_DURATION_REGEX = re.compile(r'^((?P<days>[\.\d]+?)d)?((?P<hours>[\.\d]+?)h)?((?P<minutes>[\.\d]+?)m)?((?P<seconds>[\.\d]+?)s)?$')
[docs]
def parse_duration(time_str) -> timedelta:
"""
Parse a time string e.g. (2h13m) into a timedelta object.
Modified from virhilo's answer at https://stackoverflow.com/a/4628148/851699
:param time_str: A string identifying a duration. (eg. 2h13m)
:return datetime.timedelta: A datetime.timedelta object
Taken from https://stackoverflow.com/a/51916936/2030384
"""
parts = PARSE_DURATION_REGEX.match(time_str)
assert parts is not None, "Could not parse any time information from '{}'. Examples of valid strings: '8h', '2d8h5m20s', '2m4s'".format(time_str)
time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
return timedelta(**time_params)
[docs]
def parse_duration_to_seconds(time_str) -> float:
try:
return parse_duration(time_str).total_seconds()
except:
pass
try:
return parse_duration(time_str+"s").total_seconds()
except:
pass
try:
return float(time_str)
except:
pass
raise Exception(f"Unable to parse seconds in '{time_str}'")
[docs]
def seconds_to_duration_str(s:float):
days = int(s) // (3600*24)
s -= days * 3600*24
hours = int(s) // 3600
s -= hours * 3600
minutes = int(s) // 60
s -= minutes * 60
seconds = s
r = []
if days:
r.append(f"{days} days")
if hours:
r.append(f"{hours} h.")
if minutes:
r.append(f"{minutes} m.")
if seconds > 0 or (hours == 0 and days == 0 and minutes == 0):
r.append(f"{seconds:.3g} s.")
return " ".join(r)
if __name__ == "__main__":
assert seconds_to_duration_str(0) == "0 s."
assert seconds_to_duration_str(1) == "1 s."
assert seconds_to_duration_str(61) == "1 m. 1 s."
assert seconds_to_duration_str(3600+60+1) == "1 h. 1 m. 1 s.", f"{seconds_to_duration_str(3600+60+1)}"
assert seconds_to_duration_str(3600*24+3600+61) == "1 days 1 h. 1 m. 1 s."
assert seconds_to_duration_str(2*3600*24+2*3600+60*2+2.2) == "2 days 2 h. 2 m. 2.2 s.", f"{seconds_to_duration_str(2*3600*24+2*3600+60*2+2.2)}"
exit()
assert parse_duration("11").seconds == 11
assert parse_duration("10s").seconds == 10
assert parse_duration("0.25s").total_seconds() == 0.25,f"{parse_duration('0.25s').seconds}"
assert parse_duration("0s").total_seconds() == 0
assert parse_duration("1m").total_seconds() == 60
assert parse_duration("1m1.1s").total_seconds() == 61.1
from time import sleep
i= 0
start_t = time()
while i < 10:
print(f"{time() - start_t:.2f}")
if t.has_shot():
print(f"top {i} {time() - start_t}")
i += 1
else:
sleep(1.3)