允许一次运行一个Python应用程序实例

Socket-Singleton的Python项目详细描述


插座_单例.py在

基于套接字的单实例Python应用程序,具有干净的接口

,没有锁文件、互斥锁、依赖项或tomoolery

安装:

pip install Socket_Singleton

Import:

From Socket_Singleton import Socket_Singleton

构造函数:

Socket_Singleton(address="127.0.0.1", port=1337, timeout=0, client=True, strict=True, max_clients=0)

用法:

Socket_Singleton()

或保留参考:

app = Socket_Singleton()

然后附加一个回调,并从应用程序的后续调用中捕获参数:

def my_callback(arg):
    print(arg)

app.trace(my_callback)

另请参见:

Common TCP/UDP Port Numbers

建议在构造函数中指定端口*

示例:

假设我们有申请,应用程序副本,我们希望限制为单个实例。在

^{pr2}$

第一次应用程序副本启动时间:

>> C:\current\working\directory λ python app.py
>> 

在应用程序副本正常执行。(这里,应用程序副本阻止,直到满足input()。用你自己的逻辑来代替这个。本页上的示例和基本配方包含这些调用,仅用于演示目的。)

现在,在另一个壳中,如果我们尝试:

>> C:\current\working\directory λ python app.py
>> C:\current\working\directory λ

口译员马上就离开了,我们在提示符处就回来了。在


我们还可以访问参数,这些参数是在随后使用arguments属性运行python app.py时传递的,尽管不打算直接访问,但使用trace()方法可能更方便。这允许您注册一个回调,当arguments被追加时,它将被调用(就像其他实例try一样)。在

Socket_Singleton.trace(observer, *args, **kwargs)

#app.py

from Socket_Singleton import Socket_Singleton

def callback(argument, *args, **kwargs):
    print(argument)
    #do_a_thing(argument)

def main():
    app = Socket_Singleton()
    app.trace(callback, *args, **kwargs)
    input() #Blocking call to simulate your_business_logic() 

if __name__ == "__main__":
    main()

在终点站:

>> C:\current\working\directory λ python app.py
>> 

在另一个shell中,python app.py的后续尝试如下所示:

>> C:\current\working\directory λ python app.py foo bar baz
>> C:\current\working\directory λ

同时,我们对原始的python app.pyshell的输出如下所示:

>> C:\current\working\directory λ python app.py
>> foo
>> bar
>> baz

我们还可以使用类似的接口^{str1}$通过untrace()分离给定的观察者/回调。在

Socket_Singleton.untrace(observer)


如果您希望disconnect过早地从端口断开连接,从而释放“锁”,有一种close()方法:

#app.py

from Socket_Singleton import Socket_Singleton

def main():
    app = Socket_Singleton()
    app.close()
    print("Running!")
    input()

if __name__ == "__main__":
    main()

在终点站:

>> C:\current\working\directory λ python app.py
>> Running!
>> 

在一个新的外壳里:

>> C:\current\working\directory λ python app.py
>> Running!
>> 

还实现了Context manager协议:

with Socket_Singleton():
    input() #Blocking call to simulate your_business_logic()

Socket_Singleton.__enter__()返回self,以便在需要时可以访问对象:

with Socket_Singleton() as ss:
    ss.trace(callback)
    input() #Blocking call to simulate your_business_logic()

  • timeout

以秒为单位的持续时间,指定保持套接字的时间。默认为0(无超时,保持活动状态)。在初始化结束时,即套接字绑定成功后,立即开始倒计时。在


  • clients

一个整数属性,描述自实例化后连接了多少个客户端进程。在


  • max_clients

一个正整数,描述在内部调用close()并释放端口之前要捕获多少个客户端进程。默认为0(无最大值)


  • strict=False

我们可以引发并捕获一个自定义的exceptionMultipleSingletonsError,而不是完全破坏未能成为单例的进程。在

from Socket_Singleton import Socket_Singleton, MultipleSingletonsError

def callback(arg):
    print(f"callback: {arg}")

def main():
    try:
        app = Socket_Singleton(strict=False)
    except MultipleSingletonsError as err:
        print("We are not the singleton.")
        print(err)
    else:
        print("We are the singleton!")
        app.trace(callback)
        [print(arg) for arg in app.arguments]
        # print(app)
        # print(repr(app))
        # help(app)

    input()

if __name__ == "__main__":
    main()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何在主类中调用不同的类方法时删除空指针异常?   java充气器和循环不能正常工作   json无法在java中正确传递参数,只有字符串可以正常工作   xml解析JAXB:如何解组由多个标记或属性值组成的Java对象   java如何在Eclipse的Tomcat7上运行web应用程序而不出现“启动期间子容器失败”错误?   如何从jna引用指针检索java类   java推入堆栈意味着什么?移动还是复制方法?   java仅获取相同的udp数据包   java OpenAPI/SwaggerUI注释和@BeanParam   java如何解决Jenkins用户界面缓慢的问题?   amazon web服务批量写入DynamoDB[Java]   java从Redis集合检索数据   图像构造函数中的java差异