我一直在努力使这段代码(在github上提供)工作,以便使用API流式传输帐户。但这给了我一个错误。它与代码行“p.start()”有关。我无法找出真正导致此错误的原因以及如何修复它
发动机(mysql://root:***@localhost:3306/local) 回溯(最近一次呼叫): 文件“C:\Users\Hafsa\Documents\Hafsa\designe\Data Collection\collect.py”,第177行,in 主() 文件“C:\Users\Hafsa\Documents\Hafsa\demission\Data Collection\collect.py”,第145行,在main中 p、 开始() 文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\process.py”,第105行,开始 self.\u popen=self.\u popen(self) 文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\context.py”,第223行,in\u Popen 返回\u default\u context.get\u context().Process.\u Popen(Process\u obj) 文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\context.py”,第322行,in\u Popen 返回Popen(进程对象) 文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\popen\u spawn\u win32.py”,第65行,在init reduction.dump(处理对象,到子对象) 转储文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\reduction.py”第60行 ForkingPickler(文件,协议).dump(obj) TypeError:无法pickle模块对象 回溯(最近一次呼叫): 文件“”,第1行,在 文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\spawn.py”,第99行,spawn\u main new\u handle=reduction.steal\u handle(父\u pid,管道\u handle) 文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\reduction.py”,第87行,在U句柄中 _winapi.DUPLICATE|SAME|访问| | winapi.DUPLICATE|CLOSE|源) PermissionError:[WinError 5]访问被拒绝
def parse_args():
# Parses the command line arguments.
parser = argparse.ArgumentParser(
description='Enumerate public Twitter profiles and tweets')
parser.add_argument(
'--max-id',
type=int,
help='Max Twitter ID to use for enumeration',
default=DEFAULT_MAX_ID),
parser.add_argument(
'--min-id',
type=int,
help='Minimum ID to use for enumeration',
default=DEFAULT_MIN_ID)
parser.add_argument(
'--enum-percentage',
'-p',
type=int,
default=100,
help='The percentage of 32bit account space to enumerate (0-100).')
parser.add_argument(
'--no-stream',
dest='stream',
action='store_false',
help='Disable the streaming',
default=True)
parser.add_argument(
'--no-enum',
dest='enum',
action='store_false',
help='Disable the account id enumeration',
default=True)
parser.add_argument(
'--stream-query',
'-q',
type=str,
help='The query to use when streaming results',
default=None)
parser.add_argument(
'--account-filename',
'-af',
type=str,
help='The filename to store compressed account JSON data',
default='accounts.json.gz')
parser.add_argument(
'--stdout',
action='store_true',
dest='stdout',
help='Print JSON to stdout instead of a file',
default=False)
return parser.parse_args()
def main():
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
level=logging.INFO)
logger = logging.getLogger(__name__)
args = parse_args()
consumer_key = os.environ.get('TWEEPY_CONSUMER_KEY')
consumer_secret = os.environ.get('TWEEPY_CONSUMER_SECRET')
access_token = os.environ.get('TWEEPY_ACCESS_TOKEN')
access_token_secret = os.environ.get('TWEEPY_ACCESS_TOKEN_SECRET')
if not (consumer_key and consumer_secret and access_token
and access_token_secret):
logger.error('Need to specify the OAuth configuration.')
sys.exit(1)
user_auth = OAuthHandler(consumer_key, consumer_secret)
user_auth.set_access_token(access_token, access_token_secret)
user_api = API(
user_auth, wait_on_rate_limit_notify=True, wait_on_rate_limit=True)
api_auth = AppAuthHandler(consumer_key, consumer_secret)
app_api = API(
api_auth, wait_on_rate_limit_notify=True, wait_on_rate_limit=True)
account_queue = RedisQueue('accounts')
lookup_queue = RedisQueue('lookup')
streamer_class = JSONStreamer
if args.stdout:
streamer_class = StdoutStreamer
account_streamer = streamer_class(args.account_filename)
processes = []
if args.stream:
stream_process = Process(
target=start_streamer,
args=[user_api, account_queue, lookup_queue],
kwargs={'query': args.stream_query})
processes.append(stream_process)
else:
logger.info('Skipping stream')
if args.enum:
enumerate_process = Process(
target=fetch_accounts,
args=[user_api, account_queue],
kwargs={
'min_id': args.min_id,
'max_id': args.max_id,
'percentage': args.enum_percentage
})
processes.append(enumerate_process)
else:
logger.info('Skipping enum')
lookup_account_process = Process(target=start_lookup, args=[app_api, lookup_queue, account_queue])
processes.append(lookup_account_process)
for p in processes:
p.start()
# The main loop's job is simple - it simply fetches account dicts coming
# from the various processes and saves them to the database so the tweet
# fetcher can process them.
try:
account_count = 0
while True:
try:
account = account_queue.get()
# Verify the account isn't already in our database
if Account.exists(account['id']):
continue
account_count += 1
if account_count % CHECKIN_THRESHOLD == 0:
logger.info(
'Accounts discovered: {}'.format(account_count))
# Add the account to our database cache
Account.from_dict(account).save()
# Write the account to our account streamer
account_streamer.write_row(account)
except Exception as e:
print('Error fetching account: {}'.format(e))
except KeyboardInterrupt:
print('\nCtrl+C received. Shutting down...')
for p in processes:
p.terminate()
p.join()
account_streamer.close()
if __name__ == '__main__':
main()
目前没有回答
相关问题 更多 >
编程相关推荐