Pythonでsocketサーバでリクエストを受け、それごとにmultiprocessingで何かを実行させ、外部からタイムアウトでプロセスをkillする。

前回、socketでrecvした直後に、何かをやらせるために、やらせる部分をThreadでやってみました。
でも、どうやらThreadが終了しないときに途中でkillすることが難しそうなので、ここはmultiprocessingでやってみようということになった解決編ですね。 

いや、Linuxならforkならうまくいきそうな気がしていましたが、Windowsだと、multiprocessした先からsocketでクライアントに値を返すとかもしかしたら無理かも。。って思っていましたが、杞憂だったようで。

ということでとりあえず、processをタイムアウトでkillする部分。

class	timeOuter:
	AR	= []
	NN	= 0
	TO	= 10	#TimeOutSec
	#----------------------------------
	def	__init__(self,tsec):
		self.TO		= tsec
	#----------------------------------
	#	インスタンス登録
	#----------------------------------
	def append(self,p):
		nowtime = int(time.time())	#秒単位
		self.NN = self.NN + 1
		self.AR.append([p,nowtime,self.NN])	#self.AR.append((p,nowtime,self.NN)) #どう違う?
		return	0
	#----------------------------------
	#	LOOPのスレッド開始
	#----------------------------------
	def	loop(self):
		t = threading.Thread(target=self.looplocal)
		t.start()
	#----------------------------------
	#	LOOPのスレッド本体
	#----------------------------------
	def	looplocal(self):
		print("loop Start---------")
		bwait	= True
		while True:
			if bwait:
				time.sleep(1)
			else:
				bwait = True
			n = len(self.AR)
			if n>0:
				for w in range(n):
					p = self.AR[w]
					nowtime		= int(time.time())	#秒単位
					delttime	= nowtime - int(p[1])
					if delttime > self.TO:
						print("【{}】 #削除# {}秒経過".format(p[2],delttime))
						del self.AR[w]	#pではダメよ。
						bwait = False
						#p[0].getP().sendall(b'TIMEOUT DAYO!')	#CLIENTに戻す。同期ならここで呼ばないとterminateされて呼ばれない。
						p[0].getP().terminate()
						break;
					else:
						print("【{}】 ## {}秒経過".format(p[2],delttime))
		return	0

#呼び出す方
	T = timeOuter(5)#5秒でタイムアウトの場合
	T.loop()

T = timeOuter(5)
でタイムアウト秒数を5秒に設定しながらインスタンス化し、
T.loop()
でチェックLOOPに入ります。
loop関数の中で、別スレッドでlooplocalを実行しながら待ち受けます。
最初は当然ARが長さ0のarrayなので、あれ、、Pythonだとリストか。
1秒のsleepを入れながら延々とLOOPすることになりますね。
このARに値を入れるのが、append関数ですね。

timeOuterのT.append関数では、あとから出てくるプロセス用クラスのインスタンス値をを渡し、
[インスタンス、現在のUNIX_TIME、N]のリストを、ARの1要素としてappendします。
そうすると、ARの要素すうが1になるので、timeOuterのlooplocalのなかで引っかかるっていうしくみです。

そうしてもう一度looplocal関数を見ると、現在のUNIX_TIMEと、appendされた要素の2番目の要素である、appendした時の UNIX_TIME を比較し、タイムアウト秒をこえていたら、処理することになります。
今回は、socketでrecvしたところから始まり、クライアント側はblockingで待っているという仮定なので、プロセスをkillするならば、クライアントに何らか送ってやらねばなりません。
サンプルでは非同期のためコメント化してありますが、

p[0].getP().sendall(b'TIMEOUT DAYO!')	#CLIENTに戻す。同期ならここで呼ばないとterminateされて呼ばれない。
p[0].getP().terminate()

あとから出てきますが、
p[0].getP()で戻されるのは、
multiprocessing.Process
の戻り値(インスタンス)です。

それでは、multiprocessingする方のクラスです。

