apache_beam.utils.shared module

Shared class.

Shared is a helper class for managing a single instance of an object shared by multiple threads within the same process. Instances of Shared are serializable objects that can be shared by all threads of each worker process. A Shared object encapsulates a weak reference to a singleton instance of the shared resource. The singleton is lazily initialized by calls to Shared.acquire().

Example usage:

To share a very large list across all threads of each worker in a DoFn:

# Several built-in types such as list and dict do not directly support weak # references but can add support through subclassing: # https://docs.python.org/3/library/weakref.html class WeakRefList(list):   pass  class GetNthStringFn(beam.DoFn):   def __init__(self):     self._shared_handle = shared.Shared()    def setup(self):     # setup is a good place to initialize transient in-memory resources.     def initialize_list():       # Build the giant initial list.       return WeakRefList([str(i) for i in range(1000000)])      self._giant_list = self._shared_handle.acquire(initialize_list)    def process(self, element):     yield self._giant_list[element]  p = beam.Pipeline() (p | beam.Create([2, 4, 6, 8])    | beam.ParDo(GetNthStringFn())) 

Real-world uses will typically involve using a side-input to a DoFn to initialize the shared resource in a way that can’t be done with just its constructor:

class RainbowTableLookupFn(beam.DoFn):   def __init__(self):     self._shared_handle = shared.Shared()    def process(self, element, table_elements):     def construct_table():       # Construct the rainbow table from the table elements.       # The table contains lines in the form "string::hash"       result = {}       for key, value in table_elements:         result[value] = key       return result      rainbow_table = self._shared_handle.acquire(construct_table)     unhashed_str = rainbow_table.get(element)     if unhashed_str is not None:       yield unhashed_str  p = beam.Pipeline() reverse_hash_table = p | "ReverseHashTable" >> beam.Create([                 ('a', '0cc175b9c0f1b6a831c399e269772661'),                 ('b', '92eb5ffee6ae2fec3ad71c777531578f'),                 ('c', '4a8a08f09d37b73795649038408b5f33'),                 ('d', '8277e0910d750195b448797616e091ad')]) unhashed = (p             | 'Hashes' >> beam.Create([                 '0cc175b9c0f1b6a831c399e269772661',                 '8277e0910d750195b448797616e091ad'])             | 'Unhash' >> beam.ParDo(                  RainbowTableLookupFn(), reverse_hash_table)) 
class apache_beam.utils.shared.Shared[source]

Bases: object

Handle for managing shared per-process objects.

Each instance of a Shared object represents a distinct handle to a distinct object. Example usage is described in the file comment of shared.py.

This object has the following limitations: * A shared object won’t be GC’ed if there isn’t another acquire called for a different shared object. * Each stage can only use exactly one Shared token, otherwise only one Shared token, NOT NECESSARILY THE LATEST, will be “kept-alive”. * If there are two different stages using separate Shared tokens, but which get fused together, only one Shared token will be “kept-alive”.

(See documentation of _SharedMap for details.)

acquire(constructor_fn: Callable[[], Any], tag: Any | None = None) Any[source]

Acquire a reference to the object associated with this Shared handle.

Parameters:
  • constructor_fn – function that initialises / constructs the object if not present in the cache. This function should take no arguments. It should return an initialised object, or None if the object could not be initialised / constructed.

  • tag – an optional indentifier to store with the cached object. If subsequent calls to acquire use different tags, the object will be reloaded rather than returned from cache.

Returns:

A reference to an initialised object, either from the cache, or newly-constructed.