diff --git a/tools/otagui/ota_interface.py b/tools/otagui/ota_interface.py index 1760e8ecc..a39705318 100644 --- a/tools/otagui/ota_interface.py +++ b/tools/otagui/ota_interface.py @@ -31,6 +31,10 @@ class JobInfo: isIncremental: bool = False def __post_init__(self): + """ + If the output, stdout, stderr paths are not set, automatically use + the job id as the file name. + """ if not self.output: self.output = os.path.join('output', self.id + '.zip') if not self.stdout: @@ -48,6 +52,15 @@ class JobInfo: self.isPartial = True def to_sql_form_dict(self): + """ + Convert this instance to a dict, which can be later used to insert into + the SQL database. + Format: + id: string, target: string, incremental: string, verbose: int, + partial: string, output:string, status:string, + downgrade: bool, extra: string, stdout: string, stderr:string, + start_time:int, finish_time: int(not required) + """ sql_form_dict = asdict(self) sql_form_dict['partial'] = ','.join(sql_form_dict['partial']) def bool_to_int(t): return 1 if t else 0 @@ -57,6 +70,9 @@ class JobInfo: return sql_form_dict def to_dict_basic(self): + """ + Convert the instance to a dict, which includes the file name of target. + """ basic_info = asdict(self) basic_info['target_name'] = self.target.split('/')[-1] if self.isIncremental: @@ -64,6 +80,10 @@ class JobInfo: return basic_info def to_dict_detail(self, target_lib, offset=0): + """ + Convert this instance into a dict, which includes some detailed information + of the target/source build, i.e. build version and file name. + """ detail_info = asdict(self) try: with open(self.stdout, 'r') as fout: diff --git a/tools/otagui/test_ota_interface.py b/tools/otagui/test_ota_interface.py new file mode 100644 index 000000000..effbb3a7c --- /dev/null +++ b/tools/otagui/test_ota_interface.py @@ -0,0 +1,182 @@ +import unittest +from ota_interface import JobInfo, ProcessesManagement +from unittest.mock import patch, mock_open, Mock, MagicMock + +class TestJobInfo(unittest.TestCase): + def setUp(self): + self.test_id = '286feab0-f16b-11eb-a72a-f7b1de0921ef' + self.test_target = 'target/build.zip' + self.test_verbose = True + self.test_status = 'Running' + self.test_extra = '--downgrade' + self.test_start_time = 1628698830 + self.test_finish_time = 1628698831 + + def setup_job(self, incremental = '', partial = [], + output = '', stdout = '', stderr = ''): + job_info = JobInfo(id=self.test_id, + target=self.test_target, + incremental=incremental, + verbose=self.test_verbose, + partial=partial, + output=output, + status=self.test_status, + extra=self.test_extra, + start_time=self.test_start_time, + finish_time=self.test_finish_time, + stderr=stderr, + stdout=stdout + ) + return job_info + + def test_init(self): + # No incremental, no output, no stdout/stderr set. + job_info1 = self.setup_job() + for key, value in self.__dict__.items(): + if key.startswith('test_'): + self.assertEqual(job_info1.__dict__[key[5:]], value, + 'The value of ' + key + 'is not initialized correctly' + ) + self.assertEqual(job_info1.output, 'output/'+self.test_id+'.zip', + 'Default output cannot be setup correctly' + ) + self.assertEqual(job_info1.stderr, 'output/stderr.'+self.test_id, + 'Default stderr cannot be setup correctly' + ) + self.assertEqual(job_info1.stdout, 'output/stdout.'+self.test_id, + 'Default stdout cannot be setup correctly' + ) + # Test the incremental setup + job_info2 = self.setup_job(incremental='target/source.zip') + self.assertEqual(job_info2.incremental, 'target/source.zip', + 'incremental source cannot be initialized correctly' + ) + self.assertTrue(job_info2.isIncremental, + 'incremental status cannot be initialized correctly' + ) + # Test the stdout/stderr setup + job_info3 = self.setup_job(stderr='output/stderr', + stdout='output/stdout' + ) + self.assertEqual(job_info3.stderr, 'output/stderr', + 'the stderr cannot be setup manually' + ) + self.assertEqual(job_info3.stdout, 'output/stdout', + 'the stdout cannot be setup manually' + ) + # Test the output setup + job_info4 = self.setup_job(output='output/output.zip') + self.assertEqual(job_info4.output, 'output/output.zip', + 'output cannot be setup manually' + ) + # Test the partial setup + job_info5 = self.setup_job(partial=['system', 'vendor']) + self.assertEqual(job_info5.partial, ['system', 'vendor'], + 'partial list cannot be setup correctly' + ) + + def test_to_sql_form_dict(self): + partial_list = ['system', 'vendor'] + job_info = self.setup_job(partial=partial_list) + sql_dict = job_info.to_sql_form_dict() + test_dict = {'id': self.test_id, + 'target': self.test_target, + 'incremental': '', + 'verbose': int(self.test_verbose), + 'partial': ','.join(partial_list), + 'output': 'output/' + self.test_id + '.zip', + 'status': self.test_status, + 'extra': self.test_extra, + 'stdout': 'output/stdout.' + self.test_id, + 'stderr': 'output/stderr.' + self.test_id, + 'start_time': self.test_start_time, + 'finish_time': self.test_finish_time, + 'isPartial': True, + 'isIncremental': False + } + for key, value in test_dict.items(): + self.assertEqual(value, sql_dict[key], + 'the ' + key + ' is not converted to sql form dict correctly' + ) + + def test_to_dict_basic(self): + partial_list = ['system', 'vendor'] + job_info = self.setup_job(partial=partial_list) + basic_dict = job_info.to_dict_basic() + test_dict = {'id': self.test_id, + 'target': self.test_target, + 'target_name': self.test_target.split('/')[-1], + 'incremental': '', + 'verbose': int(self.test_verbose), + 'partial': partial_list, + 'output': 'output/' + self.test_id + '.zip', + 'status': self.test_status, + 'extra': self.test_extra, + 'stdout': 'output/stdout.' + self.test_id, + 'stderr': 'output/stderr.' + self.test_id, + 'start_time': self.test_start_time, + 'finish_time': self.test_finish_time, + 'isPartial': True, + 'isIncremental': False + } + for key, value in test_dict.items(): + self.assertEqual(value, basic_dict[key], + 'the ' + key + ' is not converted to basic form dict correctly' + ) + + def test_to_dict_detail(self): + partial_list = ['system', 'vendor'] + test_incremental = 'target/source.zip' + job_info = self.setup_job(partial=partial_list, incremental=test_incremental) + mock_target_lib = Mock() + mock_target_lib.get_build_by_path = Mock( + side_effect = [ + Mock(file_name='build.zip', build_version=''), + Mock(file_name='source.zip', build_version='') + ] + ) + test_dict = {'id': self.test_id, + 'target': self.test_target, + 'incremental': 'target/source.zip', + 'verbose': int(self.test_verbose), + 'partial': partial_list, + 'output': 'output/' + self.test_id + '.zip', + 'status': self.test_status, + 'extra': self.test_extra, + 'stdout': 'NO STD OUTPUT IS FOUND', + 'stderr': 'NO STD ERROR IS FOUND', + 'start_time': self.test_start_time, + 'finish_time': self.test_finish_time, + 'isPartial': True, + 'isIncremental': True + } + # Test with no stdout and stderr + dict_detail = job_info.to_dict_detail(mock_target_lib) + mock_target_lib.get_build_by_path.assert_any_call(self.test_target) + mock_target_lib.get_build_by_path.assert_any_call(test_incremental) + for key, value in test_dict.items(): + self.assertEqual(value, dict_detail[key], + 'the ' + key + ' is not converted to detailed dict correctly' + ) + # Test with mocked stdout and stderr + mock_target_lib.get_build_by_path = Mock( + side_effect = [ + Mock(file_name='build.zip', build_version=''), + Mock(file_name='source.zip', build_version='') + ] + ) + mock_file = mock_open(read_data="mock output") + with patch("builtins.open", mock_file): + dict_detail = job_info.to_dict_detail(mock_target_lib) + test_dict['stderr'] = 'mock output' + test_dict['stdout'] = 'mock output' + for key, value in test_dict.items(): + self.assertEqual(value, dict_detail[key], + 'the ' + key + ' is not converted to detailed dict correctly' + ) + +class TestProcessesManagement(unittest.TestCase): + pass + +if __name__ == '__main__': + unittest.main() \ No newline at end of file