diff --git a/happybase/table.py b/happybase/table.py index d0469e1..a763a1e 100644 --- a/happybase/table.py +++ b/happybase/table.py @@ -4,14 +4,14 @@ import logging from numbers import Integral -from struct import Struct - -from six import iteritems from Hbase_thrift import TScan +from six import iteritems +from struct import Struct +from thriftpy.transport import TTransportException -from .util import thrift_type_to_dict, bytes_increment, OrderedDict from .batch import Batch +from .util import OrderedDict, bytes_increment, thrift_type_to_dict logger = logging.getLogger(__name__) @@ -38,12 +38,26 @@ def make_ordered_row(sorted_columns, include_timestamp): return od +def safe_call(function): + def safe(self, *args, **kwargs): + try: + return function(self, *args, **kwargs) + except (BrokenPipeError, TTransportException): + logger.debug("Network error: refresh thrift connection") + self.connection._refresh_thrift_client() + self.connection.open() + return function(self, *args, **kwargs) + + return safe + + class Table(object): """HBase table abstraction class. This class cannot be instantiated directly; use :py:meth:`Connection.table` instead. """ + def __init__(self, name, connection): self.name = name self.connection = connection @@ -55,6 +69,7 @@ def __repr__(self): self.name, ) + @safe_call def families(self): """Retrieve the column families for this table. @@ -68,11 +83,13 @@ def families(self): families[name] = thrift_type_to_dict(descriptor) return families + @safe_call def _column_family_names(self): """Retrieve the column family names for this table (internal use)""" names = self.connection.client.getColumnDescriptors(self.name).keys() return [name.rstrip(b':') for name in names] + @safe_call def regions(self): """Retrieve the regions for this table. @@ -86,6 +103,7 @@ def regions(self): # Data retrieval # + @safe_call def row(self, row, columns=None, timestamp=None, include_timestamp=False): """Retrieve a single row of data. @@ -131,6 +149,7 @@ def row(self, row, columns=None, timestamp=None, include_timestamp=False): return make_row(rows[0].columns, include_timestamp) + @safe_call def rows(self, rows, columns=None, timestamp=None, include_timestamp=False): """Retrieve multiple rows of data. @@ -176,6 +195,7 @@ def rows(self, rows, columns=None, timestamp=None, return [(r.row, make_row(r.columns, include_timestamp)) for r in results] + @safe_call def cells(self, row, column, versions=None, timestamp=None, include_timestamp=False): """Retrieve multiple versions of a single cell from the table. @@ -219,6 +239,7 @@ def cells(self, row, column, versions=None, timestamp=None, for c in cells ] + @safe_call def scan(self, row_start=None, row_stop=None, row_prefix=None, columns=None, filter=None, timestamp=None, include_timestamp=False, batch_size=1000, scan_batching=None, @@ -439,7 +460,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None, # # Data manipulation # - + @safe_call def put(self, row, data, timestamp=None, wal=True): """Store data in the table. @@ -463,6 +484,7 @@ def put(self, row, data, timestamp=None, wal=True): with self.batch(timestamp=timestamp, wal=wal) as batch: batch.put(row, data) + @safe_call def delete(self, row, columns=None, timestamp=None, wal=True): """Delete data from the table. @@ -483,6 +505,7 @@ def delete(self, row, columns=None, timestamp=None, wal=True): with self.batch(timestamp=timestamp, wal=wal) as batch: batch.delete(row, columns) + @safe_call def batch(self, timestamp=None, batch_size=None, transaction=False, wal=True): """Create a new batch operation for this table. @@ -529,6 +552,7 @@ def batch(self, timestamp=None, batch_size=None, transaction=False, # Atomic counters # + @safe_call def counter_get(self, row, column): """Retrieve the current value of a counter column. @@ -550,6 +574,7 @@ def counter_get(self, row, column): # is correctly initialised if didn't exist yet. return self.counter_inc(row, column, value=0) + @safe_call def counter_set(self, row, column, value=0): """Set a counter column to a specific value. @@ -567,6 +592,7 @@ def counter_set(self, row, column, value=0): """ self.put(row, {column: pack_i64(value)}) + @safe_call def counter_inc(self, row, column, value=1): """Atomically increment (or decrements) a counter column. @@ -586,6 +612,7 @@ def counter_inc(self, row, column, value=1): return self.connection.client.atomicIncrement( self.name, row, column, value) + @safe_call def counter_dec(self, row, column, value=1): """Atomically decrement (or increments) a counter column.