|
| 1 | +r""" |
| 2 | +Multithreading from Python |
| 3 | +""" |
| 4 | + |
| 5 | +from libc.stdlib cimport malloc, calloc, free |
| 6 | + |
| 7 | +from .types cimport * |
| 8 | +from .paridecl cimport * |
| 9 | +from gen cimport Gen, objtogen |
| 10 | + |
| 11 | +cdef class PariThreadPool: |
| 12 | + r""" |
| 13 | + Pari thread allocator |
| 14 | +
|
| 15 | + This class is intended to be used in conjunction with the multithreading |
| 16 | + capabilities of the ``ThreadPoolExecutor`` from the ``concurrent.futures`` |
| 17 | + Python library. |
| 18 | +
|
| 19 | + Examples: |
| 20 | +
|
| 21 | + >>> from concurrent.futures import ThreadPoolExecutor, as_completed |
| 22 | + >>> from cypari2 import Pari, PariThreadPool |
| 23 | + >>> pari = Pari() |
| 24 | + >>> pari.default('nbthreads', 1) |
| 25 | + >>> max_workers = 4 |
| 26 | + >>> pari_pool = PariThreadPool(max_workers) |
| 27 | + >>> square_free = [] |
| 28 | + >>> with ThreadPoolExecutor(max_workers=max_workers, initializer=pari_pool.initializer) as executor: |
| 29 | + ... futures = {executor.submit(pari.issquarefree, n): n for n in range(10**6, 10**6 + 1000)} |
| 30 | + ... for future in as_completed(futures): |
| 31 | + ... n = futures[future] |
| 32 | + ... if future.result(): |
| 33 | + ... square_free.append(n) |
| 34 | + >>> square_free.sort() |
| 35 | + >>> square_free |
| 36 | + [1000001, 1000002, 1000003, 1000005, 1000006, ..., 1000994, 1000995, 1000997, 1000999] |
| 37 | + """ |
| 38 | + def __init__(self, size_t nbthreads, size_t size=8000000, size_t sizemax=0): |
| 39 | + r""" |
| 40 | + INPUT: |
| 41 | +
|
| 42 | + - ``nbthreads`` -- the number of threads to allocate |
| 43 | +
|
| 44 | + - ``size`` -- (default: 8000000) the number of bytes for the |
| 45 | + initial PARI stack (see notes below) |
| 46 | +
|
| 47 | + - ``sizemax`` -- (default: 0) the maximal number of bytes for the |
| 48 | + dynamically increasing PARI stack. |
| 49 | + """ |
| 50 | + cdef size_t i |
| 51 | + size = max(size, pari_mainstack.rsize) |
| 52 | + sizemax = max(max(size, pari_mainstack.vsize), sizemax) |
| 53 | + self.pths = <pari_thread *> calloc(nbthreads, sizeof(pari_thread)) |
| 54 | + for i in range(nbthreads): |
| 55 | + pari_thread_valloc(self.pths + i, size, sizemax, NULL) |
| 56 | + self.ithread = 0 |
| 57 | + self.nbthreads = nbthreads |
| 58 | + |
| 59 | + def __dealloc__(self): |
| 60 | + cdef size_t i |
| 61 | + for i in range(self.ithread): |
| 62 | + pari_thread_free(self.pths + i) |
| 63 | + free(self.pths) |
| 64 | + |
| 65 | + def __repr__(self): |
| 66 | + return 'Pari thread pool with {} threads'.format(self.nbthreads) |
| 67 | + |
| 68 | + def initializer(self): |
| 69 | + if self.ithread >= self.nbthreads: |
| 70 | + raise ValueError('no more thread available') |
| 71 | + pari_thread_start(self.pths + self.ithread) |
| 72 | + self.ithread += 1 |
0 commit comments