r"""
Progress Manager
----------------
This is watch.utils.manager ported from geowatch.
CommandLine:
DEMO_PROGRESS=1 xdoctest -m progiter.manager __doc__
Example:
>>> # xdoctest: +REQUIRES(env:DEMO_PROGRESS)
>>> print('First a simple example')
>>> from progiter.manager import ProgressManager
>>> import time
>>> iterable = range(1000)
>>> pman = ProgressManager()
>>> with pman:
>>> for item in pman.progiter(iterable, desc='big loop'):
>>> time.sleep(0.005)
Example:
>>> # xdoctest: +REQUIRES(env:DEMO_PROGRESS)
>>> print('Choose your backend!')
>>> from progiter.manager import ProgressManager
>>> import time
>>> iterable = range(1000)
>>> print('threaded rich backend')
>>> pman = ProgressManager(backend='rich')
>>> with pman:
>>> for item in pman.progiter(iterable, desc='big loop'):
>>> time.sleep(0.005)
>>> ...
>>> print('unthreaded progiter backend')
>>> pman = ProgressManager(backend='progiter')
>>> with pman:
>>> for item in pman.progiter(iterable, desc='big loop'):
>>> time.sleep(0.005)
Example:
>>> # xdoctest: +REQUIRES(env:DEMO_PROGRESS)
>>> print('Now a more complex example')
>>> from progiter.manager import ProgressManager
>>> import time
>>> delay = 0.01
>>> # Can use plain progiter or rich
>>> # The usecase for plain progiter is when threads / live output
>>> # is not desirable and you just want plain stdout progress
>>> for backend in ['rich', 'progiter']:
>>> print(f'\n\n -- starting {backend} --\n\n')
>>> pman = ProgressManager(backend=backend)
>>> with pman:
>>> pbar1 = pman.progiter(range(5), desc='outer loop', verbose=3)
>>> for i in pbar1:
>>> pbar1.set_postfix(f'\\[step {i}]', refresh=False)
>>> for j1 in pman.progiter(range(100), desc=f'prepare inner loop {i}', transient=True):
>>> time.sleep(delay / 3)
>>> for j2 in pman.progiter(range(100), desc=f'execute inner loop {i}'):
>>> time.sleep(delay)
>>> for j3 in pman.progiter(range(100), desc=f'shutdown inner loop {i}', transient=True):
>>> time.sleep(delay / 3)
"""
import os
import weakref
from progiter.progiter import ProgIter
__all__ = ['ProgressManager']
# If truthy disable all threaded rich options
PROGITER_NOTHREAD = os.environ.get('PROGITER_NOTHREAD', 'auto')
if PROGITER_NOTHREAD == 'auto':
# Use rich outside of slurm
PROGITER_NOTHREAD = os.environ.get('SLURM_JOBID', '')
else:
PROGITER_NOTHREAD = bool(PROGITER_NOTHREAD)
LIVE_PROGRESS_MANAGERS = weakref.WeakValueDictionary()
class ManagedProgIter(ProgIter):
"""
Simple subclass of ProgIter to allowed it to be managed.
"""
def _set_manager(self, manager):
self.manager = weakref.proxy(manager)
def update_info(self, text):
self.info_text = text
info_text = getattr(self, 'info_text', None)
self.ensure_newline()
if info_text is not None:
# if self._cursor_at_newline:
print('+ --- Info --- +')
print(info_text)
print('+ ------------ +')
# self.display_message()
def update(self, n=1):
if not self.started:
self.begin()
manager = getattr(self, 'manager', None)
if manager is not None:
# TODO: vet this, not quite working. The idea is if part of a
# progress manager and this is not the "tail" progiter (i.e. it
# isn't the only progiter allowed to be clearing newlines) then we
# should ensure that the progiter that is allowed to clear the
# newline has its ensure_newline method called so we can actually
# write our progress without getting clobbered.
if len(manager.prog_iters) and manager.prog_iters[-1] is not self:
manager.prog_iters[-1].ensure_newline()
super().update(n=n)
def display_message(self):
super().display_message()
class RichProgIter:
"""
Ducktypes ManagedProgIter
TODO: enhance with the ability to have a update info panel that removes
the circular reference
Ignore:
from progiter import manager
manager.LIVE_PROGRESS_MANAGERS
print(len(manager.LIVE_PROGRESS_MANAGERS))
for v in manager.LIVE_PROGRESS_MANAGERS.values():
...
from progiter import * # NOQA
for _ in RichProgIter(range(1000)):
...
"""
def __init__(self, iterable=None, desc=None, total=None, freq=1, initial=0,
eta_window=64, clearline=True, adjust=True, time_thresh=2.0,
show_times=True, show_wall=False, enabled=True, verbose=None,
stream=None, chunksize=None, rel_adjust_limit=4.0,
transient=False, manager=None, spinner=False, **kwargs):
unhandled = {
'eta_window', 'clearline', 'adjust', 'time_thresh', 'show_times',
'show_wall', 'stream', 'chunksize', 'rel_adjust_limit',
}
# kwargs = udict(kwargs) - unhandled
kwargs = {k: v for k, v in kwargs.items() if k not in unhandled}
if manager is None:
manager = _RichProgIterManager()
self._self_managed = True
else:
manager = weakref.proxy(manager)
self._self_managed = False
self.manager = manager
self.iterable = iterable
self.enabled = enabled
self.spinner = spinner
if total is None:
try:
total = len(iterable)
except Exception:
...
self.total = total
self.desc = desc
addtask_kw = {}
if desc is not None:
addtask_kw['description'] = desc
else:
addtask_kw['description'] = ''
addtask_kw['total'] = self.total
self.task_id = self.manager.rich_progress.add_task(**addtask_kw)
self.transient = transient
self.extra = None
def start(self):
return self.begin()
def stop(self):
return self.end()
def begin(self):
if self._self_managed:
self.manager.start()
def end(self):
if self.transient:
self.remove()
if self._self_managed:
self.manager.stop()
def update(self, n=1):
self.manager.rich_progress.update(self.task_id, advance=n)
step = update
def __iter__(self):
if not self.enabled:
yield from self.iterable
else:
self.start()
for item in self.iterable:
yield item
self.manager.rich_progress.update(self.task_id, advance=1)
if self.total is None:
task = self.manager.rich_progress._tasks[self.task_id]
self.manager.rich_progress.update(self.task_id, total=task.completed)
self.stop()
def remove(self):
"""
Remove this progress task from its rich manager
"""
if self.enabled:
self.manager.rich_progress.remove_task(self.task_id)
def update_info(self, text):
if self.enabled:
# FIXME: remove circular reference
self.manager.update_info(text)
def ensure_newline(self):
...
def set_postfix_str(self, text, refresh=True):
self.extra = text
parts = [self.desc] if self.desc is not None else []
if self.extra is not None:
parts.append(self.extra)
if self.enabled:
description = ' '.join(parts)
self.manager.rich_progress.update(
self.task_id, description=description, refresh=refresh)
set_postfix = set_postfix_str
set_extra = set_postfix_str
class BaseProgIterManager:
def new(self, *args, **kw):
return self.progiter(*args, **kw)
def __call__(self, *args, **kw):
return self.progiter(*args, **kw)
def start(self):
return self
def begin(self):
return self.start()
def stop(self, **kwargs):
...
def __enter__(self):
return self.start()
def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
self.stop(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
# Global var
MAIN_RICH_PMAN = None
class _RichProgIterManager(BaseProgIterManager):
"""
rich specific backend.
"""
def __init__(self, **kwargs):
self.prog_iters = []
self.enabled = kwargs.get('enabled', True)
self.info_panel = None
self.rich_progress = None
self._is_main_manager = None
self.setup_rich()
self._active = False
# Can we make this work?
# def __del__(self):
# if self._active:
# self.stop()
def progiter(self, iterable=None, total=None, desc=None, transient=False, spinner=False, verbose='auto', **kw):
if not self._active:
self.start()
# Fixme remove circular ref
# self.rich_progress.pman = self
prog = RichProgIter(
manager=self, iterable=iterable, total=total, desc=desc,
transient=transient, spinner=spinner, **kw)
self.prog_iters.append(prog)
return prog
def setup_rich(self):
global MAIN_RICH_PMAN
import rich
import rich.progress
from rich.console import Group
from rich.live import Live
from rich.progress import BarColumn, TextColumn
from rich.progress import Progress as richProgress
from rich.progress import SpinnerColumn
from rich.progress import ProgressColumn, Text
# from rich.style import Style
if MAIN_RICH_PMAN is not None:
self._is_main_manager = False
self.live_context = None
self.rich_progress = MAIN_RICH_PMAN.rich_progress
self.progress_group = MAIN_RICH_PMAN.progress_group
else:
self._is_main_manager = True
class ProgressRateColumn(ProgressColumn):
"""Renders human readable transfer speed."""
def render(self, task) -> Text:
"""Show progress speed speed."""
_iters_per_second = task.finished_speed or task.speed
if _iters_per_second is not None:
rate_format = '4.2f' if _iters_per_second > .001 else 'g'
fmt = '{:' + rate_format + '} Hz'
text = fmt.format(_iters_per_second)
else:
text = '?'
# style = Style(color="red")
style = 'progress.data.speed'
renderable = Text(text, style=style)
return renderable
self.rich_progress = richProgress(
TextColumn("{task.description}"),
SpinnerColumn(),
BarColumn(),
"[progress.percentage]{task.percentage:>3.0f}%",
rich.progress.MofNCompleteColumn(),
# rich.progress.TransferSpeedColumn(),
ProgressRateColumn(),
'eta',
rich.progress.TimeRemainingColumn(),
'total',
rich.progress.TimeElapsedColumn(),
)
self.info_panel = None
self.progress_group = Group(
# self.info_panel,
self.rich_progress,
)
self.live_context = Live(self.progress_group)
MAIN_RICH_PMAN = self
def update_info(self, text):
from rich.panel import Panel
if self.info_panel is None:
self.info_panel = Panel(text)
self.progress_group.renderables.insert(0, self.info_panel)
else:
self.info_panel.renderable = text
def start(self):
if self.enabled and not self._active:
self._active = True
if self._is_main_manager:
return self.live_context.__enter__()
def stop(self, **kw):
if self.enabled and self._active:
if not kw:
kw['exc_type'] = None
kw['exc_val'] = None
kw['exc_tb'] = None
if self._is_main_manager:
global MAIN_RICH_PMAN
MAIN_RICH_PMAN = None
ret = self.live_context.__exit__(**kw)
self._is_main_manager = False
else:
ret = None
self._active = False
return ret
class _ProgIterManager(BaseProgIterManager):
"""
progiter specific backend
"""
def __init__(self, **kwargs):
self.enabled = kwargs.get('enabled', True)
# Default arguments for new progiters
self.default_progkw = {
'time_thresh': 2.0,
}
self.default_progkw.update(kwargs)
self.prog_iters = []
def progiter(self, iterable=None, total=None, desc=None, transient=False, spinner=False, verbose='auto', **kw):
progkw = self.default_progkw.copy()
progkw.update(kw)
progkw['verbose'] = verbose
if verbose == 'auto':
progkw['verbose'] = self.default_progkw.get('verbose', 1)
if True:
# Change all other - now outer - progiters to verbose=3 mode
for other in self.prog_iters:
other.ensure_newline()
if other.enabled:
other.clearline = False
other.adjust = False
other.freq = 1
prog = ManagedProgIter(iterable, total=total, desc=desc, **progkw)
prog._set_manager(self)
self.prog_iters.append(prog)
return prog
def update_info(self, text):
if len(self.prog_iters) == 0:
# if self._cursor_at_newline:
print('+ --- Info --- +')
print(text)
print('+ ------------ +')
else:
self.prog_iters[0].update_info(text)
[docs]class ProgressManager(BaseProgIterManager):
r"""
A progress manager.
Manage multiple progress bars, either with rich or ProgIter.
CommandLine:
xdoctest -m progiter.manager ProgressManager:0
xdoctest -m progiter.manager ProgressManager:1
xdoctest -m progiter.manager ProgressManager:2
Example:
>>> from progiter.manager import ProgressManager
>>> from progiter import progiter
>>> # Can use plain progiter or rich
>>> # The usecase for plain progiter is when threads / live output
>>> # is not desirable and you just want plain stdout progress
>>> pman = ProgressManager(backend='progiter')
>>> with pman:
>>> oprog = pman.progiter(range(20), desc='outer loop', verbose=3)
>>> for i in oprog:
>>> oprog.set_postfix(f'Doing step {i}', refresh=False)
>>> for i in pman.progiter(range(100), desc=f'inner loop {i}'):
>>> pass
>>> #
>>> # xdoctest: +REQUIRES(module:rich)
>>> self = pman = ProgressManager(backend='rich')
>>> pman = ProgressManager(backend='rich')
>>> with pman:
>>> oprog = pman.progiter(range(20), desc='outer loop', verbose=3)
>>> for i in oprog:
>>> oprog.set_postfix(f'Doing step {i}', refresh=False)
>>> for i in pman.progiter(range(100), desc=f'inner loop {i}'):
>>> pass
Example:
>>> # A fairly complex example
>>> # xdoctest: +REQUIRES(module:rich)
>>> from progiter.manager import ProgressManager
>>> import time
>>> delay = 0.00005
>>> N_inner = 300
>>> N_outer = 11
>>> self = pman = ProgressManager(backend='rich')
>>> with pman:
>>> oprog = pman(range(N_outer), desc='outer loop')
>>> for i in oprog:
>>> if i > 7:
>>> self.update_info(f'The info panel gives detailed updates\nWe are now at step {i}\nWe are just about done now')
>>> elif i > 5:
>>> self.update_info(f'The info panel gives detailed updates\nWe are now at step {i}')
>>> oprog.set_postfix(f'Doing step {i}')
>>> N = 1000
>>> for j in pman(iter(range(N_inner)), total=None if i % 2 == 0 else N_inner, desc=f'inner loop {i}', transient=i < 4):
>>> time.sleep(delay)
Example:
>>> # Test complex example over a grid of parameters
>>> # xdoctest: +REQUIRES(module:ubelt)
>>> # xdoctest: +REQUIRES(module:rich)
>>> import ubelt as ub
>>> from progiter.manager import ProgressManager, ManagedProgIter
>>> import time
>>> delay = 0.000005
>>> N_inner = 300
>>> N_outer = 11
>>> basis = {
>>> 'with_info': [0, 1],
>>> 'backend': ['progiter', 'rich'],
>>> 'enabled': [0, 1],
>>> #'with_info': [1],
>>> }
>>> grid = list(ub.named_product(basis))
>>> grid_prog = ManagedProgIter(grid, desc='Test cases over grid', verbose=3)
>>> grid_prog.update_info('Here we go')
>>> for item in grid:
>>> grid_prog.ensure_newline()
>>> grid_prog.update_info(f'Running grid test {ub.urepr(item, nl=1)}')
>>> print('\n\n')
>>> self = ProgressManager(backend=item['backend'], enabled=item['enabled'])
>>> with self:
>>> outer_prog = self.progiter(range(N_outer), desc='outer loop')
>>> for i in outer_prog:
>>> if item['with_info']:
>>> if i > 7:
>>> outer_prog.update_info(f'The info panel gives detailed updates\nWe are now at step {i}\nWe are just about done now')
>>> elif i > 5:
>>> outer_prog.update_info(f'The info panel gives detailed updates\nWe are now at step {i}')
>>> outer_prog.set_postfix(f'Doing step {i}')
>>> inner_kwargs = dict(
>>> total=None if i % 2 == 0 else N_inner,
>>> transient=i < 4,
>>> time_thresh=delay * 2.3,
>>> desc=f'inner loop {i}',
>>> )
>>> for j in self.progiter(iter(range(N_inner)), **inner_kwargs):
>>> time.sleep(delay)
>>> grid_prog.update_info(f'Finished test item')
Example:
>>> # Demo manual usage
>>> # xdoctest: +REQUIRES(module:rich)
>>> from progiter.manager import ProgressManager
>>> from progiter import manager
>>> import time
>>> pman = ProgressManager()
>>> pman.start()
>>> task1 = pman.progiter(desc='task1', total=100)
>>> task2 = pman.progiter(desc='task2')
>>> for i in range(100):
>>> task1.update()
>>> task2.update(2)
>>> time.sleep(0.001)
>>> ProgressManager.stopall()
Example:
>>> # Demo manual usage (progiter backend)
>>> from progiter.manager import ProgressManager
>>> from progiter import manager
>>> import time
>>> pman = ProgressManager(backend='progiter', adjust=0, freq=1)
>>> pman.start()
>>> task1 = pman.progiter(desc='task1', total=12)
>>> task2 = pman.progiter(desc='task2')
>>> task1.update()
>>> task2.update()
>>> for i in range(10):
>>> time.sleep(0.001)
>>> task1.update()
>>> time.sleep(0.001)
>>> task2.update(2)
>>> ProgressManager.stopall()
"""
def __init__(self, backend='rich', **kwargs):
LIVE_PROGRESS_MANAGERS[id(self)] = self
# TODO: check if we are being tee-d and use progiter instead if we are.
if PROGITER_NOTHREAD:
backend = 'progiter'
if backend == 'rich':
self.backend = _RichProgIterManager(**kwargs)
elif backend == 'progiter':
self.backend = _ProgIterManager(**kwargs)
else:
raise KeyError(backend)
[docs] def progiter(self, *args, **kw):
prog = self.backend.progiter(*args, **kw)
return prog
[docs] def update_info(self, text):
self.backend.update_info(text)
[docs] def start(self):
self.backend.start()
[docs] def stop(self, *args, **kwargs):
self.backend.stop(*args, **kwargs)
@property
def _is_main_manager(self):
return self.backend._is_main_manager
[docs] @classmethod
def stopall(self):
"""
Stop all background progress threads (likely only 1 exists)
Ignore:
from progiter import manager
manager.ProgressManager.stopall()
"""
for pman in LIVE_PROGRESS_MANAGERS.values():
pman.stop()