如何修复此类型错误“cant pickle module objects”,尝试从API收集tweet

2024-10-01 07:40:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直在努力使这段代码(在github上提供)工作,以便使用API流式传输帐户。但这给了我一个错误。它与代码行“p.start()”有关。我无法找出真正导致此错误的原因以及如何修复它

发动机(mysql://root:***@localhost:3306/local) 回溯(最近一次呼叫): 文件“C:\Users\Hafsa\Documents\Hafsa\designe\Data Collection\collect.py”,第177行,in 主() 文件“C:\Users\Hafsa\Documents\Hafsa\demission\Data Collection\collect.py”,第145行,在main中 p、 开始() 文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\process.py”,第105行,开始 self.\u popen=self.\u popen(self) 文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\context.py”,第223行,in\u Popen 返回\u default\u context.get\u context().Process.\u Popen(Process\u obj) 文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\context.py”,第322行,in\u Popen 返回Popen(进程对象) 文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\popen\u spawn\u win32.py”,第65行,在init reduction.dump(处理对象,到子对象) 转储文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\reduction.py”第60行 ForkingPickler(文件,协议).dump(obj) TypeError:无法pickle模块对象 回溯(最近一次呼叫): 文件“”,第1行,在 文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\spawn.py”,第99行,spawn\u main new\u handle=reduction.steal\u handle(父\u pid,管道\u handle) 文件“C:\Users\Hafsa\Anaconda3\lib\multiprocessing\reduction.py”,第87行,在U句柄中 _winapi.DUPLICATE|SAME|访问| | winapi.DUPLICATE|CLOSE|源) PermissionError:[WinError 5]访问被拒绝

def parse_args():
    # Parses the command line arguments.
    parser = argparse.ArgumentParser(
        description='Enumerate public Twitter profiles and tweets')
    parser.add_argument(
        '--max-id',
        type=int,
        help='Max Twitter ID to use for enumeration',
        default=DEFAULT_MAX_ID),
    parser.add_argument(
        '--min-id',
        type=int,
        help='Minimum ID to use for enumeration',
        default=DEFAULT_MIN_ID)
    parser.add_argument(
        '--enum-percentage',
        '-p',
        type=int,
        default=100,
        help='The percentage of 32bit account space to enumerate (0-100).')
    parser.add_argument(
        '--no-stream',
        dest='stream',
        action='store_false',
        help='Disable the streaming',
        default=True)
    parser.add_argument(
        '--no-enum',
        dest='enum',
        action='store_false',
        help='Disable the account id enumeration',
        default=True)
    parser.add_argument(
        '--stream-query',
        '-q',
        type=str,
        help='The query to use when streaming results',
        default=None)
    parser.add_argument(
        '--account-filename',
        '-af',
        type=str,
        help='The filename to store compressed account JSON data',
        default='accounts.json.gz')
    parser.add_argument(
        '--stdout',
        action='store_true',
        dest='stdout',
        help='Print JSON to stdout instead of a file',
        default=False)
    return parser.parse_args()


def main():
    logging.basicConfig(
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt='%m/%d/%Y %I:%M:%S %p',
        level=logging.INFO)
    logger = logging.getLogger(__name__)

    args = parse_args()
    consumer_key = os.environ.get('TWEEPY_CONSUMER_KEY')
    consumer_secret = os.environ.get('TWEEPY_CONSUMER_SECRET')
    access_token = os.environ.get('TWEEPY_ACCESS_TOKEN')
    access_token_secret = os.environ.get('TWEEPY_ACCESS_TOKEN_SECRET')

    if not (consumer_key and consumer_secret and access_token
            and access_token_secret):
        logger.error('Need to specify the OAuth configuration.')
        sys.exit(1)

    user_auth = OAuthHandler(consumer_key, consumer_secret)
    user_auth.set_access_token(access_token, access_token_secret)
    user_api = API(
        user_auth, wait_on_rate_limit_notify=True, wait_on_rate_limit=True)

    api_auth = AppAuthHandler(consumer_key, consumer_secret)
    app_api = API(
        api_auth, wait_on_rate_limit_notify=True, wait_on_rate_limit=True)

    account_queue = RedisQueue('accounts')
    lookup_queue = RedisQueue('lookup')

    streamer_class = JSONStreamer
    if args.stdout:
        streamer_class = StdoutStreamer

    account_streamer = streamer_class(args.account_filename)

    processes = []

    if args.stream:
        stream_process = Process(
            target=start_streamer,
            args=[user_api, account_queue, lookup_queue],
            kwargs={'query': args.stream_query})
        processes.append(stream_process)
    else:
        logger.info('Skipping stream')

    if args.enum:
        enumerate_process = Process(
            target=fetch_accounts,
            args=[user_api, account_queue],
            kwargs={
                'min_id': args.min_id,
                'max_id': args.max_id,
                'percentage': args.enum_percentage
            })
        processes.append(enumerate_process)
    else:
        logger.info('Skipping enum')

    lookup_account_process = Process(target=start_lookup, args=[app_api, lookup_queue, account_queue])
    processes.append(lookup_account_process)


    for p in processes:
        p.start()

    # The main loop's job is simple - it simply fetches account dicts coming
    # from the various processes and saves them to the database so the tweet
    # fetcher can process them.
    try:
        account_count = 0
        while True:
            try:
                account = account_queue.get()
                # Verify the account isn't already in our database
                if Account.exists(account['id']):
                    continue
                account_count += 1
                if account_count % CHECKIN_THRESHOLD == 0:
                    logger.info(
                        'Accounts discovered: {}'.format(account_count))
                # Add the account to our database cache
                Account.from_dict(account).save()
                # Write the account to our account streamer
                account_streamer.write_row(account)
            except Exception as e:
                print('Error fetching account: {}'.format(e))
    except KeyboardInterrupt:
        print('\nCtrl+C received. Shutting down...')
        for p in processes:
            p.terminate()
            p.join()
        account_streamer.close()


if __name__ == '__main__':
    main()

Tags: 文件thetopyaddiddefaultparser