# -*- coding: utf-8 -*- import unittest import os import tempfile import copy from tables import * from tables.index import Index, default_auto_index, default_index_filters from tables.idxutils import calc_chunksize from tables.tests.common import verbose, allequal, heavy, cleanup, \ PyTablesTestCase, TempFileMixin from tables.exceptions import OldIndexWarning # To delete the internal attributes automagically unittest.TestCase.tearDown = cleanup import numpy # Sensible parameters for indexing with small blocksizes minRowIndex = 10 small_blocksizes = (96, 24, 6, 3) class TDescr(IsDescription): var1 = StringCol(itemsize=4, dflt=b"", pos=1) var2 = BoolCol(dflt=0, pos=2) var3 = IntCol(dflt=0, pos=3) var4 = FloatCol(dflt=0, pos=4) class BasicTestCase(PyTablesTestCase): compress = 0 complib = "zlib" shuffle = 0 fletcher32 = 0 nrows = minRowIndex ss = small_blocksizes[2] def setUp(self): # Create an instance of an HDF5 Table self.file = tempfile.mktemp(".h5") self.fileh = open_file(self.file, "w") self.rootgroup = self.fileh.root self.populateFile() # Close the file self.fileh.close() def populateFile(self): group = self.rootgroup # Create a table title = "This is the IndexArray title" self.filters = Filters(complevel=self.compress, complib=self.complib, shuffle=self.shuffle, fletcher32=self.fletcher32) table = self.fileh.create_table(group, 'table', TDescr, title, self.filters, self.nrows) for i in range(self.nrows): table.row['var1'] = str(i).encode('ascii') # table.row['var2'] = i > 2 table.row['var2'] = i % 2 table.row['var3'] = i table.row['var4'] = float(self.nrows - i - 1) table.row.append() table.flush() # Index all entries: for col in table.colinstances.itervalues(): indexrows = col.create_index(_blocksizes=small_blocksizes) if verbose: print "Number of written rows:", self.nrows print "Number of indexed rows:", indexrows return def tearDown(self): self.fileh.close() # print "File %s not removed!" % self.file os.remove(self.file) cleanup(self) #---------------------------------------- def test00_flushLastRow(self): """Checking flushing an Index incrementing only the last row.""" if verbose: print '\n', '-=' * 30 print "Running %s.test00_flushLastRow..." % self.__class__.__name__ # Open the HDF5 file in append mode self.fileh = open_file(self.file, mode="a") table = self.fileh.root.table # Add just 3 rows more for i in range(3): table.row['var1'] = str(i).encode('ascii') table.row.append() table.flush() # redo the indexes idxcol = table.cols.var1.index if verbose: print "Max rows in buf:", table.nrowsinbuf print "Number of elements per slice:", idxcol.slicesize print "Chunk size:", idxcol.sorted.chunksize print "Elements in last row:", idxcol.indicesLR[-1] # Do a selection results = [p["var1"] for p in table.where('var1 == b"1"')] self.assertEqual(len(results), 2) self.assertEqual(results, [b'1']*2) def test00_update(self): """Checking automatic re-indexing after an update operation.""" if verbose: print '\n', '-=' * 30 print "Running %s.test00_update..." % self.__class__.__name__ # Open the HDF5 file in append mode self.fileh = open_file(self.file, mode="a") table = self.fileh.root.table # Modify a couple of columns for i, row in enumerate(table.where("(var3>1) & (var3<5)")): row['var1'] = str(i) row['var3'] = i row.update() table.flush() # redo the indexes idxcol1 = table.cols.var1.index idxcol3 = table.cols.var3.index if verbose: print "Dirtyness of var1 col:", idxcol1.dirty print "Dirtyness of var3 col:", idxcol3.dirty self.assertEqual(idxcol1.dirty, False) self.assertEqual(idxcol3.dirty, False) # Do a couple of selections results = [p["var1"] for p in table.where('var1 == b"1"')] self.assertEqual(len(results), 2) self.assertEqual(results, [b'1']*2) results = [p["var3"] for p in table.where('var3 == 0')] self.assertEqual(len(results), 2) self.assertEqual(results, [0]*2) def test01_readIndex(self): """Checking reading an Index (string flavor)""" if verbose: print '\n', '-=' * 30 print "Running %s.test01_readIndex..." % self.__class__.__name__ # Open the HDF5 file in read-only mode self.fileh = open_file(self.file, mode="r") table = self.fileh.root.table idxcol = table.cols.var1.index if verbose: print "Max rows in buf:", table.nrowsinbuf print "Number of elements per slice:", idxcol.slicesize print "Chunk size:", idxcol.sorted.chunksize # Do a selection results = [p["var1"] for p in table.where('var1 == b"1"')] self.assertEqual(len(results), 1) self.assertEqual(results, [b'1']) def test02_readIndex(self): """Checking reading an Index (bool flavor)""" if verbose: print '\n', '-=' * 30 print "Running %s.test02_readIndex..." % self.__class__.__name__ # Open the HDF5 file in read-only mode self.fileh = open_file(self.file, mode="r") table = self.fileh.root.table idxcol = table.cols.var2.index if verbose: print "Rows in table:", table.nrows print "Max rows in buf:", table.nrowsinbuf print "Number of elements per slice:", idxcol.slicesize print "Chunk size:", idxcol.sorted.chunksize # Do a selection results = [p["var2"] for p in table.where('var2 == True')] if verbose: print "Selected values:", results self.assertEqual(len(results), self.nrows // 2) self.assertEqual(results, [True]*(self.nrows // 2)) def test03_readIndex(self): """Checking reading an Index (int flavor)""" if verbose: print '\n', '-=' * 30 print "Running %s.test03_readIndex..." % self.__class__.__name__ # Open the HDF5 file in read-only mode self.fileh = open_file(self.file, mode="r") table = self.fileh.root.table idxcol = table.cols.var3.index if verbose: print "Max rows in buf:", table.nrowsinbuf print "Number of elements per slice:", idxcol.slicesize print "Chunk size:", idxcol.sorted.chunksize # Do a selection results = [p["var3"] for p in table.where('(1 0)')] # Now, modify just one row: for row in table: if row.nrow == 3: row['var1'] = "asa" row['var2'] = True row['var3'] = 3 row['var4'] = 3.1 row.update() table.flush() if self.reopen: self.fileh.close() self.fileh = open_file(self.file, "a") table = self.fileh.root.table # Do a query that uses indexes resq = [row.nrow for row in table.where('(var2 == True) & (var3 > 0)')] res_ = res + [3] if verbose: print "AutoIndex?:", table.autoindex print "Query results (original):", res print "Query results (after modifying table):", resq print "Should look like:", res_ self.assertEqual(res_, resq) def test07c_noauto(self): "Checking indexing queries (append, no-auto mode)" if verbose: print '\n', '-=' * 30 print "Running %s.test07c_noauto..." % self.__class__.__name__ table = self.table # Force a sync in indexes table.flush_rows_to_index() # Do a query that uses indexes res = [row.nrow for row in table.where('(var2 == True) & (var3 > 0)')] # Now, append three rows table.append([("asa", True, 1, 3.1)]) table.append([("asb", True, 2, 3.1)]) table.append([("asc", True, 3, 3.1)]) table.flush() if self.reopen: self.fileh.close() self.fileh = open_file(self.file, "a") table = self.fileh.root.table # Do a query that uses indexes resq = [row.nrow for row in table.where('(var2 == True) & (var3 > 0)')] res_ = res + [table.nrows-3, table.nrows-2, table.nrows-1] if verbose: print "AutoIndex?:", table.autoindex print "Query results (original):", res print "Query results (after modifying table):", resq print "Should look like:", res_ self.assertEqual(res_, resq) def test08_dirty(self): "Checking dirty flags (modify_columns)" if verbose: print '\n', '-=' * 30 print "Running %s.test08_dirty..." % self.__class__.__name__ table = self.table # Force a sync in indexes table.flush_rows_to_index() # Non indexated rows should remain here if self.iprops is not DefaultProps: indexedrows = table._indexedrows self.assertTrue(indexedrows is not None) unsavedindexedrows = table._unsaved_indexedrows self.assertTrue(unsavedindexedrows is not None) # Now, modify a couple of rows: table.modify_columns(1, columns=[["asa", "asb"], [1., 2.]], names=["var1", "var4"]) if self.reopen: self.fileh.close() self.fileh = open_file(self.file, "a") table = self.fileh.root.table # Check the counters self.assertEqual(table.nrows, self.nrows) if self.iprops is NoAutoProps: self.assertTrue(table.cols.var1.index.dirty) # Check the dirty flag for indexes if verbose: for colname in table.colnames: if table.cols._f_col(colname).index: print "dirty flag col %s: %s" % \ (colname, table.cols._f_col(colname).index.dirty) for colname in table.colnames: if table.cols._f_col(colname).index: if not table.autoindex: if colname in ["var1"]: self.assertEqual( table.cols._f_col(colname).index.dirty, True) else: self.assertEqual( table.cols._f_col(colname).index.dirty, False) else: self.assertEqual(table.cols._f_col(colname).index.dirty, False) def test09a_propIndex(self): "Checking propagate Index feature in Table.copy() (attrs)" if verbose: print '\n', '-=' * 30 print "Running %s.test09a_propIndex..." % self.__class__.__name__ table = self.table # Don't force a sync in indexes # table.flush_rows_to_index() # Non indexated rows should remain here if self.iprops is not DefaultProps: indexedrows = table._indexedrows self.assertTrue(indexedrows is not None) unsavedindexedrows = table._unsaved_indexedrows self.assertTrue(unsavedindexedrows is not None) # Now, remove some rows to make columns dirty # table.remove_rows(3,5) # Copy a Table to another location table2 = table.copy("/", 'table2', propindexes=True) if self.reopen: self.fileh.close() self.fileh = open_file(self.file, "a") table = self.fileh.root.table table2 = self.fileh.root.table2 index1 = table.cols.var1.index index2 = table2.cols.var1.index if verbose: print "Copied index:", index2 print "Original index:", index1 if index1: print "Elements in copied index:", index2.nelements print "Elements in original index:", index1.nelements # Check the counters self.assertEqual(table.nrows, table2.nrows) if table.indexed: self.assertTrue(table2.indexed) if self.iprops is DefaultProps: # No index: the index should not exist self.assertTrue(index1 is None) self.assertTrue(index2 is None) elif self.iprops is NoAutoProps: self.assertTrue(index2 is not None) # Check the dirty flag for indexes if verbose: for colname in table2.colnames: if table2.cols._f_col(colname).index: print "dirty flag col %s: %s" % \ (colname, table2.cols._f_col(colname).index.dirty) for colname in table2.colnames: if table2.cols._f_col(colname).index: self.assertEqual(table2.cols._f_col(colname).index.dirty, False) def test09b_propIndex(self): "Checking that propindexes=False works" if verbose: print '\n', '-=' * 30 print "Running %s.test09b_propIndex..." % self.__class__.__name__ table = self.table # Don't force a sync in indexes # table.flush_rows_to_index() # Non indexated rows should remain here if self.iprops is not DefaultProps: indexedrows = table._indexedrows self.assertTrue(indexedrows is not None) unsavedindexedrows = table._unsaved_indexedrows self.assertTrue(unsavedindexedrows is not None) # Now, remove some rows to make columns dirty # table.remove_rows(3,5) # Copy a Table to another location table2 = table.copy("/", 'table2', propindexes=False) if self.reopen: self.fileh.close() self.fileh = open_file(self.file, "a") table = self.fileh.root.table table2 = self.fileh.root.table2 if verbose: print "autoindex?:", self.iprops.auto print "Copied index indexed?:", table2.cols.var1.is_indexed print "Original index indexed?:", table.cols.var1.is_indexed if self.iprops is DefaultProps: # No index: the index should not exist self.assertFalse(table2.cols.var1.is_indexed) self.assertFalse(table.cols.var1.is_indexed) elif self.iprops is NoAutoProps: self.assertFalse(table2.cols.var1.is_indexed) self.assertTrue(table.cols.var1.is_indexed) def test10_propIndex(self): "Checking propagate Index feature in Table.copy() (values)" if verbose: print '\n', '-=' * 30 print "Running %s.test10_propIndex..." % self.__class__.__name__ table = self.table # Don't force a sync in indexes # table.flush_rows_to_index() # Non indexated rows should remain here if self.iprops is not DefaultProps: indexedrows = table._indexedrows self.assertTrue(indexedrows is not None) unsavedindexedrows = table._unsaved_indexedrows self.assertTrue(unsavedindexedrows is not None) # Now, remove some rows to make columns dirty # table.remove_rows(3,5) # Copy a Table to another location table2 = table.copy("/", 'table2', propindexes=True) if self.reopen: self.fileh.close() self.fileh = open_file(self.file, "a") table = self.fileh.root.table table2 = self.fileh.root.table2 index1 = table.cols.var3.index index2 = table2.cols.var3.index if verbose: print "Copied index:", index2 print "Original index:", index1 if index1: print "Elements in copied index:", index2.nelements print "Elements in original index:", index1.nelements def test11_propIndex(self): "Checking propagate Index feature in Table.copy() (dirty flags)" if verbose: print '\n', '-=' * 30 print "Running %s.test11_propIndex..." % self.__class__.__name__ table = self.table # Force a sync in indexes table.flush_rows_to_index() # Non indexated rows should remain here if self.iprops is not DefaultProps: indexedrows = table._indexedrows self.assertTrue(indexedrows is not None) unsavedindexedrows = table._unsaved_indexedrows self.assertTrue(unsavedindexedrows is not None) # Now, modify an indexed column and an unindexed one # to make the "var1" dirty table.modify_columns(1, columns=[["asa", "asb"], [1., 2.]], names=["var1", "var4"]) # Copy a Table to another location table2 = table.copy("/", 'table2', propindexes=True) if self.reopen: self.fileh.close() self.fileh = open_file(self.file, "a") table = self.fileh.root.table table2 = self.fileh.root.table2 index1 = table.cols.var1.index index2 = table2.cols.var1.index if verbose: print "Copied index:", index2 print "Original index:", index1 if index1: print "Elements in copied index:", index2.nelements print "Elements in original index:", index1.nelements # Check the dirty flag for indexes if verbose: for colname in table2.colnames: if table2.cols._f_col(colname).index: print "dirty flag col %s: %s" % \ (colname, table2.cols._f_col(colname).index.dirty) for colname in table2.colnames: if table2.cols._f_col(colname).index: if table2.autoindex: # All the destination columns should be non-dirty because # the copy removes the dirty state and puts the # index in a sane state self.assertEqual(table2.cols._f_col(colname).index.dirty, False) # minRowIndex = 10000 # just if one wants more indexed rows to be checked class AI1TestCase(AutomaticIndexingTestCase): # nrows = 10002 nrows = 102 reopen = 0 iprops = NoAutoProps colsToIndex = ['var1', 'var2', 'var3'] class AI2TestCase(AutomaticIndexingTestCase): # nrows = 10002 nrows = 102 reopen = 1 iprops = NoAutoProps colsToIndex = ['var1', 'var2', 'var3'] class AI4bTestCase(AutomaticIndexingTestCase): # nrows = 10012 nrows = 112 reopen = 1 iprops = NoAutoProps colsToIndex = ['var1', 'var2', 'var3'] class AI5TestCase(AutomaticIndexingTestCase): sbs, bs, ss, cs = calc_chunksize(minRowIndex, memlevel=1) nrows = ss * 11-1 reopen = 0 iprops = NoAutoProps colsToIndex = ['var1', 'var2', 'var3'] class AI6TestCase(AutomaticIndexingTestCase): sbs, bs, ss, cs = calc_chunksize(minRowIndex, memlevel=1) nrows = ss * 21 + 1 reopen = 1 iprops = NoAutoProps colsToIndex = ['var1', 'var2', 'var3'] class AI7TestCase(AutomaticIndexingTestCase): sbs, bs, ss, cs = calc_chunksize(minRowIndex, memlevel=1) nrows = ss * 12-1 # nrows = ss * 1-1 # faster test reopen = 0 iprops = NoAutoProps colsToIndex = ['var1', 'var2', 'var3'] class AI8TestCase(AutomaticIndexingTestCase): sbs, bs, ss, cs = calc_chunksize(minRowIndex, memlevel=1) nrows = ss * 15 + 100 # nrows = ss * 1 + 100 # faster test reopen = 1 iprops = NoAutoProps colsToIndex = ['var1', 'var2', 'var3'] class AI9TestCase(AutomaticIndexingTestCase): sbs, bs, ss, cs = calc_chunksize(minRowIndex, memlevel=1) nrows = ss reopen = 0 iprops = DefaultProps colsToIndex = [] class AI10TestCase(AutomaticIndexingTestCase): # nrows = 10002 nrows = 102 reopen = 1 iprops = DefaultProps colsToIndex = [] class AI11TestCase(AutomaticIndexingTestCase): # nrows = 10002 nrows = 102 reopen = 0 iprops = ChangeFiltersProps colsToIndex = ['var1', 'var2', 'var3'] class AI12TestCase(AutomaticIndexingTestCase): # nrows = 10002 nrows = 102 reopen = 0 iprops = ChangeFiltersProps colsToIndex = ['var1', 'var2', 'var3'] class ManyNodesTestCase(PyTablesTestCase): def setUp(self): self.file = tempfile.mktemp(".h5") self.fileh = open_file(self.file, "w", node_cache_slots=64) def test00(self): """Indexing many nodes in one single session (based on bug #26)""" IdxRecord = { 'f0': Int8Col(), 'f1': Int8Col(), 'f2': Int8Col(), } h5 = self.fileh for qn in range(5): for sn in range(5): qchr = 'chr' + str(qn) name = 'chr' + str(sn) path = "/at/%s/pt" % (qchr) table = h5.create_table(path, name, IdxRecord, createparents=1) table.cols.f0.create_index() table.cols.f1.create_index() table.cols.f2.create_index() table.row.append() table.flush() def tearDown(self): self.fileh.close() os.remove(self.file) cleanup(self) class IndexPropsChangeTestCase(TempFileMixin, PyTablesTestCase): """Test case for changing index properties in a table.""" class MyDescription(IsDescription): icol = IntCol() oldIndexProps = IndexProps() newIndexProps = IndexProps(auto=False, filters=Filters(complevel=9)) def setUp(self): super(IndexPropsChangeTestCase, self).setUp() table = self.h5file.create_table('/', 'test', self.MyDescription) table.autoindex = self.oldIndexProps.auto row = table.row for i in xrange(100): row['icol'] = i % 25 row.append() table.flush() self.table = table def tearDown(self): super(IndexPropsChangeTestCase, self).tearDown() def test_attributes(self): """Storing index properties as table attributes.""" for refprops in [self.oldIndexProps, self.newIndexProps]: self.assertEqual(self.table.autoindex, refprops.auto) self.table.autoindex = self.newIndexProps.auto def test_copyattrs(self): """Copying index properties attributes.""" oldtable = self.table newtable = oldtable.copy('/', 'test2') self.assertEqual(oldtable.autoindex, newtable.autoindex) class IndexFiltersTestCase(TempFileMixin, PyTablesTestCase): """Test case for setting index filters.""" def setUp(self): super(IndexFiltersTestCase, self).setUp() description = {'icol': IntCol()} self.table = self.h5file.create_table('/', 'test', description) def test_createIndex(self): """Checking input parameters in new indexes.""" # Different from default. argfilters = copy.copy(default_index_filters) argfilters.shuffle = not default_index_filters.shuffle # Different both from default and the previous one. idxfilters = copy.copy(default_index_filters) idxfilters.shuffle = not default_index_filters.shuffle idxfilters.fletcher32 = not default_index_filters.fletcher32 icol = self.table.cols.icol # First create icol.create_index(kind='ultralight', optlevel=4) self.assertEqual(icol.index.kind, 'ultralight') self.assertEqual(icol.index.optlevel, 4) self.assertEqual(icol.index.filters, default_index_filters) icol.remove_index() # Second create icol.create_index(kind='medium', optlevel=3, filters=argfilters) self.assertEqual(icol.index.kind, 'medium') self.assertEqual(icol.index.optlevel, 3) self.assertEqual(icol.index.filters, argfilters) icol.remove_index() def test_reindex(self): """Checking input parameters in recomputed indexes.""" icol = self.table.cols.icol icol.create_index( kind='full', optlevel=5, filters=Filters(complevel=3)) kind = icol.index.kind optlevel = icol.index.optlevel filters = icol.index.filters icol.reindex() ni = icol.index if verbose: print "Old parameters: %s, %s, %s" % (kind, optlevel, filters) print "New parameters: %s, %s, %s" % ( ni.kind, ni.optlevel, ni.filters) self.assertEqual(ni.kind, kind) self.assertEqual(ni.optlevel, optlevel) self.assertEqual(ni.filters, filters) class OldIndexTestCase(PyTablesTestCase): def test1_x(self): """Check that files with 1.x indexes are recognized and warned.""" fname = self._testFilename("idx-std-1.x.h5") f = open_file(fname) self.assertWarns(OldIndexWarning, f.get_node, "/table") f.close() # Sensible parameters for indexing with small blocksizes small_blocksizes = (512, 128, 32, 8) class CompletelySortedIndexTestCase(TempFileMixin, PyTablesTestCase): """Test case for testing a complete sort in a table.""" nrows = 100 nrowsinbuf = 11 class MyDescription(IsDescription): rcol = IntCol(pos=1) icol = IntCol(pos=2) def setUp(self): super(CompletelySortedIndexTestCase, self).setUp() table = self.h5file.create_table('/', 'table', self.MyDescription) row = table.row nrows = self.nrows for i in xrange(nrows): row['rcol'] = i row['icol'] = nrows - i row.append() table.flush() self.table = table self.icol = self.table.cols.icol # A full index with maximum optlevel should always be completely sorted self.icol.create_csindex(_blocksizes=small_blocksizes) def test00_isCompletelySortedIndex(self): """Testing the Column.is_csi property.""" icol = self.icol self.assertEqual(icol.index.is_csi, True) icol.remove_index() # Other kinds than full, should never return a CSI icol.create_index(kind="medium", optlevel=9) self.assertEqual(icol.index.is_csi, False) icol.remove_index() # As the table is small, lesser optlevels should be able to # create a completely sorted index too. icol.create_index(kind="full", optlevel=6) self.assertEqual(icol.index.is_csi, True) # Checking a CSI in a sorted copy self.table.copy("/", 'table2', sortby='icol', checkCSI=True) self.assertEqual(icol.index.is_csi, True) def test01_readSorted1(self): """Testing the Index.read_sorted() method with no arguments.""" icol = self.icol sortedcol = numpy.sort(icol[:]) sortedcol2 = icol.index.read_sorted() if verbose: print "Original sorted column:", sortedcol print "The values from the index:", sortedcol2 self.assertTrue(allequal(sortedcol, sortedcol2)) def test01_readSorted2(self): """Testing the Index.read_sorted() method with arguments (I).""" icol = self.icol sortedcol = numpy.sort(icol[:])[30:55] sortedcol2 = icol.index.read_sorted(30, 55) if verbose: print "Original sorted column:", sortedcol print "The values from the index:", sortedcol2 self.assertTrue(allequal(sortedcol, sortedcol2)) def test01_readSorted3(self): """Testing the Index.read_sorted() method with arguments (II).""" icol = self.icol sortedcol = numpy.sort(icol[:])[33:97] sortedcol2 = icol.index.read_sorted(33, 97) if verbose: print "Original sorted column:", sortedcol print "The values from the index:", sortedcol2 self.assertTrue(allequal(sortedcol, sortedcol2)) def test02_readIndices1(self): """Testing the Index.read_indices() method with no arguments.""" icol = self.icol indicescol = numpy.argsort(icol[:]).astype('uint64') indicescol2 = icol.index.read_indices() if verbose: print "Original indices column:", indicescol print "The values from the index:", indicescol2 self.assertTrue(allequal(indicescol, indicescol2)) def test02_readIndices2(self): """Testing the Index.read_indices() method with arguments (I).""" icol = self.icol indicescol = numpy.argsort(icol[:])[30:55].astype('uint64') indicescol2 = icol.index.read_indices(30, 55) if verbose: print "Original indices column:", indicescol print "The values from the index:", indicescol2 self.assertTrue(allequal(indicescol, indicescol2)) def test02_readIndices3(self): """Testing the Index.read_indices() method with arguments (II).""" icol = self.icol indicescol = numpy.argsort(icol[:])[33:97].astype('uint64') indicescol2 = icol.index.read_indices(33, 97) if verbose: print "Original indices column:", indicescol print "The values from the index:", indicescol2 self.assertTrue(allequal(indicescol, indicescol2)) def test02_readIndices4(self): """Testing the Index.read_indices() method with arguments (III).""" icol = self.icol indicescol = numpy.argsort(icol[:])[33:97:2].astype('uint64') indicescol2 = icol.index.read_indices(33, 97, 2) if verbose: print "Original indices column:", indicescol print "The values from the index:", indicescol2 self.assertTrue(allequal(indicescol, indicescol2)) def test02_readIndices5(self): """Testing the Index.read_indices() method with arguments (IV).""" icol = self.icol indicescol = numpy.argsort(icol[:])[33:55:5].astype('uint64') indicescol2 = icol.index.read_indices(33, 55, 5) if verbose: print "Original indices column:", indicescol print "The values from the index:", indicescol2 self.assertTrue(allequal(indicescol, indicescol2)) def test02_readIndices6(self): """Testing the Index.read_indices() method with step only.""" icol = self.icol indicescol = numpy.argsort(icol[:])[::3].astype('uint64') indicescol2 = icol.index.read_indices(step=3) if verbose: print "Original indices column:", indicescol print "The values from the index:", indicescol2 self.assertTrue(allequal(indicescol, indicescol2)) def test03_getitem1(self): """Testing the Index.__getitem__() method with no arguments.""" icol = self.icol indicescol = numpy.argsort(icol[:]).astype('uint64') indicescol2 = icol.index[:] if verbose: print "Original indices column:", indicescol print "The values from the index:", indicescol2 self.assertTrue(allequal(indicescol, indicescol2)) def test03_getitem2(self): """Testing the Index.__getitem__() method with start.""" icol = self.icol indicescol = numpy.argsort(icol[:])[31].astype('uint64') indicescol2 = icol.index[31] if verbose: print "Original indices column:", indicescol print "The values from the index:", indicescol2 self.assertTrue(allequal(indicescol, indicescol2)) def test03_getitem3(self): """Testing the Index.__getitem__() method with start, stop.""" icol = self.icol indicescol = numpy.argsort(icol[:])[2:16].astype('uint64') indicescol2 = icol.index[2:16] if verbose: print "Original indices column:", indicescol print "The values from the index:", indicescol2 self.assertTrue(allequal(indicescol, indicescol2)) def test04_itersorted1(self): """Testing the Table.itersorted() method with no arguments.""" table = self.table sortedtable = numpy.sort(table[:], order='icol') sortedtable2 = numpy.array( [row.fetch_all_fields() for row in table.itersorted( 'icol')], dtype=table._v_dtype) if verbose: print "Original sorted table:", sortedtable print "The values from the iterator:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test04_itersorted2(self): """Testing the Table.itersorted() method with a start.""" table = self.table sortedtable = numpy.sort(table[:], order='icol')[15:] sortedtable2 = numpy.array( [row.fetch_all_fields() for row in table.itersorted( 'icol', start=15)], dtype=table._v_dtype) if verbose: print "Original sorted table:", sortedtable print "The values from the iterator:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test04_itersorted3(self): """Testing the Table.itersorted() method with a stop.""" table = self.table sortedtable = numpy.sort(table[:], order='icol')[:20] sortedtable2 = numpy.array( [row.fetch_all_fields() for row in table.itersorted( 'icol', stop=20)], dtype=table._v_dtype) if verbose: print "Original sorted table:", sortedtable print "The values from the iterator:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test04_itersorted4(self): """Testing the Table.itersorted() method with a start and stop.""" table = self.table sortedtable = numpy.sort(table[:], order='icol')[15:20] sortedtable2 = numpy.array( [row.fetch_all_fields() for row in table.itersorted( 'icol', start=15, stop=20)], dtype=table._v_dtype) if verbose: print "Original sorted table:", sortedtable print "The values from the iterator:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test04_itersorted5(self): """Testing the Table.itersorted() method with a start, stop and step.""" table = self.table sortedtable = numpy.sort(table[:], order='icol')[15:45:4] sortedtable2 = numpy.array( [row.fetch_all_fields() for row in table.itersorted( 'icol', start=15, stop=45, step=4)], dtype=table._v_dtype) if verbose: print "Original sorted table:", sortedtable print "The values from the iterator:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test04_itersorted6(self): """Testing the Table.itersorted() method with a start, stop and step.""" table = self.table sortedtable = numpy.sort(table[:], order='icol')[33:55:5] sortedtable2 = numpy.array( [row.fetch_all_fields() for row in table.itersorted( 'icol', start=33, stop=55, step=5)], dtype=table._v_dtype) if verbose: print "Original sorted table:", sortedtable print "The values from the iterator:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test04_itersorted7(self): """Testing the Table.itersorted() method with checkCSI=True.""" table = self.table sortedtable = numpy.sort(table[:], order='icol') sortedtable2 = numpy.array( [row.fetch_all_fields() for row in table.itersorted( 'icol', checkCSI=True)], dtype=table._v_dtype) if verbose: print "Original sorted table:", sortedtable print "The values from the iterator:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test04_itersorted8(self): """Testing the Table.itersorted() method with a start, stop and negative step.""" # see also gh-252 table = self.table sortedtable = numpy.sort(table[:], order='icol')[55:33:-5] sortedtable2 = numpy.array( [row.fetch_all_fields() for row in table.itersorted( 'icol', start=55, stop=33, step=-5)], dtype=table._v_dtype) if verbose: print "Original sorted table:", sortedtable print "The values from the iterator:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test04_itersorted9(self): """Testing the Table.itersorted() method with a negative step.""" # see also gh-252 table = self.table sortedtable = numpy.sort(table[:], order='icol')[::-5] sortedtable2 = numpy.array( [row.fetch_all_fields() for row in table.itersorted( 'icol', step=-5)], dtype=table._v_dtype) if verbose: print "Original sorted table:", sortedtable print "The values from the iterator:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05_readSorted1(self): """Testing the Table.read_sorted() method with no arguments.""" table = self.table sortedtable = numpy.sort(table[:], order='icol') sortedtable2 = table.read_sorted('icol') if verbose: print "Original sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05_readSorted2(self): """Testing the Table.read_sorted() method with a start.""" table = self.table sortedtable = numpy.sort(table[:], order='icol')[16:17] sortedtable2 = table.read_sorted('icol', start=16) if verbose: print "Original sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05_readSorted3(self): """Testing the Table.read_sorted() method with a start and stop.""" table = self.table sortedtable = numpy.sort(table[:], order='icol')[16:33] sortedtable2 = table.read_sorted('icol', start=16, stop=33) if verbose: print "Original sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05_readSorted4(self): """Testing the Table.read_sorted() method with a start, stop and step.""" table = self.table sortedtable = numpy.sort(table[:], order='icol')[33:55:5] sortedtable2 = table.read_sorted('icol', start=33, stop=55, step=5) if verbose: print "Original sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05_readSorted5(self): """Testing the Table.read_sorted() method with only a step.""" table = self.table sortedtable = numpy.sort(table[:], order='icol')[::3] sortedtable2 = table.read_sorted('icol', step=3) if verbose: print "Original sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05_readSorted6(self): """Testing the Table.read_sorted() method with negative step.""" table = self.table sortedtable = numpy.sort(table[:], order='icol')[::-1] sortedtable2 = table.read_sorted('icol', step=-1) if verbose: print "Original sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05_readSorted7(self): """Testing the Table.read_sorted() method with negative step (II).""" table = self.table sortedtable = numpy.sort(table[:], order='icol')[::-2] sortedtable2 = table.read_sorted('icol', step=-2) if verbose: print "Original sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05_readSorted8(self): """Testing the Table.read_sorted() method with negative step (III)).""" table = self.table sstart = 100-24-1 sstop = 100-54-1 sortedtable = numpy.sort(table[:], order='icol')[sstart:sstop:-1] sortedtable2 = table.read_sorted('icol', start=24, stop=54, step=-1) if verbose: print "Original sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05_readSorted9(self): """Testing the Table.read_sorted() method with negative step (IV)).""" table = self.table sstart = 100-14-1 sstop = 100-54-1 sortedtable = numpy.sort(table[:], order='icol')[sstart:sstop:-3] sortedtable2 = table.read_sorted('icol', start=14, stop=54, step=-3) if verbose: print "Original sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05_readSorted10(self): """Testing the Table.read_sorted() method with negative step (V)).""" table = self.table sstart = 100-24-1 sstop = 100-25-1 sortedtable = numpy.sort(table[:], order='icol')[sstart:sstop:-2] sortedtable2 = table.read_sorted('icol', start=24, stop=25, step=-2) if verbose: print "Original sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05_readSorted11(self): """Testing the Table.read_sorted() method with start > stop.""" table = self.table sstart = 100-137-1 sstop = 100-25-1 sortedtable = numpy.sort(table[:], order='icol')[sstart:sstop:-2] sortedtable2 = table.read_sorted('icol', start=137, stop=25, step=-2) if verbose: print "Original sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05a_readSorted12(self): """Testing the Table.read_sorted() method with checkCSI (I).""" table = self.table sortedtable = numpy.sort(table[:], order='icol') sortedtable2 = table.read_sorted('icol', checkCSI=True) if verbose: print "Original sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test05b_readSorted12(self): """Testing the Table.read_sorted() method with checkCSI (II).""" table = self.table self.assertRaises(ValueError, table.read_sorted, "rcol", checkCSI=False) def test06_copy_sorted1(self): """Testing the Table.copy(sortby) method with no arguments.""" table = self.table # Copy to another table table.nrowsinbuf = self.nrowsinbuf table2 = table.copy("/", 'table2', sortby="icol") sortedtable = numpy.sort(table[:], order='icol') sortedtable2 = table2[:] if verbose: print "Original sorted table:", sortedtable print "The values from copy:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test06_copy_sorted2(self): """Testing the Table.copy(sortby) method with step=-1.""" table = self.table # Copy to another table table.nrowsinbuf = self.nrowsinbuf table2 = table.copy("/", 'table2', sortby="icol", step=-1) sortedtable = numpy.sort(table[:], order='icol')[::-1] sortedtable2 = table2[:] if verbose: print "Original sorted table:", sortedtable print "The values from copy:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test06_copy_sorted3(self): """Testing the Table.copy(sortby) method with only a start.""" table = self.table # Copy to another table table.nrowsinbuf = self.nrowsinbuf table2 = table.copy("/", 'table2', sortby="icol", start=3) sortedtable = numpy.sort(table[:], order='icol')[3:4] sortedtable2 = table2[:] if verbose: print "Original sorted table:", sortedtable print "The values from copy:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test06_copy_sorted4(self): """Testing the Table.copy(sortby) method with start, stop.""" table = self.table # Copy to another table table.nrowsinbuf = self.nrowsinbuf table2 = table.copy("/", 'table2', sortby="icol", start=3, stop=40) sortedtable = numpy.sort(table[:], order='icol')[3:40] sortedtable2 = table2[:] if verbose: print "Original sorted table:", sortedtable print "The values from copy:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test06_copy_sorted5(self): """Testing the Table.copy(sortby) method with start, stop, step.""" table = self.table # Copy to another table table.nrowsinbuf = self.nrowsinbuf table2 = table.copy("/", 'table2', sortby="icol", start=3, stop=33, step=5) sortedtable = numpy.sort(table[:], order='icol')[3:33:5] sortedtable2 = table2[:] if verbose: print "Original sorted table:", sortedtable print "The values from copy:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test06_copy_sorted6(self): """Testing the Table.copy(sortby) method after table re-opening.""" self._reopen(mode='a') table = self.h5file.root.table # Copy to another table table.nrowsinbuf = self.nrowsinbuf table2 = table.copy("/", 'table2', sortby="icol") sortedtable = numpy.sort(table[:], order='icol') sortedtable2 = table2[:] if verbose: print "Original sorted table:", sortedtable print "The values from copy:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test06_copy_sorted7(self): """Testing the `checkCSI` parameter of Table.copy() (I).""" table = self.table # Copy to another table table.nrowsinbuf = self.nrowsinbuf table2 = table.copy("/", 'table2', sortby="icol") self.assertRaises(ValueError, table2.copy, "/", 'table3', sortby="rcol", checkCSI=False) def test06_copy_sorted8(self): """Testing the `checkCSI` parameter of Table.copy() (II).""" table = self.table # Copy to another table table.nrowsinbuf = self.nrowsinbuf table2 = table.copy("/", 'table2', sortby="icol") self.assertRaises(ValueError, table2.copy, "/", 'table3', sortby="rcol", checkCSI=True) def test07_isCSI_noelements(self): """Testing the representation of an index with no elements.""" t2 = self.h5file.create_table('/', 't2', self.MyDescription) irows = t2.cols.rcol.create_csindex() if verbose: print "repr(t2)-->\n", repr(t2) self.assertEqual(irows, 0) self.assertEqual(t2.colindexes['rcol'].is_csi, False) class ReadSortedIndexTestCase(TempFileMixin, PyTablesTestCase): """Test case for testing sorted reading in a "full" sorted column.""" nrows = 100 nrowsinbuf = 11 class MyDescription(IsDescription): rcol = IntCol(pos=1) icol = IntCol(pos=2) def setUp(self): super(ReadSortedIndexTestCase, self).setUp() table = self.h5file.create_table('/', 'table', self.MyDescription) row = table.row nrows = self.nrows for i in xrange(nrows): row['rcol'] = i row['icol'] = nrows - i row.append() table.flush() self.table = table self.icol = self.table.cols.icol # A full index with maximum optlevel should always be completely sorted self.icol.create_index(optlevel=self.optlevel, kind="full", _blocksizes=small_blocksizes) def test01_readSorted1(self): """Testing the Table.read_sorted() method with no arguments.""" table = self.table sortedtable = numpy.sort(table[:], order='icol') sortedtable2 = table.read_sorted('icol') if verbose: print "Sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 # Compare with the sorted read table because we have no # guarantees that read_sorted returns a completely sorted table self.assertTrue(allequal(sortedtable, numpy.sort(sortedtable2, order="icol"))) def test01_readSorted2(self): """Testing the Table.read_sorted() method with no arguments (re-open).""" self._reopen() table = self.h5file.root.table sortedtable = numpy.sort(table[:], order='icol') sortedtable2 = table.read_sorted('icol') if verbose: print "Sorted table:", sortedtable print "The values from read_sorted:", sortedtable2 # Compare with the sorted read table because we have no # guarantees that read_sorted returns a completely sorted table self.assertTrue(allequal(sortedtable, numpy.sort(sortedtable2, order="icol"))) def test02_copy_sorted1(self): """Testing the Table.copy(sortby) method.""" table = self.table # Copy to another table table.nrowsinbuf = self.nrowsinbuf table2 = table.copy("/", 'table2', sortby="icol") sortedtable = numpy.sort(table[:], order='icol') sortedtable2 = numpy.sort(table2[:], order='icol') if verbose: print "Original table:", table2[:] print "The sorted values from copy:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) def test02_copy_sorted2(self): """Testing the Table.copy(sortby) method after table re-opening.""" self._reopen(mode='a') table = self.h5file.root.table # Copy to another table table.nrowsinbuf = self.nrowsinbuf table2 = table.copy("/", 'table2', sortby="icol") sortedtable = numpy.sort(table[:], order='icol') sortedtable2 = numpy.sort(table2[:], order='icol') if verbose: print "Original table:", table2[:] print "The sorted values from copy:", sortedtable2 self.assertTrue(allequal(sortedtable, sortedtable2)) class ReadSortedIndex0(ReadSortedIndexTestCase): optlevel = 0 class ReadSortedIndex3(ReadSortedIndexTestCase): optlevel = 3 class ReadSortedIndex6(ReadSortedIndexTestCase): optlevel = 6 class ReadSortedIndex9(ReadSortedIndexTestCase): optlevel = 9 class Issue156TestBase(PyTablesTestCase): # field name in table according to which test_copysort() sorts the table sort_field = None def setUp(self): # create hdf5 file self.filename = tempfile.mktemp(".hdf5") self.file = open_file(self.filename, mode="w") # create nested table class Foo(IsDescription): frame = UInt16Col() class Bar(IsDescription): code = UInt16Col() table = self.file.create_table('/', 'foo', Foo, filters=Filters(3, 'zlib'), createparents=True) self.file.flush() # fill table with 10 random numbers for k in xrange(10): row = table.row row['frame'] = numpy.random.random_integers(0, 2**16-1) row['Bar/code'] = numpy.random.random_integers(0, 2**16-1) row.append() self.file.flush() def tearDown(self): self.file.close() os.remove(self.filename) def test_copysort(self): # copy table oldNode = self.file.get_node('/foo') # create completely sorted index on a main column oldNode.colinstances[self.sort_field].create_csindex() # this fails on ade2ba123efd267fd31 # see gh-156 new_node = oldNode.copy(newname='foo2', overwrite=True, sortby=self.sort_field, checkCSI=True, propindexes=True) # check column is sorted self.assertTrue(numpy.all( new_node.col(self.sort_field) == sorted(oldNode.col(self.sort_field)))) # check index is available self.assertTrue(self.sort_field in new_node.colindexes) # check CSI was propagated self.assertTrue(new_node.colindexes[self.sort_field].is_csi) class Issue156TestCase01(Issue156TestBase): # sort by field from non nested entry sort_field = 'frame' class Issue156TestCase02(Issue156TestBase): # sort by field from nested entry sort_field = 'Bar/code' class Issue119Time32ColTestCase(PyTablesTestCase): """ TimeCol not properly indexing """ col_typ = Time32Col values = [ 0.93240451618785880, 0.76322375510776170, 0.16695030056300875, 0.91259117097807850, 0.93977847053454630, 0.51450406513503090, 0.24452129962257563, 0.85475938924825230, 0.32512326762476930, 0.75127635627046820, ] def setUp(self): # create hdf5 file self.filename = tempfile.mktemp(".hdf5") self.file = open_file(self.filename, mode="w") class Descr(IsDescription): when = self.col_typ(pos = 1) value = Float32Col(pos = 2) self.table = self.file.create_table('/', 'test', Descr) self.t = 1321031471.0 # 11/11/11 11:11:11 data = [(self.t + i, item) for i, item in enumerate(self.values)] self.table.append(data) self.file.flush() def tearDown(self): self.file.close() os.remove(self.filename) def test_timecol_issue(self): tbl = self.table t = self.t wherestr = '(when >= %d) & (when < %d)'%(t, t+5) no_index = tbl.read_where(wherestr) tbl.cols.when.create_index(_verbose = False) with_index = tbl.read_where(wherestr) self.assertTrue((no_index == with_index).all()) class Issue119Time64ColTestCase(Issue119Time32ColTestCase): col_typ = Time64Col #---------------------------------------------------------------------- def suite(): theSuite = unittest.TestSuite() niter = 1 # heavy = 1 # Uncomment this only for testing purposes! for n in range(niter): theSuite.addTest(unittest.makeSuite(BasicReadTestCase)) theSuite.addTest(unittest.makeSuite(ZlibReadTestCase)) theSuite.addTest(unittest.makeSuite(BloscReadTestCase)) theSuite.addTest(unittest.makeSuite(LZOReadTestCase)) theSuite.addTest(unittest.makeSuite(Bzip2ReadTestCase)) theSuite.addTest(unittest.makeSuite(ShuffleReadTestCase)) theSuite.addTest(unittest.makeSuite(Fletcher32ReadTestCase)) theSuite.addTest(unittest.makeSuite(ShuffleFletcher32ReadTestCase)) theSuite.addTest(unittest.makeSuite(OneHalfTestCase)) theSuite.addTest(unittest.makeSuite(UpperBoundTestCase)) theSuite.addTest(unittest.makeSuite(LowerBoundTestCase)) theSuite.addTest(unittest.makeSuite(AI1TestCase)) theSuite.addTest(unittest.makeSuite(AI2TestCase)) theSuite.addTest(unittest.makeSuite(AI9TestCase)) theSuite.addTest(unittest.makeSuite(DeepTableIndexTestCase)) theSuite.addTest(unittest.makeSuite(IndexPropsChangeTestCase)) theSuite.addTest(unittest.makeSuite(IndexFiltersTestCase)) theSuite.addTest(unittest.makeSuite(OldIndexTestCase)) theSuite.addTest(unittest.makeSuite(CompletelySortedIndexTestCase)) theSuite.addTest(unittest.makeSuite(ManyNodesTestCase)) theSuite.addTest(unittest.makeSuite(ReadSortedIndex0)) theSuite.addTest(unittest.makeSuite(ReadSortedIndex3)) theSuite.addTest(unittest.makeSuite(ReadSortedIndex6)) theSuite.addTest(unittest.makeSuite(ReadSortedIndex9)) theSuite.addTest(unittest.makeSuite(Issue156TestCase01)) theSuite.addTest(unittest.makeSuite(Issue156TestCase02)) theSuite.addTest(unittest.makeSuite(Issue119Time32ColTestCase)) theSuite.addTest(unittest.makeSuite(Issue119Time64ColTestCase)) if heavy: # These are too heavy for normal testing theSuite.addTest(unittest.makeSuite(AI4bTestCase)) theSuite.addTest(unittest.makeSuite(AI5TestCase)) theSuite.addTest(unittest.makeSuite(AI6TestCase)) theSuite.addTest(unittest.makeSuite(AI7TestCase)) theSuite.addTest(unittest.makeSuite(AI8TestCase)) theSuite.addTest(unittest.makeSuite(AI10TestCase)) theSuite.addTest(unittest.makeSuite(AI11TestCase)) theSuite.addTest(unittest.makeSuite(AI12TestCase)) return theSuite if __name__ == '__main__': unittest.main(defaultTest='suite')