compiler/src/lib/ulm/ulmStreams.Mod
Norayr Chilingarian ce6490771e ulmPrint ported, ulmStreams modified.
needs test, I am not sure it will work correctly.
it is not clean how some type casts would work in voc.
2013-10-22 19:56:23 +04:00

2149 lines
61 KiB
Modula-2

(* Ulm's Oberon Library
Copyright (C) 1989-2001 by University of Ulm, SAI, D-89069 Ulm, Germany
----------------------------------------------------------------------------
Ulm's Oberon Library is free software; you can redistribute it
and/or modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either version
2 of the License, or (at your option) any later version.
Ulm's Oberon Library is distributed in the hope that it will be
useful, but WITHOUT ANY WARRANTY; without even the implied warranty
of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
----------------------------------------------------------------------------
E-mail contact: oberon@mathematik.uni-ulm.de
----------------------------------------------------------------------------
$Id: Streams.om,v 1.13 2005/02/14 23:36:35 borchert Exp $
----------------------------------------------------------------------------
$Log: Streams.om,v $
Revision 1.13 2005/02/14 23:36:35 borchert
bug fix: WritePart called InternalFlush without considering
that s.pos may be implicitly changed
(this assumption was wrong since revision 1.11)
Revision 1.12 2004/05/20 09:52:43 borchert
performance improvements:
- WritePart and Write take now the buffer by reference
- ReadByteFromBuf replaced by ReadBytesFromBuf
(contributed by Christian Ehrhardt)
Revision 1.11 2001/05/03 15:17:58 borchert
InternalFlush adapted for unidirectional pipelines to avoid
unintentional flushes due to buffer boundaries
Revision 1.10 2000/04/25 21:41:47 borchert
Streams.ReadPart loops now for unbuffered streams to collect input
until cnt is reached
Revision 1.9 1998/03/31 11:13:05 borchert
bug fix: NotificationHandler just reacted on Resources.unreferenced
but not on Resources.terminated
Revision 1.8 1998/03/24 22:58:28 borchert
bug fix in Copy: left was computed incorrectly in case of
copies with fixed length (# -1)
Revision 1.7 1997/04/02 07:50:05 borchert
Copy replaced by a slightly more efficient variant
Revision 1.6 1996/09/18 07:43:51 borchert
qualified references to own module (i.e. Streams.XXX) removed
Revision 1.5 1996/01/04 16:43:57 borchert
some bug fixes in the updates of read and write regions
Revision 1.4 1995/10/11 09:46:41 borchert
- closeEvent re-introduced (because it gets raised *before*
the actual close)
- bug fix: s.write was diminished in ReadPart but the write region
not properly adjusted
- bug fix: InternalSeek was setting s.left to negative values in
a special case
Revision 1.3 1995/04/18 12:17:12 borchert
- Streams.Stream is now an extension of Services.Object
- Library variant of assertions replaced by ASSERT
- support of Resources added
- EnableClose, PreventClose & closeEvent removed
Revision 1.2 1994/07/05 12:45:57 borchert
some minor bug fixes & enhancements:
- ReadPacket added
- streams which don't require cleanup are now subject to the GC
even if Close will never be called for them
- line buffered streams w/o bufio/addrio capability fill now buffer
up to the next line terminator only instead of trying to fill the
whole buffer
- ReadPart didn't set count correctly in all cases
- Touch calls now the flush interface procedure
Revision 1.1 1994/02/22 20:10:45 borchert
Initial revision
----------------------------------------------------------------------------
AFB 6/89
Major Revision: AFB 1/92: bufpool
----------------------------------------------------------------------------
*)
MODULE ulmStreams;
IMPORT Events := ulmEvents, Objects := ulmObjects, Priorities := ulmPriorities, Process := ulmProcess, RelatedEvents := ulmRelatedEvents, Resources := ulmResources,
Services := ulmServices, SYS := ulmSYSTEM, SYSTEM, Types := ulmTypes;
CONST
(* 3rd parameter of Seek *)
(* Whence = (fromStart, fromPos, fromEnd); *)
fromStart* = 0; fromPos* = 1; fromEnd* = 2;
(* capabilities of a stream *)
(* Capability = (read, write, addrio, bufio, seek, tell, trunc, close,
holes, handler);
*)
read* = 0; write* = 1; addrio* = 2; bufio* = 3; seek* = 4; tell* = 5;
trunc* = 6; flush* = 7; close* = 8; holes* = 9; handler* = 10;
(* BufMode = (nobuf, linebuf, onebuf, bufpool); *)
nobuf* = 0; linebuf* = 1; onebuf* = 2; bufpool* = 3;
(* ErrorCode = (NoHandlerDefined, CannotRead, CannotSeek, CloseFailed,
NotLineBuffered, SeekFailed, TellFailed, BadWhence,
CannotTell, WriteFailed, CannotWrite, ReadFailed,
Unbuffered, BadParameters, CannotTrunc, TruncFailed,
NestedCall, FlushFailed);
*)
NoHandlerDefined* = 0; (* no handler defined *)
CannotRead* = 1; (* stream is write only *)
CannotSeek* = 2; (* stream is not capable of seeking *)
CloseFailed* = 3; (* Flush or Close failed *)
NotLineBuffered* = 4; (* LineTerm must not be called *)
SeekFailed* = 5; (* seek operation failed *)
TellFailed* = 6; (* tell operation failed *)
BadWhence* = 7; (* whence value out of [fromStart..fromEnd] *)
CannotTell* = 8; (* stream does not have a current position *)
WriteFailed* = 9; (* write error *)
CannotWrite* = 10; (* stream is read only *)
ReadFailed* = 11; (* read error *)
Unbuffered* = 12; (* operation isn't valid for unbuff'd streams *)
BadParameters* = 13; (* e.g. wrong count or offset values *)
CannotTrunc* = 14; (* stream is not capable of truncating *)
TruncFailed* = 15; (* trunc operation failed *)
NestedCall* = 16; (* nested stream operation *)
FlushFailed* = 17; (* flush operation failed *)
errorcodes* = 18; (* number of error codes *)
(* === private constants ======================================= *)
bufsize = 8192; (* should be the file system block size *)
defaulttermch = 0AX; (* default line terminator (for linebuf) *)
TYPE
Address* = Types.Address;
Count* = Types.Count;
Byte* = Types.Byte;
Whence* = SHORTINT; (* Whence = (fromStart, fromPos, fromEnd); *)
CapabilitySet* = SET; (* OF Capability; *)
BufMode* = SHORTINT;
ErrorCode* = SHORTINT;
Stream* = POINTER TO StreamRec;
Message* = RECORD (Objects.ObjectRec) END;
(* the buffering system:
buffers are always on bufsize-boundaries
ok: the other components are defined
pos: file position of cont[0] (pos MOD bufsize = 0)
cont: valid data: cont[rbegin]..cont[rend-1] (read-region)
written data: cont[wbegin]..cont[wend-1] (write-region)
both regions are maintained (even for non-rw streams)
*)
Buffer = POINTER TO BufferRec;
BufferRec =
RECORD
ok: BOOLEAN; (* TRUE if other components are valid *)
pos: Count; (* file position which corresponds to cont[0] *)
rbegin: Count; (* read-region: starting index *)
rend: Count; (* read-region: ending index *)
wbegin: Count; (* write-region: starting index of dirty region *)
wend: Count; (* write-region: ending index *)
cont: ARRAY bufsize OF Byte; (* buffer contents *)
nextfree: Buffer; (* only needed for released buffers *)
(* components for buffers which are members of a buffer pool *)
prevh, nexth: Buffer; (* next buffer with same the hash value *)
preva, nexta: Buffer; (* sorted list of buffers (access time) *)
END;
CONST
hashtabsize = 128; (* size of bucket table *)
TYPE
BucketTable = ARRAY hashtabsize OF Buffer;
BufferPool = POINTER TO BufferPoolRec;
BufferPoolRec =
RECORD
maxbuf: INTEGER; (* maximal number of buffers to be used *)
nbuf: INTEGER; (* number of buffers in use *)
bucket: BucketTable;
(* list of all buffers sorted after the last access time;
tail points to the buffer most recently accessed
*)
head, tail: Buffer;
END;
TYPE
AddrIOProc* = PROCEDURE (s: Stream; ptr: Address; cnt: Count) : Count;
BufIOProc* = PROCEDURE (s: Stream; VAR buf: ARRAY OF Byte;
off, cnt: Count) : Count;
SeekProc* = PROCEDURE (s: Stream; cnt: Count; whence: Whence) : BOOLEAN;
TellProc* = PROCEDURE (s: Stream; VAR cnt: Count) : BOOLEAN;
ReadProc* = PROCEDURE (s: Stream; VAR byte: Byte) : BOOLEAN;
WriteProc* = PROCEDURE (s: Stream; byte: Byte) : BOOLEAN;
TruncProc* = PROCEDURE (s: Stream; cnt: Count) : BOOLEAN;
FlushProc* = PROCEDURE (s: Stream) : BOOLEAN;
CloseProc* = PROCEDURE (s: Stream) : BOOLEAN;
HandlerProc* = PROCEDURE (s: Stream; VAR msg: Message);
Interface* = POINTER TO InterfaceRec;
InterfaceRec* =
RECORD
(Objects.ObjectRec)
addrread*: AddrIOProc; (* read, addrio *)
addrwrite*: AddrIOProc; (* write, addrio *)
bufread*: BufIOProc; (* read, bufio *)
bufwrite*: BufIOProc; (* write, bufio *)
read*: ReadProc; (* read *)
write*: WriteProc; (* write *)
seek*: SeekProc; (* seek *)
tell*: TellProc; (* tell *)
trunc*: TruncProc; (* trunc *)
flush*: FlushProc; (* flush *)
close*: CloseProc; (* close *)
handler*: HandlerProc; (* handler *)
END;
StreamRec* =
RECORD
(Services.ObjectRec)
(* following components are set after i/o-operations *)
count*: Count; (* resulting count of last operation *)
errors*: INTEGER; (* incremented for each error; may be set to 0 *)
error*: BOOLEAN; (* last operation successful? *)
lasterror*: ErrorCode; (* error code of last error *)
eof*: BOOLEAN; (* last read-operation with count=0 returned *)
(* === private part ============================================ *)
prev, next: Stream; (* list of open streams *)
if: Interface; caps: CapabilitySet;
bufmode: BufMode; (* buffering mode *)
bidirect: BOOLEAN; (* bidirectional buffering? *)
termch: Byte; (* flush on termch (linebuf only) *)
inlist: BOOLEAN; (* member of the list of opened streams? *)
tiedStream: Stream; (* to be flushed before read operations *)
buf: Buffer; (* current buffer; = NIL for unbuffered streams *)
wbuf: Buffer; (* buffer for writing (only if bidirect = TRUE) *)
bufpool: BufferPool; (* only if bufmode = bufpool *)
validpos: BOOLEAN; (* pos valid? *)
pos: Count; (* current position in stream *)
maxpos: Count; (* maximal position until now (only if buf # NIL) *)
left: Count; (* number of bytes left in buf (after pos) *)
write: Count; (* number of bytes which can be written in buf *)
rpos: Count; (* current position of if.tell *)
wextensible: BOOLEAN; (* write region extensible? *)
eofFound: BOOLEAN; (* eof seen yet? temporary use only *)
lock: BOOLEAN; (* avoid recursive operations *)
flushEvent: Events.EventType; (* valid if # NIL *)
closeEvent: Events.EventType; (* valid if # NIL *)
END;
VAR
type: Services.Type;
TYPE
(* each error causes an event;
the error number is stored in event.errorcode;
the associated text can be taken from event.message
*)
Event* = POINTER TO EventRec;
EventRec* =
RECORD
(Events.EventRec)
stream*: Stream;
errorcode*: ErrorCode;
END;
VAR
null*: Stream; (* accepts any output; does not return input *)
(* these streams are set by other modules;
after initialization of Streams they equal `null';
so, connections with the standard UNIX streams must be
done by other modules
*)
stdin*, stdout*, stderr*: Stream;
errormsg*: ARRAY errorcodes OF Events.Message;
error*: Events.EventType;
(* === private variables ========================================== *)
opened: Stream; (* list of opened streams *)
(* this list has been reduced to the set of streams which
need to be cleaned up explicitly;
all other streams are subject to the garbage collection
even if Close has never been called for them
*)
freelist: Buffer; (* list of free buffers *)
nullif: Interface; (* interface of null-devices *)
(* === private procedures ========================================= *)
PROCEDURE NewStream(s: Stream);
BEGIN
IF s.inlist THEN
s.prev := NIL;
s.next := opened;
IF opened # NIL THEN
opened.prev := s;
END;
opened := s;
END;
END NewStream;
PROCEDURE OldStream(s: Stream);
BEGIN
IF s.inlist THEN
IF s.prev # NIL THEN
s.prev.next := s.next;
ELSE
opened := s.next;
END;
IF s.next # NIL THEN
s.next.prev := s.prev;
END;
END;
END OldStream;
PROCEDURE NewBuffer(VAR b: Buffer);
BEGIN
IF freelist # NIL THEN
b := freelist;
freelist := freelist.nextfree;
ELSE
NEW(b);
END;
b.nextfree := NIL;
b.ok := FALSE;
END NewBuffer;
PROCEDURE OldBuffer(VAR b: Buffer);
BEGIN
b.nextfree := freelist;
freelist := b;
b := NIL;
END OldBuffer;
PROCEDURE Error(s: Stream; code: ErrorCode);
VAR
event: Event;
BEGIN
IF s # NIL THEN
INC(s.errors);
s.error := TRUE;
s.lasterror := code;
(* generate error event *)
NEW(event);
event.type := error;
event.message := errormsg[code];
event.stream := s;
event.errorcode := code;
RelatedEvents.Raise(s, event);
END;
END Error;
PROCEDURE ^ InternalFlush(s: Stream) : BOOLEAN;
(* ===== management of buffer pool ================================== *)
PROCEDURE InitBufPool(s: Stream);
VAR
index: INTEGER;
BEGIN
s.bufpool.maxbuf := 16; (* default size *)
s.bufpool.nbuf := 0; (* currently, no buffers are allocated *)
s.bufpool.head := NIL; s.bufpool.tail := NIL;
index := 0;
WHILE index < hashtabsize DO
s.bufpool.bucket[index] := NIL;
INC(index);
END;
END InitBufPool;
PROCEDURE HashValue(pos: Count) : INTEGER;
(* HashValue returns a hash value for pos *)
BEGIN
RETURN SHORT(pos DIV bufsize) MOD hashtabsize
END HashValue;
PROCEDURE FindBuffer(s: Stream; pos: Count; VAR buf: Buffer) : BOOLEAN;
VAR
index: INTEGER;
bp: Buffer;
BEGIN
index := HashValue(pos);
bp := s.bufpool.bucket[index];
WHILE bp # NIL DO
IF bp.pos = pos THEN
buf := bp; RETURN TRUE
END;
bp := bp.nexth; (* next buffer with same hash value *)
END;
buf := NIL;
RETURN FALSE
END FindBuffer;
PROCEDURE GetBuffer(s: Stream);
(* look for buffer for s.pos and make it to the current buffer;
set s.left and s.write in dependance of s.pos
*)
VAR
buf: Buffer;
pos: Count; (* buffer boundary for s.pos *)
posindex: Count; (* buf[posindex] corresponds to s.pos *)
index: INTEGER; (* index into bucket table of the buffer pool *)
PROCEDURE InitBuf(buf: Buffer);
VAR
index: INTEGER; (* of bucket table *)
BEGIN
buf.ok := TRUE;
buf.pos := pos;
buf.rbegin := posindex; buf.rend := posindex; s.left := 0;
buf.wbegin := posindex; buf.wend := posindex;
s.write := bufsize - posindex;
buf.nextfree := NIL;
(* insert buf into hash list *)
index := HashValue(pos);
buf.prevh := NIL;
buf.nexth := s.bufpool.bucket[index];
IF buf.nexth # NIL THEN
buf.nexth.prevh := buf;
END;
s.bufpool.bucket[index] := buf;
(* buf is already at the end of the sorted list if we
re-use an old buffer
*)
IF s.bufpool.tail # buf THEN
(* append buf to the sorted list *)
buf.nexta := NIL;
IF s.bufpool.tail = NIL THEN
s.bufpool.head := buf;
buf.preva := NIL;
ELSE
s.bufpool.tail.nexta := buf;
buf.preva := s.bufpool.tail;
END;
s.bufpool.tail := buf;
END;
END InitBuf;
PROCEDURE UseBuffer(s: Stream; buf: Buffer);
(* make buf to the current buffer of s *)
BEGIN
IF s.buf # buf THEN
(* remove buf from sorted list *)
IF buf.preva # NIL THEN
buf.preva.nexta := buf.nexta;
ELSE
s.bufpool.head := buf.nexta;
END;
IF buf.nexta # NIL THEN
buf.nexta.preva := buf.preva;
ELSE
s.bufpool.tail := buf.preva;
END;
(* append buf to sorted list *)
buf.nexta := NIL;
IF s.bufpool.tail = NIL THEN
s.bufpool.head := buf;
buf.preva := NIL;
ELSE
s.bufpool.tail.nexta := buf;
buf.preva := s.bufpool.tail;
END;
s.bufpool.tail := buf;
(* set current buf of s to buf *)
s.buf := buf;
(* update s.left and s.write *)
IF buf.rbegin = buf.rend THEN
buf.rbegin := posindex; buf.rend := posindex; s.left := 0;
ELSIF (posindex >= buf.rbegin) & (posindex < buf.rend) THEN
s.left := buf.rend - posindex;
ELSE
s.left := 0;
END;
IF buf.wbegin = buf.wend THEN
buf.wbegin := posindex; buf.wend := posindex;
s.write := bufsize - posindex;
ELSIF (posindex >= buf.wbegin) & (posindex < buf.wend) THEN
s.write := bufsize - posindex;
ELSE
s.write := 0;
END;
END;
END UseBuffer;
BEGIN (* GetBuffer *)
posindex := s.pos MOD bufsize;
pos := s.pos - posindex;
IF ~s.buf.ok THEN
(* init first allocated buffer which has not been used until now *)
InitBuf(s.buf);
INC(s.bufpool.nbuf);
ELSIF s.buf.pos # pos THEN
IF FindBuffer(s, pos, buf) THEN
UseBuffer(s, buf);
ELSE
IF s.bufpool.nbuf >= s.bufpool.maxbuf THEN
(* re-use already allocated buffer *)
buf := s.bufpool.head;
UseBuffer(s, buf);
IF buf.wbegin # buf.wend THEN
IF ~InternalFlush(s) THEN END;
END;
(* remove buf from hash list *)
IF buf.prevh # NIL THEN
buf.prevh.nexth := buf.nexth;
ELSE
index := HashValue(buf.pos);
s.bufpool.bucket[index] := buf.nexth;
END;
IF buf.nexth # NIL THEN
buf.nexth.prevh := buf.prevh;
END;
InitBuf(buf);
ELSE
(* allocate and initialize new buffer *)
NewBuffer(buf);
InitBuf(buf);
INC(s.bufpool.nbuf);
END;
s.buf := buf;
END;
END;
END GetBuffer;
PROCEDURE FlushBufPool(s: Stream) : BOOLEAN;
VAR
buf: Buffer;
ok: BOOLEAN;
BEGIN
ok := TRUE;
IF s.bufpool.nbuf > 0 THEN
buf := s.bufpool.head;
WHILE buf # NIL DO
s.buf := buf;
ok := InternalFlush(s) & ok;
buf := buf.nexta;
END;
END;
RETURN ok
END FlushBufPool;
PROCEDURE ReleaseBufPool(s: Stream);
(* precondition: all buffers are flushed *)
VAR
buf: Buffer;
BEGIN
IF s.bufpool.nbuf > 0 THEN
buf := s.bufpool.head;
WHILE buf # NIL DO
s.buf := buf;
OldBuffer(s.buf);
buf := buf.nexta;
END;
END;
NewBuffer(s.buf);
InitBufPool(s);
END ReleaseBufPool;
(* ================================================================== *)
PROCEDURE GetBufMode*(s: Stream) : BufMode;
BEGIN
RETURN s.bufmode
END GetBufMode;
PROCEDURE LineTerm*(s: Stream; termch: Byte);
(* set line terminator of `s' (linebuf) to `termch' *)
BEGIN
s.error := FALSE;
IF s.bufmode = linebuf THEN
s.termch := termch;
ELSE
Error(s, NotLineBuffered);
END;
END LineTerm;
PROCEDURE Tie*(in, out: Stream);
(* PRE: `in' is an line buffered input stream,
`out' an output stream,
and `in' # `out';
causes `out' to be flushed before reading from `in';
`out' may be NIL to undo the effect
*)
BEGIN
in.error := FALSE;
IF in.bufmode # linebuf THEN
Error(in, NotLineBuffered); RETURN
END;
IF (in = out) OR ~(read IN in.caps) OR
(out # NIL) & ~(write IN out.caps) THEN
Error(in, BadParameters); RETURN
END;
in.tiedStream := out;
END Tie;
PROCEDURE SetBufferPoolSize*(s: Stream; nbuf: INTEGER);
BEGIN
s.error := FALSE;
IF SYS.TAS(s.lock) THEN
Error(s, NestedCall); RETURN
END;
IF (s.bufmode = bufpool) & (nbuf >= 1) THEN
s.bufpool.maxbuf := nbuf;
END;
s.lock := FALSE;
END SetBufferPoolSize;
PROCEDURE GetBufferPoolSize*(s: Stream; VAR nbuf: INTEGER);
BEGIN
s.error := FALSE;
CASE s.bufmode OF
| nobuf: nbuf := 0;
| linebuf: nbuf := 1;
| onebuf: nbuf := 1;
| bufpool: nbuf := s.bufpool.maxbuf;
END;
END GetBufferPoolSize;
PROCEDURE Capabilities*(s: Stream) : CapabilitySet;
BEGIN
s.error := FALSE;
RETURN s.caps
END Capabilities;
PROCEDURE GetFlushEvent*(s: Stream; VAR type: Events.EventType);
(* `type' will be raised BEFORE every flush operation *)
BEGIN
s.error := FALSE;
IF s.flushEvent = NIL THEN
Events.Define(s.flushEvent);
END;
type := s.flushEvent;
END GetFlushEvent;
PROCEDURE GetCloseEvent*(s: Stream; VAR type: Events.EventType);
(* `type' will be raised BEFORE the stream gets closed;
that means write operations etc. are legal
*)
BEGIN
s.error := FALSE;
IF s.closeEvent = NIL THEN
Events.Define(s.closeEvent);
END;
type := s.closeEvent;
END GetCloseEvent;
PROCEDURE Close*(s: Stream) : BOOLEAN;
VAR
event: Event;
type: Events.EventType;
otherStream: Stream;
BEGIN
s.error := FALSE;
IF (s.closeEvent # NIL) & ~SYS.TAS(s.lock) THEN
type := s.closeEvent; s.closeEvent := NIL;
s.lock := FALSE;
Events.SetPriority(type, Events.GetPriority() + 1);
NEW(event);
event.type := type;
event.message := "close event of Streams";
event.stream := s;
Events.Raise(event);
END;
IF ~SYS.TAS(s.lock) THEN
IF write IN s.caps THEN
IF s.bufmode = bufpool THEN
IF ~FlushBufPool(s) THEN END;
ELSE
IF ~InternalFlush(s) THEN END;
END;
END;
IF close IN s.caps THEN
IF ~s.if.close(s) THEN
Error(s, CloseFailed);
END;
END;
IF s.buf # NIL THEN
IF s.bufmode = bufpool THEN
ReleaseBufPool(s);
END;
OldBuffer(s.buf);
END;
OldStream(s);
(* check if this stream has been tied to another stream *)
otherStream := opened;
WHILE otherStream # NIL DO
IF otherStream.tiedStream = s THEN
otherStream.tiedStream := NIL; (* undo tie operation *)
END;
otherStream := otherStream.next;
END;
(* s.lock remains TRUE to prevent further operations *)
Resources.Notify(s, Resources.terminated);
RETURN ~s.error
ELSE
Error(s, NestedCall);
RETURN FALSE
END;
END Close;
PROCEDURE Release*(s: Stream);
BEGIN
IF ~Close(s) THEN END;
END Release;
PROCEDURE CloseAll*;
BEGIN
WHILE opened # NIL DO
(* that's no endless loop; see Close/OldStream *)
Release(opened);
END;
END CloseAll;
PROCEDURE NotificationHandler(event: Events.Event);
VAR
s: Stream;
BEGIN
IF ~(event IS Resources.Event) THEN RETURN END;
WITH event: Resources.Event DO
IF ~(event.resource IS Stream) THEN RETURN END;
s := event.resource(Stream);
IF event.change IN {Resources.unreferenced, Resources.terminated} THEN
IF ~s.lock THEN
Release(s);
END;
END;
END;
END NotificationHandler;
PROCEDURE Init*(s: Stream; if: Interface; caps: CapabilitySet;
bufmode: BufMode);
VAR
eventType: Events.EventType;
type: Services.Type;
PROCEDURE InitBidirectionalBuffering(s: Stream);
BEGIN
s.validpos := TRUE;
s.pos := 0;
NewBuffer(s.wbuf);
s.buf.ok := TRUE; s.buf.rbegin := 0; s.buf.rend := 0; s.buf.pos := 0;
s.wbuf.ok := TRUE; s.wbuf.wbegin := 0; s.wbuf.wend := 0;
s.wbuf.pos := 0;
s.left := 0; s.write := bufsize;
END InitBidirectionalBuffering;
BEGIN
ASSERT((s # NIL) & (if # NIL) & ({read, write} * caps # {}));
Services.GetType(s, type); ASSERT(type # NIL);
s.inlist := (close IN caps) OR (bufmode # nobuf) & (write IN caps);
NewStream(s);
(* initialize public part *)
s.count := 0;
s.errors := 0;
s.error := FALSE;
s.lasterror := 0;
s.eof := FALSE;
(* private part *)
s.if := if; s.caps := caps;
s.bufmode := bufmode;
s.validpos := FALSE;
s.left := 0; s.write := 0;
s.tiedStream := NIL;
IF bufmode IN {linebuf, onebuf, bufpool} THEN
NewBuffer(s.buf);
IF (bufmode = bufpool) & ~(seek IN caps) THEN
bufmode := onebuf;
END;
CASE bufmode OF
| linebuf: s.termch := defaulttermch;
| bufpool: NEW(s.bufpool); InitBufPool(s);
ELSE
END;
s.maxpos := 0;
s.wextensible := {read, write, seek, tell, holes} * caps =
{read, write, seek, tell};
s.bidirect := {read, write, seek, tell, trunc} * caps = {read, write};
IF s.bidirect THEN
InitBidirectionalBuffering(s);
ELSE
s.wbuf := NIL;
END;
ELSE
s.buf := NIL;
s.wbuf := NIL;
s.wextensible := FALSE;
s.bidirect := FALSE;
END;
s.flushEvent := NIL;
s.closeEvent := NIL;
Resources.TakeInterest(s, eventType);
Events.Handler(eventType, NotificationHandler);
s.lock := FALSE;
END Init;
PROCEDURE Send*(s: Stream; VAR message: Message);
BEGIN
IF ~SYS.TAS(s.lock) THEN
IF handler IN s.caps THEN
s.if.handler(s, message);
ELSE
Error(s, NoHandlerDefined);
END;
s.lock := FALSE;
ELSE
Error(s, NestedCall);
END;
END Send;
(* === private i/o procedures ================================= *)
PROCEDURE ValidPos(s: Stream);
BEGIN
IF ~s.validpos THEN
IF tell IN s.caps THEN
IF ~s.if.tell(s, s.pos) OR (s.pos < 0) THEN
Error(s, TellFailed);
s.pos := 0;
END;
ELSE
s.pos := 0;
END;
s.rpos := s.pos;
s.validpos := TRUE;
s.left := 0;
s.write := 0;
END;
END ValidPos;
PROCEDURE InitBuf(s: Stream);
BEGIN
IF s.bufmode = bufpool THEN
GetBuffer(s);
ELSE
s.buf.pos := s.pos - s.pos MOD bufsize;
s.buf.wbegin := s.pos MOD bufsize;
s.write := bufsize - s.buf.wbegin;
s.buf.wend := s.buf.wbegin;
s.buf.rbegin := s.buf.wbegin;
s.buf.rend := s.buf.wbegin;
s.left := 0;
s.buf.ok := TRUE;
END;
END InitBuf;
PROCEDURE FillBuf(s: Stream) : BOOLEAN;
(* return FALSE on EOF or errors *)
VAR
offset, count: Count;
posindex: Count; (* s.pos MOD bufsize *)
PROCEDURE Fill(s: Stream; VAR offset, count: Count) : BOOLEAN;
(* try to fill buf.cont[offset]..buf.cont[offset+count-1];
return FALSE on EOF;
Fill always extends a read region:
s.buf.rend is set to offset + the number of bytes read
*)
VAR
linetermseen: BOOLEAN;
byte: Byte;
BEGIN
IF s.eofFound THEN
RETURN FALSE
END;
IF addrio IN s.caps THEN
s.buf.rend := s.if.addrread(s, SYSTEM.ADR(s.buf.cont[offset]), count) +
offset;
ELSIF bufio IN s.caps THEN
s.buf.rend := s.if.bufread(s, s.buf.cont, offset, count) + offset;
ELSIF s.bufmode = linebuf THEN
s.buf.rend := offset; linetermseen := FALSE;
WHILE ~linetermseen & (s.buf.rend < offset+count) &
s.if.read(s, byte) DO
s.buf.cont[s.buf.rend] := byte; INC(s.buf.rend);
linetermseen := byte = s.termch;
END;
s.eofFound := ~linetermseen &
(s.buf.rend < offset+count); (* s.if.read failed? *)
ELSE
s.buf.rend := offset;
WHILE (s.buf.rend < offset+count) &
s.if.read(s, s.buf.cont[s.buf.rend]) DO
INC(s.buf.rend);
END;
s.eofFound := s.buf.rend < offset+count; (* s.if.read failed? *)
END;
(* negative counts of addrread or bufread indicate read errors *)
IF s.buf.rend < offset THEN
(* note error and recover s.buf.rend *)
Error(s, ReadFailed);
s.buf.rend := offset;
END;
INC(s.rpos, s.buf.rend - offset);
IF s.buf.rend > offset THEN
DEC(count, s.buf.rend - offset);
offset := s.buf.rend;
RETURN TRUE
ELSE
s.eofFound := TRUE;
RETURN FALSE
END;
END Fill;
BEGIN (* FillBuf *)
ValidPos(s);
posindex := s.pos MOD bufsize;
s.eofFound := FALSE;
(* flush associated output streams (line buffered streams only) *)
IF s.bufmode = linebuf THEN
IF write IN s.caps THEN
IF ~InternalFlush(s) THEN END;
END;
IF (s.tiedStream # NIL) & ~SYS.TAS(s.tiedStream.lock) THEN
IF ~InternalFlush(s.tiedStream) THEN END;
s.tiedStream.lock := FALSE;
END;
END;
(* get a valid buffer and set
offset and count to the buffer range which is to be filled;
on default, we want to fill the whole buffer
*)
offset := 0; count := bufsize; (* default *)
IF ~s.buf.ok THEN
InitBuf(s);
ELSIF s.bidirect THEN
s.buf.rbegin := 0; s.buf.rend := 0; s.pos := 0; posindex := 0;
ELSE
IF s.bufmode = bufpool THEN
GetBuffer(s);
IF s.left > 0 THEN
(* buffer is already filled *)
s.eof := FALSE; RETURN TRUE
END;
ELSIF s.buf.pos # s.pos - posindex THEN
(* reuse filled buffer *)
IF write IN s.caps THEN
IF ~InternalFlush(s) THEN END;
END;
InitBuf(s);
END;
IF s.buf.rbegin # s.buf.rend THEN
IF (write IN s.caps) &
(s.buf.wbegin <= posindex) & (s.buf.wend > posindex) THEN
(* set read region to write region *)
s.buf.rbegin := s.buf.wbegin; s.buf.rend := s.buf.wend;
s.left := s.buf.wend - posindex;
s.eof := FALSE; RETURN TRUE
ELSIF s.buf.rend = posindex THEN
(* stream position equals end of read region *)
offset := s.buf.rend; count := bufsize - offset;
END;
END;
(* take care of the write region by limiting count;
note that s.pos does *not* point into the write region;
this is guaranteed by WritePart and other operations
which would have extended the read region in such a case
*)
IF (write IN s.caps) & (s.buf.wbegin # s.buf.wend) THEN
IF s.buf.wbegin >= offset THEN
IF s.buf.wbegin > posindex THEN
(* write-region behind current position *)
count := s.buf.wbegin - offset;
ELSE
(* write-region before current position *)
offset := s.buf.wend; count := bufsize - offset;
END;
END;
IF (s.buf.pos + s.buf.wbegin = s.rpos) & ~(seek IN s.caps) THEN
(* flush if the start of write region corresponds to real
file position and we are not able to change the position
*)
IF ~InternalFlush(s) THEN END;
END;
END;
END;
(* set the real position to the position we want to read from *)
IF ~s.bidirect & (s.buf.pos + offset # s.rpos) THEN
IF (seek IN s.caps) & s.if.seek(s, s.buf.pos+offset, fromStart) THEN
s.rpos := s.buf.pos + offset;
ELSIF s.pos = s.rpos THEN
DEC(count, posindex - offset);
offset := posindex;
ELSIF seek IN s.caps THEN
Error(s, SeekFailed); RETURN FALSE
ELSE
Error(s, CannotSeek); RETURN FALSE
END;
END;
(* try to fill buf[offset..offset+count-1];
and set s.buf.rbegin & s.buf.rend to the new read region
*)
IF s.buf.rend # offset THEN
(* forget old read region if we cannot extend it *)
s.buf.rbegin := offset; s.buf.rend := offset;
END;
WHILE Fill(s, offset, count) & (posindex >= s.buf.rend) DO END;
IF posindex >= s.buf.rend THEN
(* read operation failed *)
IF (s.pos > s.rpos) &
(seek IN s.caps) & s.if.seek(s, s.pos, fromStart) THEN
s.rpos := s.pos;
(* second try: we were not able to fill the whole buffer
but perhaps we are able to read what we were requested for
*)
DEC(count, posindex - offset);
offset := posindex;
s.buf.rbegin := offset; s.buf.rend := offset;
s.eofFound := FALSE; (* retry it *)
s.eof := ~Fill(s, offset, count);
ELSE
s.eof := TRUE;
END;
ELSE
s.eof := FALSE;
END;
IF s.eof THEN
s.left := 0;
ELSE
s.left := s.buf.rend - posindex;
END;
RETURN ~s.eof
END FillBuf;
(* ==== i/o operations ============================================== *)
PROCEDURE ReadPart*(s: Stream; VAR buf: ARRAY OF Byte;
off, cnt: Count) : BOOLEAN;
(* fill buf[off..off+cnt-1] *)
VAR
pos: Count;
partcnt: Count;
PROCEDURE ReadBytesFromBuf(s: Stream;
VAR to: ARRAY OF Byte;
off, cnt: Count) : BOOLEAN;
VAR
bytes, max, spos: Count;
BEGIN
IF s.left = 0 THEN
IF s.eofFound OR ~FillBuf(s) THEN RETURN FALSE END;
END;
spos := s.pos MOD bufsize;
max := s.left;
IF max > cnt THEN
max := cnt;
END;
bytes := 0;
WHILE bytes < max DO
to[off] := s.buf.cont[spos];
INC(off); INC(spos); INC(bytes);
END;
INC(s.pos, bytes); DEC(s.left, bytes); INC(s.count, bytes);
IF ~s.bidirect THEN
IF s.write >= bytes THEN
DEC(s.write, bytes);
ELSE
s.write := 0;
END;
END;
RETURN TRUE
END ReadBytesFromBuf;
BEGIN (* ReadPart *)
IF SYS.TAS(s.lock) THEN
Error(s, NestedCall);
RETURN FALSE
END;
s.error := FALSE; s.count := 0;
IF ~(read IN s.caps) THEN
s.lock := FALSE; Error(s, CannotRead); RETURN FALSE
ELSIF (off < 0) OR (off+cnt > LEN(buf)) OR (cnt < 0) THEN
s.lock := FALSE; Error(s, BadParameters); RETURN FALSE
END;
IF cnt = 0 THEN s.lock := FALSE; RETURN TRUE END;
IF s.buf # NIL THEN
s.eofFound := FALSE;
WHILE (s.count < cnt) &
ReadBytesFromBuf(s, buf, s.count + off, cnt - s.count) DO
(* s.count is already incremented by ReadBytesFromBuf *)
END;
(* extend write region, if necessary *)
IF ~s.bidirect THEN
pos := s.pos MOD bufsize;
IF (s.write > 0) & (s.buf.wend < pos) THEN
IF s.buf.wbegin = s.buf.wend THEN
s.buf.wbegin := pos;
END;
s.buf.wend := pos;
END;
END;
ELSE
IF addrio IN s.caps THEN
s.count := s.if.addrread(s, SYSTEM.ADR(buf[off]), cnt);
IF (s.count > 0) & (s.count < cnt) THEN
LOOP
partcnt := s.if.addrread(s,
SYSTEM.ADR(buf[off + s.count]), cnt - s.count);
IF (partcnt < 0) OR (partcnt = 0) THEN EXIT END;
ASSERT(partcnt <= cnt - s.count);
INC(s.count, partcnt);
IF s.count = cnt THEN EXIT END;
END;
END;
ELSIF bufio IN s.caps THEN
s.count := s.if.bufread(s, buf, off, cnt);
IF (s.count > 0) & (s.count < cnt) THEN
LOOP
partcnt := s.if.bufread(s, buf, off + s.count, cnt - s.count);
IF (partcnt < 0) OR (partcnt = 0) THEN EXIT END;
ASSERT(partcnt <= cnt - s.count);
INC(s.count, partcnt);
IF s.count = cnt THEN EXIT END;
END;
END;
ELSE
s.count := 0;
WHILE (s.count < cnt) & s.if.read(s, buf[s.count+off]) DO
INC(s.count);
END;
END;
IF s.count < 0 THEN
s.count := 0;
Error(s, ReadFailed);
ELSE
s.eof := s.count = 0;
END;
END;
s.lock := FALSE;
RETURN s.count = cnt
END ReadPart;
PROCEDURE Read*(s: Stream; VAR buf: ARRAY OF Byte) : BOOLEAN;
BEGIN
RETURN ReadPart(s, buf, 0, LEN(buf))
END Read;
PROCEDURE ReadByte*(s: Stream; VAR byte: Byte) : BOOLEAN;
VAR
ok: BOOLEAN;
pos: Count;
BEGIN
IF SYS.TAS(s.lock) THEN
Error(s, NestedCall); RETURN FALSE
END;
s.error := FALSE;
IF s.left = 0 THEN
IF ~(read IN s.caps) THEN
s.lock := FALSE; Error(s, CannotRead); s.count := 0; RETURN FALSE
END;
IF s.buf # NIL THEN
IF ~FillBuf(s) THEN
(* FillBuf sets s.eof *)
s.lock := FALSE;
s.count := 0;
RETURN FALSE
END;
ELSE
ok := s.if.read(s, byte);
IF ok THEN
s.count := 1;
ELSE
s.count := 0;
END;
s.eof := ~ok;
s.lock := FALSE;
RETURN ok
END;
END;
(* s.left > 0 *)
s.count := 1;
byte := s.buf.cont[s.pos MOD bufsize];
INC(s.pos); DEC(s.left);
IF ~s.bidirect & (s.write # 0) THEN
DEC(s.write);
pos := s.pos MOD bufsize;
IF s.buf.wend < pos THEN
IF s.buf.wbegin = s.buf.wend THEN
s.buf.wbegin := pos;
END;
s.buf.wend := pos;
END;
END;
(* s.eof has been set by FillBuf *)
s.lock := FALSE;
RETURN TRUE
END ReadByte;
PROCEDURE ReadPacket*(s: Stream; VAR buf: ARRAY OF Byte;
off, maxcnt: Count) : Count;
(* fill buf[off..] with next packet *)
BEGIN
IF s.left > 0 THEN
IF maxcnt > s.left THEN
maxcnt := s.left;
END;
IF ReadPart(s, buf, off, maxcnt) THEN END;
RETURN s.count
END;
IF SYS.TAS(s.lock) THEN
Error(s, NestedCall);
s.count := 0;
RETURN 0
END;
s.error := FALSE; s.count := 0;
IF ~(read IN s.caps) THEN
s.lock := FALSE; Error(s, CannotRead); s.count := 0; RETURN 0
ELSIF (off < 0) OR (off+maxcnt > LEN(buf)) OR (maxcnt < 0) THEN
s.lock := FALSE; Error(s, BadParameters); s.count := 0; RETURN 0
END;
IF maxcnt = 0 THEN s.lock := FALSE; RETURN 0 END;
IF s.buf # NIL THEN
(* s.left = 0 *)
IF ~FillBuf(s) THEN
(* FillBuf sets s.eof *)
s.lock := FALSE;
RETURN 0
END;
s.lock := FALSE;
IF maxcnt > s.left THEN
maxcnt := s.left;
END;
IF ReadPart(s, buf, off, maxcnt) THEN END;
RETURN s.count
END;
(* s.buf = NIL *)
IF addrio IN s.caps THEN
s.count := s.if.addrread(s, SYSTEM.ADR(buf[off]), maxcnt);
ELSIF bufio IN s.caps THEN
s.count := s.if.bufread(s, buf, off, maxcnt);
ELSE
s.count := 0;
WHILE (s.count < maxcnt) & s.if.read(s, buf[s.count+off]) DO
INC(s.count);
END;
END;
IF s.count < 0 THEN
s.count := 0;
Error(s, ReadFailed);
ELSE
s.eof := s.count = 0;
END;
s.lock := FALSE;
RETURN s.count
END ReadPacket;
PROCEDURE WritePart*(s: Stream;
(* read-only *) VAR buf: ARRAY OF Byte;
off, cnt: Count) : BOOLEAN;
(* write buf[off..off+cnt-1] to s *)
VAR
posindex: Count;
PROCEDURE NewBuffer(s: Stream) : BOOLEAN;
(* flush and get new buffer *)
BEGIN
IF s.pos - posindex # s.buf.pos THEN
IF s.bufmode # bufpool THEN
IF ~InternalFlush(s) THEN RETURN FALSE END;
END;
InitBuf(s);
IF s.write # 0 THEN RETURN TRUE END;
END;
IF s.buf.wbegin = s.buf.wend THEN
(* nothing written into this buffer until now *)
s.buf.wbegin := posindex; s.buf.wend := posindex;
s.write := bufsize - posindex;
ELSIF s.wextensible & (s.buf.rbegin # s.buf.rend) THEN
(* check if the write region may be extended
over parts of the read region
*)
IF s.buf.wend < posindex THEN
(* write region before current position *)
IF (s.buf.rbegin <= s.buf.wend) & (s.buf.rend >= posindex) THEN
s.buf.wend := posindex;
s.write := bufsize - posindex;
END;
ELSE (* s.wbegin > posindex *)
(* write region after current position *)
IF (s.buf.rbegin <= posindex) & (s.buf.rend >= s.buf.wbegin) THEN
s.buf.wbegin := posindex;
s.write := bufsize - posindex;
END;
END;
END;
IF (* still *) s.write = 0 THEN
(* Flush necessary *)
IF ~InternalFlush(s) THEN RETURN FALSE END;
s.buf.wbegin := posindex; s.buf.wend := posindex;
s.write := bufsize - posindex;
END;
RETURN TRUE
END NewBuffer;
PROCEDURE UpdateReadRegion(s: Stream);
BEGIN
(* update s.left and extend read region, if possible *)
IF s.buf.rbegin = s.buf.rend THEN
(* set read region to write region *)
s.buf.rbegin := s.buf.wbegin; s.buf.rend := s.buf.wend;
s.left := s.buf.rend - posindex;
ELSIF (s.buf.rbegin < s.buf.wbegin) & (s.buf.rend >= s.buf.wbegin) THEN
(* forward extension of read region possible *)
IF s.buf.rend < s.buf.wend THEN
s.buf.rend := s.buf.wend;
END;
s.left := s.buf.rend - posindex;
ELSIF (s.buf.rbegin <= s.buf.wend) & (s.buf.rend > s.buf.wend) THEN
(* backward extension of read region possible *)
IF s.buf.rbegin > s.buf.wbegin THEN
s.buf.rbegin := s.buf.wend;
END;
s.left := s.buf.rend - posindex;
ELSE
(* posindex does not fall into [s.buf.rbegin..s.buf.rend-1] *)
s.left := 0;
END;
IF s.pos = s.buf.pos + bufsize THEN
s.left := 0;
END;
END UpdateReadRegion;
BEGIN
IF SYS.TAS(s.lock) THEN
Error(s, NestedCall); RETURN FALSE
END;
s.error := FALSE; s.count := 0;
IF ~(write IN s.caps) THEN
s.lock := FALSE; Error(s, CannotWrite); RETURN FALSE
ELSIF (off < 0) OR (off+cnt > LEN(buf)) OR (cnt < 0) THEN
s.lock := FALSE; Error(s, BadParameters); RETURN FALSE
ELSIF cnt = 0 THEN
s.lock := FALSE; RETURN TRUE
END;
IF s.buf # NIL THEN
IF s.bidirect THEN
WHILE s.count < cnt DO
IF (s.write = 0) & ~InternalFlush(s) THEN
s.lock := FALSE; RETURN FALSE
END;
s.wbuf.cont[s.wbuf.wend] := buf[off + s.count];
INC(s.wbuf.wend); INC(s.count); DEC(s.write);
IF (s.bufmode = linebuf) &
(buf[s.count+off-1] = s.termch) THEN
IF ~InternalFlush(s) THEN
s.lock := FALSE; RETURN FALSE
END;
END;
END;
ELSE
ValidPos(s);
posindex := s.pos MOD bufsize;
IF ~s.buf.ok THEN
InitBuf(s);
END;
(* copy from buf to s.buf *)
WHILE s.count < cnt DO
IF s.write = 0 THEN
posindex := s.pos MOD bufsize;
IF s.count > 0 THEN
UpdateReadRegion(s);
END;
IF ~NewBuffer(s) THEN
s.lock := FALSE; RETURN FALSE
END;
END;
s.buf.cont[posindex] := buf[off + s.count];
IF s.buf.wend = posindex THEN
INC(s.buf.wend);
END;
INC(s.count); INC(s.pos); DEC(s.write); INC(posindex);
IF (s.bufmode = linebuf) &
(buf[s.count+off-1] = s.termch) THEN
UpdateReadRegion(s);
IF ~InternalFlush(s) THEN
s.lock := FALSE; RETURN FALSE
END;
(* s.pos can be changed by InternalFlush *)
posindex := s.pos MOD bufsize;
END;
END;
UpdateReadRegion(s);
END;
ELSE (* unbuffered stream *)
IF addrio IN s.caps THEN
s.count := s.if.addrwrite(s, SYSTEM.ADR(buf[off]), cnt);
ELSIF bufio IN s.caps THEN
s.count := s.if.bufwrite(s, buf, off, cnt);
ELSE
s.count := 0;
WHILE (s.count < cnt) & s.if.write(s, buf[off+s.count]) DO
INC(s.count);
END;
END;
IF s.count # cnt THEN
Error(s, WriteFailed);
END;
END;
s.lock := FALSE;
RETURN s.count = cnt
END WritePart;
PROCEDURE Write*(s: Stream;
(* read-only *) VAR buf: ARRAY OF Byte) : BOOLEAN;
BEGIN
RETURN WritePart(s, buf, 0, LEN(buf))
END Write;
PROCEDURE WritePartC*(s: Stream; buf: ARRAY OF Byte;
off, cnt: Count) : BOOLEAN;
(* write buf[off..off+cnt-1] to s *)
BEGIN
RETURN WritePart(s, buf, off, cnt)
END WritePartC;
PROCEDURE WriteC*(s: Stream; buf: ARRAY OF Byte) : BOOLEAN;
BEGIN
RETURN WritePart(s, buf, 0, LEN(buf))
END WriteC;
PROCEDURE WriteByte*(s: Stream; byte: Byte) : BOOLEAN;
VAR
posindex: Count;
BEGIN
IF (s.write > 0) & ~SYS.TAS(s.lock) THEN
s.error := FALSE; s.count := 1;
IF s.bidirect THEN
s.wbuf.cont[s.wbuf.wend] := byte; INC(s.wbuf.wend); DEC(s.write);
ELSE
(* put byte into s.buf *)
posindex := s.pos MOD bufsize;
s.buf.cont[posindex] := byte;
IF s.buf.wend = posindex THEN
INC(s.buf.wend);
END;
DEC(s.write);
(* update s.buf.rend and s.left, if necessary *)
IF s.buf.rend = posindex THEN
INC(s.buf.rend);
END;
IF s.left # 0 THEN
DEC(s.left);
ELSIF s.buf.rbegin = s.buf.rend THEN
(* set read-region to write-region *)
s.buf.rbegin := s.buf.wbegin; s.buf.rend := s.buf.wend;
s.left := s.buf.wend - posindex;
END;
INC(s.pos);
END;
IF (s.bufmode = linebuf) & (byte = s.termch) THEN
IF ~InternalFlush(s) THEN
s.lock := FALSE; RETURN FALSE
END;
IF ~s.bidirect THEN
s.buf.wbegin := s.pos MOD bufsize;
END;
END;
s.lock := FALSE; RETURN TRUE
ELSE
RETURN WritePart(s, byte, 0, 1)
END;
END WriteByte;
PROCEDURE InternalSeek(s: Stream; offset: Count; whence: Whence) : BOOLEAN;
VAR
oldpos: Count; pos: Count;
BEGIN
s.error := FALSE;
IF s.bidirect THEN
Error(s, CannotSeek); RETURN FALSE
ELSIF s.buf = NIL THEN
IF ~(seek IN s.caps) THEN
Error(s, CannotSeek); RETURN FALSE
ELSIF ~s.if.seek(s, offset, whence) THEN
Error(s, SeekFailed); RETURN FALSE
END;
ELSE
IF ~s.validpos & (seek IN s.caps) THEN
IF (write IN s.caps) & ~InternalFlush(s) THEN END;
IF ~s.if.seek(s, offset, whence) THEN
Error(s, SeekFailed); RETURN FALSE
END;
IF whence = fromStart THEN
s.validpos := TRUE;
s.pos := offset; s.rpos := offset;
END;
ELSE
ValidPos(s); oldpos := s.pos;
IF s.pos > s.maxpos THEN
s.maxpos := s.pos;
END;
CASE whence OF
| fromStart: IF offset < 0 THEN
Error(s, SeekFailed); RETURN FALSE
END;
s.pos := offset;
| fromPos: IF s.pos + offset < 0 THEN
Error(s, SeekFailed); RETURN FALSE
END;
INC(s.pos, offset);
| fromEnd: IF (write IN s.caps) & ~InternalFlush(s) THEN END;
IF ~(seek IN s.caps) OR
~s.if.seek(s, offset, whence) THEN
Error(s, SeekFailed); RETURN FALSE
END;
s.validpos := FALSE; ValidPos(s);
ELSE
Error(s, BadWhence); RETURN FALSE
END;
IF ~(holes IN s.caps) & (s.pos > s.maxpos) THEN
(* if holes are not permitted
we need to check the new position
*)
IF ~(seek IN s.caps) THEN
Error(s, CannotSeek); RETURN FALSE
ELSIF s.if.seek(s, s.pos, fromStart) THEN
s.rpos := s.pos; s.maxpos := s.pos;
ELSE
Error(s, SeekFailed); RETURN FALSE
END;
END;
IF s.buf.ok & (s.pos # oldpos) THEN
(* set s.left and s.write *)
IF (s.pos < s.buf.pos) OR (s.pos >= s.buf.pos + bufsize) THEN
s.left := 0; s.write := 0;
ELSE
pos := s.pos MOD bufsize;
IF s.buf.rbegin = s.buf.rend THEN
s.buf.rbegin := pos; s.buf.rend := pos;
END;
IF s.buf.wbegin = s.buf.wend THEN
s.buf.wbegin := pos; s.buf.wend := pos;
END;
IF s.pos > oldpos THEN
IF (pos >= s.buf.rbegin) & (pos < s.buf.rend) THEN
s.left := s.buf.rend - pos;
ELSE
s.left := 0;
END;
IF (pos >= s.buf.wbegin) & (pos <= s.buf.wend) THEN
s.write := bufsize - pos;
ELSE
s.write := 0;
END;
IF s.wextensible &
(s.write < s.left) & (s.buf.wbegin # s.buf.wend) THEN
(* s.write = 0 (else s.write >= s.left);
try to extend write-region to avoid
an unnecessary flush operation
*)
IF (s.buf.wbegin < pos) &
(s.buf.wend >= s.buf.rbegin) THEN
(* write-region is followed by read-region *)
s.buf.wend := pos; s.write := bufsize - pos;
ELSIF (pos < s.buf.wbegin) &
(s.buf.wbegin >= s.buf.rend) THEN
(* read-region is followed by write-region *)
s.buf.wbegin := pos; s.write := bufsize - pos;
END;
END;
ELSE (* s.pos < oldpos *)
IF (pos < s.buf.rbegin) OR (pos > s.buf.rend) THEN
s.left := 0;
ELSE
s.left := s.buf.rend - pos;
END;
IF (pos < s.buf.wbegin) OR (pos > s.buf.wend) THEN
s.write := 0;
ELSE
s.write := bufsize - pos;
END;
END;
END;
END;
END;
END;
IF s.left > 0 THEN
s.eof := FALSE;
END;
RETURN TRUE
END InternalSeek;
PROCEDURE Seek*(s: Stream; offset: Count; whence: Whence) : BOOLEAN;
VAR
rval: BOOLEAN;
BEGIN
IF ~SYS.TAS(s.lock) THEN
rval := InternalSeek(s, offset, whence);
s.lock := FALSE;
RETURN rval
ELSE
Error(s, NestedCall);
RETURN FALSE
END;
END Seek;
PROCEDURE Tell*(s: Stream; VAR offset: Count) : BOOLEAN;
BEGIN
IF ~SYS.TAS(s.lock) THEN
s.error := FALSE;
IF tell IN s.caps THEN
IF s.buf # NIL THEN
IF s.validpos THEN
offset := s.pos;
ELSIF s.if.tell(s, s.rpos) THEN
s.validpos := TRUE;
s.pos := s.rpos;
offset := s.pos;
ELSE
s.lock := FALSE;
Error(s, TellFailed);
END;
ELSIF ~s.if.tell(s, offset) THEN
s.lock := FALSE;
Error(s, TellFailed);
END;
ELSE
s.lock := FALSE;
Error(s, CannotTell);
END;
s.lock := FALSE;
ELSE
Error(s, NestedCall);
END;
RETURN ~s.error
END Tell;
PROCEDURE GetPos*(s: Stream; VAR offset: Count);
(* IF ~Tell(s, offset) THEN offset := internal position END; *)
BEGIN
IF ~Tell(s, offset) THEN
IF SYS.TAS(s.lock) THEN
Error(s, NestedCall);
ELSE
ValidPos(s);
offset := s.pos;
s.lock := FALSE;
END;
END;
END GetPos;
PROCEDURE SetPos*(s: Stream; offset: Count);
(* IF ~Seek(s, offset, fromStart) THEN END; *)
BEGIN
IF ~Seek(s, offset, fromStart) THEN END;
END SetPos;
PROCEDURE ^ Touch*(s: Stream);
PROCEDURE Trunc*(s: Stream; length: Count) : BOOLEAN;
(* truncate `s' to a total length of `length';
following holds if holes are permitted:
(1) the current position remains unchanged
(2) the contents between `length' and
the current position is undefined
this call fails if holes are not permitted and the
current position is beyond `length'
*)
VAR
ok: BOOLEAN;
BEGIN
IF ~SYS.TAS(s.lock) THEN
IF (trunc IN s.caps) & (length >= 0) THEN
s.error := FALSE; ok := TRUE;
IF s.buf # NIL THEN
ValidPos(s);
IF ~(holes IN s.caps) & (s.pos > length) THEN
ok := FALSE;
ELSIF (s.bufmode = bufpool) OR s.buf.ok &
(s.buf.pos DIV bufsize >= length DIV bufsize) THEN
Touch(s);
END;
END;
IF ~ok OR ~s.if.trunc(s, length) THEN
s.lock := FALSE; Error(s, TruncFailed);
END;
ELSE
s.lock := FALSE; Error(s, CannotTrunc);
END;
s.lock := FALSE;
ELSE
Error(s, NestedCall);
END;
RETURN ~s.error
END Trunc;
PROCEDURE Back*(s: Stream) : BOOLEAN;
(* undo last read operation (one byte);
because of the delayed buffer filling
Back is always successful for buffered streams
immediately after read-operations
*)
VAR
rval: BOOLEAN;
BEGIN
IF ~SYS.TAS(s.lock) THEN
s.error := FALSE;
IF read IN s.caps THEN
IF seek IN s.caps THEN
(* fails if s.pos = 0 *)
rval := InternalSeek(s, -1, 1)
ELSIF s.bidirect & s.buf.ok THEN
IF s.pos > 0 THEN
DEC(s.pos); INC(s.left);
rval := TRUE;
ELSE
rval := FALSE;
END;
ELSIF (s.buf # NIL) & s.buf.ok THEN
rval := InternalSeek(s, -1, 1) & (s.left > 0)
ELSE
rval := FALSE
END;
ELSE
s.lock := FALSE; Error(s, CannotRead);
rval := FALSE
END;
s.lock := FALSE;
RETURN rval
ELSE
Error(s, NestedCall);
RETURN FALSE
END;
END Back;
PROCEDURE Insert*(s: Stream; byte: Byte) : BOOLEAN;
(* return `byte' on next read-operation *)
BEGIN
IF ~SYS.TAS(s.lock) THEN
s.error := FALSE;
IF read IN s.caps THEN
IF s.buf # NIL THEN
(* seek in buffer possible? *)
IF s.bidirect THEN
IF s.pos > 0 THEN
DEC(s.pos); s.buf.cont[s.pos] := byte;
RETURN TRUE
ELSE
RETURN FALSE
END;
ELSIF s.buf.ok &
(s.pos > s.buf.pos+s.buf.rbegin) &
(s.pos < s.buf.pos+s.buf.rend) &
InternalSeek(s, -1, 1) THEN
s.buf.cont[s.pos MOD bufsize] := byte;
s.lock := FALSE;
RETURN TRUE
ELSE
s.lock := FALSE;
RETURN FALSE
END;
ELSE
s.lock := FALSE; Error(s, Unbuffered); RETURN FALSE
END;
ELSE
s.lock := FALSE; Error(s, CannotRead); RETURN FALSE
END;
ELSE
Error(s, NestedCall);
RETURN FALSE
END;
END Insert;
PROCEDURE InternalFlush(s: Stream) : BOOLEAN;
PROCEDURE Write(s: Stream; buf: Buffer) : BOOLEAN;
VAR
count: Count;
BEGIN
IF addrio IN s.caps THEN
count := s.if.addrwrite(s, SYSTEM.ADR(buf.cont[buf.wbegin]),
buf.wend-buf.wbegin);
ELSIF bufio IN s.caps THEN
count := s.if.bufwrite(s, buf.cont,
buf.wbegin, buf.wend-buf.wbegin);
ELSIF s.if.write(s, buf.cont[buf.wbegin]) THEN
count := 1;
ELSE
count := 0;
END;
IF count < 0 THEN
count := 0;
END;
INC(buf.wbegin, count); INC(s.rpos, count);
RETURN count > 0
END Write;
PROCEDURE FlushEvent;
VAR
event: Event;
BEGIN
IF s.flushEvent # NIL THEN
NEW(event);
event.type := s.flushEvent;
event.message := "flush event of Streams";
event.stream := s;
Events.Raise(event);
END;
END FlushEvent;
BEGIN
s.error := FALSE;
IF (write IN s.caps) & (s.buf # NIL) & s.buf.ok THEN
IF s.bidirect & (s.wbuf.wend > s.wbuf.wbegin) THEN
FlushEvent;
WHILE (s.wbuf.wend > s.wbuf.wbegin) & Write(s, s.wbuf) DO END;
IF s.wbuf.wend > s.wbuf.wbegin THEN
s.wbuf.wbegin := 0; s.wbuf.wend := 0; s.write := bufsize;
Error(s, WriteFailed); RETURN FALSE
END;
s.wbuf.wbegin := 0; s.wbuf.wend := 0; s.write := bufsize;
ELSIF ~s.bidirect & (s.buf.wend > s.buf.wbegin) THEN
FlushEvent;
ValidPos(s);
IF s.buf.pos + s.buf.wbegin # s.rpos THEN
IF ~(seek IN s.caps) THEN
Error(s, CannotSeek);
(* write in this case at the current position
else there is no easy way to write anyhow
*)
ELSIF ~s.if.seek(s, s.buf.pos + s.buf.wbegin, fromStart) THEN
s.buf.wend := s.buf.wbegin; s.write := 0;
Error(s, SeekFailed); RETURN FALSE
END;
s.rpos := s.buf.pos + s.buf.wbegin;
END;
WHILE (s.buf.wend > s.buf.wbegin) & Write(s, s.buf) DO END;
IF s.buf.wend > s.buf.wbegin THEN
s.buf.wend := s.buf.wbegin; s.write := bufsize - s.buf.wbegin;
Error(s, WriteFailed); RETURN FALSE
END;
IF {seek, tell, trunc} * s.caps = {} THEN
(* unidirectional pipeline; reset s.pos to avoid
unintentional flushes due to buffer boundaries
*)
s.pos := 0; s.rpos := 0; s.buf.pos := 0;
s.buf.wbegin := 0; s.buf.wend := 0; s.write := bufsize;
ELSE
IF (s.pos >= s.buf.pos) & (s.pos < s.buf.pos + bufsize) THEN
s.buf.wbegin := s.pos MOD bufsize;
s.buf.wend := s.buf.wbegin;
s.write := bufsize - s.buf.wbegin;
ELSE
s.write := 0;
END;
END;
END;
END;
RETURN TRUE
END InternalFlush;
PROCEDURE Flush*(s: Stream) : BOOLEAN;
VAR
ok: BOOLEAN;
BEGIN
IF ~SYS.TAS(s.lock) THEN
IF s.bufmode = bufpool THEN
ok := FlushBufPool(s);
ELSE
ok := InternalFlush(s);
END;
IF ok & (flush IN s.caps) THEN
ok := s.if.flush(s);
IF ~ok THEN
Error(s, FlushFailed);
END;
END;
s.lock := FALSE;
RETURN ok
ELSE
Error(s, NestedCall);
RETURN FALSE
END;
END Flush;
PROCEDURE InputInBuffer*(s: Stream) : BOOLEAN;
(* returns TRUE if the next byte to be read is buffered *)
VAR
buf: Buffer;
pos: Count;
BEGIN
IF s.bufmode = bufpool THEN
IF ~s.buf.ok THEN RETURN FALSE END;
pos := s.pos - s.pos MOD bufsize;
IF s.buf.pos # pos THEN
IF ~FindBuffer(s, pos, buf) THEN
RETURN FALSE
END;
pos := s.pos - buf.pos;
RETURN (pos >= buf.rbegin) & (pos < buf.rend)
END;
ELSIF s.bidirect THEN
RETURN s.left > 0
END;
pos := s.pos MOD bufsize;
RETURN (read IN s.caps) & (s.buf # NIL) & s.buf.ok &
((s.left > 0) OR
(write IN s.caps) & (s.buf.wbegin <= pos) & (s.buf.wend > pos))
END InputInBuffer;
PROCEDURE OutputInBuffer*(s: Stream) : BOOLEAN;
(* returns TRUE if Flush would lead to a write-operation *)
VAR
buf: Buffer;
BEGIN
IF s.bufmode = bufpool THEN
buf := s.bufpool.head;
WHILE buf # NIL DO
IF buf.wbegin # buf.wend THEN RETURN TRUE END;
buf := buf.nexta;
END;
RETURN FALSE
ELSIF s.bidirect THEN
RETURN s.wbuf.wend > s.wbuf.wbegin
ELSE
RETURN (write IN s.caps) & (s.buf # NIL) & s.buf.ok &
(s.buf.wend > s.buf.wbegin)
END;
END OutputInBuffer;
PROCEDURE OutputWillBeBuffered*(s: Stream) : BOOLEAN;
(* returns TRUE if the next written byte will be buffered *)
VAR
buf: Buffer;
pos: Count;
BEGIN
IF s.bufmode = bufpool THEN
IF s.bufpool.nbuf < s.bufpool.maxbuf THEN RETURN TRUE END;
pos := s.pos - s.pos MOD bufsize;
IF s.buf.pos # pos THEN
IF ~FindBuffer(s, pos, buf) THEN RETURN FALSE END;
IF s.buf.wbegin = s.buf.wend THEN RETURN TRUE END;
pos := s.pos - buf.pos;
RETURN (pos >= buf.wbegin) & (pos <= buf.wend) OR
(buf.wbegin > 0) & (pos + 1 = buf.wbegin)
END;
ELSIF s.bidirect THEN
RETURN s.write > 0
END;
RETURN (write IN s.caps) & (s.buf # NIL) &
((s.write > 0) OR ~s.buf.ok)
END OutputWillBeBuffered;
PROCEDURE Touch*(s: Stream);
(* forget any buffer contents *)
BEGIN
IF ~SYS.TAS(s.lock) THEN
s.error := FALSE;
IF write IN s.caps THEN
IF s.bufmode = bufpool THEN
IF ~FlushBufPool(s) THEN END;
ReleaseBufPool(s);
ELSE
IF ~InternalFlush(s) THEN END;
END;
END;
IF flush IN s.caps THEN
IF ~s.if.flush(s) THEN
Error(s, FlushFailed);
END;
END;
IF s.bidirect THEN
s.buf.rbegin := 0; s.buf.rend := 0; s.left := 0;
ELSE
s.validpos := FALSE;
IF s.buf # NIL THEN
s.buf.ok := FALSE;
s.left := 0;
s.write := 0;
s.eofFound := FALSE;
END;
END;
s.lock := FALSE;
ELSE
Error(s, NestedCall);
END;
END Touch;
PROCEDURE Copy*(source, dest: Stream; maxcnt: Count) : BOOLEAN;
(* more efficient variants are possible *)
VAR
left, count, copied, read, written: Count;
buffer: ARRAY bufsize OF Byte;
ok: BOOLEAN;
BEGIN
IF maxcnt >= 0 THEN
read := 0; written := 0; ok := TRUE;
left := maxcnt;
LOOP
IF left = 0 THEN
EXIT
END;
ASSERT(left > 0);
IF left > bufsize THEN
count := bufsize;
ELSE
count := left;
END;
ok := ReadPacket(source, buffer, 0, count) > 0;
ASSERT(source.count <= count);
INC(read, source.count);
IF ~ok THEN EXIT END;
ok := WritePart(dest, buffer, 0, source.count);
ASSERT(dest.count <= source.count);
INC(written, dest.count);
IF ~ok THEN EXIT END;
DEC(left, dest.count);
END;
source.count := read; dest.count := written;
RETURN ok
ELSE
copied := 0;
WHILE (ReadPacket(source, buffer, 0, bufsize) > 0) &
WritePart(dest, buffer, 0, source.count) DO
INC(copied, source.count);
END;
source.count := copied; dest.count := copied;
RETURN ~source.error & ~dest.error
END;
END Copy;
(* === nulldev procedures ========================================== *)
PROCEDURE NulldevRead(s: Stream; VAR byte: Byte) : BOOLEAN;
BEGIN
byte := 0X;
RETURN FALSE
END NulldevRead;
PROCEDURE NulldevWrite(s: Stream; byte: Byte) : BOOLEAN;
BEGIN
RETURN TRUE
END NulldevWrite;
PROCEDURE InitNullIf(VAR nullif: Interface);
BEGIN
NEW(nullif);
nullif.read := NulldevRead;
nullif.write := NulldevWrite;
END InitNullIf;
PROCEDURE OpenNulldev(VAR s: Stream);
BEGIN
NEW(s);
Services.Init(s, type);
Init(s, nullif, {read, write}, nobuf);
END OpenNulldev;
PROCEDURE ExitHandler(event: Events.Event);
(* flush all streams on exit;
we do not close them to allow output by other exit event handlers
*)
VAR s: Stream;
BEGIN
s := opened;
WHILE s # NIL DO
IF (s.bufmode # nobuf) & (write IN s.caps) THEN
IF ~Flush(s) THEN END;
END;
s := s.next;
END;
END ExitHandler;
PROCEDURE FreeHandler(event: Events.Event);
(* set all free lists to NIL to return the associated storage
to the garbage collector
*)
BEGIN
freelist := NIL;
END FreeHandler;
BEGIN
Services.CreateType(type, "Streams.Stream", "");
errormsg[NoHandlerDefined] := "no handler defined";
errormsg[CannotRead] := "not opened for reading";
errormsg[CannotSeek] := "not capable of seeking";
errormsg[CloseFailed] := "close operation failed";
errormsg[NotLineBuffered] := "stream is not line buffered";
errormsg[SeekFailed] := "seek operation failed";
errormsg[TellFailed] := "tell operation failed";
errormsg[BadWhence] := "bad value of whence parameter";
errormsg[CannotTell] := "not capable of telling current position";
errormsg[WriteFailed] := "write operation failed";
errormsg[CannotWrite] := "not opened for writing";
errormsg[ReadFailed] := "read operation failed";
errormsg[Unbuffered] := "operation is not valid for unbuffered streams";
errormsg[BadParameters] := "bad parameter values";
errormsg[CannotTrunc] := "not capable of truncating";
errormsg[TruncFailed] := "trunc operation failed";
errormsg[NestedCall] := "nested stream operation";
errormsg[FlushFailed] := "flush operation failed";
Events.Define(error); Events.SetPriority(error, Priorities.liberrors);
Events.Ignore(error);
opened := NIL;
InitNullIf(nullif);
OpenNulldev(null); stdin := null; stdout := null; stderr := null;
Events.Handler(Process.termination, ExitHandler);
Events.Handler(Process.startOfGarbageCollection, FreeHandler);
END ulmStreams.