1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19  import email 
 20  import os.path 
 21  import unittest 
 22  import time 
 23   
 24  import dkim 
 25   
 26   
 28      """Get the content of the given test data file. 
 29   
 30      The files live in dkim/tests/data. 
 31      """ 
 32      path = os.path.join(os.path.dirname(__file__), 'data', filename) 
 33      with open(path, 'rb') as f: 
 34          return f.read() 
  35   
 36   
 38   
 40          self.assertEqual( 
 41              b"foo", dkim.fold(b"foo")) 
  42   
 44           
 45           
 46          self.assertEqual( 
 47              b"foo" * 24 + b"\r\n foo", dkim.fold(b"foo" * 25)) 
   48   
 49   
 51      """End-to-end signature and verification tests.""" 
 52   
 56   
 58          sample_dns = """\ 
 59  k=rsa; \ 
 60  p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ 
 61  b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" 
 62   
 63          _dns_responses = { 
 64            'example._domainkey.canonical.com.': sample_dns, 
 65            'test._domainkey.example.com.': read_test_data("test.txt"), 
 66            '20120113._domainkey.gmail.com.': """k=rsa; \ 
 67  p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\ 
 68  +eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\ 
 69  s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\ 
 70  hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\ 
 71  MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\ 
 72  Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB""" 
 73          } 
 74          try: 
 75              domain = domain.decode('ascii') 
 76          except UnicodeDecodeError: 
 77              return None 
 78          self.assertTrue(domain in _dns_responses,domain) 
 79          return _dns_responses[domain] 
  80   
 82          sample_dns = """\ 
 83  k=rsa; \ 
 84  p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ 
 85  b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" 
 86   
 87          _dns_responses = { 
 88            'example._domainkey.canonical.com.': sample_dns, 
 89            'test._domainkey.example.com.': read_test_data("test2.txt"), 
 90            '20120113._domainkey.gmail.com.': """\ 
 91  p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\ 
 92  +eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\ 
 93  s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\ 
 94  hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\ 
 95  MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\ 
 96  Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB""" 
 97          } 
 98          try: 
 99              domain = domain.decode('ascii') 
100          except UnicodeDecodeError: 
101              return None 
102          self.assertTrue(domain in _dns_responses,domain) 
103          return _dns_responses[domain] 
 104   
106           
107          for header_algo in (b"simple", b"relaxed"): 
108              for body_algo in (b"simple", b"relaxed"): 
109                  sig = dkim.sign( 
110                      self.message, b"test", b"example.com", self.key, 
111                      canonicalize=(header_algo, body_algo)) 
112                  res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) 
113                  self.assertTrue(res) 
 114   
116           
117          for header_algo in (b"simple", b"relaxed"): 
118              for body_algo in (b"simple", b"relaxed"): 
119                  sig = dkim.sign( 
120                      self.message, b"test", b"example.com", self.key, 
121                      canonicalize=(header_algo, body_algo)) 
122                  res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc2) 
123                  self.assertTrue(res) 
 124   
126           
127          for header_algo in (b"simple", b"relaxed"): 
128               for body_algo in (b"simple", b"relaxed"): 
129                  sig = dkim.sign( 
130                      self.message, b"test", b"example.com", self.key, 
131                      canonicalize=(header_algo, body_algo), 
132                      include_headers=(b'from',) + dkim.DKIM.SHOULD) 
133                  res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) 
134                  self.assertTrue(res) 
 135   
137          sig = dkim.sign( 
138              self.message, b"test", b"example.com", self.key, length=True) 
139          msg = email.message_from_string(self.message.decode('utf-8')) 
140          self.assertIn('; l=%s' % len(msg.get_payload() + '\n'), sig.decode('utf-8')) 
141          res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) 
142          self.assertTrue(res) 
 143   
145           
146          for header_algo in (b"simple", b"relaxed"): 
147              for body_algo in (b"simple", b"relaxed"): 
148                  sig = dkim.sign( 
149                      self.message, b"test", b"example.com", self.key) 
150                  res = dkim.verify( 
151                      sig + self.message + b"foo", dnsfunc=self.dnsfunc) 
152                  self.assertFalse(res) 
 153   
155           
156          sig = dkim.sign(self.message, b"test", b"example.com\xe9", self.key) 
157          res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) 
158          self.assertFalse(res) 
 159   
161         
162         
163         
164         
165         
166        sample_msg = b"""\ 
167  From: mbp@canonical.com 
168  To: scottk@example.com 
169  Subject: this is my 
170      test message 
171  """.replace(b'\n', b'\r\n') 
172   
173        sample_privkey = b"""\ 
174  -----BEGIN RSA PRIVATE KEY----- 
175  MIIBOwIBAAJBANmBe10IgY+u7h3enWTukkqtUD5PR52Tb/mPfjC0QJTocVBq6Za/ 
176  PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQJAYFUKsD+uMlcFu1D3YNaR 
177  EGYGXjJ6w32jYGJ/P072M3yWOq2S1dvDthI3nRT8MFjZ1wHDAYHrSpfDNJ3v2fvZ 
178  cQIhAPgRPmVYn+TGd59asiqG1SZqh+p+CRYHW7B8BsicG5t3AiEA4HYNOohlgWan 
179  8tKgqLJgUdPFbaHZO1nDyBgvV8hvWZUCIQDDdCq6hYKuKeYUy8w3j7cgJq3ih922 
180  2qNWwdJCfCWQbwIgTY0cBvQnNe0067WQIpj2pG7pkHZR6qqZ9SE+AjNTHX0CIQCI 
181  Mgq55Y9MCq5wqzy141rnxrJxTwK9ABo3IAFMWEov3g== 
182  -----END RSA PRIVATE KEY----- 
183  """ 
184   
185        sample_pubkey = """\ 
186  -----BEGIN PUBLIC KEY----- 
187  MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T 
188  b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ== 
189  -----END PUBLIC KEY----- 
190  """ 
191   
192        for header_mode in [dkim.Relaxed, dkim.Simple]: 
193   
194          dkim_header = dkim.sign(sample_msg, b'example', b'canonical.com', 
195              sample_privkey, canonicalize=(header_mode, dkim.Relaxed)) 
196           
197           
198           
199           
200           
201           
202          signed = dkim.fold(dkim_header) + sample_msg 
203          result = dkim.verify(signed,dnsfunc=self.dnsfunc, 
204                  minkey=512) 
205          self.assertTrue(result) 
206          dkim_header = dkim.fold(dkim_header) 
207           
208          pos = dkim_header.rindex(b'\r\n ') 
209          dkim_header = dkim_header[:pos]+b'\r\n\t'+dkim_header[pos+3:] 
210          result = dkim.verify(dkim_header + sample_msg, 
211                  dnsfunc=self.dnsfunc, minkey=512) 
212          self.assertTrue(result) 
 213   
