让这个代码来读取并处理它。这个文件相当大,有1200万行,所以目前我手动将其拆分为1000行文件,并按每1000行顺序启动每个进程(bash脚本)。在
有没有一种方法可以使用Twisted加载一个文件,并从一个文件中处理1000个项目(进度条会很好),而不需要我手动拆分它?在
在扫描仪.py在
import argparse
from tqdm import tqdm
from sys import argv
from pprint import pformat
from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
import lxml.html
from geoip import geolite2
import pycountry
from tld import get_tld
import json
import socket
poweredby = ""
server = ""
ip = ""
def cbRequest(response, url):
global poweredby, server, ip
# print 'Response version:', response.version
# print 'Response code:', response.code
# print 'Response phrase:', response.phrase
# print 'Response headers:'
# print pformat(list(response.headers.getAllRawHeaders()))
poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
server = response.headers.getRawHeaders("Server")[0]
#print poweredby
#print server
d = readBody(response)
d.addCallback(cbBody, url)
return d
def cbBody(body, ourl):
global poweredby, server,ip
#print body
html_element = lxml.html.fromstring(body)
generator = html_element.xpath("//meta[@name='generator']/@content")
ip = socket.gethostbyname(ourl)
try:
match = geolite2.lookup(ip)
if match is not None:
country = match.country
try:
c = pycountry.countries.lookup(country)
country = c.name
except:
country = ""
except:
country = ""
try:
res = get_tld("http://www" + ourl, as_object=True)
tld = res.suffix
except:
tld = ""
try:
match = re.search(r'[\w\.-]+@[\w\.-]+', body)
email = match.group(0)
except:
email = ""
permalink=ourl.rstrip().replace(".","-")
try:
item = generator[0]
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
except:
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
print val
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Scanner v0.99')
parser.add_argument(
'-i', '--input', help='Input list of domains', required=True)
args = parser.parse_args()
input = args.input
with open(input) as f:
urls = f.read().splitlines()
def mainjob(reactor, urls=urls):
for url in tqdm(urls):
agent = Agent(reactor)
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['bot']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(lambda x: None) # ignore errors
return d
react(mainjob, argv[3:])
更新1:
现在我这样做:
在文件.txt-12000000条线路
chunk01.txt-包含1000行的文件 . . . 在
我为每个块文件执行一个脚本。在
^{pr2}$要执行脚本一次:
python scanner.py file.txt
问题在于,我需要将url作为参数传递给react()。如果我把它作为12000000个文件读入内存(通过f.read()),它就太大了。因此,我分割了文件并在每个小文件上执行脚本。在
希望现在更清楚。。。在
更新2:
基于@Jean-Paul-Calderone的回答,我编写了这段代码。在
似乎很管用,但从那以后我遇到了麻烦:
180000次迭代。。。。我假设有180000个域(输入文件中的每一行),脚本只打印/输出了大约35707行(条目)。我想大概是18万。。。我知道有些域会超时。当我用“旧”的方式运行它时,它更加一致,数量更接近,即输入域的数量接近输出文件中的输出行。在
代码有什么“坏”吗?有什么想法吗?在
python scanner.py > out.txt
181668it [1:47:36, 4.82it/s]
数数台词:
wc -l out.txt
36840 out.txt
在扫描仪.py在
import argparse
from tqdm import tqdm
from sys import argv
from pprint import pformat
from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
from twisted.internet.task import cooperate
from twisted.internet.defer import gatherResults
import lxml.html
from geoip import geolite2
import pycountry
from tld import get_tld
import json
import socket
poweredby = ""
server = ""
ip = ""
def cbRequest(response, url):
global poweredby, server, ip
# print 'Response version:', response.version
# print 'Response code:', response.code
# print 'Response phrase:', response.phrase
# print 'Response headers:'
# print pformat(list(response.headers.getAllRawHeaders()))
poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
server = response.headers.getRawHeaders("Server")[0]
#print poweredby
#print server
d = readBody(response)
d.addCallback(cbBody, url)
return d
def cbBody(body, ourl):
global poweredby, server,ip
#print body
html_element = lxml.html.fromstring(body)
generator = html_element.xpath("//meta[@name='generator']/@content")
ip = socket.gethostbyname(ourl)
try:
match = geolite2.lookup(ip)
if match is not None:
country = match.country
try:
c = pycountry.countries.lookup(country)
country = c.name
except:
country = ""
except:
country = ""
try:
res = get_tld("http://www" + ourl, as_object=True)
tld = res.suffix
except:
tld = ""
try:
match = re.search(r'[\w\.-]+@[\w\.-]+', body)
email = match.group(0)
except:
email = ""
permalink=ourl.rstrip().replace(".","-")
try:
item = generator[0]
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
except:
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
print val
def main(reactor, url_path):
urls = open(url_path)
return mainjob(reactor, (url.strip() for url in urls))
def mainjob(reactor, urls=argv[2:]):
#for url in urls:
# print url
agent = Agent(reactor)
work = (process(agent, url) for url in tqdm(urls))
tasks = list(cooperate(work) for i in range(100))
return gatherResults(list(task.whenDone() for task in tasks))
def process(agent, url):
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['bot']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(lambda x: None) # ignore errors
return d
react(main, ["./domains.txt"])
更新3:
已更新将错误打印到的代码错误.txt在
import argparse
from tqdm import tqdm
from sys import argv
from pprint import pformat
from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
from twisted.internet.task import cooperate
from twisted.internet.defer import gatherResults
import lxml.html
from geoip import geolite2
import pycountry
from tld import get_tld
import json
import socket
poweredby = ""
server = ""
ip = ""
f = open("errors.txt", "w")
def error(response, url):
f.write("Error: "+url+"\n")
def cbRequest(response, url):
global poweredby, server, ip
# print 'Response version:', response.version
# print 'Response code:', response.code
# print 'Response phrase:', response.phrase
# print 'Response headers:'
# print pformat(list(response.headers.getAllRawHeaders()))
poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
server = response.headers.getRawHeaders("Server")[0]
#print poweredby
#print server
d = readBody(response)
d.addCallback(cbBody, url)
return d
def cbBody(body, ourl):
global poweredby, server,ip
#print body
html_element = lxml.html.fromstring(body)
generator = html_element.xpath("//meta[@name='generator']/@content")
ip = socket.gethostbyname(ourl)
try:
match = geolite2.lookup(ip)
if match is not None:
country = match.country
try:
c = pycountry.countries.lookup(country)
country = c.name
except:
country = ""
except:
country = ""
try:
res = get_tld("http://www" + ourl, as_object=True)
tld = res.suffix
except:
tld = ""
try:
match = re.search(r'[\w\.-]+@[\w\.-]+', body)
email = match.group(0)
except:
email = ""
permalink=ourl.rstrip().replace(".","-")
try:
item = generator[0]
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
except:
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
print val
def main(reactor, url_path):
urls = open(url_path)
return mainjob(reactor, (url.strip() for url in urls))
def mainjob(reactor, urls=argv[2:]):
#for url in urls:
# print url
agent = Agent(reactor)
work = (process(agent, url) for url in tqdm(urls))
tasks = list(cooperate(work) for i in range(100))
return gatherResults(list(task.whenDone() for task in tasks))
def process(agent, url):
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['crawler']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(error, url)
return d
react(main, ["./domains.txt"])
f.close()
更新4:
我用Wireshark捕捉了流量,只有2个域,这些域以前出错了:
user@laptop:~/crawler$ python scanner.py
2it [00:00, 840.71it/s]
user@laptop:~/crawler$ cat errors.txt
Error: google.al
Error: fau.edu.al
正如你所看到的,他们有错误,但是在Wireshark中,我看到了这样的反应:
您需要为程序创建的并发量添加一个限制。目前,您同时处理所有给定的URL,或者至少尝试:
这将为每个URL发出一个请求,而不等待任何一个URL完成。相反,使用
^{pr2}$twisted.internet.task.cooperate
一次运行一个有限的数字。一次运行一个请求:你可能想要的不止这些。所以,再调用cooperative()几次:
一次最多可以有100个请求。每个任务从
work
提取下一个元素并等待它。gatherResults
等待所有100个任务完成。在现在只需避免一次将完整的输入加载到内存中:
这将打开url文件,但只在需要时读取其中的行。在
相关问题 更多 >
编程相关推荐