1 # gp_cert_auto_enroll_ext samba group policy
2 # Copyright (C) David Mulder <dmulder@suse.com> 2021
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from samba.gp.gpclass import gp_pol_ext, gp_applier, GPOSTATE
22 from ldb import SCOPE_SUBTREE, SCOPE_BASE
23 from samba.auth import system_session
24 from samba.gp.gpclass import get_dc_hostname
26 from shutil import which
27 from subprocess import Popen, PIPE
30 from samba.gp.util.logging import log
33 from cryptography.hazmat.primitives.serialization.pkcs7 import \
34 load_der_pkcs7_certificates
35 except ModuleNotFoundError:
36 def load_der_pkcs7_certificates(x): return []
37 log.error('python cryptography missing pkcs7 support. '
38 'Certificate chain parsing will fail')
39 from cryptography.hazmat.primitives.serialization import Encoding
40 from cryptography.x509 import load_der_x509_certificate
41 from cryptography.hazmat.backends import default_backend
42 from samba.common import get_string
45 -----BEGIN CERTIFICATE-----
47 -----END CERTIFICATE-----"""
48 endpoint_re = '(https|HTTPS)://(?P<server>[a-zA-Z0-9.-]+)/ADPolicyProvider' + \
49 '_CEP_(?P<auth>[a-zA-Z]+)/service.svc/CEP'
51 global_trust_dirs = ['/etc/pki/trust/anchors', # SUSE
52 '/etc/pki/ca-trust/source/anchors', # RHEL/Fedora
53 '/usr/local/share/ca-certificates'] # Debian/Ubuntu
55 def octet_string_to_objectGUID(data):
56 """Convert an octet string to an objectGUID."""
57 return '%s-%s-%s-%s-%s' % ('%02x' % struct.unpack('<L', data[0:4])[0],
58 '%02x' % struct.unpack('<H', data[4:6])[0],
59 '%02x' % struct.unpack('<H', data[6:8])[0],
60 '%02x' % struct.unpack('>H', data[8:10])[0],
61 '%02x%02x' % struct.unpack('>HL', data[10:]))
64 def group_and_sort_end_point_information(end_point_information):
65 """Group and Sort End Point Information.
67 [MS-CAESO] 4.4.5.3.2.3
68 In this step autoenrollment processes the end point information by grouping
69 it by CEP ID and sorting in the order with which it will use the end point
70 to access the CEP information.
72 # Create groups of the CertificateEnrollmentPolicyEndPoint instances that
73 # have the same value of the EndPoint.PolicyID datum.
75 for e in end_point_information:
76 if e['PolicyID'] not in end_point_groups.keys():
77 end_point_groups[e['PolicyID']] = []
78 end_point_groups[e['PolicyID']].append(e)
80 # Sort each group by following these rules:
81 for end_point_group in end_point_groups.values():
82 # Sort the CertificateEnrollmentPolicyEndPoint instances in ascending
83 # order based on the EndPoint.Cost value.
84 end_point_group.sort(key=lambda e: e['Cost'])
86 # For instances that have the same EndPoint.Cost:
87 cost_list = [e['Cost'] for e in end_point_group]
88 costs = set(cost_list)
90 i = cost_list.index(cost)
91 j = len(cost_list)-operator.indexOf(reversed(cost_list), cost)-1
95 # Sort those that have EndPoint.Authentication equal to Kerberos
96 # first. Then sort those that have EndPoint.Authentication equal to
97 # Anonymous. The rest of the CertificateEnrollmentPolicyEndPoint
98 # instances follow in an arbitrary order.
101 if e['AuthFlags'] == 0x2:
104 elif e['AuthFlags'] == 0x1:
108 end_point_group[i:j+1] = sorted(end_point_group[i:j+1],
110 return list(end_point_groups.values())
112 def obtain_end_point_information(entries):
113 """Obtain End Point Information.
115 [MS-CAESO] 4.4.5.3.2.2
116 In this step autoenrollment initializes the
117 CertificateEnrollmentPolicyEndPoints table.
119 end_point_information = {}
120 section = 'Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\'
122 if not e.keyname.startswith(section):
124 name = e.keyname.replace(section, '')
125 if name not in end_point_information.keys():
126 end_point_information[name] = {}
127 end_point_information[name][e.valuename] = e.data
128 for ca in end_point_information.values():
129 m = re.match(endpoint_re, ca['URL'])
131 name = '%s-CA' % m.group('server').replace('.', '-')
133 ca['hostname'] = m.group('server')
134 ca['auth'] = m.group('auth')
135 elif ca['URL'].lower() != 'ldap:':
136 edata = { 'endpoint': ca['URL'] }
137 log.error('Failed to parse the endpoint', edata)
139 end_point_information = \
140 group_and_sort_end_point_information(end_point_information.values())
141 return end_point_information
143 def fetch_certification_authorities(ldb):
146 [MS-CAESO] 4.4.5.3.1.2
149 basedn = ldb.get_default_basedn()
150 # Autoenrollment MUST do an LDAP search for the CA information
151 # (pKIEnrollmentService) objects under the following container:
152 dn = 'CN=Enrollment Services,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
153 attrs = ['cACertificate', 'cn', 'dNSHostName']
154 expr = '(objectClass=pKIEnrollmentService)'
155 res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
159 data = { 'name': get_string(es['cn'][0]),
160 'hostname': get_string(es['dNSHostName'][0]),
161 'cACertificate': get_string(base64.b64encode(es['cACertificate'][0]))
166 def fetch_template_attrs(ldb, name, attrs=None):
168 attrs = ['msPKI-Minimal-Key-Size']
169 basedn = ldb.get_default_basedn()
170 dn = 'CN=Certificate Templates,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
171 expr = '(cn=%s)' % name
172 res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
173 if len(res) == 1 and 'msPKI-Minimal-Key-Size' in res[0]:
176 return {'msPKI-Minimal-Key-Size': ['2048']}
178 def format_root_cert(cert):
179 return cert_wrap % re.sub(b"(.{64})", b"\\1\n", cert.encode(), 0, re.DOTALL)
181 def find_cepces_submit():
182 certmonger_dirs = [os.environ.get("PATH"), '/usr/lib/certmonger',
183 '/usr/libexec/certmonger']
184 return which('cepces-submit', path=':'.join(certmonger_dirs))
186 def get_supported_templates(server):
187 cepces_submit = find_cepces_submit()
188 if not cepces_submit:
189 log.error('Failed to find cepces-submit')
193 env['CERTMONGER_OPERATION'] = 'GET-SUPPORTED-TEMPLATES'
194 p = Popen([cepces_submit, '--server=%s' % server, '--auth=Kerberos'],
195 env=env, stdout=PIPE, stderr=PIPE)
196 out, err = p.communicate()
197 if p.returncode != 0:
198 data = {'Error': err.decode()}
199 log.error('Failed to fetch the list of supported templates.', data)
200 return out.strip().split()
203 def getca(ca, url, trust_dir):
204 """Fetch Certificate Chain from the CA."""
205 root_cert = os.path.join(trust_dir, '%s.crt' % ca['name'])
209 r = requests.get(url=url, params={'operation': 'GetCACert',
210 'message': 'CAIdentifier'})
211 except requests.exceptions.ConnectionError:
212 log.warn('Failed to establish a new connection')
214 if r is None or r.content == b'' or r.headers['Content-Type'] == 'text/html':
215 log.warn('Failed to fetch the root certificate chain.')
216 log.warn('The Network Device Enrollment Service is either not' +
217 ' installed or not configured.')
218 if 'cACertificate' in ca:
219 log.warn('Installing the server certificate only.')
220 der_certificate = base64.b64decode(ca['cACertificate'])
222 cert = load_der_x509_certificate(der_certificate)
224 cert = load_der_x509_certificate(der_certificate,
226 cert_data = cert.public_bytes(Encoding.PEM)
227 with open(root_cert, 'wb') as w:
229 root_certs.append(root_cert)
232 if r.headers['Content-Type'] == 'application/x-x509-ca-cert':
233 # Older versions of load_der_x509_certificate require a backend param
235 cert = load_der_x509_certificate(r.content)
237 cert = load_der_x509_certificate(r.content, default_backend())
238 cert_data = cert.public_bytes(Encoding.PEM)
239 with open(root_cert, 'wb') as w:
241 root_certs.append(root_cert)
242 elif r.headers['Content-Type'] == 'application/x-x509-ca-ra-cert':
243 certs = load_der_pkcs7_certificates(r.content)
244 for i in range(0, len(certs)):
245 cert = certs[i].public_bytes(Encoding.PEM)
246 filename, extension = root_cert.rsplit('.', 1)
247 dest = '%s.%d.%s' % (filename, i, extension)
248 with open(dest, 'wb') as w:
250 root_certs.append(dest)
252 log.warn('getca: Wrong (or missing) MIME content type')
257 def find_global_trust_dir():
258 """Return the global trust dir using known paths from various Linux distros."""
259 for trust_dir in global_trust_dirs:
260 if os.path.isdir(trust_dir):
262 return global_trust_dirs[0]
264 def update_ca_command():
265 """Return the command to update the CA trust store."""
266 return which('update-ca-certificates') or which('update-ca-trust')
268 def changed(new_data, old_data):
269 """Return True if any key present in both dicts has changed."""
270 return any((new_data[k] != old_data[k] if k in old_data else False)
271 for k in new_data.keys())
273 def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'):
274 """Install the root certificate chain."""
275 data = dict({'files': [], 'templates': []}, **ca)
276 url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname']
278 log.info("Try to get root or server certificates")
280 root_certs = getca(ca, url, trust_dir)
281 data['files'].extend(root_certs)
282 global_trust_dir = find_global_trust_dir()
283 for src in root_certs:
284 # Symlink the certs to global trust dir
285 dst = os.path.join(global_trust_dir, os.path.basename(src))
288 data['files'].append(dst)
289 log.info("Created symlink: %s -> %s" % (src, dst))
290 except PermissionError:
291 log.warn('Failed to symlink root certificate to the'
292 ' admin trust anchors')
293 except FileNotFoundError:
294 log.warn('Failed to symlink root certificate to the'
295 ' admin trust anchors.'
296 ' The directory was not found', global_trust_dir)
297 except FileExistsError:
298 # If we're simply downloading a renewed cert, the symlink
299 # already exists. Ignore the FileExistsError. Preserve the
300 # existing symlink in the unapply data.
301 data['files'].append(dst)
303 update = update_ca_command()
304 log.info("Running %s" % (update))
305 if update is not None:
306 ret = Popen([update]).wait()
308 log.error('Failed to run %s' % (update))
310 # Setup Certificate Auto Enrollment
311 getcert = which('getcert')
312 cepces_submit = find_cepces_submit()
313 if getcert is not None and cepces_submit is not None:
314 p = Popen([getcert, 'add-ca', '-c', ca['name'], '-e',
315 '%s --server=%s --auth=%s' % (cepces_submit,
316 ca['hostname'], auth)],
317 stdout=PIPE, stderr=PIPE)
318 out, err = p.communicate()
319 log.debug(out.decode())
320 if p.returncode != 0:
321 if p.returncode == 2:
322 log.info('The CA [%s] already exists' % ca['name'])
324 data = {'Error': err.decode(), 'CA': ca['name']}
325 log.error('Failed to add Certificate Authority', data)
327 supported_templates = get_supported_templates(ca['hostname'])
328 for template in supported_templates:
329 attrs = fetch_template_attrs(ldb, template)
330 nickname = '%s.%s' % (ca['name'], template.decode())
331 keyfile = os.path.join(private_dir, '%s.key' % nickname)
332 certfile = os.path.join(trust_dir, '%s.crt' % nickname)
333 p = Popen([getcert, 'request', '-c', ca['name'],
334 '-T', template.decode(),
335 '-I', nickname, '-k', keyfile, '-f', certfile,
336 '-g', attrs['msPKI-Minimal-Key-Size'][0]],
337 stdout=PIPE, stderr=PIPE)
338 out, err = p.communicate()
339 log.debug(out.decode())
340 if p.returncode != 0:
341 if p.returncode == 2:
342 log.info('The template [%s] already exists' % (nickname))
344 data = {'Error': err.decode(), 'Certificate': nickname}
345 log.error('Failed to request certificate', data)
347 data['files'].extend([keyfile, certfile])
348 data['templates'].append(nickname)
349 if update is not None:
350 ret = Popen([update]).wait()
352 log.error('Failed to run %s' % (update))
354 log.warn('certmonger and cepces must be installed for ' +
355 'certificate auto enrollment to work')
356 return json.dumps(data)
358 class gp_cert_auto_enroll_ext(gp_pol_ext, gp_applier):
360 return r'Cryptography\AutoEnrollment'
362 def unapply(self, guid, attribute, value):
363 ca_cn = base64.b64decode(attribute)
364 data = json.loads(value)
365 getcert = which('getcert')
366 if getcert is not None:
367 Popen([getcert, 'remove-ca', '-c', ca_cn]).wait()
368 for nickname in data['templates']:
369 Popen([getcert, 'stop-tracking', '-i', nickname]).wait()
370 for f in data['files']:
371 if os.path.exists(f):
372 if os.path.exists(f):
374 self.cache_remove_attribute(guid, attribute)
376 def apply(self, guid, ca, applier_func, *args, **kwargs):
377 attribute = base64.b64encode(ca['name'].encode()).decode()
378 # If the policy has changed, unapply, then apply new policy
379 old_val = self.cache_get_attribute_value(guid, attribute)
380 old_data = json.loads(old_val) if old_val is not None else {}
381 templates = ['%s.%s' % (ca['name'], t.decode()) for t in get_supported_templates(ca['hostname'])] \
382 if old_val is not None else []
383 new_data = { 'templates': templates, **ca }
384 if changed(new_data, old_data) or self.cache_get_apply_state() == GPOSTATE.ENFORCE:
385 self.unapply(guid, attribute, old_val)
386 # If policy is already applied and unchanged, skip application
387 if old_val is not None and not changed(new_data, old_data) and \
388 self.cache_get_apply_state() != GPOSTATE.ENFORCE:
391 # Apply the policy and log the changes
392 data = applier_func(*args, **kwargs)
393 self.cache_add_attribute(guid, attribute, data)
395 def process_group_policy(self, deleted_gpo_list, changed_gpo_list,
396 trust_dir=None, private_dir=None):
397 if trust_dir is None:
398 trust_dir = self.lp.cache_path('certs')
399 if private_dir is None:
400 private_dir = self.lp.private_path('certs')
401 if not os.path.exists(trust_dir):
402 os.mkdir(trust_dir, mode=0o755)
403 if not os.path.exists(private_dir):
404 os.mkdir(private_dir, mode=0o700)
406 for guid, settings in deleted_gpo_list:
407 if str(self) in settings:
408 for ca_cn_enc, data in settings[str(self)].items():
409 self.unapply(guid, ca_cn_enc, data)
411 for gpo in changed_gpo_list:
412 if gpo.file_sys_path:
413 section = r'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
414 pol_file = 'MACHINE/Registry.pol'
415 path = os.path.join(gpo.file_sys_path, pol_file)
416 pol_conf = self.parse(path)
419 for e in pol_conf.entries:
420 if e.keyname == section and e.valuename == 'AEPolicy':
421 # This policy applies as specified in [MS-CAESO] 4.4.5.1
423 continue # The policy is disabled
424 enroll = e.data & 0x1 == 0x1
425 manage = e.data & 0x2 == 0x2
426 retrive_pending = e.data & 0x4 == 0x4
428 ca_names = self.__enroll(gpo.name,
430 trust_dir, private_dir)
432 # Cleanup any old CAs that have been removed
433 ca_attrs = [base64.b64encode(n.encode()).decode()
435 self.clean(gpo.name, keep=ca_attrs)
437 # If enrollment has been disabled for this GPO,
438 # remove any existing policy
440 self.cache_get_all_attribute_values(gpo.name)
441 self.clean(gpo.name, remove=list(ca_attrs.keys()))
443 def __read_cep_data(self, guid, ldb, end_point_information,
444 trust_dir, private_dir):
447 [MS-CAESO] 4.4.5.3.2.4
448 In this step autoenrollment initializes instances of the
449 CertificateEnrollmentPolicy by accessing end points associated with CEP
450 groups created in the previous step.
452 # For each group created in the previous step:
453 for end_point_group in end_point_information:
454 # Pick an arbitrary instance of the
455 # CertificateEnrollmentPolicyEndPoint from the group
456 e = end_point_group[0]
458 # If this instance does not have the AutoEnrollmentEnabled flag set
459 # in the EndPoint.Flags, continue with the next group.
460 if not e['Flags'] & 0x10:
463 # If the current group contains a
464 # CertificateEnrollmentPolicyEndPoint instance with EndPoint.URI
466 if any([e['URL'] == 'LDAP:' for e in end_point_group]):
467 # Perform an LDAP search to read the value of the objectGuid
468 # attribute of the root object of the forest root domain NC. If
469 # any errors are encountered, continue with the next group.
470 res = ldb.search('', SCOPE_BASE, '(objectClass=*)',
471 ['rootDomainNamingContext'])
474 res2 = ldb.search(res[0]['rootDomainNamingContext'][0],
475 SCOPE_BASE, '(objectClass=*)',
480 # Compare the value read in the previous step to the
481 # EndPoint.PolicyId datum CertificateEnrollmentPolicyEndPoint
482 # instance. If the values do not match, continue with the next
484 objectGUID = '{%s}' % \
485 octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper()
486 if objectGUID != e['PolicyID']:
489 # For each CertificateEnrollmentPolicyEndPoint instance for that
492 for ca in end_point_group:
493 # If EndPoint.URI equals "LDAP":
494 if ca['URL'] == 'LDAP:':
495 # This is a basic configuration.
496 cas = fetch_certification_authorities(ldb)
498 self.apply(guid, _ca, cert_enroll, _ca, ldb, trust_dir,
500 ca_names.append(_ca['name'])
501 # If EndPoint.URI starts with "HTTPS//":
502 elif ca['URL'].lower().startswith('https://'):
503 self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
504 private_dir, auth=ca['auth'])
505 ca_names.append(ca['name'])
507 edata = { 'endpoint': ca['URL'] }
508 log.error('Unrecognized endpoint', edata)
511 def __enroll(self, guid, entries, trust_dir, private_dir):
512 url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
513 ldb = Ldb(url=url, session_info=system_session(),
514 lp=self.lp, credentials=self.creds)
517 end_point_information = obtain_end_point_information(entries)
518 if len(end_point_information) > 0:
519 ca_names.extend(self.__read_cep_data(guid, ldb,
520 end_point_information,
521 trust_dir, private_dir))
523 cas = fetch_certification_authorities(ldb)
525 self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
527 ca_names.append(ca['name'])
532 pol_file = 'MACHINE/Registry.pol'
533 section = r'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
534 if gpo.file_sys_path:
535 path = os.path.join(gpo.file_sys_path, pol_file)
536 pol_conf = self.parse(path)
539 for e in pol_conf.entries:
540 if e.keyname == section and e.valuename == 'AEPolicy':
541 enroll = e.data & 0x1 == 0x1
542 if e.data & 0x8000 or not enroll:
544 output['Auto Enrollment Policy'] = {}
545 url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
546 ldb = Ldb(url=url, session_info=system_session(),
547 lp=self.lp, credentials=self.creds)
548 end_point_information = \
549 obtain_end_point_information(pol_conf.entries)
550 cas = fetch_certification_authorities(ldb)
551 if len(end_point_information) > 0:
552 cas2 = [ep for sl in end_point_information for ep in sl]
553 if any([ca['URL'] == 'LDAP:' for ca in cas2]):
558 if 'URL' in ca and ca['URL'] == 'LDAP:':
560 policy = 'Auto Enrollment Policy'
562 if policy not in output:
564 output[policy][cn] = {}
565 if 'cACertificate' in ca:
566 output[policy][cn]['CA Certificate'] = \
567 format_root_cert(ca['cACertificate']).decode()
568 output[policy][cn]['Auto Enrollment Server'] = \
570 supported_templates = \
571 get_supported_templates(ca['hostname'])
572 output[policy][cn]['Templates'] = \
573 [t.decode() for t in supported_templates]