class	procDorry:
	#----------------------------------
	#	Processインスタンス
	#----------------------------------
	def	getP(self):
		return	self.P
	#----------------------------------
	#	クライアントAccept
	#----------------------------------
	def	getC(self):
		return	self.C
	#----------------------------------
	#	拡張仕様用
	#----------------------------------
	def	getA(self):
		return	self.A
	#----------------------------------
	#	マルチプロセス開始
	#----------------------------------
	def	exec(self,conn,mes,alter):
		self.C	= conn;
		self.M	= mes;
		self.A	= alter;
		self.P = multiprocessing.Process(target=self.dorry)
		self.P.start()
		print("Process:")
		print(self.P)
	#----------------------------------
	#	マルチプロセス本体
	#----------------------------------
	def	dorry(self):
		#------------------------------
		self.C.sendall(b'complete DAYO!')	#非同期ならまず返す
		#------------------------------
		#execute sample
		print("message:{}".format(self.M))
		print("alternated:{}".format(self.A))
		for w in range(10):
			print(w)
			time.sleep(1)
		#------------------------------
		#self.C.sendall(b'complete DAYO!')	#同期なら最後に返す
		#------------------------------
		del	self	#このdelいるのか?
	#----------------------------------
	#	STATIC。ここでインスタンス生成
	#----------------------------------
	def	funcs(conn,mes,alter):
		p = procDorry()
		p.exec(conn,mes,alter)
		#---------------------------
		#	terminateのサンプル
		#	実際は、alterにクラスインスタンスの関数を登録しておいて
		#	そこにprocDorryのインスタンスを追加していく感じ
		#	クラスには、インスタンス、開始時刻、種別などを登録して、LOOPで検索しながらterminateさせるとかする。
		#---------------------------
		A = p.getA()
		if callable(A):		#	if hasattr(A, '__call__'):
			print(A)
			A(p)
		else:
			print("alternated not callable.")
		#---------------------------
	#----------------------------------

Threadの時にもやりましたが、funcsというSTATIC関数の中で、procDorryのインスタンスを生成して、exec関数を実行します。
まぁ投げっぱなしってことです。
execするときに、実際にはクライアントsocket、message、Timerインスタンスを渡しています。

では、socketで受ける本体の方はというと、

class	hiraSockSv:
	dispct	= 0
	readbuf	= 1024
	bBlock	= False	#BlockingMode
	address	= 'localhost'
	port	= 12345
	dorry	= None	#
	alter	= "ALTERNATED"	#
	#--------------------------
	def setAlter(self,a):
		self.alter = a
	#--------------------------
	def setFunc(self,f):
		self.dorry = f
	#--------------------------
	# なんか動いてるっぽい表示
	def dispN(self):
		if self.dispct==0:
			print("●\r",end="")
		else:
			print("〇\r",end="")
		self.dispct = self.dispct + 1;
		if self.dispct == 2:
			self.dispct = 0
	#--------------------------
	def	__init__(self,address,port):
		self.address	= address;
		self.port		= port;
	#--------------------------
	def	setData(self,a,b):
		self.address	= address;
		self.port		= port;
	#--------------------------
	def	test(self):
		print("A:{} B:{}".format(self.address,	self.port)) 
	#--------------------------
	def __enter__(self):
		#print("前処理")
		return self		#必須
	#--------------------------
	def __exit__(self, exc_type, exc_value, traceback):
		#print("後処理")
		self.tmp = 0
	#--------------------------
	def localAccept(self,s):
		try:
			conn, addr = s.accept()
			return	conn, addr
		except Exception as e:
			if e.args[0]==10035 or e.args[0]==11:
				return	None,None
	#--------------------------------------
	def localRecv(self,conn,n):
		try:
			dat = conn.recv(n)
			return	dat
		except Exception as e:
			if e.args[0]==10035 or e.args[0]==11:
				return	None
	#--------------------------------------
	def exec(self):
		print("Address:{} Port:{}".format(self.address,	self.port)) 
		with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
			s.bind((self.address, self.port))
			s.listen(10)
			s.setblocking(False)
			while True:
				mes = "";
				#------------------------------------------------
				conn,addr = self.localAccept(s)
				if conn==None:
					self.dispN()
					time.sleep(0.1)
					continue
				conn.setblocking(False)	#LINUXだとこれをしないとrecvがblockingしてしまう
				while True:
					dat = self.localRecv(conn,self.readbuf)
					if not dat:
						print("complete?");
						break
					mes = mes + dat.decode('utf8')
				#print(mes);
				print("one cours !")	#ここでThreadを生成させよう
				if self.dorry:
					self.dorry(conn,mes,self.alter)
				else:
					print("NOT DORRY?")
		return	0
	#--------------------------------------

