MOO
Moo Language
 
230231232 233234235236237238239240241242243244245246247248249250251 253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400    401402403 423424425       426427428 430431432433434435436437438439440                        441442443444445 446447448449  450451452453454455                            456457458459460461462463464465466467 468469470471472473474    475476477478479480481482483484485486487488 489490491492493494495496497  498499500     501502503504505506507508509510            511512513  514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623             624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696	## the handle must be the first field in this object to match
	## the internal representation used by various modules. (e.g. sck)
	var(#get) handle := -1.
 
 
	var inwc := 0, outwc := 0. ## input watcher count and ouput watcher count
	var insem, outsem.
	var(#get,#set) inputAction, outputAction.
	var(#get) inputReady := false, outputReady := false.
 
	method close
	{
		if (self.handle >= 0)
		{
			if (self.insem notNil) 
			{
				System unsignal: self.insem;
				       removeAsyncSemaphore: self.insem.
				self.insem := nil.
			}.
			if (self.outsem notNil)
			{
				System unsignal: self.outsem;
 
				self.outsem := nil.
			}.
 
			self.outwc := 0.
			self.inwc := 0.
 
			self _close.
			self.handle := -1.
		}
	}
 
## TODO: how to specify a timeout for an action? using another semaphore??
	method watchInput
	{
		if (self.inwc == 0) 
		{ 
			if (self.insem isNil)
			{
				self.insem := Semaphore new.
				self.insem signalAction: [:sem | 
					self.inputReady := true.
					self.inputAction value: self value: true
				].
				System addAsyncSemaphore: self.insem.
			}.
			self.inputReady := false.
			System signal: self.insem onInput: self.handle 
		}.
		self.inwc := self.inwc + 1.
	}
 
	method unwatchInput
	{
		if (self.inwc > 0)
		{
			self.inwc := self.inwc - 1.
			if (self.inwc == 0)
			{
				##if (self.insem notNil) { System unsignal: self.insem }.
				System unsignal: self.insem.
				System removeAsyncSemaphore: self.insem.
				self.insem := nil.
			}.
		}.
	}
 
	method watchOutput
	{
		if (self.outwc == 0)
		{
			if (self.outsem isNil)
			{
				self.outsem := Semaphore new.
				self.outsem signalAction: [:sem | 
					self.outputReady := true.
					self.outputAction value: self value: true 
				].
				System addAsyncSemaphore: self.outsem.
			}.
			self.outputReady := false.
			System signal: self.outsem onOutput: self.handle.
		}.
		self.outwc := self.outwc + 1.
	}
 
 
	method unwatchOutput
	{
		if (self.outwc > 0)
		{
			self.outwc := self.outwc - 1.
			if (self.outwc == 0)
			{
				## self.outsem must not be nil here.
				System unsignal: self.outsem.
				System removeAsyncSemaphore: self.outsem.
				self.outsem := nil.
			}.
		}.
	}
 
	method writeBytes: bytes offset: offset length: length signal: sem
	{
		| oldact n |
#######################################
## TODO: if data still in progress, failure... or success while concatening the message? 
##       for a stream, concatening is not bad. but it's not good if the socket requires message boundary preservation.
######################################
 
		if (self.outputReady)
		{
			## n >= 0: written
			## n <= -1: tolerable error (e.g. EAGAIN)
			## exception: fatal error
			##while (true) ## TODO: loop to write as much as possible
			##{
				n := self _writeBytes: bytes offset: offset length: length.
				if (n >= 0) 
				{ 
					if (sem notNil) { sem signal }.
					^n 
				}.
			##}.
 
			self.outputReady := false.
		}.
 
		oldact := self.outputAction.
		self.outputAction := [ :sck :state |
			##### schedule write.
			if (state)
			{
				if ((self _writeBytes: bytes offset: offset length: length) <= -1) 
				{
					## EAGAIN
					self.outputReady := false.
					^self.
				}.
## TODO: handle _writeBytes may not write in full.
			}.
 
			self.outputAction := oldact.
			self unwatchOutput.
		].
 
		## TODO: set timeout?
		self watchOutput.
	}
 
	method writeBytes: bytes signal: sem
	{
		^self writeBytes: bytes offset: 0 length: (bytes size) signal: sem.
	}
 
	method writeBytes: bytes offset: offset length: length
	{
		^self writeBytes: bytes offset: offset length: length signal: nil.
	}
 
	method writeBytes: bytes
	{
		^self writeBytes: bytes offset: 0 length: (bytes size) signal: nil.
	}
}
 
class Socket(AsyncHandle) from 'sck'
{
	method(#primitive) open(domain, type, proto).
 
 
 
 
	method(#primitive) _close.
	method(#primitive) bind: addr.
	method(#primitive) _listen: backlog.
 
	DGRAM  := 2.
}
 
 
 
 
 
 
 
 
extend Socket
{
	method(#class) new { self messageProhibited: #new }
 
 
	method(#class) domain: domain type: type
	{
		^super new open(domain, type, 0).
	}
 
	method listen: backlog do: acceptBlock
	{
		self.inputAction := acceptBlock.
		self watchInput.
		^self _listen: backlog.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
	}
 
	method connect: target do: connectBlock
	{
		| conblk oldact |
 
 
		if ((self _connect: target) <= -1)
		{
			## connection in progress
 
 
 
			oldact := self.outputAction.
			self.outputAction := [ :sck :state |
				| soerr |
 
				if (state)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
				{
					## i don't map a connection error to an exception.
					## it's not a typical exception. it is a normal failure
					## that is caused by an external system. 
					##
					## or should i map an error to an exception?
					## i can treat EINPROGRESS, ECONNREFUSED as failure.
					## all other errors may get treated as an exception?
					## what about timeout???
 
					soerr := self _socketError.
					if (soerr >= 0) 
 
					{
						## finalize connection if not in progress
						self.outputAction := oldact.
						self unwatchOutput.
						if (connectBlock notNil)
						{
							connectBlock value: sck value: (soerr == 0).
 
 
 
 
						}.
					}.
				}
				else
				{
					## timed out
					self.outputAction := oldact.
					self unwatchOutput.
					if (connectBlock notNil) 
					{
						## TODO: tri-state? success, failure, timeout? or boolean with extra error code
						connectBlock value: sck value: false. 
					}.
				}.
 
			].
 
			###self.outputTimeout: 10 do: xxxx.
			self watchOutput.
		}
		else
		{
			## connected immediately.
			if (connectBlock notNil) 
 
 
			{
				connectBlock value: self value: true.
			}
 
 
 
 
 
		}
	}
}
 
class MyObject(Object)
{
	method(#class) main
	{
		| conact inact outact accact |
 
 
 
 
 
 
 
 
 
 
 
 
 
 
(SocketAddress fromString: '192.168.123.232:99') dump.
'****************************' dump.
 
 
 
(*
s:= X new: 20.
s basicSize dump.
'****************************' dump.
 
s := Y new: 10.
s x.
s basicAt: 1 put: 20.
s dump.
s basicSize dump.
'****************************' dump.
*)
 
(***********************************
s := ByteArray new: 100.
s basicFillFrom: 0 with: ($a asInteger) count: 100.
s basicFillFrom: 50 with: ($b asInteger) count: 50.
(s basicShiftFrom: 50 to: 94 count: 10) dump.
s dump.
##thisProcess terminate.
 
s := IP4Address fromString: '192.168.123.232'.
s dump.
s basicSize dump.
 
s := IP6Address fromString: 'fe80::c225:e9ff:fe47:99.2.3.4'.
##s := IP6Address fromString: '::99.12.34.54'.
##s := IP6Address fromString: '::FFFF:0:0'.
##s := IP6Address fromString: 'fe80::'.
s dump.
s basicSize dump.
 
s := IP6Address fromString: 'fe80::c225:e9ff:fe47:b1b6'.
s dump.
s basicSize dump.
##s := IP6Address new.
##s dump.
##s := IP4SocketAddress new.
##s dump.
thisProcess terminate.
**************************)
 
		inact := [:sck :state |
			| data n |
(*
end of data -> 0.
no data -> -1. (e.g. EAGAIN)
has data -> 1 or more
error -> exception
*)
 
			data := ByteArray new: 100.
			do
			{
				n := sck readBytes: data.
				if (n <= 0)
				{
					if (n == 0) { sck close }. ## end of data
					break.
				}
				elsif (n > 0)
				{
					(n asString & ' bytes read') dump.
					data dump.
 
					##sck writeBytes: #[ $h, $e, $l, $l, $o, $., $., $., C'\n' ].
					sck writeBytes: data offset: 0 length: n.
				}.
			}
			while (true).
		].
 
## TODO: what should it accept as block parameter
## socket, output result? , output object?
		outact := [:sck :state |
			if (state)
			{
				## what if i want write more data???
				##[ sck writeBytes: #[ $h, $e, $l, $l, $o, C'\n' ] ] 
				##	on: Exception do: [:ex | sck close. ].
			}
			else
			{
			}
		].
 
	
		conact := [:sck :state |
 
			| x write_more count |
 
			count := 0.
			if (state)
			{
				'CONNECTED NOW.............' dump.
				###sck inputTimeout: 10; outputTimeout: 10; connectTimeout: 10.
 
#############################################
				write_more := [:sem |
					if (count <= 26)
					{
						sck writeBytes: %[ $h, $e, $l, $l, $o, $-, $m, $o, count + 65, $o, $o, C'\n' ] signal: x.
						count := count + 1.
					}
					else
					{
						System removeAsyncSemaphore: x.
					}.
				].
 
 
 
 
 
 
 
 
 
 
 
 
 
 
				x := Semaphore new.
				x signalAction: write_more.
				System addAsyncSemaphore: x.
				x signal.
 
				##sck outputAction: outact.
				##sck writeBytes: #[ $h, $e, $l, $l, $o, $-, $m, $o, $o, C'\n' ] signal: x.
###############################################
 
				sck inputAction: inact.
				sck watchInput.
			}
			else
			{
				'UNABLE TO CONNECT............' dump.
			}
		].
 
		## ------------------------------------------------------
		accact := [:sck :state |
			| newsck newaddr |
 
			newaddr := SocketAddress new.
			newsck := sck accept: newaddr.
 
			System log: 'new connection - '; log: newaddr; log: ' '; log: (newsck handle); logNl.
 
			newsck inputAction: inact; outputAction: outact.
			##newsck watchInput; watchOutput.
			newsck watchInput.
 
			
			newsck writeBytes: #[ $W, $e, $l, $c, $o, $m, $e, $., C'\n' ].
		].
 
 
 
		[
			| s s2 st sg ss |
			[
				s := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM.
				s connect: (SocketAddress fromString: '127.0.0.1:9999') do: conact.
 
##				s2 := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM.
##				s2 bind: (SocketAddress fromString: '0.0.0.0:9998').
##				##s2 inputAction: accact.
##				###s2 listen: 10; watchInput.
##				s2 listen: 10 do: accact.
 
(*
st := Semaphore new.
System addAsyncSemaphore: st.
System signal: st afterSecs: 5.
'JJJJJJJJJJJ' dump.
sg := SemaphoreGroup new.
'JJJJJJJJJJJ' dump.
sg wait.
'YYYYYYYYYYYYYYY' dump.
*)
 
###[ while (1) { '1111' dump. System sleepForSecs: 1 } ] fork.
 
(*
st := Semaphore new.
System addAsyncSemaphore: st.
System signal: st afterSecs: 20.
*)
 
 
				while (true)
				{
					ss := System handleAsyncEvent.
230231232233234235236237238239240241242243244245246247248249250251252 254255256   257258259260261                                                                                                                      262263264265266267268269270271272273274275276277278279280281282283284285286287 307308309310311312313314315316317318319 321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405    406407408409410411   412413414415416417418419           420421422423424425426427428429430431432433434435436437438439440441442 443444  445446447448449450451452453454455456457458459460461462463464465466467  468469      470471472473474475476 477                                                                      478479480481482483484  485486487488489490491492493494495496497498499500501502503504505506507508   509                                                                510511512	## the handle must be the first field in this object to match
	## the internal representation used by various modules. (e.g. sck)
	var(#get) handle := -1.
	var outsem := nil.
 
	##method initialize
	##{
	##	^super initialize
	##}
 
	method close
	{
		if (self.handle >= 0)
		{
			###if (self.insem notNil) 
			###{
			###	System unsignal: self.insem;
			###	       removeAsyncSemaphore: self.insem.
			###	self.insem := nil.
			###}.
			if (self.outsem notNil)
			{
				System unsignal: self.outsem;
 
				self.outsem := nil.
			}.
 
 
 
 
			self _close.
			self.handle := -1.
		}
	}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
	method writeBytes: bytes signal: sem
	{
		^self writeBytes: bytes offset: 0 length: (bytes size)
	}
 
	method writeBytes: bytes offset: offset length: length
	{
		^self writeBytes: bytes offset: offset length: length.
	}
 
	method writeBytes: bytes
	{
		^self writeBytes: bytes offset: 0 length: (bytes size)
	}
}
 
class Socket(AsyncHandle) from 'sck'
{
	var eventActions.
	var pending_bytes, pending_offset, pending_length.
	var outreadysem, outdonesem, inreadysem.
 
	method(#primitive) _open(domain, type, proto).
	method(#primitive) _close.
	method(#primitive) bind: addr.
	method(#primitive) _listen: backlog.
 
	DGRAM  := 2.
}
 
pooldic Socket.EventType
{
	CONNECTED := 0.
	DATA_IN := 1.
	DATA_OUT := 2.
}
 
extend Socket
{
	method(#class) new { self messageProhibited: #new }
 
 
	method(#class) domain: domain type: type
	{
		^(super new) open(domain, type, 0)
	}
 
	method initialize
	{
		super initialize.
		self.eventActions := #(nil nil nil).
 
		self.outdonesem := Semaphore new.
		self.outreadysem := Semaphore new.
		self.inreadysem := Semaphore new.
 
		self.outdonesem signalAction: [ :xsem |
			(self.eventActions at: Socket.EventType.DATA_OUT) value: self.
			System unsignal: self.outreadysem.
		].
 
		self.outreadysem signalAction: [ :xsem |
			| nwritten |
			nwritten := self _writeBytes: self.pending_bytes offset: self.pending_offset length: self.pending_length.
			if (nwritten >= 0)
			{
				self.pending_bytes := nil.
				self.pending_offset := 0.
				self.pending_length := 0.
				self.outdonesem signal.
			}
		].
 
		self.inreadysem signalAction: [ :ysem |
			(self.eventActions at: Socket.EventType.DATA_IN) value: self.
		].
	}
 
	method open(domain, type, proto)
	{
		| sck |
		sck := self _open(domain, type, proto).
 
		if (self.handle >= 0)
		{
			System addAsyncSemaphore: self.outdonesem.
			System addAsyncSemaphore: self.outreadysem.
		}.
 
		^sck
	}
 
 
	method close
	{
'Socket close' dump.
		System removeAsyncSemaphore: self.outdonesem.
		System removeAsyncSemaphore: self.outreadysem.
		^super close.
	}
###	method listen: backlog do: acceptBlock
###	{
###		self.inputAction := acceptBlock.
###		self watchInput.
###		^self _listen: backlog.
###	}
 
	method onEvent: event_type do: action_block
	{
		self.eventActions at: event_type put: action_block.
	}
 
	method connect: target
	{
		| sem |
		if ((self _connect: target) <= -1)
		{
			sem := Semaphore new.
			sem signalAction: [ :xsem |
				| soerr dra |
				soerr := self _socketError.
				if (soerr >= 0) 
				{
					## finalize connection if not in progress
'CHECKING FOR CONNECTION.....' dump.
					System unsignal: xsem.
					System removeAsyncSemaphore: xsem.
 
 
 
 
 
					(self.eventActions at: Socket.EventType.CONNECTED) value: self value: (soerr == 0).
 
					if (soerr == 0)
					{
						if ((self.eventActions at: Socket.EventType.DATA_IN) notNil)
 
 
 
						{
							xsem signalAction: [ :ysem |
								(self.eventActions at: Socket.EventType.DATA_IN) value: self.
							].
							System addAsyncSemaphore: xsem.
							System signal: xsem onInput: self.handle.
						}.
					}.
 
 
 
 
 
 
 
 
 
 
 
				}.
				(* HOW TO HANDLE TIMEOUT? *)
			].
 
			System addAsyncSemaphore: sem.
			System signal: sem onOutput: self.handle.
		}
		else
		{
			## connected immediately
'IMMEDIATELY CONNECTED.....' dump.
			(self.eventActions at: Socket.EventType.CONNECTED) value: self value: true.
			if ((self.eventActions at: Socket.EventType.DATA_IN) notNil)
			{
				sem := Semaphore new.
				sem signalAction: [ :xsem |
					(self.eventActions at: Socket.EventType.DATA_IN) value: self.
				].
				System addAsyncSemaphore: sem.
				System signal: sem onInput: self.handle.
			}.
		}
	}
 
 
	method writeBytes: bytes offset: offset length: length
 
 
	{
		| n |
 
		## n >= 0: written
		## n <= -1: tolerable error (e.g. EAGAIN)
		## exception: fatal error
		##while (true) ## TODO: loop to write as much as possible
		##{
			n := self _writeBytes: bytes offset: offset length: length.
			if (n >= 0) 
			{ 
				self.outdonesem signal.
				^n 
			}.
		##}.
 
		## TODO: adjust offset and length 
		self.pending_bytes := bytes.
		self.pending_offset := offset.
		self.pending_length := length.
 
		System signal: self.outreadysem onOutput: self.handle.
	}
 
 
 
}
 
 
 
 
 
 
 
class MyObject(Object)
{
	method(#class) main
	{
		[
			| s s2 st sg ss buf count |
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
			count := 0.
			[
				buf := ByteArray new: 128.
				s := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM.
				
				s onEvent: Socket.EventType.CONNECTED do: [ :sck :state |
					if (state)
 
 
					{
						'AAAAAAAA' dump.
						s writeBytes: #[ $a, $b, $c ] 
					}
					else
					{
						'FAILED TO CONNECT' dump.
					}.
				].
				s onEvent: Socket.EventType.DATA_IN do: [ :sck |
					| nbytes |
					nbytes := s readBytes: buf. 
					if (nbytes == 0)
					{
						sck close
					}.
					('Got ' & (nbytes asString)) dump.
					buf dump.
				].
				s onEvent: Socket.EventType.DATA_OUT do: [ :sck |
					if (count < 10) { s writeBytes: #[ $a, $b, C'\n' ]. count := count + 1. }.
				].
 
				s connect: (SocketAddress fromString: '127.0.0.1:9999').
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
				while (true)
				{
					ss := System handleAsyncEvent.