import re
import logging
from datetime import timedelta,datetime
from time import time
[docs]
class EveryNSeconds:
def __init__(self, n: float, trigger_start = True):
# n = duration in seconds
self._n = n
self._trigger_start = trigger_start
self._mark = time()
self._absolute_start = self._mark
[docs]
def has_shot(self):
t = time()
# How much time since last shot.
delta = t - self._mark
has_shot = delta >= self._n or self._trigger_start
if self._trigger_start:
self._trigger_start = False
if has_shot:
#old_mark = self._mark
self._mark += (int(delta / self._n)) * self._n
#print(f"mark={old_mark-self._absolute_start:.2f} delta={delta:.2f} => new mark = {self._mark-self._absolute_start:.2f}")
return has_shot
[docs]
def add_logging_level(levelName, levelNum, methodName = None):
""" Adds a new logging level to the `logging` module and the currently configured logging class.
`levelName` becomes an attribute of the `logging` module with the value `levelNum`.
`methodName` becomes a convenience method for both `logging` itself and the class returned by `logging.getLoggerClass()`
(usually just `logging.Logger`). If `methodName` is not specified, `levelName.lower()` is used.
To avoid accidental clobberings of existing attributes, this method will raise an `AttributeError` if the level name
is already an attribute of the `logging` module or if the method name is already present .
Example
-------
>>> add_logging_level('TRACE', logging.DEBUG - 5)
>>> logging.getLogger(__name__).setLevel("TRACE")
>>> logging.getLogger(__name__).trace('that worked')
>>> logging.trace('so did this')
>>> logging.TRACE
5
"""
if not methodName:
methodName = levelName.lower()
if hasattr(logging, levelName) or hasattr(logging, methodName) or hasattr(logging.getLoggerClass(), methodName):
return
def logForLevel(self, message, *args, **kwargs):
if self.isEnabledFor(levelNum):
self._log(levelNum, message, args, **kwargs)
def logToRoot(message, *args, **kwargs):
logging.log(levelNum, message, *args, **kwargs)
logging.addLevelName(levelNum, levelName)
setattr(logging, levelName, levelNum)
setattr(logging.getLoggerClass(), methodName, logForLevel)
setattr(logging, methodName, logToRoot)
[docs]
def init_global_logging(lvl=logging.INFO):
add_logging_level("TRACE", logging.DEBUG - 1)
# Import here to remove a dependency if one doesn't use
# init_global_logging.
import colorlog
logger = logging.getLogger() # Get the *root* logger (see python's doc)
logger.setLevel(lvl)
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)s%(reset)s:%(message)s'))
logger.handlers.clear() # Remove the current handlers to avoid double printing with my own handler
logger.addHandler(handler)
[docs]
def delete_dir_recursion(p):
"""Delete folder, sub-folders and files.
Taken from: https://stackoverflow.com/questions/70246591/delete-directory-and-all-symlinks-recursively
Use this to remove the content of a directory without removing the
directory itself.
"""
for f in p.glob('**/*'):
if f.is_symlink():
f.unlink(missing_ok=True) # missing_ok is added in python 3.8
#print(f'symlink {f.name} from path {f} was deleted')
elif f.is_file():
f.unlink()
#print(f'file: {f.name} from path {f} was deleted')
elif f.is_dir():
try:
f.rmdir() # delete empty sub-folder
#print(f'folder: {f.name} from path {f} was deleted')
except OSError: # sub-folder is not empty
delete_dir_recursion(f) # recurse the current sub-folder
except Exception as exception: # capture other exception
logging.error(exception)
[docs]
def nice_timestamp() -> str:
""" A nice timestamp which makes sure the day, month and year
cannot be mismatched.
"""
return datetime.now().strftime('%d %b %Y %X')
[docs]
PARSE_DURATION_REGEX = re.compile(r'^((?P<days>[\.\d]+?)d)?((?P<hours>[\.\d]+?)h)?((?P<minutes>[\.\d]+?)m)?((?P<seconds>[\.\d]+?)s)?$')
[docs]
def parse_duration(time_str) -> timedelta:
"""
Parse a time string e.g. (2h13m) into a timedelta object.
Modified from virhilo's answer at https://stackoverflow.com/a/4628148/851699
:param time_str: A string identifying a duration. (eg. 2h13m)
:return datetime.timedelta: A datetime.timedelta object
Taken from https://stackoverflow.com/a/51916936/2030384
"""
parts = PARSE_DURATION_REGEX.match(time_str)
assert parts is not None, "Could not parse any time information from '{}'. Examples of valid strings: '8h', '2d8h5m20s', '2m4s'".format(time_str)
time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
return timedelta(**time_params)
[docs]
def parse_duration_to_seconds(time_str) -> float:
try:
return parse_duration(time_str).total_seconds()
except:
pass
try:
return parse_duration(time_str+"s").total_seconds()
except:
pass
try:
return float(time_str)
except:
pass
raise Exception(f"Unable to parse seconds in '{time_str}'")
[docs]
def seconds_to_duration_str(s:float):
days = int(s) // (3600*24)
s -= days * 3600*24
hours = int(s) // 3600
s -= hours * 3600
minutes = int(s) // 60
s -= minutes * 60
seconds = s
r = []
if days:
r.append(f"{days} days")
if hours:
r.append(f"{hours} h.")
if minutes:
r.append(f"{minutes} m.")
if seconds > 0 or (hours == 0 and days == 0 and minutes == 0):
r.append(f"{seconds:.3g} s.")
return " ".join(r)
if __name__ == "__main__":
assert seconds_to_duration_str(0) == "0 s."
assert seconds_to_duration_str(1) == "1 s."
assert seconds_to_duration_str(61) == "1 m. 1 s."
assert seconds_to_duration_str(3600+60+1) == "1 h. 1 m. 1 s.", f"{seconds_to_duration_str(3600+60+1)}"
assert seconds_to_duration_str(3600*24+3600+61) == "1 days 1 h. 1 m. 1 s."
assert seconds_to_duration_str(2*3600*24+2*3600+60*2+2.2) == "2 days 2 h. 2 m. 2.2 s.", f"{seconds_to_duration_str(2*3600*24+2*3600+60*2+2.2)}"
exit()
assert parse_duration("11").seconds == 11
assert parse_duration("10s").seconds == 10
assert parse_duration("0.25s").total_seconds() == 0.25,f"{parse_duration('0.25s').seconds}"
assert parse_duration("0s").total_seconds() == 0
assert parse_duration("1m").total_seconds() == 60
assert parse_duration("1m1.1s").total_seconds() == 61.1
from time import sleep
i= 0
start_t = time()
while i < 10:
print(f"{time() - start_t:.2f}")
if t.has_shot():
print(f"top {i} {time() - start_t}")
i += 1
else:
sleep(1.3)