Python API¶
All public symbols are importable directly from databricks_access_audit.
Client factory¶
create_client(...) → AuditClient¶
from databricks_access_audit import create_client
client = create_client(
cloud="azure", # "azure" | "aws" | "gcp"
client_id="...",
client_secret="...",
account_id="...",
prefer_sdk=True, # use databricks-sdk when available (default True)
max_retries=5,
base_delay=1.0,
max_delay=60.0,
)
Returns a DatabricksSDKClient when databricks-sdk is installed and prefer_sdk=True, otherwise a DatabricksAPIClient (raw HTTP).
Group audit¶
GroupMembershipResolver¶
Resolves a group and its full nested membership tree.
from databricks_access_audit import GroupMembershipResolver
resolver = GroupMembershipResolver(client)
group_node = resolver.resolve_group("data-engineers")
members = resolver.get_all_members_flat(group_node)
# members = {"users": [GroupMember, ...], "service_principals": [GroupMember, ...]}
CatalogPermissionScanner¶
Scans catalog-level Unity Catalog grants across all workspaces.
from databricks_access_audit import CatalogPermissionScanner, WorkspaceDiscovery
ws_disc = WorkspaceDiscovery(client, cloud_provider="azure")
workspaces = ws_disc.discover()
scanner = CatalogPermissionScanner(client, resolver)
catalog_grants = scanner.scan_all_workspaces(
workspaces, "data-engineers", group_node, members, max_workers=8
)
# Returns List[CatalogGrant]
SchemaPermissionScanner / TablePermissionScanner / VolumePermissionScanner¶
from databricks_access_audit import (
SchemaPermissionScanner,
TablePermissionScanner,
VolumePermissionScanner,
WorkspaceInfo,
)
sch_scanner = SchemaPermissionScanner(client)
schema_grants = sch_scanner.scan_schemas(workspace, catalog_name, group_name, members, upstream)
tbl_scanner = TablePermissionScanner(client)
table_grants = tbl_scanner.scan_tables(workspace, catalog_name, schema_name, group_name, members, upstream)
vol_scanner = VolumePermissionScanner(client)
volume_grants = vol_scanner.scan_volumes(workspace, catalog_name, schema_name, group_name, members, upstream)
# Returns List[VolumeGrant]
When using both scan_tables and scan_volumes, enumerate schemas once and pass the same (catalog, schema) pairs to both scanners to avoid double API calls.
RedundancyDetector¶
Compares member-direct grants against the group's effective privileges.
from databricks_access_audit import RedundancyDetector
detector = RedundancyDetector()
redundancy = detector.detect_redundancy(catalog_grants, "data-engineers")
# Returns List[RedundancyResult]
RevokeScriptGenerator¶
Generates REVOKE SQL for redundant grants.
from databricks_access_audit import RevokeScriptGenerator
sql = RevokeScriptGenerator.generate(redundancy, include_partial=True)
print(sql)
Principal audit¶
PrincipalAuditor¶
Resolves every workspace and UC permission reachable by a principal.
from databricks_access_audit import PrincipalAuditor, WorkspaceDiscovery
ws_disc = WorkspaceDiscovery(client, cloud_provider="azure")
auditor = PrincipalAuditor(client, workspace_discovery=ws_disc, cloud_provider="azure")
result = auditor.audit(
identifier="alice@company.com",
scan_schemas=True,
scan_workspace_objects=True,
max_workers=8,
)
# Returns PrincipalAuditResult
result fields:
| Field | Type | Description |
|---|---|---|
principal_name |
str |
Email or display name |
principal_type |
str |
USER, SERVICE_PRINCIPAL, or GROUP |
principal_source |
PrincipalSource |
EXTERNAL (IdP) or INTERNAL (Databricks-managed) |
groups |
List[GroupMembership] |
All group memberships (direct and transitive) |
workspace_roles |
List[WorkspaceRole] |
Workspace access assignments |
permissions |
List[EffectivePermission] |
Unity Catalog permissions |
workspace_object_grants |
List[WorkspaceObjectGrant] |
Workspace object ACLs |
dead_end_groups |
List[str] |
Groups with no workspace assignment AND no UC grants — provide nothing to the principal |
uc_only_groups |
List[str] |
Groups with no workspace assignment but with UC grants — intentional fine-grained access pattern |
escalation_findings |
List[EscalationFinding] |
Populated after detect_escalations() |
Escalation detection¶
from databricks_access_audit import detect_escalations
result.escalation_findings = detect_escalations(result)
Flags ALL_PRIVILEGES and MANAGE grants in a PrincipalAuditResult.
Workspace object scanning¶
from databricks_access_audit import WorkspaceObjectScanner
obj_scanner = WorkspaceObjectScanner(client, resolver)
grants = obj_scanner.scan_all_workspaces(
workspaces, group_name, group_node, members,
object_types=["jobs", "clusters"], # None = all 13 types
max_workers=8,
)
# Returns List[WorkspaceObjectGrant]
Stale grant detection¶
from databricks_access_audit import StaleGrantChecker
checker = StaleGrantChecker(
client,
workspace_url="https://adb-xxx.azuredatabricks.net",
warehouse_id="abc123",
stale_days=90,
)
findings = checker.check_catalog_grants(catalog_grants, workspace_name, workspace_url)
# Returns List[StaleFinding]
Requires system.access.audit to be enabled and the audit SP to have SELECT on it.
Workspace-local group detection¶
from databricks_access_audit import LocalGroupChecker
checker = LocalGroupChecker(client)
findings = checker.check_all_workspaces(workspaces)
# Returns List[LocalGroupFinding]
Permission elevation¶
from databricks_access_audit import PermissionElevator
with PermissionElevator(client, sp_client_id="...", dry_run=False) as elevator:
for ws in workspaces:
elevator.ensure_workspace_admin(ws.workspace_id, ws.workspace_name)
# ... run audit ...
# Prior permission state is restored on exit (even on exception)
Snapshots¶
from databricks_access_audit import (
build_group_snapshot, build_principal_snapshot,
save_snapshot, load_snapshot, diff_snapshots,
)
# Group mode
snap = build_group_snapshot(group_name, members, catalog_grants, schema_grants, table_grants)
save_snapshot(snap, "snapshots/data-engineers_2025-Q1.json")
# Principal mode
snap = build_principal_snapshot(result)
save_snapshot(snap, "snapshots/alice_2025-Q1.json")
# Diff
baseline = load_snapshot("snapshots/data-engineers_2025-Q1.json")
diff = diff_snapshots(baseline, snap)
if diff.has_changes:
print(f"{len(diff.grants_added)} added, {len(diff.grants_removed)} removed")
Resource audit¶
ResourceAuditor¶
Discovers every identity that has access to a given Unity Catalog resource or workspace — the resource-first inverse of PrincipalAuditor.
from databricks_access_audit import ResourceAuditor
auditor = ResourceAuditor(client, account_id="...", cloud="azure")
result = auditor.audit(
"main", # catalog, schema, table, or workspace name
resource_type=None, # auto-detect, or pass "catalog"/"schema"/"table"/"workspace"
expand_groups=True, # expand group grants to individual members (default True)
explicit_workspace_urls="", # comma-separated URLs; empty = auto-discover all workspaces
max_workers=8,
)
# Returns ResourceAuditResult
result fields:
| Field | Type | Description |
|---|---|---|
resource_type |
str |
"CATALOG", "SCHEMA", "TABLE", or "WORKSPACE" |
resource_name |
str |
Normalised resource name |
grants |
List[ResourceGrant] |
All access grants found, deduplicated across workspaces |
Each ResourceGrant:
| Field | Type | Description |
|---|---|---|
principal_name |
str |
User email, group display name, or SP name |
principal_type |
str |
"USER", "GROUP", "SERVICE_PRINCIPAL" |
principal_source |
PrincipalSource |
EXTERNAL (IdP-synced) or INTERNAL (Databricks-managed) |
privileges |
List[str] |
e.g. ["USE_CATALOG", "SELECT"] |
via_group |
Optional[str] |
Group name if inherited; None for direct grants |
workspace_name |
str |
Workspace the grant was discovered in |
Resource type is auto-detected from the name format:
| Name format | Detected as |
|---|---|
main (0 dots) |
CATALOG |
main.analytics (1 dot) |
SCHEMA |
main.analytics.orders (2+ dots) |
TABLE |
https://... or name containing "databricks" |
WORKSPACE |
Pass resource_type="workspace" explicitly when the workspace name doesn't contain "databricks" — otherwise the name is queried as a UC catalog and returns empty results.
detect_resource_type(name)¶
from databricks_access_audit import detect_resource_type
detect_resource_type("main") # → "catalog"
detect_resource_type("main.analytics") # → "schema"
detect_resource_type("main.analytics.orders") # → "table"
detect_resource_type("https://adb-123.azuredatabricks.net") # → "workspace"
Principal comparison¶
PrincipalComparer¶
Pure-read comparison of group memberships between two principals.
from databricks_access_audit import PrincipalComparer
comparer = PrincipalComparer(client, cloud_provider="azure")
result = comparer.compare("alice@company.com", "bob@company.com")
# Returns CompareResult
print(f"Only Alice: {[g.group_name for g in result.only_in_a]}")
print(f"Only Bob: {[g.group_name for g in result.only_in_b]}")
print(f"Shared: {[g.group_name for g in result.in_both]}")
CompareResult fields:
| Field | Type | Description |
|---|---|---|
principal_a |
str |
Identifier passed for A |
principal_b |
str |
Identifier passed for B |
display_name_a |
str |
Resolved display name for A |
display_name_b |
str |
Resolved display name for B |
only_in_a |
List[GroupComparison] |
Groups A has that B does not |
only_in_b |
List[GroupComparison] |
Groups B has that A does not |
in_both |
List[GroupComparison] |
Groups both principals share |
Each GroupComparison has: group_id, group_name, external_id, in_a, in_b, is_direct_in_a, is_direct_in_b, path_in_a, path_in_b, and a source property (PrincipalSource.EXTERNAL or INTERNAL).
Access cloning¶
AccessCloner¶
Builds a provisioning report and optionally applies SCIM changes to replicate one principal's group access onto another.
from databricks_access_audit import AccessCloner
cloner = AccessCloner(client, cloud_provider="azure")
# Dry-run: analyse source's direct group memberships
report = cloner.build_report(
source="alice@company.com",
target="bob@company.com",
scan_uc=False, # set True to detect UC-only groups (adds catalog-scan calls)
max_workers=8,
)
# Inspect the classified actions
for action in report.idp_actions:
print(f"[IdP required] {action.group_name}")
for action in report.databricks_actions:
print(f"[Databricks] {action.group_name} → workspaces: {action.workspace_accesses}")
for action in report.unverified_actions:
print(f"[Unverified] {action.group_name}")
# Apply: SCIM PATCH target into each DATABRICKS-classified group
# target_scim_id is the SCIM account-level ID of the target principal
cloner.apply(report, target_scim_id="12345678901234")
# action.applied=True on success, action.error=... on failure
CloneReport fields:
| Field | Type | Description |
|---|---|---|
source_principal |
str |
Source identifier |
target_principal |
str |
Target identifier |
source_display_name |
str |
Resolved display name for source |
target_display_name |
str |
Resolved display name for target |
actions |
List[CloneAction] |
One entry per direct group membership of source |
idp_actions |
property | IDP_REQUIRED actions |
databricks_actions |
property | DATABRICKS actions |
unverified_actions |
property | UNVERIFIED actions |
skipped |
property | SKIPPED actions |
CloneAction fields: action_type (CloneActionType), group_id, group_name, external_id, path, workspace_accesses, uc_grants_summary, applied, error, and a source property.
CloneActionType values: DATABRICKS, IDP_REQUIRED, UNVERIFIED, SKIPPED.
CSV output¶
from databricks_access_audit import (
write_group_audit_csv, write_principal_audit_csv,
write_compare_csv, write_clone_report_csv,
)
from databricks_access_audit.csv_output import write_diff_csv, write_resource_audit_csv
write_group_audit_csv(catalog_grants, schema_grants, table_grants, redundancy)
write_principal_audit_csv(result, escalation_findings)
write_resource_audit_csv(resource_result) # 8 columns: resource_type, resource_name, principal_name, principal_type, principal_source, privileges, via_group, workspace_name
write_diff_csv(diff)
write_compare_csv(compare_result)
write_clone_report_csv(clone_report)
All write to sys.stdout by default — pipe to a file or redirect in your shell.
Workspace discovery¶
from databricks_access_audit import WorkspaceDiscovery
ws_disc = WorkspaceDiscovery(client, cloud_provider="azure")
workspaces = ws_disc.discover() # all workspaces in account
workspaces = ws_disc.discover("https://adb-xxx.azuredatabricks.net") # explicit
# Returns List[WorkspaceInfo]
Data models reference¶
All models are in databricks_access_audit.models and re-exported from the package root.
| Model | Key fields |
|---|---|
GroupMember |
id, display_name, member_type, email, external_id, source |
GroupNode |
id, display_name, direct_users, nested_groups, external_id |
WorkspaceInfo |
workspace_id, workspace_name, workspace_url, cloud, region |
CatalogGrant |
catalog_name, principal, privileges, grant_source, workspace_name |
SchemaGrant |
catalog_name, schema_name, principal, privileges, grant_source |
TableGrant |
full_name, principal, privileges, grant_source |
WorkspaceObjectGrant |
object_type, object_name, permission_level, grant_source, workspace_name |
RedundancyResult |
principal, redundancy_level, redundant_privileges, additional_privileges |
PrincipalAuditResult |
see table above |
GroupMembership |
group_name, is_direct, path, source |
WorkspaceRole |
workspace_name, permission_level, via_group |
EffectivePermission |
securable_type, securable_name, privileges, via_group |
EscalationFinding |
privilege, securable_type, securable_name, via_group, is_transitive |
StaleFinding |
principal, catalog_name, privileges, last_access, stale_days |
LocalGroupFinding |
group_name, workspace_name, member_count |
ResourceGrant |
resource_type, resource_name, principal_name, principal_type, principal_source, privileges, via_group, workspace_name |
ResourceAuditResult |
resource_type, resource_name, grants |
AuditDiff |
grants_added, grants_removed, members_added, members_removed, has_changes |
GroupComparison |
group_id, group_name, external_id, in_a, in_b, is_direct_in_a, is_direct_in_b, path_in_a, path_in_b, source |
CompareResult |
principal_a, principal_b, display_name_a, display_name_b, only_in_a, only_in_b, in_both |
CloneActionType |
Enum: DATABRICKS, IDP_REQUIRED, UNVERIFIED, SKIPPED |
CloneAction |
action_type, group_id, group_name, external_id, path, workspace_accesses, uc_grants_summary, applied, error, source |
CloneReport |
source_principal, target_principal, source_display_name, target_display_name, actions, plus idp_actions, databricks_actions, unverified_actions, skipped properties |