Skip to content

NotImplementedError - can't pickle _thread._local objects #64

@pwnslinger

Description

@pwnslinger

Hi, I am working on importing data to a huge database, so then I decided to use multiprocessing but since it couldn't work fine with pickling functions, I decided to use multiprocess, but I kept receiving the following error:

  File "/home/pwnslinger/.virtualenvs/twitter/lib/python3.6/site-packages/multiprocess/pool.py", line 528, in __reduce__
    'pool objects cannot be passed between processes or pickled'
NotImplementedError: pool objects cannot be passed between processes or pickled

Then I tried to remove pool object from __dict__, but now I receive the same error I was getting when I was using multiprocessing package.

File "/home/pwnslinger/.virtualenvs/twitter/lib/python3.6/site-packages/dill/_dill.py", line 910, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 496, in save
    rv = reduce(self.proto)
TypeError: can't pickle _thread._local objects

Here is part of my code related to this matter. Any help would be appreciated.

class Worker(object):
    def __init__(self, files, engine):
        self.files = files
        self.engine = engine

    @staticmethod
    def worker(self):
        try:
            conn = self.engine.raw_connect()
        except PoolError:
            l.debug('cannot dedicate pool!')
        if conn:
            cur = conn.cursor()
        pid = os.getpid()
        l.debug("started process %d on %d files..."%(pid, len(self.files)))
        for fname in self.files:
            l.debug("working on %s file"%os.path.basename(fname))
            df = pd.read_json(fname, lines=True)
            for i in range(len(df)):
                content = json.loads(df.iloc[i].to_json())
                if content is None:
                    continue
                if "info" in content.keys():
                    l.debug("%d activities added to the database..."%content["info"]["activity_count"])
                    continue
                populate_data(content, cur, conn)

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)

class Controller(object):
    def __init__(self, count, maxchild, engine):
        self.cpu_count = count
        self.maxchild = maxchild
        self.engine = engine
        self.procs = []
        self.fname_list = glob.glob(os.path.expanduser('~/Projects/[CENSORED]*'))
        self.files_per_cpu = len(self.fname_list)//count
        self.init()

    def init(self):
        # fix MemoryError
        proc_pool = mp.Pool(processes=self.cpu_count, maxtasksperchild=self.maxchild)
        for index in range(self.cpu_count):
            try:
                files = self.fname_list[index*self.files_per_cpu:index*self.files_per_cpu+self.files_per_cpu]
            except IndexError:
                files = self.fname_list[index*self.files_per_cpu:]

            w = Worker(files, engine)
            proc = proc_pool.apply_async(w.worker)
            self.procs.append(proc)

        for proc in self.procs:
            proc.get()

        proc_pool.close()
        proc_pool.join()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions