我编写了一个脚本,该脚本应该使用多线程将多个主机名解析为ip地址。在
但是,它会在某个随机点发生故障并冻结。如何解决这个问题?在
num_threads = 100
conn = pymysql.connect(host='xx.xx.xx.xx', unix_socket='/tmp/mysql.sock', user='user', passwd='pw', db='database')
cur = conn.cursor()
def mexec(befehl):
cur = conn.cursor()
cur.execute(befehl)
websites = ['facebook.com','facebook.org' ... ... ... ...] \#10.000 websites in array
queue = Queue()
def getips(i, q):
while True:
#--resolve IP--
try:
result = socket.gethostbyname_ex(site)
print(result)
mexec("UPDATE sites2block SET ip='"+result+"', updated='yes' ") #puts site in mysqldb
except (socket.gaierror):
print("no ip")
mexec("UPDATE sites2block SET ip='no ip', updated='yes',")
q.task_done()
#Spawn thread pool
for i in range(num_threads):
worker = Thread(target=getips, args=(i, queue))
worker.setDaemon(True)
worker.start()
#Place work in queue
for site in websites:
queue.put(site)
#Wait until worker threads are done to exit
queue.join()
我的第一个想法是,你会因为DNS过载而出错——也许你的解析器不允许你每次执行超过一定数量的查询。在
此外,我发现了一些问题:
您忘了在
while
循环中正确地分配site
这可能最好用在队列上迭代的for
循环来代替。在您的版本中,您使用模块级命名空间中的site
变量,这可能导致查询加倍,其他查询被跳过。在在这种情况下,您可以控制队列是否仍有条目或等待一些条目。如果两者都不是,你可以退出你的线程。
出于安全考虑,你最好这样做
为了以后做
为了与未来的协议保持兼容,您应该使用}。在那里你可以得到你想要的所有IP(一开始,你可以限制到IPv4,但是切换到IPv6就更容易了),并且可以将它们全部放入数据库。在
socket.getaddrinfo()
,而不是{对于您的队列,代码示例可以是
应该会成功的。在
另一个问题是并行插入MySQL。一次只能执行一个MySQL查询。因此,您可以通过
threading.Lock()
或RLock()
来保护访问,也可以将答案放入另一个由另一个线程处理的队列中,该队列甚至可以捆绑它们。在您可能会发现使用^{} 比直接使用
threading
、multiprocessing
、Queue
更简单:注意:您可以轻松地从使用线程切换到进程:
^{pr2}$如果
gethostbyname_ex()
在您的操作系统上不是线程安全的,例如it might be the case on OSX,则需要它。在如果要处理
gethostbyname_ex()
中可能出现的异常:它类似于the example from the docs。在
您可以使用sentinel值向线程发出没有工作的信号,并连接线程,而不是}:
queue.task_done()
和{gethostbyname_ex()
函数已过时。要同时支持IPv4/v6地址,可以使用^{相关问题 更多 >
编程相关推荐