Mercurial > lbo > hg > pcombinators
view pcombinators/state.py @ 37:423f7851fe6d draft
Genericize ParseState and implement first prototype of stream parser.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Wed, 22 May 2019 22:48:28 +0200 |
parents | 859c9eaa90c2 |
children | 8b9c4713b049 |
line wrap: on
line source
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed May 22 21:41:32 2019 @author: lbo """ import io def ps(s): return ParseState(s) class _State: """Generic parsing state representation.""" _holds = [] # List of indices that are still marked as needed. Ascending def next(self): pass def peek(self): raise NotImplementedError() def index(self): raise NotImplementedError() def len(self): raise NotImplementedError() # Holds are a simple garbage collection mechanism by which parsers should # indicate which parts of state they may still backtrack to. class ParserHold: def __init__(self, i): self.total_index = i total_index = 0 def _maybe_collect(self): pass def hold(self): self._holds.append(self.index()) return self.ParserHold(self.index()) def release(self, hold): self._holds.pop(hold.total_index) self._maybe_collect() def __iter__(self): return self def __next__(self): return self.next() def finished(self): return self.index() == self.len() def remaining(self): raise NotImplementedError() class ParseException(Exception): pass def error(self, msg): raise ParseException(msg) def reset(self): raise NotImplementedError('use holds!') class ParseFileState(_State): """A lazy parsing state implementation, reading from stream.""" _fobj = None _buf = [] # List of characters. _index = 0 # Index in current _buf _total_offset = 0 # Index of first _buf entry in stream since start def __init__(self, f): if type(f) is str: self._fobj = open(f, 'r') elif isinstance(f, io.IOBase): self._fobj = f else: raise NotImplementedError('unknown input source {}'.format(f)) def __del__(self): if self._fobj: self._fobj.close() def _maybe_collect(self): # No holds left, forget everything up to now. if len(self._holds) == 0: self._buf = self._buf[self._index:] else: # Find oldest hold and update buffer. assert sorted(self._holds) == self._holds to_clean = self._holds[0]-self._total_offset self._buf = self._buf[:to_clean] self._total_offset += to_clean self._index -= to_clean self._holds.pop(0) def index(self): return self._total_offset + self._index PREFILL = 256 def fill_buffer(self, min=0): if len(self._buf)-self._index <= min: self._buf.extend(self._fobj.read(self.PREFILL)) def peek(self): self.fill_buffer() return self._buf[self._index] def next(self): self.fill_buffer() self._index += 1 return self._buf[self._index-1] def remaining(self): print('warning: remaining() on ParseFileState is only accurate to up to {} characters lookahead and expensive'.format(self.PREFIL)) self.fill_buffer(self.PREFILL) return self._buf[self._index:] def len(self): print('warning: len() is inaccurate on ParseFileState, returning only past and present state') return self._total_offset + len(self._buf) class ParseState(_State): """Encapsulates state as the parser goes through input supplied as string.""" _input = '' _index = 0 def __init__(self, s): """Create a ParseState object from str s, representing the input to be parsed.""" self._input = s def __repr__(self): if self._index < len(self._input): return 'ParseState({}< {} >{})'.format( self._input[0:self._index], self._input[self._index], self._input[self._index+1:]) else: return 'ParseState({}<>)'.format(self._input) def next(self): current = self.peek() self._index += 1 return current def peek(self): return self._input[self._index] def index(self): return self._index def len(self): return len(self._input) def reset(self, ix): self._index = ix def __iter__(self): return self def __next__(self): return self.next() def finished(self): return self._index == len(self._input) def remaining(self): if self.finished(): return '' return self._input[self._index:]