で、最後に呼び出す方ですが、

if __name__ == '__main__':
	T = timeOuter(5)
	T.loop()
	with hiraSockSv('127.0.0.1',12345)as S:
		S.setAlter(T.append)
		S.setFunc(procDorry.funcs)
		S.exec()

こんな感じですね。
timeOuterを生成してチェックのloopを開始させる。
hiraSockSvのインスタンスを生成して、拡張情報としてT.appendの関数を渡す。
関数情報として、procDorry.funcsを渡す。
hiraSockSvの exec関数で実行(socketサーバの役割)を開始する。

まぁそんな感じです。

実際の挙動は、
socketでリクエストがくる➡
受信する➡
マルチプロセスを生成して受信したメッセージを渡す ➡
1秒ずつ10回LOOPして番号を表示させる ➡
プロセス終了

ということになりますが、実際は、5秒たったところで、タイムアウトチェックにひっかかり、
プロセスを終了させています。
実際に実行させたときの画面は以下で。

loop Start---------
.
Address:127.0.0.1 Port:12345
complete?
one cours !
Process:
<Process(Process-1, started)>
<bound method timeOuter.append of <__main__.timeOuter object at 0x000002380E097DA0>>
message:----+----ひらひら1----+----2----+----3----+----4----+----5----+----6
alternated:<bound method timeOuter.append of <__mp_main__.timeOuter object at 0x0000028A8EED6710>>
0
【1】 ## 1秒経過
1
【1】 ## 2秒経過
complete?
one cours !
Process:
<Process(Process-2, started)>
<bound method timeOuter.append of <__main__.timeOuter object at 0x000002380E097DA0>>
message:----+----ひらひら1----+----2----+----3----+----4----+----5----+----6
alternated:<bound method timeOuter.append of <__mp_main__.timeOuter object at 0x0000023D6A0966D8>>
0
2
【1】 ## 3秒経過
【2】 ## 1秒経過
1
3
【1】 ## 4秒経過
【2】 ## 2秒経過
complete?
one cours !
Process:
<Process(Process-3, started)>
<bound method timeOuter.append of <__main__.timeOuter object at 0x000002380E097DA0>>
message:----+----ひらひら1----+----2----+----3----+----4----+----5----+----6
alternated:<bound method timeOuter.append of <__mp_main__.timeOuter object at 0x0000025BD9CD6710>>
0
2
4
【1】 ## 5秒経過
【2】 ## 3秒経過
【3】 ## 1秒経過
1
complete?
one cours !
Process:
<Process(Process-4, started)>
<bound method timeOuter.append of <__main__.timeOuter object at 0x000002380E097DA0>>
message:----+----ひらひら1----+----2----+----3----+----4----+----5----+----6
alternated:<bound method timeOuter.append of <__mp_main__.timeOuter object at 0x0000028B3D816710>>
0
3
5
【1】 #削除# 6秒経過
【2】 ## 4秒経過
【3】 ## 2秒経過
【4】 ## 1秒経過
2
1
complete?
one cours !
Process:
<Process(Process-5, started)>
<bound method timeOuter.append of <__main__.timeOuter object at 0x000002380E097DA0>>
message:----+----ひらひら1----+----2----+----3----+----4----+----5----+----6
alternated:<bound method timeOuter.append of <__mp_main__.timeOuter object at 0x000001BB54C11CF8>>
0
4
【2】 ## 5秒経過
【3】 ## 3秒経過
【4】 ## 2秒経過
【5】 ## 1秒経過
3
2
1
5
【2】 #削除# 6秒経過
【3】 ## 4秒経過
【4】 ## 3秒経過
【5】 ## 2秒経過
4
3
2
【3】 ## 5秒経過
【4】 ## 4秒経過
【5】 ## 3秒経過
5
4
3
【3】 #削除# 6秒経過
【4】 ## 5秒経過
【5】 ## 4秒経過
5
4
【4】 #削除# 6秒経過
【5】 ## 5秒経過
5
【5】 #削除# 6秒経過
〇




コメントする

メールアドレスが公開されることはありません。 が付いている欄は必須項目です