diff --git a/Makefile b/Makefile index 8f8f5607..dcf0672d 100644 --- a/Makefile +++ b/Makefile @@ -72,7 +72,6 @@ test-xmlrpc: ## Run XMLRPC unit tests ./venv/bin/python ./unittest-xmlrpc.py test: test-core test-restful test-xmlrpc ## Run unit tests - ./venv-dev/bin/python -m black ./glances --check --exclude outputs/static test-with-upgrade: venv-upgrade venv-dev-upgrade test ## Upgrade deps and run unit tests diff --git a/glances/plugins/plugin/model.py b/glances/plugins/plugin/model.py index afc0b0ad..cfc846a8 100644 --- a/glances/plugins/plugin/model.py +++ b/glances/plugins/plugin/model.py @@ -236,30 +236,13 @@ class GlancesPluginModel(object): else: return None - def get_json_history(self, item=None, nb=0): - """Return the history (JSON format). - - - the stats history (dict of list) if item is None - - the stats history for the given item (list) instead - - None if item did not exist in the history - Limit to lasts nb items (all if nb=0) - """ - s = self.stats_history.get_json(nb=nb) - if item is None: - return s - else: - if item in s: - return s[item] - else: - return None - def get_export_history(self, item=None): """Return the stats history object to export.""" return self.get_raw_history(item=item) def get_stats_history(self, item=None, nb=0): """Return the stats history (JSON format).""" - s = self.get_json_history(nb=nb) + s = self.stats_history.get_json(nb=nb) if item is None: return json_dumps(s) @@ -751,6 +734,8 @@ class GlancesPluginModel(object): return {k: v for k, v in stats._asdict().items() if k in self.fields_description} elif isinstance(stats, dict): return {k: v for k, v in stats.items() if k in self.fields_description} + elif isinstance(stats, list): + return [self.filter_stats(s) for s in stats] else: return stats diff --git a/unittest-core.py b/unittest-core.py index 51f3fc2a..87a7718a 100755 --- a/unittest-core.py +++ b/unittest-core.py @@ -60,6 +60,113 @@ class TestGlances(unittest.TestCase): """The function is called *every time* before test_*.""" print('\n' + '=' * 78) + def _common_plugin_tests(self, plugin): + """Common method to test a Glances plugin + This method is called multiple time by test 100 to 1xx""" + + # Reset all the stats, history and views + plugin_instance = stats.get_plugin(plugin) + plugin_instance.reset() # reset stats + plugin_instance.reset_views() # reset views + plugin_instance.reset_stats_history() # reset history + + # Check before update + self.assertEqual(plugin_instance.get_raw(), plugin_instance.stats_init_value) + self.assertEqual(plugin_instance.is_enabled(), True) + self.assertEqual(plugin_instance.is_disabled(), False) + self.assertEqual(plugin_instance.get_views(), {}) + self.assertIsInstance(plugin_instance.get_raw(), (dict, list)) + if plugin_instance.history_enable() and isinstance(plugin_instance.get_raw(), dict): + self.assertEqual(plugin_instance.get_key(), None) + self.assertTrue(all([f in [h['name'] for h in plugin_instance.items_history_list] for f in plugin_instance.get_raw_history()])) + elif plugin_instance.history_enable() and isinstance(plugin_instance.get_raw(), list): + self.assertNotEqual(plugin_instance.get_key(), None) + + # Update stats (add first element) + plugin_instance.update() + plugin_instance.update_stats_history() + plugin_instance.update_views() + + # Check stats + self.assertIsInstance(plugin_instance.get_raw(), (dict, list)) + if isinstance(plugin_instance.get_raw(), dict): + # self.assertTrue(any([f in plugin_instance.get_raw() for f in plugin_instance.fields_description])) + res = False + for f in plugin_instance.fields_description: + if f not in plugin_instance.get_raw(): + print(f"WARNING: {f} not found in {plugin} plugin fields_description") + else: + res = True + self.assertTrue(res) + elif isinstance(plugin_instance.get_raw(), list): + res = False + for i in plugin_instance.get_raw(): + # self.assertTrue(all([f in plugin_instance.fields_description for f in i])) + for f in i: + if f in plugin_instance.fields_description: + res = True + self.assertTrue(res) + + self.assertEqual(plugin_instance.get_raw(), plugin_instance.get_export()) + self.assertEqual(plugin_instance.get_stats(), plugin_instance.get_json()) + self.assertEqual(json.loads(plugin_instance.get_stats()), plugin_instance.get_raw()) + if len(plugin_instance.fields_description) > 0: + # Get first item of the fields_description + first_field = list(plugin_instance.fields_description.keys())[0] + self.assertIsInstance(plugin_instance.get_raw_stats_item(first_field), dict) + self.assertIsInstance(json.loads(plugin_instance.get_stats_item(first_field)), dict) + self.assertIsInstance(plugin_instance.get_item_info(first_field, 'description'), str) + # Filter stats + current_stats = plugin_instance.get_raw() + if isinstance(plugin_instance.get_raw(), dict): + current_stats['foo'] = 'bar' + current_stats = plugin_instance.filter_stats(current_stats) + self.assertTrue('foo' not in current_stats) + elif isinstance(plugin_instance.get_raw(), list): + current_stats[0]['foo'] = 'bar' + current_stats = plugin_instance.filter_stats(current_stats) + self.assertTrue('foo' not in current_stats[0]) + + # Update stats (add second element) + plugin_instance.update() + plugin_instance.update_stats_history() + plugin_instance.update_views() + + # Check history + if plugin_instance.history_enable(): + if isinstance(plugin_instance.get_raw(), dict): + first_history_field = plugin_instance.get_items_history_list()[0]['name'] + elif isinstance(plugin_instance.get_raw(), list): + first_history_field = '_'.join([plugin_instance.get_raw()[0][plugin_instance.get_key()], + plugin_instance.get_items_history_list()[0]['name']]) + self.assertEqual(len(plugin_instance.get_raw_history(first_history_field)), 2) + self.assertGreater(plugin_instance.get_raw_history(first_history_field)[1][0], + plugin_instance.get_raw_history(first_history_field)[0][0]) + + # Update stats (add third element) + plugin_instance.update() + plugin_instance.update_stats_history() + plugin_instance.update_views() + + self.assertEqual(len(plugin_instance.get_raw_history(first_history_field)), 3) + self.assertEqual(len(plugin_instance.get_raw_history(first_history_field, 2)), 2) + self.assertIsInstance(json.loads(plugin_instance.get_stats_history()), dict) + + # Check views + self.assertIsInstance(plugin_instance.get_views(), dict) + if isinstance(plugin_instance.get_raw(), dict): + self.assertIsInstance(plugin_instance.get_views(first_history_field), dict) + self.assertTrue('decoration' in plugin_instance.get_views(first_history_field)) + elif isinstance(plugin_instance.get_raw(), list): + first_history_field = plugin_instance.get_items_history_list()[0]['name'] + first_item = plugin_instance.get_raw()[0][plugin_instance.get_key()] + self.assertIsInstance(plugin_instance.get_views(item=first_item, + key=first_history_field), dict) + self.assertTrue('decoration' in plugin_instance.get_views(item=first_item, + key=first_history_field)) + self.assertIsInstance(json.loads(plugin_instance.get_json_views()), dict) + self.assertEqual(json.loads(plugin_instance.get_json_views()), plugin_instance.get_views()) + def test_000_update(self): """Update stats (mandatory step for all the stats). @@ -126,8 +233,9 @@ class TestGlances(unittest.TestCase): def test_005_mem(self): """Check MEM plugin.""" + plugin_name = 'mem' stats_to_check = ['available', 'used', 'free', 'total'] - print('INFO: [TEST_005] Check MEM stats: %s' % ', '.join(stats_to_check)) + print('INFO: [TEST_005] Check {} stats: {}'.format(plugin_name, ', '.join(stats_to_check))) stats_grab = stats.get_plugin('mem').get_raw() for stat in stats_to_check: # Check that the key exist @@ -136,17 +244,17 @@ class TestGlances(unittest.TestCase): self.assertGreaterEqual(stats_grab[stat], 0) print('INFO: MEM stats: %s' % stats_grab) - def test_006_swap(self): + def test_006_memswap(self): """Check MEMSWAP plugin.""" stats_to_check = ['used', 'free', 'total'] - print('INFO: [TEST_006] Check SWAP stats: %s' % ', '.join(stats_to_check)) + print('INFO: [TEST_006] Check MEMSWAP stats: %s' % ', '.join(stats_to_check)) stats_grab = stats.get_plugin('memswap').get_raw() for stat in stats_to_check: # Check that the key exist self.assertTrue(stat in stats_grab, msg='Cannot find key: %s' % stat) # Check that % is > 0 self.assertGreaterEqual(stats_grab[stat], 0) - print('INFO: SWAP stats: %s' % stats_grab) + print('INFO: MEMSWAP stats: %s' % stats_grab) def test_007_network(self): """Check NETWORK plugin.""" @@ -481,42 +589,46 @@ class TestGlances(unittest.TestCase): bar.percent = 110 self.assertEqual(bar.get(), '|||||||||||||||||||||||||||||||||||||||||||| >100%') - def test_100_plugin_model_dict(self): - """Test a standard plugin returning a dict""" - print('INFO: [TEST_100] Test standard plugin returning a dict') - plugin_instance = stats.get_plugin('mem') - plugin_instance.reset() # reset stats - plugin_instance.reset_views() # reset views - plugin_instance.reset_stats_history() # reset history - # Check before update - self.assertEqual(plugin_instance.get_raw(), plugin_instance.stats_init_value) - self.assertIsInstance(plugin_instance.get_raw(), dict) - self.assertEqual(plugin_instance.get_key(), None) - self.assertEqual(plugin_instance.is_enabled(), True) - self.assertEqual(plugin_instance.is_disabled(), False) - self.assertEqual(plugin_instance.history_enable(), True) - self.assertTrue(all([f in [h['name'] for h in plugin_instance.items_history_list] for f in plugin_instance.get_raw_history()])) - self.assertEqual(plugin_instance.get_views(), {}) - # Update stats - plugin_instance.update() - plugin_instance.update_stats_history() - plugin_instance.update_views() - # Check stats - self.assertIsInstance(plugin_instance.get_raw(), dict) - self.assertTrue(all([f in plugin_instance.fields_description for f in plugin_instance.get_raw()])) - self.assertEqual(plugin_instance.get_raw(), plugin_instance.get_export()) - self.assertEqual(plugin_instance.get_stats(), plugin_instance.get_json()) - self.assertEqual(json.loads(plugin_instance.get_stats()), plugin_instance.get_raw()) - # Check history - pass - # Check views - pass + # def test_100_system_plugin_method(self): + # """Test system plugin methods""" + # print('INFO: [TEST_100] Test system plugin methods') + # self._common_plugin_tests('system') - def test_110_plugin_model_list(self): - """Test a standard plugin returning a list""" - print('INFO: [TEST_110] Test standard plugin returning a list') - plugin_instance = stats.get_plugin('network') - # + sorted_stats + def test_101_cpu_plugin_method(self): + """Test cpu plugin methods""" + print('INFO: [TEST_100] Test cpu plugin methods') + self._common_plugin_tests('cpu') + + @unittest.skipIf(WINDOWS, "Load average not available on Windows") + def test_102_load_plugin_method(self): + """Test load plugin methods""" + print('INFO: [TEST_102] Test load plugin methods') + self._common_plugin_tests('load') + + def test_103_mem_plugin_method(self): + """Test mem plugin methods""" + print('INFO: [TEST_103] Test mem plugin methods') + self._common_plugin_tests('mem') + + def test_104_memswap_plugin_method(self): + """Test memswap plugin methods""" + print('INFO: [TEST_104] Test memswap plugin methods') + self._common_plugin_tests('memswap') + + def test_105_network_plugin_method(self): + """Test network plugin methods""" + print('INFO: [TEST_105] Test network plugin methods') + self._common_plugin_tests('network') + + # def test_106_diskio_plugin_method(self): + # """Test diskio plugin methods""" + # print('INFO: [TEST_106] Test diskio plugin methods') + # self._common_plugin_tests('diskio') + + def test_107_fs_plugin_method(self): + """Test fs plugin methods""" + print('INFO: [TEST_107] Test fs plugin methods') + self._common_plugin_tests('fs') def test_700_secure(self): """Test secure functions"""