# This file is part of Streams.
#
# Streams is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Streams is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Streams. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from abc import ABCMeta, abstractmethod
from collections.abc import (
Callable,
Container,
Iterable,
Iterator,
)
from typing import (
Optional,
TypeVar,
Union,
)
__all__ = (
'LinearStream',
'Stream',
)
MT = TypeVar('MT') # type of values after mapping
VT = TypeVar('VT') # type of values before or without mapping
[docs]class Stream(Container[VT], metaclass=ABCMeta):
"""An abstract base class for stream classes (i.e. an
interface for robust objects that are capable of holding an
indefinite amount of data). All stream classes should directly or
indirectly derive from this class.
"""
__slots__: Iterable[str] = ()
def __repr__(self) -> str:
"""Returns the canonical string representation of the node."""
return f'{self.__class__.__name__}({repr(self.value)})'
@property
@abstractmethod
def value(self) -> VT:
"""Returns the value of the node."""
raise NotImplementedError
@value.setter
@abstractmethod
def value(self, value: VT) -> None:
"""Sets the value of the node."""
raise NotImplementedError
[docs] @classmethod
@abstractmethod
def map(
cls,
fn: Callable[..., MT],
*streams: Stream,
does_memoize: bool=True
) -> Stream[MT]:
"""Returns a new stream that contains the return values of the
function applied to each item in the streams.
:param fn: the function to be applied to each value in the
stream
:param streams: the tuple of streams that contain the values to
be mapped
:param does_memoize: By default, the node will cache the result
of ``next_thunk``. This can potentially hog a lot of memory.
To turn caching off, set ``does_memoize`` to ``False``. It
might be desirable to propagate this to composite streams
generated by custom functions.
"""
raise NotImplementedError
[docs]class LinearStream(Stream[VT], Iterable[VT], metaclass=ABCMeta):
"""An abstract linearly linked list class implemented as a stream
class.
"""
__slots__: Iterable[str] = ()
def __getitem__(
self,
key: Union[int, slice],
) -> Union[VT, LinearStream[VT]]:
"""Returns the value contained at a particular index or the
items contained within a particular slice.
:param key: an integer or a slice object whose ``start`` and
``stop`` values are integers. Negative indices are relative
to ``self``.
"""
if isinstance(key, int):
return self._starter(key).value
start, stop, step = key.start, key.stop, key.step
node = self
if start is not None:
if start < 0:
raise ValueError(
f'start must be nonnegative integer, not {start}'
)
node = node._starter(start)
if stop is not None:
if stop < 0:
raise ValueError(
f'stop must be nonnegative integer, not {stop}'
)
if start is not None:
stop -= start
if start > stop:
raise ValueError(
'start must be less than or equal to stop'
)
node = node._stopper(stop)
if step is not None:
if step <= 0:
raise ValueError(
f'step must be positive integer, not {step}'
)
node = node._stepper(step)
return node
def __iter__(self) -> Iterator[VT]:
"""Returns an iterator that yields the values from the stream.
"""
node = self
while node is not None:
yield node.value
node = node.next
@property
@abstractmethod
def next(self) -> Optional[LinearStream[VT]]:
"""Returns the next node."""
raise NotImplementedError
[docs] @abstractmethod
def filter(
self,
predicate: Callable[[VT], bool]=None,
) -> LinearStream[VT]:
"""Returns a new stream that filters out the values that do not
satisfy the predicate.
:param predicate: the function to apply to the values in the
stream. It defaults to testing each value itself for
validity.
"""
raise NotImplementedError
[docs] @classmethod
def from_iterable(
cls,
iterable: Iterable[VT],
does_memoize: bool=True,
) -> Optional[LinearStream[VT]]:
"""Returns a new stream that contains data from an iterable.
Use of the iterable elsewhere afterward is generally inadvisable
Otherwise, the stream might become out of sync.
:param iterable: the iterable from which to create the stream
:param does_memoize: By default, the node will cache the result
of ``next_thunk``. This can potentially hog a lot of memory.
To turn caching off, set ``does_memoize`` to ``False``. It
might be desirable to propagate this to composite streams
generated by custom functions.
"""
return cls._from_iterator(iter(iterable), does_memoize)
@classmethod
@abstractmethod
def _from_iterator(
cls,
iterator: Iterator[VT],
does_memoize: bool=True,
) -> Optional[LinearStream[VT]]:
"""Returns a new stream that contains data from an iterator. Use
of the iterator elsewhere afterward is generally inadvisable.
Otherwise, the stream might become out of sync.
:param iterator: the iterator that will be used to retrieve the
values for the stream
:param does_memoize: By default, the node will cache the result
of ``next_thunk``. This can potentially hog a lot of memory.
To turn caching off, set ``does_memoize`` to ``False``. It
might be desirable to propagate this to composite streams
generated by custom functions.
"""
raise NotImplementedError
@abstractmethod
def _starter(self, n: int) -> LinearStream[VT]:
"""Returns the node that is ``n`` nodes away from ``self``.
:param n: the number of nodes away to start from ``self``
"""
raise NotImplementedError
@abstractmethod
def _stepper(self, n: int) -> LinearStream[VT]:
"""Returns a new stream that skips every ``n - 1`` nodes.
:param n: the number of nodes to skip between nodes
"""
raise NotImplementedError
@abstractmethod
def _stopper(self, n: int) -> LinearStream[VT]:
"""Returns a new stream that is limited to ``n`` nodes.
:param n: the number of nodes to which to limit the stream
"""
raise NotImplementedError