Pythonでsocketサーバでリクエストを受け、それごとにmultiprocessingで何かを実行させ、外部からタイムアウトでプロセスをkillする。
前回、socketでrecvした直後に、何かをやらせるために、やらせる部分をThreadでやってみました。
でも、どうやらThreadが終了しないときに途中でkillすることが難しそうなので、ここはmultiprocessingでやってみようということになった解決編ですね。
いや、Linuxならforkならうまくいきそうな気がしていましたが、Windowsだと、multiprocessした先からsocketでクライアントに値を返すとかもしかしたら無理かも。。って思っていましたが、杞憂だったようで。
ということでとりあえず、processをタイムアウトでkillする部分。
class timeOuter:
AR = []
NN = 0
TO = 10 #TimeOutSec
#----------------------------------
def __init__(self,tsec):
self.TO = tsec
#----------------------------------
# インスタンス登録
#----------------------------------
def append(self,p):
nowtime = int(time.time()) #秒単位
self.NN = self.NN + 1
self.AR.append([p,nowtime,self.NN]) #self.AR.append((p,nowtime,self.NN)) #どう違う?
return 0
#----------------------------------
# LOOPのスレッド開始
#----------------------------------
def loop(self):
t = threading.Thread(target=self.looplocal)
t.start()
#----------------------------------
# LOOPのスレッド本体
#----------------------------------
def looplocal(self):
print("loop Start---------")
bwait = True
while True:
if bwait:
time.sleep(1)
else:
bwait = True
n = len(self.AR)
if n>0:
for w in range(n):
p = self.AR[w]
nowtime = int(time.time()) #秒単位
delttime = nowtime - int(p[1])
if delttime > self.TO:
print("【{}】 #削除# {}秒経過".format(p[2],delttime))
del self.AR[w] #pではダメよ。
bwait = False
#p[0].getP().sendall(b'TIMEOUT DAYO!') #CLIENTに戻す。同期ならここで呼ばないとterminateされて呼ばれない。
p[0].getP().terminate()
break;
else:
print("【{}】 ## {}秒経過".format(p[2],delttime))
return 0
#呼び出す方
T = timeOuter(5)#5秒でタイムアウトの場合
T.loop()
T = timeOuter(5)
でタイムアウト秒数を5秒に設定しながらインスタンス化し、
T.loop()
でチェックLOOPに入ります。
loop関数の中で、別スレッドでlooplocalを実行しながら待ち受けます。
最初は当然ARが長さ0のarrayなので、あれ、、Pythonだとリストか。
1秒のsleepを入れながら延々とLOOPすることになりますね。
このARに値を入れるのが、append関数ですね。
timeOuterのT.append関数では、あとから出てくるプロセス用クラスのインスタンス値をを渡し、
[インスタンス、現在のUNIX_TIME、N]のリストを、ARの1要素としてappendします。
そうすると、ARの要素すうが1になるので、timeOuterのlooplocalのなかで引っかかるっていうしくみです。
そうしてもう一度looplocal関数を見ると、現在のUNIX_TIMEと、appendされた要素の2番目の要素である、appendした時の UNIX_TIME を比較し、タイムアウト秒をこえていたら、処理することになります。
今回は、socketでrecvしたところから始まり、クライアント側はblockingで待っているという仮定なので、プロセスをkillするならば、クライアントに何らか送ってやらねばなりません。
サンプルでは非同期のためコメント化してありますが、
p[0].getP().sendall(b'TIMEOUT DAYO!') #CLIENTに戻す。同期ならここで呼ばないとterminateされて呼ばれない。
p[0].getP().terminate()
あとから出てきますが、
p[0].getP()で戻されるのは、
multiprocessing.Process
の戻り値(インスタンス)です。
それでは、multiprocessingする方のクラスです。
class procDorry:
#----------------------------------
# Processインスタンス
#----------------------------------
def getP(self):
return self.P
#----------------------------------
# クライアントAccept
#----------------------------------
def getC(self):
return self.C
#----------------------------------
# 拡張仕様用
#----------------------------------
def getA(self):
return self.A
#----------------------------------
# マルチプロセス開始
#----------------------------------
def exec(self,conn,mes,alter):
self.C = conn;
self.M = mes;
self.A = alter;
self.P = multiprocessing.Process(target=self.dorry)
self.P.start()
print("Process:")
print(self.P)
#----------------------------------
# マルチプロセス本体
#----------------------------------
def dorry(self):
#------------------------------
self.C.sendall(b'complete DAYO!') #非同期ならまず返す
#------------------------------
#execute sample
print("message:{}".format(self.M))
print("alternated:{}".format(self.A))
for w in range(10):
print(w)
time.sleep(1)
#------------------------------
#self.C.sendall(b'complete DAYO!') #同期なら最後に返す
#------------------------------
del self #このdelいるのか?
#----------------------------------
# STATIC。ここでインスタンス生成
#----------------------------------
def funcs(conn,mes,alter):
p = procDorry()
p.exec(conn,mes,alter)
#---------------------------
# terminateのサンプル
# 実際は、alterにクラスインスタンスの関数を登録しておいて
# そこにprocDorryのインスタンスを追加していく感じ
# クラスには、インスタンス、開始時刻、種別などを登録して、LOOPで検索しながらterminateさせるとかする。
#---------------------------
A = p.getA()
if callable(A): # if hasattr(A, '__call__'):
print(A)
A(p)
else:
print("alternated not callable.")
#---------------------------
#----------------------------------
Threadの時にもやりましたが、funcsというSTATIC関数の中で、procDorryのインスタンスを生成して、exec関数を実行します。
まぁ投げっぱなしってことです。
execするときに、実際にはクライアントsocket、message、Timerインスタンスを渡しています。
では、socketで受ける本体の方はというと、
class hiraSockSv:
dispct = 0
readbuf = 1024
bBlock = False #BlockingMode
address = 'localhost'
port = 12345
dorry = None #
alter = "ALTERNATED" #
#--------------------------
def setAlter(self,a):
self.alter = a
#--------------------------
def setFunc(self,f):
self.dorry = f
#--------------------------
# なんか動いてるっぽい表示
def dispN(self):
if self.dispct==0:
print("●\r",end="")
else:
print("〇\r",end="")
self.dispct = self.dispct + 1;
if self.dispct == 2:
self.dispct = 0
#--------------------------
def __init__(self,address,port):
self.address = address;
self.port = port;
#--------------------------
def setData(self,a,b):
self.address = address;
self.port = port;
#--------------------------
def test(self):
print("A:{} B:{}".format(self.address, self.port))
#--------------------------
def __enter__(self):
#print("前処理")
return self #必須
#--------------------------
def __exit__(self, exc_type, exc_value, traceback):
#print("後処理")
self.tmp = 0
#--------------------------
def localAccept(self,s):
try:
conn, addr = s.accept()
return conn, addr
except Exception as e:
if e.args[0]==10035 or e.args[0]==11:
return None,None
#--------------------------------------
def localRecv(self,conn,n):
try:
dat = conn.recv(n)
return dat
except Exception as e:
if e.args[0]==10035 or e.args[0]==11:
return None
#--------------------------------------
def exec(self):
print("Address:{} Port:{}".format(self.address, self.port))
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((self.address, self.port))
s.listen(10)
s.setblocking(False)
while True:
mes = "";
#------------------------------------------------
conn,addr = self.localAccept(s)
if conn==None:
self.dispN()
time.sleep(0.1)
continue
conn.setblocking(False) #LINUXだとこれをしないとrecvがblockingしてしまう
while True:
dat = self.localRecv(conn,self.readbuf)
if not dat:
print("complete?");
break
mes = mes + dat.decode('utf8')
#print(mes);
print("one cours !") #ここでThreadを生成させよう
if self.dorry:
self.dorry(conn,mes,self.alter)
else:
print("NOT DORRY?")
return 0
#--------------------------------------
で、最後に呼び出す方ですが、
if __name__ == '__main__':
T = timeOuter(5)
T.loop()
with hiraSockSv('127.0.0.1',12345)as S:
S.setAlter(T.append)
S.setFunc(procDorry.funcs)
S.exec()
こんな感じですね。
timeOuterを生成してチェックのloopを開始させる。
hiraSockSvのインスタンスを生成して、拡張情報としてT.appendの関数を渡す。
関数情報として、procDorry.funcsを渡す。
hiraSockSvの exec関数で実行(socketサーバの役割)を開始する。
まぁそんな感じです。
実際の挙動は、
socketでリクエストがくる➡
受信する➡
マルチプロセスを生成して受信したメッセージを渡す ➡
1秒ずつ10回LOOPして番号を表示させる ➡
プロセス終了
ということになりますが、実際は、5秒たったところで、タイムアウトチェックにひっかかり、
プロセスを終了させています。
実際に実行させたときの画面は以下で。
loop Start---------
.
Address:127.0.0.1 Port:12345
complete?
one cours !
Process:
<Process(Process-1, started)>
<bound method timeOuter.append of <__main__.timeOuter object at 0x000002380E097DA0>>
message:----+----ひらひら1----+----2----+----3----+----4----+----5----+----6
alternated:<bound method timeOuter.append of <__mp_main__.timeOuter object at 0x0000028A8EED6710>>
0
【1】 ## 1秒経過
1
【1】 ## 2秒経過
complete?
one cours !
Process:
<Process(Process-2, started)>
<bound method timeOuter.append of <__main__.timeOuter object at 0x000002380E097DA0>>
message:----+----ひらひら1----+----2----+----3----+----4----+----5----+----6
alternated:<bound method timeOuter.append of <__mp_main__.timeOuter object at 0x0000023D6A0966D8>>
0
2
【1】 ## 3秒経過
【2】 ## 1秒経過
1
3
【1】 ## 4秒経過
【2】 ## 2秒経過
complete?
one cours !
Process:
<Process(Process-3, started)>
<bound method timeOuter.append of <__main__.timeOuter object at 0x000002380E097DA0>>
message:----+----ひらひら1----+----2----+----3----+----4----+----5----+----6
alternated:<bound method timeOuter.append of <__mp_main__.timeOuter object at 0x0000025BD9CD6710>>
0
2
4
【1】 ## 5秒経過
【2】 ## 3秒経過
【3】 ## 1秒経過
1
complete?
one cours !
Process:
<Process(Process-4, started)>
<bound method timeOuter.append of <__main__.timeOuter object at 0x000002380E097DA0>>
message:----+----ひらひら1----+----2----+----3----+----4----+----5----+----6
alternated:<bound method timeOuter.append of <__mp_main__.timeOuter object at 0x0000028B3D816710>>
0
3
5
【1】 #削除# 6秒経過
【2】 ## 4秒経過
【3】 ## 2秒経過
【4】 ## 1秒経過
2
1
complete?
one cours !
Process:
<Process(Process-5, started)>
<bound method timeOuter.append of <__main__.timeOuter object at 0x000002380E097DA0>>
message:----+----ひらひら1----+----2----+----3----+----4----+----5----+----6
alternated:<bound method timeOuter.append of <__mp_main__.timeOuter object at 0x000001BB54C11CF8>>
0
4
【2】 ## 5秒経過
【3】 ## 3秒経過
【4】 ## 2秒経過
【5】 ## 1秒経過
3
2
1
5
【2】 #削除# 6秒経過
【3】 ## 4秒経過
【4】 ## 3秒経過
【5】 ## 2秒経過
4
3
2
【3】 ## 5秒経過
【4】 ## 4秒経過
【5】 ## 3秒経過
5
4
3
【3】 #削除# 6秒経過
【4】 ## 5秒経過
【5】 ## 4秒経過
5
4
【4】 #削除# 6秒経過
【5】 ## 5秒経過
5
【5】 #削除# 6秒経過
〇