diff options
Diffstat (limited to 'tools/releasetools/test_common.py')
-rw-r--r-- | tools/releasetools/test_common.py | 249 |
1 files changed, 218 insertions, 31 deletions
diff --git a/tools/releasetools/test_common.py b/tools/releasetools/test_common.py index 5fdc132..a861346 100644 --- a/tools/releasetools/test_common.py +++ b/tools/releasetools/test_common.py @@ -29,15 +29,54 @@ def random_string_with_holes(size, block_size, step_size): data[begin:end] = os.urandom(block_size) return "".join(data) +def get_2gb_string(): + kilobytes = 1024 + megabytes = 1024 * kilobytes + gigabytes = 1024 * megabytes + + size = int(2 * gigabytes + 1) + block_size = 4 * kilobytes + step_size = 4 * megabytes + two_gb_string = random_string_with_holes( + size, block_size, step_size) + return two_gb_string + class CommonZipTest(unittest.TestCase): + def _verify(self, zip_file, zip_file_name, arcname, contents, + test_file_name=None, expected_stat=None, expected_mode=0o644, + expected_compress_type=zipfile.ZIP_STORED): + # Verify the stat if present. + if test_file_name is not None: + new_stat = os.stat(test_file_name) + self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode)) + self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime)) + + # Reopen the zip file to verify. + zip_file = zipfile.ZipFile(zip_file_name, "r") + + # Verify the timestamp. + info = zip_file.getinfo(arcname) + self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0)) + + # Verify the file mode. + mode = (info.external_attr >> 16) & 0o777 + self.assertEqual(mode, expected_mode) + + # Verify the compress type. + self.assertEqual(info.compress_type, expected_compress_type) + + # Verify the zip contents. + self.assertEqual(zip_file.read(arcname), contents) + self.assertIsNone(zip_file.testzip()) + def _test_ZipWrite(self, contents, extra_zipwrite_args=None): extra_zipwrite_args = dict(extra_zipwrite_args or {}) test_file = tempfile.NamedTemporaryFile(delete=False) - zip_file = tempfile.NamedTemporaryFile(delete=False) - test_file_name = test_file.name + + zip_file = tempfile.NamedTemporaryFile(delete=False) zip_file_name = zip_file.name # File names within an archive strip the leading slash. @@ -52,31 +91,103 @@ class CommonZipTest(unittest.TestCase): test_file.write(contents) test_file.close() - old_stat = os.stat(test_file_name) + expected_stat = os.stat(test_file_name) expected_mode = extra_zipwrite_args.get("perms", 0o644) - + expected_compress_type = extra_zipwrite_args.get("compress_type", + zipfile.ZIP_STORED) time.sleep(5) # Make sure the atime/mtime will change measurably. common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args) + common.ZipClose(zip_file) - new_stat = os.stat(test_file_name) - self.assertEqual(int(old_stat.st_mode), int(new_stat.st_mode)) - self.assertEqual(int(old_stat.st_mtime), int(new_stat.st_mtime)) - self.assertIsNone(zip_file.testzip()) - - zip_file.close() - zip_file = zipfile.ZipFile(zip_file_name, "r") - info = zip_file.getinfo(arcname) - - self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0)) - mode = (info.external_attr >> 16) & 0o777 - self.assertEqual(mode, expected_mode) - self.assertEqual(zip_file.read(arcname), contents) - self.assertIsNone(zip_file.testzip()) + self._verify(zip_file, zip_file_name, arcname, contents, test_file_name, + expected_stat, expected_mode, expected_compress_type) finally: os.remove(test_file_name) os.remove(zip_file_name) + def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None): + extra_args = dict(extra_args or {}) + + zip_file = tempfile.NamedTemporaryFile(delete=False) + zip_file_name = zip_file.name + zip_file.close() + + zip_file = zipfile.ZipFile(zip_file_name, "w") + + try: + expected_compress_type = extra_args.get("compress_type", + zipfile.ZIP_STORED) + time.sleep(5) # Make sure the atime/mtime will change measurably. + + if not isinstance(zinfo_or_arcname, zipfile.ZipInfo): + arcname = zinfo_or_arcname + expected_mode = extra_args.get("perms", 0o644) + else: + arcname = zinfo_or_arcname.filename + expected_mode = extra_args.get("perms", + zinfo_or_arcname.external_attr >> 16) + + common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args) + common.ZipClose(zip_file) + + self._verify(zip_file, zip_file_name, arcname, contents, + expected_mode=expected_mode, + expected_compress_type=expected_compress_type) + finally: + os.remove(zip_file_name) + + def _test_ZipWriteStr_large_file(self, large, small, extra_args=None): + extra_args = dict(extra_args or {}) + + zip_file = tempfile.NamedTemporaryFile(delete=False) + zip_file_name = zip_file.name + + test_file = tempfile.NamedTemporaryFile(delete=False) + test_file_name = test_file.name + + arcname_large = test_file_name + arcname_small = "bar" + + # File names within an archive strip the leading slash. + if arcname_large[0] == "/": + arcname_large = arcname_large[1:] + + zip_file.close() + zip_file = zipfile.ZipFile(zip_file_name, "w") + + try: + test_file.write(large) + test_file.close() + + expected_stat = os.stat(test_file_name) + expected_mode = 0o644 + expected_compress_type = extra_args.get("compress_type", + zipfile.ZIP_STORED) + time.sleep(5) # Make sure the atime/mtime will change measurably. + + common.ZipWrite(zip_file, test_file_name, **extra_args) + common.ZipWriteStr(zip_file, arcname_small, small, **extra_args) + common.ZipClose(zip_file) + + # Verify the contents written by ZipWrite(). + self._verify(zip_file, zip_file_name, arcname_large, large, + test_file_name, expected_stat, expected_mode, + expected_compress_type) + + # Verify the contents written by ZipWriteStr(). + self._verify(zip_file, zip_file_name, arcname_small, small, + expected_compress_type=expected_compress_type) + finally: + os.remove(zip_file_name) + os.remove(test_file_name) + + def _test_reset_ZIP64_LIMIT(self, func, *args): + default_limit = (1 << 31) - 1 + self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) + func(*args) + self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) + def test_ZipWrite(self): file_contents = os.urandom(1024) self._test_ZipWrite(file_contents) @@ -88,23 +199,99 @@ class CommonZipTest(unittest.TestCase): "perms": 0o777, "compress_type": zipfile.ZIP_DEFLATED, }) + self._test_ZipWrite(file_contents, { + "arcname": "foobar", + "perms": 0o700, + "compress_type": zipfile.ZIP_STORED, + }) def test_ZipWrite_large_file(self): - kilobytes = 1024 - megabytes = 1024 * kilobytes - gigabytes = 1024 * megabytes - - size = int(2 * gigabytes + 1) - block_size = 4 * kilobytes - step_size = 4 * megabytes - file_contents = random_string_with_holes( - size, block_size, step_size) + file_contents = get_2gb_string() self._test_ZipWrite(file_contents, { "compress_type": zipfile.ZIP_DEFLATED, }) def test_ZipWrite_resets_ZIP64_LIMIT(self): - default_limit = (1 << 31) - 1 - self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) - self._test_ZipWrite('') - self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) + self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "") + + def test_ZipWriteStr(self): + random_string = os.urandom(1024) + # Passing arcname + self._test_ZipWriteStr("foo", random_string) + + # Passing zinfo + zinfo = zipfile.ZipInfo(filename="foo") + self._test_ZipWriteStr(zinfo, random_string) + + # Timestamp in the zinfo should be overwritten. + zinfo.date_time = (2015, 3, 1, 15, 30, 0) + self._test_ZipWriteStr(zinfo, random_string) + + def test_ZipWriteStr_with_opts(self): + random_string = os.urandom(1024) + # Passing arcname + self._test_ZipWriteStr("foo", random_string, { + "perms": 0o700, + "compress_type": zipfile.ZIP_DEFLATED, + }) + self._test_ZipWriteStr("bar", random_string, { + "compress_type": zipfile.ZIP_STORED, + }) + + # Passing zinfo + zinfo = zipfile.ZipInfo(filename="foo") + self._test_ZipWriteStr(zinfo, random_string, { + "compress_type": zipfile.ZIP_DEFLATED, + }) + self._test_ZipWriteStr(zinfo, random_string, { + "perms": 0o600, + "compress_type": zipfile.ZIP_STORED, + }) + + def test_ZipWriteStr_large_file(self): + # zipfile.writestr() doesn't work when the str size is over 2GiB even with + # the workaround. We will only test the case of writing a string into a + # large archive. + long_string = get_2gb_string() + short_string = os.urandom(1024) + self._test_ZipWriteStr_large_file(long_string, short_string, { + "compress_type": zipfile.ZIP_DEFLATED, + }) + + def test_ZipWriteStr_resets_ZIP64_LIMIT(self): + self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "") + zinfo = zipfile.ZipInfo(filename="foo") + self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "") + + def test_bug21309935(self): + zip_file = tempfile.NamedTemporaryFile(delete=False) + zip_file_name = zip_file.name + zip_file.close() + + try: + random_string = os.urandom(1024) + zip_file = zipfile.ZipFile(zip_file_name, "w") + # Default perms should be 0o644 when passing the filename. + common.ZipWriteStr(zip_file, "foo", random_string) + # Honor the specified perms. + common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755) + # The perms in zinfo should be untouched. + zinfo = zipfile.ZipInfo(filename="baz") + zinfo.external_attr = 0o740 << 16 + common.ZipWriteStr(zip_file, zinfo, random_string) + # Explicitly specified perms has the priority. + zinfo = zipfile.ZipInfo(filename="qux") + zinfo.external_attr = 0o700 << 16 + common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400) + common.ZipClose(zip_file) + + self._verify(zip_file, zip_file_name, "foo", random_string, + expected_mode=0o644) + self._verify(zip_file, zip_file_name, "bar", random_string, + expected_mode=0o755) + self._verify(zip_file, zip_file_name, "baz", random_string, + expected_mode=0o740) + self._verify(zip_file, zip_file_name, "qux", random_string, + expected_mode=0o400) + finally: + os.remove(zip_file_name) |