aboutsummaryrefslogtreecommitdiffstats
path: root/python/google/protobuf/internal/message_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/google/protobuf/internal/message_test.py')
-rwxr-xr-xpython/google/protobuf/internal/message_test.py618
1 files changed, 13 insertions, 605 deletions
diff --git a/python/google/protobuf/internal/message_test.py b/python/google/protobuf/internal/message_test.py
index 48b7ffd..73a9a3a 100755
--- a/python/google/protobuf/internal/message_test.py
+++ b/python/google/protobuf/internal/message_test.py
@@ -2,7 +2,7 @@
#
# Protocol Buffers - Google's data interchange format
# Copyright 2008 Google Inc. All rights reserved.
-# https://developers.google.com/protocol-buffers/
+# http://code.google.com/p/protobuf/
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
@@ -43,639 +43,47 @@ abstract interface.
__author__ = 'gps@google.com (Gregory P. Smith)'
-import copy
-import math
-import operator
-import pickle
-import sys
-
-from google.apputils import basetest
+import unittest
+from google.protobuf import unittest_import_pb2
from google.protobuf import unittest_pb2
-from google.protobuf.internal import api_implementation
from google.protobuf.internal import test_util
-from google.protobuf import message
-
-# Python pre-2.6 does not have isinf() or isnan() functions, so we have
-# to provide our own.
-def isnan(val):
- # NaN is never equal to itself.
- return val != val
-def isinf(val):
- # Infinity times zero equals NaN.
- return not isnan(val) and isnan(val * 0)
-def IsPosInf(val):
- return isinf(val) and (val > 0)
-def IsNegInf(val):
- return isinf(val) and (val < 0)
-class MessageTest(basetest.TestCase):
-
- def testBadUtf8String(self):
- if api_implementation.Type() != 'python':
- self.skipTest("Skipping testBadUtf8String, currently only the python "
- "api implementation raises UnicodeDecodeError when a "
- "string field contains bad utf-8.")
- bad_utf8_data = test_util.GoldenFileData('bad_utf8_string')
- with self.assertRaises(UnicodeDecodeError) as context:
- unittest_pb2.TestAllTypes.FromString(bad_utf8_data)
- self.assertIn('field: protobuf_unittest.TestAllTypes.optional_string',
- str(context.exception))
+class MessageTest(unittest.TestCase):
def testGoldenMessage(self):
- golden_data = test_util.GoldenFileData(
- 'golden_message_oneof_implemented')
+ golden_data = test_util.GoldenFile('golden_message').read()
golden_message = unittest_pb2.TestAllTypes()
golden_message.ParseFromString(golden_data)
test_util.ExpectAllFieldsSet(self, golden_message)
- self.assertEqual(golden_data, golden_message.SerializeToString())
- golden_copy = copy.deepcopy(golden_message)
- self.assertEqual(golden_data, golden_copy.SerializeToString())
+ self.assertTrue(golden_message.SerializeToString() == golden_data)
def testGoldenExtensions(self):
- golden_data = test_util.GoldenFileData('golden_message')
+ golden_data = test_util.GoldenFile('golden_message').read()
golden_message = unittest_pb2.TestAllExtensions()
golden_message.ParseFromString(golden_data)
all_set = unittest_pb2.TestAllExtensions()
test_util.SetAllExtensions(all_set)
self.assertEquals(all_set, golden_message)
- self.assertEqual(golden_data, golden_message.SerializeToString())
- golden_copy = copy.deepcopy(golden_message)
- self.assertEqual(golden_data, golden_copy.SerializeToString())
+ self.assertTrue(golden_message.SerializeToString() == golden_data)
def testGoldenPackedMessage(self):
- golden_data = test_util.GoldenFileData('golden_packed_fields_message')
+ golden_data = test_util.GoldenFile('golden_packed_fields_message').read()
golden_message = unittest_pb2.TestPackedTypes()
golden_message.ParseFromString(golden_data)
all_set = unittest_pb2.TestPackedTypes()
test_util.SetAllPackedFields(all_set)
self.assertEquals(all_set, golden_message)
- self.assertEqual(golden_data, all_set.SerializeToString())
- golden_copy = copy.deepcopy(golden_message)
- self.assertEqual(golden_data, golden_copy.SerializeToString())
+ self.assertTrue(all_set.SerializeToString() == golden_data)
def testGoldenPackedExtensions(self):
- golden_data = test_util.GoldenFileData('golden_packed_fields_message')
+ golden_data = test_util.GoldenFile('golden_packed_fields_message').read()
golden_message = unittest_pb2.TestPackedExtensions()
golden_message.ParseFromString(golden_data)
all_set = unittest_pb2.TestPackedExtensions()
test_util.SetAllPackedExtensions(all_set)
self.assertEquals(all_set, golden_message)
- self.assertEqual(golden_data, all_set.SerializeToString())
- golden_copy = copy.deepcopy(golden_message)
- self.assertEqual(golden_data, golden_copy.SerializeToString())
-
- def testPickleSupport(self):
- golden_data = test_util.GoldenFileData('golden_message')
- golden_message = unittest_pb2.TestAllTypes()
- golden_message.ParseFromString(golden_data)
- pickled_message = pickle.dumps(golden_message)
-
- unpickled_message = pickle.loads(pickled_message)
- self.assertEquals(unpickled_message, golden_message)
-
-
- def testPickleIncompleteProto(self):
- golden_message = unittest_pb2.TestRequired(a=1)
- pickled_message = pickle.dumps(golden_message)
-
- unpickled_message = pickle.loads(pickled_message)
- self.assertEquals(unpickled_message, golden_message)
- self.assertEquals(unpickled_message.a, 1)
- # This is still an incomplete proto - so serializing should fail
- self.assertRaises(message.EncodeError, unpickled_message.SerializeToString)
-
- def testPositiveInfinity(self):
- golden_data = (b'\x5D\x00\x00\x80\x7F'
- b'\x61\x00\x00\x00\x00\x00\x00\xF0\x7F'
- b'\xCD\x02\x00\x00\x80\x7F'
- b'\xD1\x02\x00\x00\x00\x00\x00\x00\xF0\x7F')
- golden_message = unittest_pb2.TestAllTypes()
- golden_message.ParseFromString(golden_data)
- self.assertTrue(IsPosInf(golden_message.optional_float))
- self.assertTrue(IsPosInf(golden_message.optional_double))
- self.assertTrue(IsPosInf(golden_message.repeated_float[0]))
- self.assertTrue(IsPosInf(golden_message.repeated_double[0]))
- self.assertEqual(golden_data, golden_message.SerializeToString())
-
- def testNegativeInfinity(self):
- golden_data = (b'\x5D\x00\x00\x80\xFF'
- b'\x61\x00\x00\x00\x00\x00\x00\xF0\xFF'
- b'\xCD\x02\x00\x00\x80\xFF'
- b'\xD1\x02\x00\x00\x00\x00\x00\x00\xF0\xFF')
- golden_message = unittest_pb2.TestAllTypes()
- golden_message.ParseFromString(golden_data)
- self.assertTrue(IsNegInf(golden_message.optional_float))
- self.assertTrue(IsNegInf(golden_message.optional_double))
- self.assertTrue(IsNegInf(golden_message.repeated_float[0]))
- self.assertTrue(IsNegInf(golden_message.repeated_double[0]))
- self.assertEqual(golden_data, golden_message.SerializeToString())
-
- def testNotANumber(self):
- golden_data = (b'\x5D\x00\x00\xC0\x7F'
- b'\x61\x00\x00\x00\x00\x00\x00\xF8\x7F'
- b'\xCD\x02\x00\x00\xC0\x7F'
- b'\xD1\x02\x00\x00\x00\x00\x00\x00\xF8\x7F')
- golden_message = unittest_pb2.TestAllTypes()
- golden_message.ParseFromString(golden_data)
- self.assertTrue(isnan(golden_message.optional_float))
- self.assertTrue(isnan(golden_message.optional_double))
- self.assertTrue(isnan(golden_message.repeated_float[0]))
- self.assertTrue(isnan(golden_message.repeated_double[0]))
-
- # The protocol buffer may serialize to any one of multiple different
- # representations of a NaN. Rather than verify a specific representation,
- # verify the serialized string can be converted into a correctly
- # behaving protocol buffer.
- serialized = golden_message.SerializeToString()
- message = unittest_pb2.TestAllTypes()
- message.ParseFromString(serialized)
- self.assertTrue(isnan(message.optional_float))
- self.assertTrue(isnan(message.optional_double))
- self.assertTrue(isnan(message.repeated_float[0]))
- self.assertTrue(isnan(message.repeated_double[0]))
-
- def testPositiveInfinityPacked(self):
- golden_data = (b'\xA2\x06\x04\x00\x00\x80\x7F'
- b'\xAA\x06\x08\x00\x00\x00\x00\x00\x00\xF0\x7F')
- golden_message = unittest_pb2.TestPackedTypes()
- golden_message.ParseFromString(golden_data)
- self.assertTrue(IsPosInf(golden_message.packed_float[0]))
- self.assertTrue(IsPosInf(golden_message.packed_double[0]))
- self.assertEqual(golden_data, golden_message.SerializeToString())
-
- def testNegativeInfinityPacked(self):
- golden_data = (b'\xA2\x06\x04\x00\x00\x80\xFF'
- b'\xAA\x06\x08\x00\x00\x00\x00\x00\x00\xF0\xFF')
- golden_message = unittest_pb2.TestPackedTypes()
- golden_message.ParseFromString(golden_data)
- self.assertTrue(IsNegInf(golden_message.packed_float[0]))
- self.assertTrue(IsNegInf(golden_message.packed_double[0]))
- self.assertEqual(golden_data, golden_message.SerializeToString())
-
- def testNotANumberPacked(self):
- golden_data = (b'\xA2\x06\x04\x00\x00\xC0\x7F'
- b'\xAA\x06\x08\x00\x00\x00\x00\x00\x00\xF8\x7F')
- golden_message = unittest_pb2.TestPackedTypes()
- golden_message.ParseFromString(golden_data)
- self.assertTrue(isnan(golden_message.packed_float[0]))
- self.assertTrue(isnan(golden_message.packed_double[0]))
-
- serialized = golden_message.SerializeToString()
- message = unittest_pb2.TestPackedTypes()
- message.ParseFromString(serialized)
- self.assertTrue(isnan(message.packed_float[0]))
- self.assertTrue(isnan(message.packed_double[0]))
-
- def testExtremeFloatValues(self):
- message = unittest_pb2.TestAllTypes()
-
- # Most positive exponent, no significand bits set.
- kMostPosExponentNoSigBits = math.pow(2, 127)
- message.optional_float = kMostPosExponentNoSigBits
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_float == kMostPosExponentNoSigBits)
-
- # Most positive exponent, one significand bit set.
- kMostPosExponentOneSigBit = 1.5 * math.pow(2, 127)
- message.optional_float = kMostPosExponentOneSigBit
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_float == kMostPosExponentOneSigBit)
-
- # Repeat last two cases with values of same magnitude, but negative.
- message.optional_float = -kMostPosExponentNoSigBits
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_float == -kMostPosExponentNoSigBits)
-
- message.optional_float = -kMostPosExponentOneSigBit
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_float == -kMostPosExponentOneSigBit)
-
- # Most negative exponent, no significand bits set.
- kMostNegExponentNoSigBits = math.pow(2, -127)
- message.optional_float = kMostNegExponentNoSigBits
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_float == kMostNegExponentNoSigBits)
-
- # Most negative exponent, one significand bit set.
- kMostNegExponentOneSigBit = 1.5 * math.pow(2, -127)
- message.optional_float = kMostNegExponentOneSigBit
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_float == kMostNegExponentOneSigBit)
-
- # Repeat last two cases with values of the same magnitude, but negative.
- message.optional_float = -kMostNegExponentNoSigBits
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_float == -kMostNegExponentNoSigBits)
-
- message.optional_float = -kMostNegExponentOneSigBit
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_float == -kMostNegExponentOneSigBit)
-
- def testExtremeDoubleValues(self):
- message = unittest_pb2.TestAllTypes()
-
- # Most positive exponent, no significand bits set.
- kMostPosExponentNoSigBits = math.pow(2, 1023)
- message.optional_double = kMostPosExponentNoSigBits
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_double == kMostPosExponentNoSigBits)
-
- # Most positive exponent, one significand bit set.
- kMostPosExponentOneSigBit = 1.5 * math.pow(2, 1023)
- message.optional_double = kMostPosExponentOneSigBit
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_double == kMostPosExponentOneSigBit)
-
- # Repeat last two cases with values of same magnitude, but negative.
- message.optional_double = -kMostPosExponentNoSigBits
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_double == -kMostPosExponentNoSigBits)
-
- message.optional_double = -kMostPosExponentOneSigBit
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_double == -kMostPosExponentOneSigBit)
-
- # Most negative exponent, no significand bits set.
- kMostNegExponentNoSigBits = math.pow(2, -1023)
- message.optional_double = kMostNegExponentNoSigBits
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_double == kMostNegExponentNoSigBits)
-
- # Most negative exponent, one significand bit set.
- kMostNegExponentOneSigBit = 1.5 * math.pow(2, -1023)
- message.optional_double = kMostNegExponentOneSigBit
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_double == kMostNegExponentOneSigBit)
-
- # Repeat last two cases with values of the same magnitude, but negative.
- message.optional_double = -kMostNegExponentNoSigBits
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_double == -kMostNegExponentNoSigBits)
-
- message.optional_double = -kMostNegExponentOneSigBit
- message.ParseFromString(message.SerializeToString())
- self.assertTrue(message.optional_double == -kMostNegExponentOneSigBit)
-
- def testFloatPrinting(self):
- message = unittest_pb2.TestAllTypes()
- message.optional_float = 2.0
- self.assertEqual(str(message), 'optional_float: 2.0\n')
-
- def testHighPrecisionFloatPrinting(self):
- message = unittest_pb2.TestAllTypes()
- message.optional_double = 0.12345678912345678
- if sys.version_info.major >= 3:
- self.assertEqual(str(message), 'optional_double: 0.12345678912345678\n')
- else:
- self.assertEqual(str(message), 'optional_double: 0.123456789123\n')
-
- def testUnknownFieldPrinting(self):
- populated = unittest_pb2.TestAllTypes()
- test_util.SetAllNonLazyFields(populated)
- empty = unittest_pb2.TestEmptyMessage()
- empty.ParseFromString(populated.SerializeToString())
- self.assertEqual(str(empty), '')
-
- def testSortingRepeatedScalarFieldsDefaultComparator(self):
- """Check some different types with the default comparator."""
- message = unittest_pb2.TestAllTypes()
-
- # TODO(mattp): would testing more scalar types strengthen test?
- message.repeated_int32.append(1)
- message.repeated_int32.append(3)
- message.repeated_int32.append(2)
- message.repeated_int32.sort()
- self.assertEqual(message.repeated_int32[0], 1)
- self.assertEqual(message.repeated_int32[1], 2)
- self.assertEqual(message.repeated_int32[2], 3)
-
- message.repeated_float.append(1.1)
- message.repeated_float.append(1.3)
- message.repeated_float.append(1.2)
- message.repeated_float.sort()
- self.assertAlmostEqual(message.repeated_float[0], 1.1)
- self.assertAlmostEqual(message.repeated_float[1], 1.2)
- self.assertAlmostEqual(message.repeated_float[2], 1.3)
-
- message.repeated_string.append('a')
- message.repeated_string.append('c')
- message.repeated_string.append('b')
- message.repeated_string.sort()
- self.assertEqual(message.repeated_string[0], 'a')
- self.assertEqual(message.repeated_string[1], 'b')
- self.assertEqual(message.repeated_string[2], 'c')
-
- message.repeated_bytes.append(b'a')
- message.repeated_bytes.append(b'c')
- message.repeated_bytes.append(b'b')
- message.repeated_bytes.sort()
- self.assertEqual(message.repeated_bytes[0], b'a')
- self.assertEqual(message.repeated_bytes[1], b'b')
- self.assertEqual(message.repeated_bytes[2], b'c')
-
- def testSortingRepeatedScalarFieldsCustomComparator(self):
- """Check some different types with custom comparator."""
- message = unittest_pb2.TestAllTypes()
-
- message.repeated_int32.append(-3)
- message.repeated_int32.append(-2)
- message.repeated_int32.append(-1)
- message.repeated_int32.sort(key=abs)
- self.assertEqual(message.repeated_int32[0], -1)
- self.assertEqual(message.repeated_int32[1], -2)
- self.assertEqual(message.repeated_int32[2], -3)
-
- message.repeated_string.append('aaa')
- message.repeated_string.append('bb')
- message.repeated_string.append('c')
- message.repeated_string.sort(key=len)
- self.assertEqual(message.repeated_string[0], 'c')
- self.assertEqual(message.repeated_string[1], 'bb')
- self.assertEqual(message.repeated_string[2], 'aaa')
-
- def testSortingRepeatedCompositeFieldsCustomComparator(self):
- """Check passing a custom comparator to sort a repeated composite field."""
- message = unittest_pb2.TestAllTypes()
-
- message.repeated_nested_message.add().bb = 1
- message.repeated_nested_message.add().bb = 3
- message.repeated_nested_message.add().bb = 2
- message.repeated_nested_message.add().bb = 6
- message.repeated_nested_message.add().bb = 5
- message.repeated_nested_message.add().bb = 4
- message.repeated_nested_message.sort(key=operator.attrgetter('bb'))
- self.assertEqual(message.repeated_nested_message[0].bb, 1)
- self.assertEqual(message.repeated_nested_message[1].bb, 2)
- self.assertEqual(message.repeated_nested_message[2].bb, 3)
- self.assertEqual(message.repeated_nested_message[3].bb, 4)
- self.assertEqual(message.repeated_nested_message[4].bb, 5)
- self.assertEqual(message.repeated_nested_message[5].bb, 6)
-
- def testRepeatedCompositeFieldSortArguments(self):
- """Check sorting a repeated composite field using list.sort() arguments."""
- message = unittest_pb2.TestAllTypes()
-
- get_bb = operator.attrgetter('bb')
- cmp_bb = lambda a, b: cmp(a.bb, b.bb)
- message.repeated_nested_message.add().bb = 1
- message.repeated_nested_message.add().bb = 3
- message.repeated_nested_message.add().bb = 2
- message.repeated_nested_message.add().bb = 6
- message.repeated_nested_message.add().bb = 5
- message.repeated_nested_message.add().bb = 4
- message.repeated_nested_message.sort(key=get_bb)
- self.assertEqual([k.bb for k in message.repeated_nested_message],
- [1, 2, 3, 4, 5, 6])
- message.repeated_nested_message.sort(key=get_bb, reverse=True)
- self.assertEqual([k.bb for k in message.repeated_nested_message],
- [6, 5, 4, 3, 2, 1])
- if sys.version_info.major >= 3: return # No cmp sorting in PY3.
- message.repeated_nested_message.sort(sort_function=cmp_bb)
- self.assertEqual([k.bb for k in message.repeated_nested_message],
- [1, 2, 3, 4, 5, 6])
- message.repeated_nested_message.sort(cmp=cmp_bb, reverse=True)
- self.assertEqual([k.bb for k in message.repeated_nested_message],
- [6, 5, 4, 3, 2, 1])
-
- def testRepeatedScalarFieldSortArguments(self):
- """Check sorting a scalar field using list.sort() arguments."""
- message = unittest_pb2.TestAllTypes()
-
- message.repeated_int32.append(-3)
- message.repeated_int32.append(-2)
- message.repeated_int32.append(-1)
- message.repeated_int32.sort(key=abs)
- self.assertEqual(list(message.repeated_int32), [-1, -2, -3])
- message.repeated_int32.sort(key=abs, reverse=True)
- self.assertEqual(list(message.repeated_int32), [-3, -2, -1])
- if sys.version_info.major < 3: # No cmp sorting in PY3.
- abs_cmp = lambda a, b: cmp(abs(a), abs(b))
- message.repeated_int32.sort(sort_function=abs_cmp)
- self.assertEqual(list(message.repeated_int32), [-1, -2, -3])
- message.repeated_int32.sort(cmp=abs_cmp, reverse=True)
- self.assertEqual(list(message.repeated_int32), [-3, -2, -1])
-
- message.repeated_string.append('aaa')
- message.repeated_string.append('bb')
- message.repeated_string.append('c')
- message.repeated_string.sort(key=len)
- self.assertEqual(list(message.repeated_string), ['c', 'bb', 'aaa'])
- message.repeated_string.sort(key=len, reverse=True)
- self.assertEqual(list(message.repeated_string), ['aaa', 'bb', 'c'])
- if sys.version_info.major < 3: # No cmp sorting in PY3.
- len_cmp = lambda a, b: cmp(len(a), len(b))
- message.repeated_string.sort(sort_function=len_cmp)
- self.assertEqual(list(message.repeated_string), ['c', 'bb', 'aaa'])
- message.repeated_string.sort(cmp=len_cmp, reverse=True)
- self.assertEqual(list(message.repeated_string), ['aaa', 'bb', 'c'])
-
- def testRepeatedFieldsComparable(self):
- m1 = unittest_pb2.TestAllTypes()
- m2 = unittest_pb2.TestAllTypes()
- m1.repeated_int32.append(0)
- m1.repeated_int32.append(1)
- m1.repeated_int32.append(2)
- m2.repeated_int32.append(0)
- m2.repeated_int32.append(1)
- m2.repeated_int32.append(2)
- m1.repeated_nested_message.add().bb = 1
- m1.repeated_nested_message.add().bb = 2
- m1.repeated_nested_message.add().bb = 3
- m2.repeated_nested_message.add().bb = 1
- m2.repeated_nested_message.add().bb = 2
- m2.repeated_nested_message.add().bb = 3
-
- if sys.version_info.major >= 3: return # No cmp() in PY3.
-
- # These comparisons should not raise errors.
- _ = m1 < m2
- _ = m1.repeated_nested_message < m2.repeated_nested_message
-
- # Make sure cmp always works. If it wasn't defined, these would be
- # id() comparisons and would all fail.
- self.assertEqual(cmp(m1, m2), 0)
- self.assertEqual(cmp(m1.repeated_int32, m2.repeated_int32), 0)
- self.assertEqual(cmp(m1.repeated_int32, [0, 1, 2]), 0)
- self.assertEqual(cmp(m1.repeated_nested_message,
- m2.repeated_nested_message), 0)
- with self.assertRaises(TypeError):
- # Can't compare repeated composite containers to lists.
- cmp(m1.repeated_nested_message, m2.repeated_nested_message[:])
-
- # TODO(anuraag): Implement extensiondict comparison in C++ and then add test
-
- def testParsingMerge(self):
- """Check the merge behavior when a required or optional field appears
- multiple times in the input."""
- messages = [
- unittest_pb2.TestAllTypes(),
- unittest_pb2.TestAllTypes(),
- unittest_pb2.TestAllTypes() ]
- messages[0].optional_int32 = 1
- messages[1].optional_int64 = 2
- messages[2].optional_int32 = 3
- messages[2].optional_string = 'hello'
-
- merged_message = unittest_pb2.TestAllTypes()
- merged_message.optional_int32 = 3
- merged_message.optional_int64 = 2
- merged_message.optional_string = 'hello'
-
- generator = unittest_pb2.TestParsingMerge.RepeatedFieldsGenerator()
- generator.field1.extend(messages)
- generator.field2.extend(messages)
- generator.field3.extend(messages)
- generator.ext1.extend(messages)
- generator.ext2.extend(messages)
- generator.group1.add().field1.MergeFrom(messages[0])
- generator.group1.add().field1.MergeFrom(messages[1])
- generator.group1.add().field1.MergeFrom(messages[2])
- generator.group2.add().field1.MergeFrom(messages[0])
- generator.group2.add().field1.MergeFrom(messages[1])
- generator.group2.add().field1.MergeFrom(messages[2])
-
- data = generator.SerializeToString()
- parsing_merge = unittest_pb2.TestParsingMerge()
- parsing_merge.ParseFromString(data)
-
- # Required and optional fields should be merged.
- self.assertEqual(parsing_merge.required_all_types, merged_message)
- self.assertEqual(parsing_merge.optional_all_types, merged_message)
- self.assertEqual(parsing_merge.optionalgroup.optional_group_all_types,
- merged_message)
- self.assertEqual(parsing_merge.Extensions[
- unittest_pb2.TestParsingMerge.optional_ext],
- merged_message)
-
- # Repeated fields should not be merged.
- self.assertEqual(len(parsing_merge.repeated_all_types), 3)
- self.assertEqual(len(parsing_merge.repeatedgroup), 3)
- self.assertEqual(len(parsing_merge.Extensions[
- unittest_pb2.TestParsingMerge.repeated_ext]), 3)
-
- def ensureNestedMessageExists(self, msg, attribute):
- """Make sure that a nested message object exists.
-
- As soon as a nested message attribute is accessed, it will be present in the
- _fields dict, without being marked as actually being set.
- """
- getattr(msg, attribute)
- self.assertFalse(msg.HasField(attribute))
-
- def testOneofGetCaseNonexistingField(self):
- m = unittest_pb2.TestAllTypes()
- self.assertRaises(ValueError, m.WhichOneof, 'no_such_oneof_field')
-
- def testOneofSemantics(self):
- m = unittest_pb2.TestAllTypes()
- self.assertIs(None, m.WhichOneof('oneof_field'))
-
- m.oneof_uint32 = 11
- self.assertEqual('oneof_uint32', m.WhichOneof('oneof_field'))
- self.assertTrue(m.HasField('oneof_uint32'))
-
- m.oneof_string = u'foo'
- self.assertEqual('oneof_string', m.WhichOneof('oneof_field'))
- self.assertFalse(m.HasField('oneof_uint32'))
- self.assertTrue(m.HasField('oneof_string'))
-
- m.oneof_nested_message.bb = 11
- self.assertEqual('oneof_nested_message', m.WhichOneof('oneof_field'))
- self.assertFalse(m.HasField('oneof_string'))
- self.assertTrue(m.HasField('oneof_nested_message'))
-
- m.oneof_bytes = b'bb'
- self.assertEqual('oneof_bytes', m.WhichOneof('oneof_field'))
- self.assertFalse(m.HasField('oneof_nested_message'))
- self.assertTrue(m.HasField('oneof_bytes'))
-
- def testOneofCompositeFieldReadAccess(self):
- m = unittest_pb2.TestAllTypes()
- m.oneof_uint32 = 11
-
- self.ensureNestedMessageExists(m, 'oneof_nested_message')
- self.assertEqual('oneof_uint32', m.WhichOneof('oneof_field'))
- self.assertEqual(11, m.oneof_uint32)
-
- def testOneofHasField(self):
- m = unittest_pb2.TestAllTypes()
- self.assertFalse(m.HasField('oneof_field'))
- m.oneof_uint32 = 11
- self.assertTrue(m.HasField('oneof_field'))
- m.oneof_bytes = b'bb'
- self.assertTrue(m.HasField('oneof_field'))
- m.ClearField('oneof_bytes')
- self.assertFalse(m.HasField('oneof_field'))
-
- def testOneofClearField(self):
- m = unittest_pb2.TestAllTypes()
- m.oneof_uint32 = 11
- m.ClearField('oneof_field')
- self.assertFalse(m.HasField('oneof_field'))
- self.assertFalse(m.HasField('oneof_uint32'))
- self.assertIs(None, m.WhichOneof('oneof_field'))
-
- def testOneofClearSetField(self):
- m = unittest_pb2.TestAllTypes()
- m.oneof_uint32 = 11
- m.ClearField('oneof_uint32')
- self.assertFalse(m.HasField('oneof_field'))
- self.assertFalse(m.HasField('oneof_uint32'))
- self.assertIs(None, m.WhichOneof('oneof_field'))
-
- def testOneofClearUnsetField(self):
- m = unittest_pb2.TestAllTypes()
- m.oneof_uint32 = 11
- self.ensureNestedMessageExists(m, 'oneof_nested_message')
- m.ClearField('oneof_nested_message')
- self.assertEqual(11, m.oneof_uint32)
- self.assertTrue(m.HasField('oneof_field'))
- self.assertTrue(m.HasField('oneof_uint32'))
- self.assertEqual('oneof_uint32', m.WhichOneof('oneof_field'))
-
- def testOneofDeserialize(self):
- m = unittest_pb2.TestAllTypes()
- m.oneof_uint32 = 11
- m2 = unittest_pb2.TestAllTypes()
- m2.ParseFromString(m.SerializeToString())
- self.assertEqual('oneof_uint32', m2.WhichOneof('oneof_field'))
-
- def testSortEmptyRepeatedCompositeContainer(self):
- """Exercise a scenario that has led to segfaults in the past.
- """
- m = unittest_pb2.TestAllTypes()
- m.repeated_nested_message.sort()
-
- def testHasFieldOnRepeatedField(self):
- """Using HasField on a repeated field should raise an exception.
- """
- m = unittest_pb2.TestAllTypes()
- with self.assertRaises(ValueError) as _:
- m.HasField('repeated_int32')
-
-
-class ValidTypeNamesTest(basetest.TestCase):
-
- def assertImportFromName(self, msg, base_name):
- # Parse <type 'module.class_name'> to extra 'some.name' as a string.
- tp_name = str(type(msg)).split("'")[1]
- valid_names = ('Repeated%sContainer' % base_name,
- 'Repeated%sFieldContainer' % base_name)
- self.assertTrue(any(tp_name.endswith(v) for v in valid_names),
- '%r does end with any of %r' % (tp_name, valid_names))
-
- parts = tp_name.split('.')
- class_name = parts[-1]
- module_name = '.'.join(parts[:-1])
- __import__(module_name, fromlist=[class_name])
-
- def testTypeNamesCanBeImported(self):
- # If import doesn't work, pickling won't work either.
- pb = unittest_pb2.TestAllTypes()
- self.assertImportFromName(pb.repeated_int32, 'Scalar')
- self.assertImportFromName(pb.repeated_nested_message, 'Composite')
-
+ self.assertTrue(all_set.SerializeToString() == golden_data)
if __name__ == '__main__':
- basetest.main()
+ unittest.main()