QPython與Termux的交互:PIOS.py
PIOS = Python Interactive Over Socket 基于套接字的Python交互
Version 版本:v1.1
默認(rèn)端口號 Default Port : 12000 .
In Termux:
? python -i PIOS/server.py
In QPython:
? from PIOS.client import *
? Connect()
In Termux:
? python -i PIOS/server.py 15234
In QPython:
? from PIOS.client import *
? Connect(15234)
In QPython:
? from PIOS.server import *
? Start()
In Termux:
? python -i PIOS/client.py
In QPython:
? from PIOS.server import *
? Start(18379)
In Termux:
? python -i PIOS/client.py 18379
Server Functions 服務(wù)器函數(shù):
? Start Server 啟動服務(wù)器:
? ? Start(port) # port 端口號,Return None
? ? Start() # port = 12000,Return None
Client Functions 客戶端函數(shù):
? Connect to Server 連接服務(wù)器:
? ? Connect(port) # port 端口號,Return None
? ? Connect() # port = 12000,Return None
? Remote Functions 遠(yuǎn)程函數(shù):
? ? RemoteEval(code) # Return: (Result,Error)
? ? RemoteExec(code) # Return: (Result,Error)
? ? RemoteSl4a(args) # Return: (Result,Error),Only for QPython is a server 僅針對QPython服務(wù)器
? ? RemoteShell(code) # Return: (Result,Error), if QPython is a client and Termux is a server, it will open Termux 如果QPython 3C是客戶端且Termux是服務(wù)器,將會打開Termux
by 乘著船 20220515
源代碼server.py:
def Now():
? ?t=time.localtime()
? ?return "%04d-%02d-%02d,%02d:%02d:%02d"%(t.tm_year,t.tm_mon,t.tm_mday,t.tm_hour,t.tm_min,t.tm_sec)
def Accept():
? ?global Conn
? ?Conn,addr = Socket.accept()
? ?print(Now(),'對接地址',addr)
def Send(Result,Error):
? ?Data=bytes(repr(Result)+'\x00'+Error,'utf-8')
? ?Conn.sendall(Data)
def Recv():
? ?a=bytearray()
? ?while True:
? ? ? ?b=Conn.recv(1024)
? ? ? ?a.extend(b)
? ? ? ?if len(b)<1024:
? ? ? ? ? ?break
? ?return str(a,'utf-8')
from traceback import format_exc
import socket,time,os,sys
class Out:
? ?result=[]
? ?error=''
? ?write=result.append
? ?def flush():
? ? ? ?pass
def RunEval(code):
? ?try:
? ? ? ?Out.result=eval(code,Env,Env)
? ?except:
? ? ? ?Out.error=format_exc()
? ? ? ?Out.result=''
def RunExec(code):
? ?sys.stdout=Out
? ?try:
? ? ? ?exec(code,Env,Env)
? ?except:
? ? ? ?Out.error=format_exc()
? ?Out.result=''.join(Out.result)
? ?sys.stdout=fileOut
def RunShell(code):
? ?code='os.system('+repr(code)+')'
? ?print()
? ?return RunEval(code)
def RunSl4a(code):
? ?code='droid.'+code
? ?try:
? ? ? ?data=eval(code,Env,Env)
? ? ? ?Out.result=data.result
? ? ? ?Out.error=data.error
? ? ? ?if Out.error==None:
? ? ? ? ? ?Out.error=''
? ?except:
? ? ? ?Out.error=format_exc()
? ? ? ?Out.result=''
def Run(cmd):
? ?mode,code=cmd.split(':',1)
? ?Glb['Run'+mode](code)
? ?data=Out.result,Out.error
? ?Out.result=[]
? ?Out.write=Out.result.append
? ?Out.error=''
? ?return data
Glb=globals()
def Once():
? ?data=Recv()
? ?print(Now(),'傳入指令',data)
? ?try:
? ? ? ?data=Run(data)
? ?except:
? ? ? ?print(Now(),'連接中斷')
? ? ? ?exit()
? ?print(Now(),'返回結(jié)果',data)
? ?Send(*data)
def Exit(Text='操作結(jié)束,謝謝使用!'):
? ?sys.stdout=fileOut
? ?Text=str(Text)
? ?Send(Text,'')
? ?Conn.close()
? ?Socket.close()
? ?print(Now(),'對接結(jié)束')
? ?exit()
Env={'Exit':Exit,'os':os}
def Start(port=12000):#對接流程
? ?global Socket
? ?print(Now(),'對接開始 端口號:%s'%port)
? ?Socket=socket.socket()
? ?host=socket.gethostname()
? ?try:
? ? ? ?Socket.bind((host,port))
? ? ? ?print(Now(),'已連接上',host,port)
? ?except:
? ? ? ?print(Now(),'連接失敗,對接結(jié)束',format_exc(),host,port)
? ? ? ?exit()
? ?Socket.listen(5)
? ?Socket.settimeout(120)
? ?Accept()
? ?while True:
? ? ? ?Once()
projectPath=__file__[:__file__.rfind('/')]
os.chdir(projectPath)
try:
? ?port=int(sys.argv[1])
except:
? ?port=12000
if os.environ['HOME'].find('qpython')!=-1:
? ?from androidhelper import Android as droid
? ?droid=droid()
? ?Env['droid']=droid
fileOut=sys.__stdout__
print('日志文件:',fileOut)
sys.stdout=fileOut
if __name__=='__main__':
? ?Start(port)
源代碼client.py:
import time,socket,os,sys
def Connect(port=12000):
? ?global Socket
? ?Socket = socket.socket()
? ?host = socket.gethostname()
? ?startTime = time.time()
? ?while time.time()-startTime<=5:
? ? ? ?try:
? ? ? ? ? ?Socket.connect((host, port))
? ? ? ? ? ?break
? ? ? ?except:
? ? ? ? ? ?pass
? ?else:
? ? ? ?print('連接到服務(wù)端超時')
? ? ? ?exit()
? ?global isTermux
? ?isTermux=RemoteEval('os.environ.get("HOME","").find("termux")!=-1')[0] and os.environ.get('HOME','').find('indi.czc.qpython')!=-1
def Recv():
? ?a=bytearray()
? ?while True:
? ? ? ?b=Socket.recv(1024)
? ? ? ?a.extend(b)
? ? ? ?if len(b)<1024:
? ? ? ? ? ?break
? ?s=str(a,'utf-8').rsplit('\x00',1)
? ?try:
? ? ? ?return eval(s[0]),s[1]
? ? ? ?#數(shù)據(jù)類型解析成功
? ?except:
? ? ? ?#數(shù)據(jù)類型解析失敗
? ? ? ?return [s[0],s[1]]
def Send(Data):
? ?Data=bytes(Data,'utf-8')
? ?Socket.sendall(Data)
def RemoteEval(code):
? ?try:
? ? ? ?Send('Eval:'+code)
? ?except:
? ? ? ?SendErrCode()
? ?return Recv()
def RemoteExec(code):
? ?try:
? ? ? ?Send('Exec:'+code)
? ?except:
? ? ? ?SendErrCode()
? ?return Recv()
def RemoteShell(code):
? ?try:
? ? ? ?Send('Shell:'+code)
? ?except:
? ? ? ?SendErrCode()
? ?if isTermux:
? ? ? ?try:
? ? ? ? ? ?droid.launch(None,'com.termux',False)
? ? ? ?except:
? ? ? ? ? ?pass
? ?return Recv()
def RemoteSl4a(*args):
? ?method=args[0]
? ?params=repr(args[1:])
? ?code=method+params
? ?try:
? ? ? ?Send('Sl4a:'+code)
? ?except:
? ? ? ?SendErrCode()
? ?return Recv()
def SendErrCode():
? ?Send('Eval:"Traceback: Error Code"')
isTermux=None
if os.environ.get('HOME','').find('qpython')!=-1:
? ?from androidhelper import Android as droid
? ?droid=droid()
try:
? ?port=int(sys.argv[1])
except:
? ?port=12000
if __name__=='__main__':
? ?Connect(port)
PIOS用于在QPython各分支之間,以及QPython、Termux、PyDroid之間互相通信傳送數(shù)據(jù)。
視頻演示:BV1LP411r7Wh
作者:乘著船@Bilibili
更多文章+下載鏈接:https://www.bilibili.com/read/readlist/rl321663