Source code for streams.abc

# This file is part of Streams.
#
# Streams is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Streams is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Streams.  If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations
from abc import ABCMeta, abstractmethod
from collections.abc import (
    Callable,
    Container,
    Iterable,
    Iterator,
)
from typing import (
    Optional,
    TypeVar,
    Union,
)

__all__ = (
    'LinearStream',
    'Stream',
)

MT = TypeVar('MT')  # type of values after mapping
VT = TypeVar('VT')  # type of values before or without mapping


[docs]class Stream(Container[VT], metaclass=ABCMeta): """An abstract base class for stream classes (i.e. an interface for robust objects that are capable of holding an indefinite amount of data). All stream classes should directly or indirectly derive from this class. """ __slots__: Iterable[str] = () def __repr__(self) -> str: """Returns the canonical string representation of the node.""" return f'{self.__class__.__name__}({repr(self.value)})' @property @abstractmethod def value(self) -> VT: """Returns the value of the node.""" raise NotImplementedError @value.setter @abstractmethod def value(self, value: VT) -> None: """Sets the value of the node.""" raise NotImplementedError
[docs] @classmethod @abstractmethod def map( cls, fn: Callable[..., MT], *streams: Stream, does_memoize: bool=True ) -> Stream[MT]: """Returns a new stream that contains the return values of the function applied to each item in the streams. :param fn: the function to be applied to each value in the stream :param streams: the tuple of streams that contain the values to be mapped :param does_memoize: By default, the node will cache the result of ``next_thunk``. This can potentially hog a lot of memory. To turn caching off, set ``does_memoize`` to ``False``. It might be desirable to propagate this to composite streams generated by custom functions. """ raise NotImplementedError
[docs]class LinearStream(Stream[VT], Iterable[VT], metaclass=ABCMeta): """An abstract linearly linked list class implemented as a stream class. """ __slots__: Iterable[str] = () def __getitem__( self, key: Union[int, slice], ) -> Union[VT, LinearStream[VT]]: """Returns the value contained at a particular index or the items contained within a particular slice. :param key: an integer or a slice object whose ``start`` and ``stop`` values are integers. Negative indices are relative to ``self``. """ if isinstance(key, int): return self._starter(key).value start, stop, step = key.start, key.stop, key.step node = self if start is not None: if start < 0: raise ValueError( f'start must be nonnegative integer, not {start}' ) node = node._starter(start) if stop is not None: if stop < 0: raise ValueError( f'stop must be nonnegative integer, not {stop}' ) if start is not None: stop -= start if start > stop: raise ValueError( 'start must be less than or equal to stop' ) node = node._stopper(stop) if step is not None: if step <= 0: raise ValueError( f'step must be positive integer, not {step}' ) node = node._stepper(step) return node def __iter__(self) -> Iterator[VT]: """Returns an iterator that yields the values from the stream. """ node = self while node is not None: yield node.value node = node.next @property @abstractmethod def next(self) -> Optional[LinearStream[VT]]: """Returns the next node.""" raise NotImplementedError
[docs] @abstractmethod def filter( self, predicate: Callable[[VT], bool]=None, ) -> LinearStream[VT]: """Returns a new stream that filters out the values that do not satisfy the predicate. :param predicate: the function to apply to the values in the stream. It defaults to testing each value itself for validity. """ raise NotImplementedError
[docs] @classmethod def from_iterable( cls, iterable: Iterable[VT], does_memoize: bool=True, ) -> Optional[LinearStream[VT]]: """Returns a new stream that contains data from an iterable. Use of the iterable elsewhere afterward is generally inadvisable Otherwise, the stream might become out of sync. :param iterable: the iterable from which to create the stream :param does_memoize: By default, the node will cache the result of ``next_thunk``. This can potentially hog a lot of memory. To turn caching off, set ``does_memoize`` to ``False``. It might be desirable to propagate this to composite streams generated by custom functions. """ return cls._from_iterator(iter(iterable), does_memoize)
@classmethod @abstractmethod def _from_iterator( cls, iterator: Iterator[VT], does_memoize: bool=True, ) -> Optional[LinearStream[VT]]: """Returns a new stream that contains data from an iterator. Use of the iterator elsewhere afterward is generally inadvisable. Otherwise, the stream might become out of sync. :param iterator: the iterator that will be used to retrieve the values for the stream :param does_memoize: By default, the node will cache the result of ``next_thunk``. This can potentially hog a lot of memory. To turn caching off, set ``does_memoize`` to ``False``. It might be desirable to propagate this to composite streams generated by custom functions. """ raise NotImplementedError @abstractmethod def _starter(self, n: int) -> LinearStream[VT]: """Returns the node that is ``n`` nodes away from ``self``. :param n: the number of nodes away to start from ``self`` """ raise NotImplementedError @abstractmethod def _stepper(self, n: int) -> LinearStream[VT]: """Returns a new stream that skips every ``n - 1`` nodes. :param n: the number of nodes to skip between nodes """ raise NotImplementedError @abstractmethod def _stopper(self, n: int) -> LinearStream[VT]: """Returns a new stream that is limited to ``n`` nodes. :param n: the number of nodes to which to limit the stream """ raise NotImplementedError