221   
223           
224           
225           
226          message = read_test_data("message.mbox") 
227          for header_algo in (b"simple", b"relaxed"): 
228              for body_algo in (b"simple", b"relaxed"): 
229                  d = dkim.DKIM(message) 
230                   
231                  d.should_not_sign.remove(b'received') 
232                  sig = d.sign(b"test", b"example.com", self.key, 
233                      include_headers=d.all_sign_headers(), 
234                      canonicalize=(header_algo, body_algo)) 
235                  dv = dkim.DKIM(sig + message) 
236                  res = dv.verify(dnsfunc=self.dnsfunc) 
237                  self.assertEquals(d.include_headers,dv.include_headers) 
238                  s = dkim.select_headers(d.headers,d.include_headers) 
239                  sv = dkim.select_headers(dv.headers,dv.include_headers) 
240                  self.assertEquals(s,sv) 
241                  self.assertTrue(res) 
 242   
244           
245           
246          hfrom = b'From: "Resident Evil" <sales@spammer.com>\r\n' 
247          h,b = self.message.split(b'\n\n',1) 
248          for header_algo in (b"simple", b"relaxed"): 
249              for body_algo in (b"simple", b"relaxed"): 
250                  sig = dkim.sign( 
251                      self.message, b"test", b"example.com", self.key) 
252                   
253                  h1 = h+b'\r\n'+b'X-Foo: bar' 
254                  message = b'\n\n'.join((h1,b)) 
255                  res = dkim.verify(sig+message, dnsfunc=self.dnsfunc) 
256                  self.assertTrue(res) 
257                   
258                  h1 = h+b'\r\n'+hfrom.strip() 
259                  message = b'\n\n'.join((h1,b)) 
260                  res = dkim.verify(sig+message, dnsfunc=self.dnsfunc) 
261                  self.assertFalse(res) 
262                   
263                  h1 = hfrom+h 
264                  message = b'\n\n'.join((h1,b)) 
265                  res = dkim.verify(sig+message, dnsfunc=self.dnsfunc) 
266                  self.assertFalse(res) 
 267   
269           
270          sigerror = False 
271          sig = '' 
272          message = read_test_data('test_nofrom.message') 
273          selector = 'test' 
274          domain = 'example.com' 
275          identity = None 
276          try: 
277              sig = dkim.sign(message, selector, domain, read_test_data('test.private'), identity = identity) 
278          except dkim.ParameterError as x: 
279              sigerror = True 
280          self.assertTrue(sigerror) 
 281   
283        sig = {b'v': b'1', 
284        b'a': b'rsa-sha256', 
285        b'b': b'K/UUOt8lCtgjp3kSTogqBm9lY1Yax/NwZ+bKm39/WKzo5KYe3L/6RoIA/0oiDX4kO\n \t Qut49HCV6ZUe6dY9V5qWBwLanRs1sCnObaOGMpFfs8tU4TWpDSVXaNZAqn15XVW0WH\n \t EzOzUfVuatpa1kF4voIgSbmZHR1vN3WpRtcTBe/I=', 
286        b'bh': b'n0HUwGCP28PkesXBPH82Kboy8LhNFWU9zUISIpAez7M=', 
287        b'c': b'simple/simple', 
288        b'd': b'kitterman.com', 
289        b'i': b'scott@Kitterman.com', 
290        b'h': b'From:To:Subject:Date:Cc:MIME-Version:Content-Type:\n \t Content-Transfer-Encoding:Message-Id', 
291        b's': b'2007-00', 
292        b't': b'1299525798'} 
293        dkim.validate_signature_fields(sig) 
294         
295        sigVer = sig.copy() 
296        sigVer[b'v'] = 2 
297        self.assertRaises(dkim.ValidationError, dkim.validate_signature_fields, sigVer) 
298         
299        sigX = sig.copy() 
300        sigX[b'x'] = b'1399525798' 
301        dkim.validate_signature_fields(sig) 
302         
303        sigX[b't'] = b'1400000000' 
304        self.assertRaises(dkim.ValidationError, dkim.validate_signature_fields, sigX) 
305         
306        now = int(time.time()) 
307        sigX[b'x'] = str(now+400000).encode('ascii') 
308        dkim.validate_signature_fields(sigX) 
309         
310        sigX[b'x'] = str(now - 24*3600).encode('ascii') 
311        self.assertRaises(dkim.ValidationError, dkim.validate_signature_fields, sigX) 
  312   
314      from unittest import TestLoader 
315      return TestLoader().loadTestsFromName(__name__) 
 316