This is an automated email from the ASF dual-hosted git repository.
DImuthuUpe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
The following commit(s) were added to refs/heads/master by this push:
new 034d0122b CILogon COmanage Provisioner Integration (#487)
034d0122b is described below
commit 034d0122ba21eb0a78413968907835e35f3a67c1
Author: Lahiru Jayathilake <[email protected]>
AuthorDate: Wed Jun 3 08:36:46 2026 -0400
CILogon COmanage Provisioner Integration (#487)
* Add posix username generation and per-cluster unique local_username
* throw an error when the first and last names are empty when building the
cluster username
* POSIX username generation and updated AMIE account provisioning to use
POSIX allocator
* COmanage connector implementation
* updated the baseline integration test for PI cluster provisioning and
audit
---
.../ACCESS/AMIE-Processor/config.yaml.example | 3 -
connectors/ACCESS/AMIE-Processor/config/config.go | 16 +-
.../ACCESS/AMIE-Processor/handler/handler.go | 12 -
.../ACCESS/AMIE-Processor/handler/posix_alloc.go | 79 ++++++
.../AMIE-Processor/handler/posix_alloc_test.go | 100 ++++++++
.../handler/request_account_create.go | 10 +-
.../handler/request_project_create.go | 23 +-
.../request_project_create_integration_test.go | 16 ++
.../pipeline/baseline_integration_test.go | 5 +-
.../testdata/scenarios/baseline.yaml | 19 +-
connectors/COmanage/Identity-Provisioner/README.md | 140 +++++++++++
.../Identity-Provisioner/config.example.yaml | 20 ++
.../Identity-Provisioner/internal/client/client.go | 144 +++++++++++
.../internal/client/client_test.go | 243 +++++++++++++++++++
.../internal/client/group_members.go | 121 ++++++++++
.../Identity-Provisioner/internal/client/groups.go | 154 ++++++++++++
.../internal/client/identifiers.go | 130 ++++++++++
.../Identity-Provisioner/internal/client/people.go | 154 ++++++++++++
.../internal/client/unix_cluster.go | 93 ++++++++
.../internal/operations/compose.go | 98 ++++++++
.../internal/operations/compose_test.go | 134 +++++++++++
.../internal/operations/ensure_posix_account.go | 265 +++++++++++++++++++++
.../internal/operations/lookup.go | 84 +++++++
.../internal/operations/orchestrator.go | 41 ++++
.../internal/subscribers/cluster_user.go | 70 ++++++
.../Identity-Provisioner/pkg/comanage/loader.go | 108 +++++++++
internal/connectors/loader.go | 7 +
.../db/migrations/000002_compute_clusters.up.sql | 1 +
pkg/posix/username.go | 83 +++++++
pkg/posix/username_test.go | 156 ++++++++++++
pkg/service/compute_cluster_user.go | 19 +-
31 files changed, 2507 insertions(+), 41 deletions(-)
diff --git a/connectors/ACCESS/AMIE-Processor/config.yaml.example
b/connectors/ACCESS/AMIE-Processor/config.yaml.example
index af0149ffd..528c9039f 100644
--- a/connectors/ACCESS/AMIE-Processor/config.yaml.example
+++ b/connectors/ACCESS/AMIE-Processor/config.yaml.example
@@ -19,6 +19,3 @@ amie:
log:
level: "info"
format: "text"
-
-provisioner:
- type: "noop"
diff --git a/connectors/ACCESS/AMIE-Processor/config/config.go
b/connectors/ACCESS/AMIE-Processor/config/config.go
index 7daa093c7..a365aae1e 100644
--- a/connectors/ACCESS/AMIE-Processor/config/config.go
+++ b/connectors/ACCESS/AMIE-Processor/config/config.go
@@ -27,11 +27,10 @@ import (
)
type Config struct {
- Server ServerConfig `yaml:"server"`
- Database DatabaseConfig `yaml:"database"`
- AMIE AMIEConfig `yaml:"amie"`
- Log LogConfig `yaml:"log"`
- Provisioner ProvisionerConfig `yaml:"provisioner"`
+ Server ServerConfig `yaml:"server"`
+ Database DatabaseConfig `yaml:"database"`
+ AMIE AMIEConfig `yaml:"amie"`
+ Log LogConfig `yaml:"log"`
}
type ServerConfig struct {
@@ -60,10 +59,6 @@ type LogConfig struct {
Format string `yaml:"format"` // "text" or "json"
}
-type ProvisionerConfig struct {
- Type string `yaml:"type"` // "noop" or "slurm"
-}
-
// Load reads config from a YAML file and applies environment variable
overrides.
func Load(path string) (*Config, error) {
data, err := os.ReadFile(path)
@@ -113,9 +108,6 @@ func applyDefaults(cfg *Config) {
if cfg.Log.Format == "" {
cfg.Log.Format = "text"
}
- if cfg.Provisioner.Type == "" {
- cfg.Provisioner.Type = "noop"
- }
}
func applyEnvOverrides(cfg *Config) {
diff --git a/connectors/ACCESS/AMIE-Processor/handler/handler.go
b/connectors/ACCESS/AMIE-Processor/handler/handler.go
index b4eca6db2..5153a4ac6 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/handler.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/handler.go
@@ -19,9 +19,7 @@ package handler
import (
"context"
- "crypto/rand"
"database/sql"
- "encoding/hex"
"errors"
"fmt"
"strconv"
@@ -193,16 +191,6 @@ func ensureOrganization(ctx context.Context, svc
*service.Service, code, name st
})
}
-// generateTempPosixUsername returns a placeholder posix username for a
-// freshly provisioned ComputeClusterUser.
-//
-// TODO: replace with a real policy
-func generateTempPosixUsername() string {
- var b [4]byte
- _, _ = rand.Read(b[:])
- return "amie-" + hex.EncodeToString(b[:])
-}
-
// getInt64 reads a string-encoded integer from a packet body field. AMIE
// transmits numeric fields like ServiceUnitsAllocated as JSON strings.
func getInt64(body map[string]any, key string) (int64, error) {
diff --git a/connectors/ACCESS/AMIE-Processor/handler/posix_alloc.go
b/connectors/ACCESS/AMIE-Processor/handler/posix_alloc.go
new file mode 100644
index 000000000..b1fb3ef6b
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/handler/posix_alloc.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package handler
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strconv"
+
+ "github.com/apache/airavata-custos/pkg/models"
+ "github.com/apache/airavata-custos/pkg/posix"
+ "github.com/apache/airavata-custos/pkg/service"
+)
+
+func allocateAndCreateClusterUser(ctx context.Context, svc *service.Service,
clusterID, userID string) (*models.ComputeClusterUser, error) {
+ user, err := svc.GetUser(ctx, userID)
+ if err != nil {
+ return nil, fmt.Errorf("lookup user %q: %w", userID, err)
+ }
+
+ base, truncated, err := posix.BuildBase(user, posix.Prefix())
+ if err != nil {
+ _, _ = svc.CreateAuditEvent(ctx, &models.AuditEvent{
+ EventType: "PosixUsernameUnbuildable",
+ EntityID: userID,
+ Details: err.Error(),
+ })
+ return nil, err
+ }
+ if truncated {
+ _, _ = svc.CreateAuditEvent(ctx, &models.AuditEvent{
+ EventType: "PosixUsernameTruncated",
+ EntityID: userID,
+ Details: base,
+ })
+ }
+
+ for n := 0; n < posix.MaxCollisionSuffix; n++ {
+ candidate := base
+ if n > 0 {
+ candidate = base + strconv.Itoa(n+1)
+ }
+ ccu, err := svc.CreateComputeClusterUser(ctx,
&models.ComputeClusterUser{
+ ComputeClusterID: clusterID,
+ UserID: userID,
+ LocalUsername: candidate,
+ })
+ if err == nil {
+ return ccu, nil
+ }
+ if errors.Is(err, service.ErrAlreadyExists) {
+ continue
+ }
+ return nil, err
+ }
+
+ _, _ = svc.CreateAuditEvent(ctx, &models.AuditEvent{
+ EventType: "PosixUsernameAllocatorExhausted",
+ EntityID: userID,
+ Details: base,
+ })
+ return nil, fmt.Errorf("posix username allocator exhausted for base
%q", base)
+}
diff --git a/connectors/ACCESS/AMIE-Processor/handler/posix_alloc_test.go
b/connectors/ACCESS/AMIE-Processor/handler/posix_alloc_test.go
new file mode 100644
index 000000000..b171c7cb6
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/handler/posix_alloc_test.go
@@ -0,0 +1,100 @@
+//go:build integration
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package handler
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "sync"
+ "testing"
+
+ "github.com/google/uuid"
+
+ "github.com/apache/airavata-custos/pkg/models"
+ "github.com/apache/airavata-custos/pkg/posix"
+)
+
+func TestAllocateAndCreateClusterUser_CollisionRetry(t *testing.T) {
+ database := setupTestDB(t)
+ svc := newTestCoreService(database)
+ ctx := context.Background()
+
+ org, err := svc.CreateOrganization(ctx, &models.Organization{
+ OriginatedID: uuid.NewString(),
+ Name: "posix-alloc-test-org",
+ })
+ if err != nil {
+ t.Fatalf("create org: %v", err)
+ }
+
+ const n = 10
+ userIDs := make([]string, n)
+ for i := range userIDs {
+ u, err := svc.CreateUser(ctx, &models.User{
+ OrganizationID: org.ID,
+ FirstName: "Collision",
+ LastName: "Target",
+ Email: fmt.Sprintf("col-%[email protected]",
uuid.NewString()),
+ })
+ if err != nil {
+ t.Fatalf("create user %d: %v", i, err)
+ }
+ userIDs[i] = u.ID
+ }
+
+ type result struct {
+ username string
+ err error
+ }
+ results := make([]result, n)
+ var wg sync.WaitGroup
+ for i, uid := range userIDs {
+ wg.Add(1)
+ go func(idx int, userID string) {
+ defer wg.Done()
+ ccu, err := allocateAndCreateClusterUser(ctx, svc,
testClusterID, userID)
+ if err != nil {
+ results[idx] = result{err: err}
+ return
+ }
+ results[idx] = result{username: ccu.LocalUsername}
+ }(i, uid)
+ }
+ wg.Wait()
+
+ seen := make(map[string]bool)
+ for i, r := range results {
+ if r.err != nil {
+ t.Errorf("goroutine %d: %v", i, r.err)
+ continue
+ }
+ if seen[r.username] {
+ t.Errorf("duplicate username %q across goroutines",
r.username)
+ }
+ seen[r.username] = true
+ if !strings.HasPrefix(r.username, posix.Prefix()+"-") {
+ t.Errorf("username %q does not start with %q",
r.username, posix.Prefix()+"-")
+ }
+ }
+ if len(seen) != n {
+ t.Errorf("distinct usernames: got %d, want %d", len(seen), n)
+ }
+}
diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
b/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
index a42a3ac02..9896916f5 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
@@ -160,8 +160,8 @@ func (h *RequestAccountCreateHandler) ensureUser(ctx
context.Context, body map[s
return user, nil
}
-// ensureComputeClusterUser returns the user's existing cluster mapping on the
-// configured cluster, or provisions a fresh one with a temp posix username.
+// ensureComputeClusterUser returns the user's existing cluster mapping or
+// provisions a new one via the POSIX allocator.
func (h *RequestAccountCreateHandler) ensureComputeClusterUser(ctx
context.Context, userID string) (*models.ComputeClusterUser, error) {
existing, err := h.svc.ListComputeClusterUsersByUser(ctx, userID)
if err != nil {
@@ -172,11 +172,7 @@ func (h *RequestAccountCreateHandler)
ensureComputeClusterUser(ctx context.Conte
return &a, nil
}
}
- return h.svc.CreateComputeClusterUser(ctx, &models.ComputeClusterUser{
- UserID: userID,
- ComputeClusterID: h.clusterID,
- LocalUsername: generateTempPosixUsername(),
- })
+ return allocateAndCreateClusterUser(ctx, h.svc, h.clusterID, userID)
}
// ensureMembership returns the existing (allocation, user) membership or
diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
index 3d7dff92e..178f2be4e 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
@@ -78,6 +78,14 @@ func (h *RequestProjectCreateHandler) Handle(ctx
context.Context, tx *sql.Tx, pa
return fmt.Errorf("request_project_create: audit CREATE_PERSON:
%w", err)
}
+ piClusterUser, err := h.ensurePIClusterUser(ctx, pi.ID)
+ if err != nil {
+ return fmt.Errorf("request_project_create: ensure PI cluster
user: %w", err)
+ }
+ if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID,
model.AuditCreateAccount, "compute_cluster_user", piClusterUser.ID,
piClusterUser.LocalUsername); err != nil {
+ return fmt.Errorf("request_project_create: audit CREATE_ACCOUNT
(PI): %w", err)
+ }
+
project, err := h.ensureProject(ctx, projectOriginatedID, grantNumber,
pi.ID)
if err != nil {
return fmt.Errorf("request_project_create: ensure project: %w",
err)
@@ -98,7 +106,7 @@ func (h *RequestProjectCreateHandler) Handle(ctx
context.Context, tx *sql.Tx, pa
"ProjectID": project.ID,
"GrantNumber": grantNumber,
"PiPersonID": pi.ID,
- "PiRemoteSiteLogin": piGlobalID,
+ "PiRemoteSiteLogin": piClusterUser.LocalUsername,
"ResourceList": getResourceList(body),
}
reply := map[string]any{"type": "notify_project_create", "body":
replyBody}
@@ -142,6 +150,19 @@ func (h *RequestProjectCreateHandler) ensurePIUser(ctx
context.Context, body map
return user, nil
}
+func (h *RequestProjectCreateHandler) ensurePIClusterUser(ctx context.Context,
userID string) (*models.ComputeClusterUser, error) {
+ existing, err := h.svc.ListComputeClusterUsersByUser(ctx, userID)
+ if err != nil {
+ return nil, fmt.Errorf("list compute cluster users: %w", err)
+ }
+ for _, a := range existing {
+ if a.ComputeClusterID == h.clusterID {
+ return &a, nil
+ }
+ }
+ return allocateAndCreateClusterUser(ctx, h.svc, h.clusterID, userID)
+}
+
func (h *RequestProjectCreateHandler) ensureProject(ctx context.Context,
originatedID, grantNumber, piID string) (*models.Project, error) {
if p, err := h.svc.GetProjectByOriginatedID(ctx, originatedID); err ==
nil {
return p, nil
diff --git
a/connectors/ACCESS/AMIE-Processor/handler/request_project_create_integration_test.go
b/connectors/ACCESS/AMIE-Processor/handler/request_project_create_integration_test.go
index 92b5e345c..08110d500 100644
---
a/connectors/ACCESS/AMIE-Processor/handler/request_project_create_integration_test.go
+++
b/connectors/ACCESS/AMIE-Processor/handler/request_project_create_integration_test.go
@@ -187,6 +187,7 @@ func TestRequestProjectCreate_HappyPath(t *testing.T) {
for _, action := range []model.AuditAction{
model.AuditCreatePerson,
+ model.AuditCreateAccount,
model.AuditCreateProject,
model.AuditCreateAllocation,
model.AuditReplySent,
@@ -196,6 +197,19 @@ func TestRequestProjectCreate_HappyPath(t *testing.T) {
}
}
+ var piCU struct {
+ LocalUsername string `db:"local_username"`
+ }
+ if err := database.Get(&piCU,
+ "SELECT local_username FROM compute_cluster_users WHERE user_id
= ? AND compute_cluster_id = ?",
+ user.ID, testClusterID,
+ ); err != nil {
+ t.Fatalf("read PI compute_cluster_user: %v", err)
+ }
+ if piCU.LocalUsername == "" {
+ t.Errorf("PI compute_cluster_user.local_username: empty")
+ }
+
if got, want := amie.lastReplyType(), "notify_project_create"; got !=
want {
t.Fatalf("reply type: got %q, want %q", got, want)
}
@@ -211,6 +225,8 @@ func TestRequestProjectCreate_HappyPath(t *testing.T) {
}
if got, _ := reply["PiRemoteSiteLogin"].(string); got == "" {
t.Errorf("reply.PiRemoteSiteLogin: empty; required value")
+ } else if got != piCU.LocalUsername {
+ t.Errorf("reply.PiRemoteSiteLogin: got %q, want %q", got,
piCU.LocalUsername)
}
}
diff --git
a/connectors/ACCESS/AMIE-Processor/pipeline/baseline_integration_test.go
b/connectors/ACCESS/AMIE-Processor/pipeline/baseline_integration_test.go
index 0ad15af13..a72678644 100644
--- a/connectors/ACCESS/AMIE-Processor/pipeline/baseline_integration_test.go
+++ b/connectors/ACCESS/AMIE-Processor/pipeline/baseline_integration_test.go
@@ -56,8 +56,8 @@ func TestPipeline_BaselineDeterminism(t *testing.T) {
{"compute_allocations", 2}, //
tables.compute_allocations.total_count
{"compute_allocation_diffs", 1}, //
tables.compute_allocation_diffs.total_count
{"amie_user_dns", 2}, //
tables.amie_user_dns.total_count
- {"amie_audit_log", 23}, // audit_log.total_count
- {"compute_cluster_users", 0}, // not_expected
+ {"amie_audit_log", 26}, // audit_log.total_count
+ {"compute_cluster_users", 1}, //
tables.compute_cluster_users.total_count (survivor's PI CCU; merge dedups Sam's)
{"compute_allocation_memberships", 0}, // not_expected
}
if decoded != 9 {
@@ -75,6 +75,7 @@ func TestPipeline_BaselineDeterminism(t *testing.T) {
want int
}{
{"CREATE_PERSON", 3},
+ {"CREATE_ACCOUNT", 3},
{"CREATE_PROJECT", 3},
{"CREATE_ALLOCATION", 3},
{"REPLY_SENT", 8},
diff --git a/connectors/ACCESS/AMIE-Processor/testdata/scenarios/baseline.yaml
b/connectors/ACCESS/AMIE-Processor/testdata/scenarios/baseline.yaml
index 6768f4e66..878da6dd6 100644
--- a/connectors/ACCESS/AMIE-Processor/testdata/scenarios/baseline.yaml
+++ b/connectors/ACCESS/AMIE-Processor/testdata/scenarios/baseline.yaml
@@ -10,8 +10,10 @@
# NOT covered (handlers need the Custos-assigned project UUID, which the mock
# can't supply): request_account_create, request_account_inactivate,
# request_account_reactivate, request_project_inactivate,
request_project_reactivate.
-# Tables left empty as a result: compute_cluster_users,
compute_allocation_memberships.
+# Tables left empty as a result: compute_allocation_memberships.
# Status-flip code paths on projects/allocations/memberships are also
unexercised.
+# compute_cluster_users IS populated because request_project_create provisions
+# a PI cluster user via the POSIX allocator.
scenario:
name: baseline
@@ -121,10 +123,22 @@ expectations:
- { dn: "/C=US/O=Baseline Org/CN=Sam Second",
user_id: { from: "users.id where
[email protected]" } }
+ compute_cluster_users:
+ # request_project_create now provisions a CCU for the PI via the POSIX
+ # allocator. BL-001 (#1 + supplement) creates Pat's CCU once. BL-002
+ # (#3) creates Sam's CCU. The merge then deletes Sam's CCU on Pat's
+ # cluster (cluster overlap dedup), leaving Pat's row only.
+ total_count: 1
+ rows:
+ - { compute_cluster_id: "${AMIE_CLUSTER_ID}",
+ local_username: custos-pfirst,
+ user_id: { from: "users.id where
[email protected]" } }
+
audit_log:
- total_count: 23
+ total_count: 26
by_action:
CREATE_PERSON: 3 # all 3 request_project_create packets audit
CREATE_PERSON (re-use audited too)
+ CREATE_ACCOUNT: 3 # PI CCU provisioning audited on every
request_project_create (re-use audited too)
CREATE_PROJECT: 3
CREATE_ALLOCATION: 3 # supplement-delivery still audits
CREATE_ALLOCATION (known M1: diff is the actual write)
REPLY_SENT: 8 # inform_transaction_complete does not reply
@@ -145,7 +159,6 @@ expectations:
- PERMANENTLY_FAILED
not_expected:
- - compute_cluster_users
- compute_allocation_memberships
- amie_processing_errors
diff --git a/connectors/COmanage/Identity-Provisioner/README.md
b/connectors/COmanage/Identity-Provisioner/README.md
new file mode 100644
index 000000000..9df9cf4be
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/README.md
@@ -0,0 +1,140 @@
+# COmanage Identity-Provisioner
+
+Connector that bridges Custos to a COmanage Registry. When the core service
+emits `ComputeClusterUserCreateEvent` for a cluster this connector services,
+the orchestrator ensures the user has a fully provisioned POSIX identity in
+COmanage: a CoPerson, a per-user CoGroup, the matching Identifiers, and a
+`UnixClusterAccount` block on the target UnixCluster.
+
+## How it loads
+
+The loader is wired from `internal/connectors/loader.go` alongside SLURM and
+AMIE. On startup:
+
+1. `comanage.LoadConnector` reads seven required env vars:
+ `COMANAGE_REGISTRY_URL`, `COMANAGE_CO_ID`, `COMANAGE_API_USER`,
+ `COMANAGE_API_KEY`, `COMANAGE_PERSON_ID_TYPE`,
+ `COMANAGE_UNIX_CLUSTER_ID`, `CUSTOS_CLUSTER_ID`.
+2. If any are missing it logs `comanage provisioner: required env vars not
set; skipping`
+ and returns `nil`. This is the documented way to disable the connector in a
+ deployment without code changes.
+
+`COMANAGE_PERSON_ID_TYPE` is the Identifier Type configured in the registry's
+Identifier Types screen that the connector uses to look up and tag a CoPerson.
+Set it to whatever string your COmanage instance uses for the per-person ID.
+3. If all six are present it constructs an HTTP client and a
+ `ClusterUserSubscriber`, then registers it against the shared event bus.
+ The subscriber filters by `CustosClusterID` so a deployment that services
+ multiple clusters can run multiple connector instances side by side.
+
+See `config.example.yaml` for the full env var reference (required + optional).
+
+## What the orchestrator does
+
+For each event, the orchestrator runs the following calls in order. Steps that
+find an existing record are short-circuited; steps that create something
+tolerate "already exists" responses so re-runs after a partial failure are
+idempotent.
+
+| # | Step | What it does | Endpoint |
+|---|------|--------------|----------|
+| 1 | Lookup CoPerson | Look up `comanage_id` stored in
`user_identities(source="comanage")`, fall back to REST email search, otherwise
POST a new CoPerson. | Core `GET /people/<comanage_id>`, REST `co_people.json`,
Core `POST /people` |
+| 2 | Store `comanage_id` | Persist the COmanage identifier in
`user_identities` so step 1 finds it next time. | Core service
`CreateUserIdentity` |
+| 3 | Read composite | Pull the full Core composite to read `uidnumber` and
`CoPerson.meta.id`. | Core `GET /people/<comanage_id>` |
+| 4 | Per-user CoGroup | Find or create a CoGroup named after the user's local
username (`GroupType:"CL"`, `Auto:false`). | REST `co_groups.json` |
+| 5 | groupname Identifier | Attach a `type:"uid"` Identifier (the local
username string) to the CoGroup. | REST `identifiers.json` |
+| 6 | gidnumber Identifier | Attach a `type:"gidnumber"` Identifier (numeric,
mirrors `uidnumber`) to the CoGroup. | REST `identifiers.json` |
+| 7 | CoGroupMember | Join the CoPerson to the CoGroup as member + owner. |
REST `co_group_members.json` |
+| 8 | UnixClusterGroup | Attach the CoGroup to the configured UnixCluster. A
4xx here is treated as "already attached". | REST `unix_cluster_groups.json` |
+| 9 | UnixClusterAccount | Re-GET a fresh composite, merge a
`UnixClusterAccount` block, then full-composite PUT. The merge round-trips
unmodeled fields as `json.RawMessage` so `deleteOmitted` cannot drop attributes
the connector does not understand. | Core `GET /people/<comanage_id>` then `PUT
/people/<comanage_id>` |
+
+Step 9 uses `sync_mode:"M"` (Manual) so a downstream provisioning plugin
+cannot overwrite the block when Custos is the source of truth.
+
+## Audit events
+
+The connector emits the following `audit_events.event_type` values via the
+core service. `audit_events.entity_id` is always the
`compute_cluster_users.id`.
+
+| Event type | When |
+|------------|------|
+| `ComanageCoPersonCreated` | A new CoPerson was POSTed in step 1. Lookups
that resolved to an existing CoPerson do not fire this. |
+| `ComanageClusterAccountAttached` | The sequence completed and the user has a
UnixClusterAccount block on the configured UnixCluster. |
+| `ComanageProvisioningFailed` | Any step returned an error. The `details`
field carries `step=<name> err=<message>` so the dead-letter is tractable from
the audit log alone. |
+
+The AMIE-Processor side of this flow also emits:
+
+| Event type | When |
+|------------|------|
+| `PosixUsernameTruncated` | The base username derived from the user's name
was longer than the POSIX cap and got truncated. |
+| `PosixUsernameUnbuildable` | First and last name both normalized to empty so
no candidate could be built. |
+| `PosixUsernameAllocatorExhausted` | All collision suffixes were exhausted.
The cluster-user row is not created. |
+
+## End-to-end audit walkthrough
+
+A successful provisioning of a new user produces this sequence for a single
+`compute_cluster_users.id`:
+
+1. AMIE side: `PosixUsernameTruncated` may fire if the user's given and family
+ name exceeded the POSIX cap.
+2. AMIE side: the cluster-user row is committed and the event bus delivers
+ `ComputeClusterUserCreateEvent`.
+3. COmanage side: `ComanageCoPersonCreated` fires if step 1 went through the
+ POST path. Idempotent re-runs against an existing CoPerson skip this event.
+4. COmanage side: `ComanageClusterAccountAttached` fires after step 9, with
+ `details` of the form `comanage_id=<id> username=<local> uid=<n>`.
+
+A failure produces `ComanageProvisioningFailed` with
+`details=step=<n> err=<msg>` in place of the success event.
+
+## Username allocation
+
+Local POSIX usernames are allocated by `pkg/posix`:
+
+1. `BuildBase` derives a candidate from `Prefix() + first-initial + last-name`,
+ normalised to lowercase ASCII, capped at the POSIX length limit. Truncation
+ fires a `PosixUsernameTruncated` audit event. If both names normalize to
+ empty, the allocator emits `PosixUsernameUnbuildable` and returns an error.
+2. The AMIE handler attempts `CreateComputeClusterUser` with the base. If the
+ composite UNIQUE on `(compute_cluster_id, local_username)` rejects it, the
+ handler retries with `base + "2"`, `base + "3"`, and so on up to
+ `MaxCollisionSuffix`.
+3. If suffixes are exhausted a `PosixUsernameAllocatorExhausted` event is
+ emitted and the handler returns an error.
+
+## Local development
+
+The connector reads config from env vars. To run it against a test registry:
+
+```bash
+export COMANAGE_REGISTRY_URL=https://<your-registry>/registry
+export COMANAGE_CO_ID=<numeric CO id>
+export COMANAGE_API_USER=<api user for that CO>
+export COMANAGE_API_KEY=<api key for that user>
+export COMANAGE_PERSON_ID_TYPE=<CoPerson Identifier Type, per your registry>
+export COMANAGE_UNIX_CLUSTER_ID=<numeric UnixCluster id>
+export CUSTOS_CLUSTER_ID=<UUID from the compute_clusters table>
+go run ./cmd/server
+```
+
+Without those vars set the connector skips registration silently, so the rest
+of Custos runs unaffected.
+
+## Test surface
+
+```bash
+# unit
+go test ./connectors/COmanage/Identity-Provisioner/...
+
+# integration (requires a real registry; skipped by default)
+go test -tags integration ./connectors/COmanage/Identity-Provisioner/...
+```
+
+The unit tests cover:
+
+- HTTP client transport: 5xx retry with exponential backoff, error
+ classification (`ErrNotFound`, `ErrAuth401`, `HTTPError`).
+- Core API and REST wrappers via `httptest.NewServer`.
+- The `compose.go` merge layer: a fixture composite is merged with a
+ `UnixClusterAccount` block, then asserted to preserve every key from the
+ original.
diff --git a/connectors/COmanage/Identity-Provisioner/config.example.yaml
b/connectors/COmanage/Identity-Provisioner/config.example.yaml
new file mode 100644
index 000000000..9ed3b990b
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/config.example.yaml
@@ -0,0 +1,20 @@
+# COmanage Identity-Provisioner connector
+#
+# This connector is configured via environment variables. Below is the full
+# list with example values; see README.md for semantics. Missing required
+# vars causes the loader to skip subscriber registration silently.
+
+# REQUIRED ---------------------------------------------------------------
+# COMANAGE_REGISTRY_URL=https://<your-registry>/registry
+# COMANAGE_CO_ID=<numeric CO id>
+# COMANAGE_API_USER=<api user for that CO>
+# COMANAGE_API_KEY=<api key for that user>
+# COMANAGE_PERSON_ID_TYPE=<CoPerson Identifier Type configured in the registry>
+# COMANAGE_UNIX_CLUSTER_ID=<numeric UnixCluster id>
+# CUSTOS_CLUSTER_ID=<UUID of the compute_cluster row this connector services>
+
+# OPTIONAL ---------------------------------------------------------------
+# COMANAGE_DEFAULT_SHELL=/bin/bash
+# COMANAGE_HOMEDIR_PREFIX=/home/
+# COMANAGE_HTTP_TIMEOUT=30s
+# POSIX_USERNAME_PREFIX=custos # read by pkg/posix; default "custos"
diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/client.go
b/connectors/COmanage/Identity-Provisioner/internal/client/client.go
new file mode 100644
index 000000000..bf95a998c
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/client.go
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package client wraps the COmanage Core API and REST surfaces. Auth is HTTP
+// Basic.
+package client
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "time"
+)
+
+// restAPIVersion is the COmanage REST v1 model version.
+const restAPIVersion = "1.0"
+
+type Config struct {
+ RegistryURL string
+ COID int
+ APIUser string
+ APIKey string
+ // PersonIDType is the COmanage Identifier Type used to look up and tag
a
+ // CoPerson (e.g. the type name configured in the registry's Identifier
+ // Types). Required value.
+ PersonIDType string
+ UnixClusterID int
+ CustosClusterID string
+ DefaultShell string
+ HomedirPrefix string
+ HTTPTimeout time.Duration
+}
+
+type Client struct {
+ cfg Config
+ http *http.Client
+}
+
+func New(cfg Config) *Client {
+ timeout := cfg.HTTPTimeout
+ if timeout == 0 {
+ timeout = 30 * time.Second
+ }
+ return &Client{
+ cfg: cfg,
+ http: &http.Client{Timeout: timeout},
+ }
+}
+
+func (c *Client) Config() Config { return c.cfg }
+
+// coreAPI and restAPI are the two URL families. Centralized so an upstream
+// version bump flips one line.
+func (c *Client) coreAPI(path string) string {
+ return fmt.Sprintf("%s/api/co/%d/core/v1%s", c.cfg.RegistryURL,
c.cfg.COID, path)
+}
+
+func (c *Client) restAPI(path string) string {
+ return c.cfg.RegistryURL + path
+}
+
+// Do issue an authenticated request and retries 5xx with backoff.
+func (c *Client) Do(method, url string, body []byte) (*http.Response, []byte,
error) {
+ resp, respBody, err := c.doOnce(method, url, body, c.cfg.APIKey)
+ if err == nil && resp.StatusCode >= 500 && resp.StatusCode < 600 {
+ // 1s, 2s, 4s
+ for attempt := 1; attempt <= 3; attempt++ {
+ _ = resp.Body.Close()
+ time.Sleep(time.Duration(1<<uint(attempt-1)) *
time.Second)
+ resp, respBody, err = c.doOnce(method, url, body,
c.cfg.APIKey)
+ if err != nil || resp.StatusCode < 500 {
+ break
+ }
+ }
+ }
+ return resp, respBody, err
+}
+
+func (c *Client) doOnce(method, url string, body []byte, apiKey string)
(*http.Response, []byte, error) {
+ var rdr io.Reader
+ if body != nil {
+ rdr = bytes.NewReader(body)
+ }
+ req, err := http.NewRequest(method, url, rdr)
+ if err != nil {
+ return nil, nil, fmt.Errorf("build request: %w", err)
+ }
+ if body != nil {
+ req.Header.Set("Content-Type", "application/json")
+ }
+ req.SetBasicAuth(c.cfg.APIUser, apiKey)
+
+ resp, err := c.http.Do(req)
+ if err != nil {
+ return nil, nil, fmt.Errorf("http: %w", err)
+ }
+ respBody, readErr := io.ReadAll(resp.Body)
+ _ = resp.Body.Close()
+ if readErr != nil {
+ return resp, nil, fmt.Errorf("read response: %w", readErr)
+ }
+ return resp, respBody, nil
+}
+
+var (
+ ErrAuth401 = errors.New("comanage: 401 unauthorized")
+ ErrNotFound = errors.New("comanage: not found")
+)
+
+type HTTPError struct {
+ Method string
+ URL string
+ StatusCode int
+ Body string
+}
+
+func (e *HTTPError) Error() string {
+ return fmt.Sprintf("comanage %s %s: %d: %s", e.Method, e.URL,
e.StatusCode, truncate(e.Body, 200))
+}
+
+func truncate(s string, n int) string {
+ s = strings.TrimSpace(s)
+ if len(s) <= n {
+ return s
+ }
+ return s[:n] + "..."
+}
diff --git
a/connectors/COmanage/Identity-Provisioner/internal/client/client_test.go
b/connectors/COmanage/Identity-Provisioner/internal/client/client_test.go
new file mode 100644
index 000000000..471aa606b
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/client_test.go
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+ "encoding/json"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func newTestClient(t *testing.T, baseURL string) *Client {
+ t.Helper()
+ return New(Config{
+ RegistryURL: baseURL,
+ COID: 2,
+ APIUser: "co_2.testuser",
+ APIKey: "key-primary",
+ UnixClusterID: 1,
+ HTTPTimeout: 5 * time.Second,
+ })
+}
+
+func TestCreatePerson_SendsBasicAuthAndDecodes(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,
r *http.Request) {
+ if r.Method != http.MethodPost ||
!strings.HasSuffix(r.URL.Path, "/api/co/2/core/v1/people") {
+ t.Errorf("unexpected request: %s %s", r.Method,
r.URL.Path)
+ }
+ user, pass, ok := r.BasicAuth()
+ if !ok || user != "co_2.testuser" || pass != "key-primary" {
+ t.Errorf("auth header: ok=%v user=%q pass=%q", ok,
user, pass)
+ }
+ w.WriteHeader(http.StatusCreated)
+ _, _ = io.WriteString(w,
`[{"identifier":"Person100099","type":"comanage_id","login":false,"status":"A"}]`)
+ }))
+ defer srv.Close()
+
+ c := newTestClient(t, srv.URL+"/registry")
+ c.cfg.RegistryURL = srv.URL // override to match server root
+ out, err := c.CreatePerson([]byte(`{}`))
+ if err != nil {
+ t.Fatalf("CreatePerson: %v", err)
+ }
+ if len(out) != 1 || out[0].Identifier != "Person100099" || out[0].Type
!= "comanage_id" {
+ t.Errorf("got %+v", out)
+ }
+}
+
+func TestDo_RetriesOn5xx(t *testing.T) {
+ var calls atomic.Int32
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,
r *http.Request) {
+ n := calls.Add(1)
+ if n < 3 {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ _, _ = io.WriteString(w, "ok")
+ }))
+ defer srv.Close()
+
+ c := New(Config{
+ RegistryURL: srv.URL,
+ COID: 2,
+ APIUser: "u",
+ APIKey: "k",
+ HTTPTimeout: 2 * time.Second,
+ })
+ resp, body, err := c.Do("GET", srv.URL+"/anything", nil)
+ if err != nil {
+ t.Fatalf("Do: %v", err)
+ }
+ if resp.StatusCode != http.StatusOK {
+ t.Errorf("status: got %d", resp.StatusCode)
+ }
+ if got := string(body); got != "ok" {
+ t.Errorf("body: %q", got)
+ }
+ if calls.Load() != 3 {
+ t.Errorf("calls: got %d, want 3", calls.Load())
+ }
+}
+
+func TestCreateCoGroup_EnvelopeShape(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,
r *http.Request) {
+ if !strings.HasSuffix(r.URL.Path, "/co_groups.json") {
+ t.Errorf("unexpected path: %s", r.URL.Path)
+ }
+ body, _ := io.ReadAll(r.Body)
+ var req CoGroupCreateRequest
+ if err := json.Unmarshal(body, &req); err != nil {
+ t.Fatalf("decode request: %v", err)
+ }
+ if req.RequestType != "CoGroups" || req.Version != "1.0" ||
len(req.CoGroups) != 1 {
+ t.Errorf("envelope: %+v", req)
+ }
+ g := req.CoGroups[0]
+ if g.CoId != 2 || g.Name != "custos-test" || g.GroupType !=
"CL" || g.Auto {
+ t.Errorf("group fields: %+v", g)
+ }
+ w.WriteHeader(http.StatusCreated)
+ _, _ = io.WriteString(w,
`{"ResponseType":"NewObject","Version":"1.0","ObjectType":"CoGroup","Id":"42"}`)
+ }))
+ defer srv.Close()
+
+ c := newTestClient(t, srv.URL)
+ c.cfg.RegistryURL = srv.URL
+ id, err := c.CreateCoGroup("custos-test", "Primary group for
custos-test")
+ if err != nil {
+ t.Fatalf("CreateCoGroup: %v", err)
+ }
+ if id != 42 {
+ t.Errorf("id: got %d, want 42", id)
+ }
+}
+
+func TestCreateIdentifierOnGroup_UsesNestedPersonShape(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,
r *http.Request) {
+ body, _ := io.ReadAll(r.Body)
+ var req IdentifierCreateRequest
+ if err := json.Unmarshal(body, &req); err != nil {
+ t.Fatalf("decode: %v", err)
+ }
+ ident := req.Identifiers[0]
+ if ident.Person.Type != "Group" || ident.Person.Id != 25 {
+ t.Errorf("Person field: %+v (need {Type:Group,Id:25})",
ident.Person)
+ }
+ w.WriteHeader(http.StatusCreated)
+ _, _ = io.WriteString(w,
`{"ResponseType":"NewObject","Version":"1.0","ObjectType":"Identifier","Id":"7"}`)
+ }))
+ defer srv.Close()
+
+ c := newTestClient(t, srv.URL)
+ c.cfg.RegistryURL = srv.URL
+ id, err := c.CreateIdentifierOnGroup("custos-alice", "uid", 25)
+ if err != nil {
+ t.Fatalf("CreateIdentifierOnGroup: %v", err)
+ }
+ if id != 7 {
+ t.Errorf("id: %d", id)
+ }
+}
+
+func TestCreateUnixClusterGroup_URLAndBody(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,
r *http.Request) {
+ if !strings.HasSuffix(r.URL.Path,
"/unix_cluster/unix_cluster_groups.json") {
+ t.Errorf("unexpected path: %s", r.URL.Path)
+ }
+ body, _ := io.ReadAll(r.Body)
+ var req UnixClusterGroupCreateRequest
+ if err := json.Unmarshal(body, &req); err != nil {
+ t.Fatalf("decode: %v", err)
+ }
+ one := req.UnixClusterGroups[0]
+ if one.UnixClusterId != 1 || one.CoGroupId != 25 {
+ t.Errorf("fields: %+v", one)
+ }
+ w.WriteHeader(http.StatusCreated)
+ _, _ = io.WriteString(w,
`{"ResponseType":"NewObject","Version":"1.0","ObjectType":"UnixClusterGroup","Id":"3"}`)
+ }))
+ defer srv.Close()
+
+ c := newTestClient(t, srv.URL)
+ c.cfg.RegistryURL = srv.URL
+ id, err := c.CreateUnixClusterGroup(25)
+ if err != nil {
+ t.Fatalf("CreateUnixClusterGroup: %v", err)
+ }
+ if id != 3 {
+ t.Errorf("id: %d", id)
+ }
+}
+
+func TestGetPersonComposite_PreservesRawJSON(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,
r *http.Request) {
+ if r.Method != http.MethodGet || !strings.Contains(r.URL.Path,
"/api/co/2/core/v1/people/Person100099") {
+ t.Errorf("unexpected request: %s %s", r.Method,
r.URL.Path)
+ }
+ w.Header().Set("Content-Type", "application/json")
+ _, _ = io.WriteString(w,
`{"CoPerson":{"meta":{"id":99}},"Name":[],"Identifier":[]}`)
+ }))
+ defer srv.Close()
+
+ c := newTestClient(t, srv.URL)
+ c.cfg.RegistryURL = srv.URL
+ raw, err := c.GetPersonComposite("Person100099")
+ if err != nil {
+ t.Fatalf("GetPersonComposite: %v", err)
+ }
+ if !strings.Contains(string(raw), `"id":99`) {
+ t.Errorf("raw missing expected substring: %s", string(raw))
+ }
+}
+
+func TestFindCoGroupByName_ExactMatchOnly(t *testing.T) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,
r *http.Request) {
+ _, _ = io.WriteString(w, `{
+ "ResponseType":"CoGroups","Version":"1.0",
+ "CoGroups":[
+ {"Version":"1.0","Id":11,"CoId":2,"Name":"custos-alice"},
+ {"Version":"1.0","Id":12,"CoId":2,"Name":"custos-bob"}
+ ]
+ }`)
+ }))
+ defer srv.Close()
+
+ c := newTestClient(t, srv.URL)
+ c.cfg.RegistryURL = srv.URL
+
+ id, err := c.FindCoGroupByName("custos-bob")
+ if err != nil {
+ t.Fatalf("FindCoGroupByName: %v", err)
+ }
+ if id != 12 {
+ t.Errorf("id: got %d, want 12", id)
+ }
+ id, err = c.FindCoGroupByName("custos-nope")
+ if err != nil {
+ t.Fatalf("FindCoGroupByName miss: %v", err)
+ }
+ if id != 0 {
+ t.Errorf("miss should return 0, got %d", id)
+ }
+}
diff --git
a/connectors/COmanage/Identity-Provisioner/internal/client/group_members.go
b/connectors/COmanage/Identity-Provisioner/internal/client/group_members.go
new file mode 100644
index 000000000..922cc8984
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/group_members.go
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+)
+
+type CoGroupMemberCreateRequest struct {
+ RequestType string `json:"RequestType"`
+ Version string `json:"Version"`
+ CoGroupMembers []CoGroupMemberCreateOne `json:"CoGroupMembers"`
+}
+
+type CoGroupMemberCreateOne struct {
+ Version string `json:"Version"`
+ Person IdentifierParent `json:"Person"`
+ CoGroupId int `json:"CoGroupId"`
+ Member bool `json:"Member"`
+ Owner bool `json:"Owner"`
+}
+
+func (c *Client) CreateCoGroupMember(coPersonId, coGroupId int) (int, error) {
+ body, err := json.Marshal(CoGroupMemberCreateRequest{
+ RequestType: "CoGroupMembers",
+ Version: restAPIVersion,
+ CoGroupMembers: []CoGroupMemberCreateOne{{
+ Version: restAPIVersion,
+ Person: IdentifierParent{Type: "CO", Id: coPersonId},
+ CoGroupId: coGroupId,
+ Member: true,
+ Owner: true,
+ }},
+ })
+ if err != nil {
+ return 0, fmt.Errorf("marshal co_group_member: %w", err)
+ }
+
+ u := c.restAPI("/co_group_members.json")
+ resp, respBody, err := c.Do(http.MethodPost, u, body)
+ if err != nil {
+ return 0, err
+ }
+ switch resp.StatusCode {
+ case http.StatusCreated, http.StatusOK:
+ var out CoGroupCreateResponse
+ if err := json.Unmarshal(respBody, &out); err != nil {
+ return 0, fmt.Errorf("decode co_group_member response:
%w: %s", err, string(respBody))
+ }
+ var id int
+ if _, err := fmt.Sscanf(out.Id, "%d", &id); err != nil {
+ return 0, fmt.Errorf("parse co_group_member id %q: %w",
out.Id, err)
+ }
+ return id, nil
+ case http.StatusUnauthorized:
+ return 0, ErrAuth401
+ default:
+ return 0, &HTTPError{Method: "POST", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
+
+type CoGroupMemberListResponse struct {
+ ResponseType string `json:"ResponseType"`
+ Version string `json:"Version"`
+ CoGroupMembers []CoGroupMemberListOne `json:"CoGroupMembers"`
+}
+
+type CoGroupMemberListOne struct {
+ Version string `json:"Version"`
+ Id int `json:"Id"`
+ Person IdentifierParent `json:"Person"`
+ CoGroupId int `json:"CoGroupId"`
+ Member bool `json:"Member"`
+ Owner bool `json:"Owner"`
+}
+
+// FindCoGroupMember returns the membership id for a (group, person) pair, or
+// 0 if none.
+func (c *Client) FindCoGroupMember(coGroupId, coPersonId int) (int, error) {
+ u :=
c.restAPI(fmt.Sprintf("/co_group_members.json?cogroupid=%d&copersonid=%d",
coGroupId, coPersonId))
+ resp, respBody, err := c.Do(http.MethodGet, u, nil)
+ if err != nil {
+ return 0, err
+ }
+ switch resp.StatusCode {
+ case http.StatusOK:
+ var out CoGroupMemberListResponse
+ if err := json.Unmarshal(respBody, &out); err != nil {
+ return 0, fmt.Errorf("decode co_group_members list:
%w", err)
+ }
+ for _, m := range out.CoGroupMembers {
+ if m.CoGroupId == coGroupId && m.Person.Id ==
coPersonId {
+ return m.Id, nil
+ }
+ }
+ return 0, nil
+ case http.StatusNoContent:
+ return 0, nil
+ case http.StatusUnauthorized:
+ return 0, ErrAuth401
+ default:
+ return 0, &HTTPError{Method: "GET", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/groups.go
b/connectors/COmanage/Identity-Provisioner/internal/client/groups.go
new file mode 100644
index 000000000..d52e52ad7
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/groups.go
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+)
+
+// CoGroupCreateRequest is the body envelope for POST /co_groups.json.
+// GroupType "CL" = GroupEnum::Clusters (per-user cluster primary group).
+type CoGroupCreateRequest struct {
+ RequestType string `json:"RequestType"`
+ Version string `json:"Version"`
+ CoGroups []CoGroupCreateOne `json:"CoGroups"`
+}
+
+type CoGroupCreateOne struct {
+ Version string `json:"Version"`
+ CoId int `json:"CoId"`
+ Name string `json:"Name"`
+ Description string `json:"Description"`
+ Open bool `json:"Open"`
+ Status string `json:"Status"`
+ GroupType string `json:"GroupType"`
+ Auto bool `json:"Auto"`
+}
+
+// CoGroupCreateResponse is the standard NewObject shape returned by COmanage
+// REST POSTs.
+type CoGroupCreateResponse struct {
+ ResponseType string `json:"ResponseType"`
+ Version string `json:"Version"`
+ ObjectType string `json:"ObjectType"`
+ Id string `json:"Id"`
+}
+
+func (c *Client) CreateCoGroup(name, description string) (int, error) {
+ body, err := json.Marshal(CoGroupCreateRequest{
+ RequestType: "CoGroups",
+ Version: restAPIVersion,
+ CoGroups: []CoGroupCreateOne{{
+ Version: restAPIVersion,
+ CoId: c.cfg.COID,
+ Name: name,
+ Description: description,
+ Open: false,
+ Status: "Active",
+ GroupType: "CL",
+ Auto: false,
+ }},
+ })
+ if err != nil {
+ return 0, fmt.Errorf("marshal co_group: %w", err)
+ }
+
+ u := c.restAPI("/co_groups.json")
+ resp, respBody, err := c.Do(http.MethodPost, u, body)
+ if err != nil {
+ return 0, err
+ }
+ switch resp.StatusCode {
+ case http.StatusCreated, http.StatusOK:
+ var out CoGroupCreateResponse
+ if err := json.Unmarshal(respBody, &out); err != nil {
+ return 0, fmt.Errorf("decode co_group create response:
%w: %s", err, string(respBody))
+ }
+ var id int
+ if _, err := fmt.Sscanf(out.Id, "%d", &id); err != nil {
+ return 0, fmt.Errorf("parse co_group id %q: %w",
out.Id, err)
+ }
+ return id, nil
+ case http.StatusUnauthorized:
+ return 0, ErrAuth401
+ default:
+ return 0, &HTTPError{Method: "POST", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
+
+type CoGroupListResponse struct {
+ ResponseType string `json:"ResponseType"`
+ Version string `json:"Version"`
+ CoGroups []CoGroupReadOne `json:"CoGroups"`
+}
+
+type CoGroupReadOne struct {
+ Version string `json:"Version"`
+ Id int `json:"Id"`
+ CoId int `json:"CoId"`
+ Name string `json:"Name"`
+}
+
+// FindCoGroupByName returns the group id, or 0 if no group with that name
+// exists in the configured CO.
+func (c *Client) FindCoGroupByName(name string) (int, error) {
+ u := c.restAPI(fmt.Sprintf("/co_groups.json?coid=%d", c.cfg.COID))
+ resp, respBody, err := c.Do(http.MethodGet, u, nil)
+ if err != nil {
+ return 0, err
+ }
+ switch resp.StatusCode {
+ case http.StatusOK:
+ var out CoGroupListResponse
+ if err := json.Unmarshal(respBody, &out); err != nil {
+ return 0, fmt.Errorf("decode co_groups list: %w", err)
+ }
+ for _, g := range out.CoGroups {
+ if g.Name == name {
+ return g.Id, nil
+ }
+ }
+ return 0, nil
+ case http.StatusNoContent:
+ return 0, nil
+ case http.StatusUnauthorized:
+ return 0, ErrAuth401
+ default:
+ return 0, &HTTPError{Method: "GET", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
+
+func (c *Client) DeleteCoGroup(id int) error {
+ u := c.restAPI(fmt.Sprintf("/co_groups/%d.json", id))
+ resp, respBody, err := c.Do(http.MethodDelete, u, nil)
+ if err != nil {
+ return err
+ }
+ switch resp.StatusCode {
+ case http.StatusOK, http.StatusNoContent:
+ return nil
+ case http.StatusUnauthorized:
+ return ErrAuth401
+ case http.StatusNotFound:
+ return ErrNotFound
+ default:
+ return &HTTPError{Method: "DELETE", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
diff --git
a/connectors/COmanage/Identity-Provisioner/internal/client/identifiers.go
b/connectors/COmanage/Identity-Provisioner/internal/client/identifiers.go
new file mode 100644
index 000000000..f562c2915
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/identifiers.go
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+)
+
+// IdentifierParent is the nested {"Type":"Group"|"CO","Id":<n>} block that
+// attaches an Identifier to a CoGroup or CoPerson.
+type IdentifierParent struct {
+ Type string `json:"Type"`
+ Id int `json:"Id"`
+}
+
+type IdentifierCreateRequest struct {
+ RequestType string `json:"RequestType"`
+ Version string `json:"Version"`
+ Identifiers []IdentifierCreateOne `json:"Identifiers"`
+}
+
+type IdentifierCreateOne struct {
+ Version string `json:"Version"`
+ Identifier string `json:"Identifier"`
+ Type string `json:"Type"`
+ Login bool `json:"Login"`
+ Status string `json:"Status"`
+ Person IdentifierParent `json:"Person"`
+}
+
+// CreateIdentifierOnGroup POSTs an Identifier attached to a CoGroup.
+func (c *Client) CreateIdentifierOnGroup(value, identifierType string,
coGroupId int) (int, error) {
+ body, err := json.Marshal(IdentifierCreateRequest{
+ RequestType: "Identifiers",
+ Version: restAPIVersion,
+ Identifiers: []IdentifierCreateOne{{
+ Version: restAPIVersion,
+ Identifier: value,
+ Type: identifierType,
+ Login: false,
+ Status: "Active",
+ Person: IdentifierParent{Type: "Group", Id:
coGroupId},
+ }},
+ })
+ if err != nil {
+ return 0, fmt.Errorf("marshal identifier: %w", err)
+ }
+
+ u := c.restAPI("/identifiers.json")
+ resp, respBody, err := c.Do(http.MethodPost, u, body)
+ if err != nil {
+ return 0, err
+ }
+ switch resp.StatusCode {
+ case http.StatusCreated, http.StatusOK:
+ var out CoGroupCreateResponse // same NewObject shape
+ if err := json.Unmarshal(respBody, &out); err != nil {
+ return 0, fmt.Errorf("decode identifier create
response: %w: %s", err, string(respBody))
+ }
+ var id int
+ if _, err := fmt.Sscanf(out.Id, "%d", &id); err != nil {
+ return 0, fmt.Errorf("parse identifier id %q: %w",
out.Id, err)
+ }
+ return id, nil
+ case http.StatusUnauthorized:
+ return 0, ErrAuth401
+ default:
+ return 0, &HTTPError{Method: "POST", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
+
+type IdentifierListResponse struct {
+ ResponseType string `json:"ResponseType"`
+ Version string `json:"Version"`
+ Identifiers []IdentifierListOne `json:"Identifiers"`
+}
+
+type IdentifierListOne struct {
+ Version string `json:"Version"`
+ Id int `json:"Id"`
+ Identifier string `json:"Identifier"`
+ Type string `json:"Type"`
+ Status string `json:"Status"`
+}
+
+// FindIdentifierOnGroup returns the existing Identifier id with the given
+// type on a CoGroup, or 0 if none.
+func (c *Client) FindIdentifierOnGroup(coGroupId int, identifierType string)
(int, error) {
+ u := c.restAPI(fmt.Sprintf("/identifiers.json?cogroupid=%d", coGroupId))
+ resp, respBody, err := c.Do(http.MethodGet, u, nil)
+ if err != nil {
+ return 0, err
+ }
+ switch resp.StatusCode {
+ case http.StatusOK:
+ var out IdentifierListResponse
+ if err := json.Unmarshal(respBody, &out); err != nil {
+ return 0, fmt.Errorf("decode identifiers list: %w", err)
+ }
+ for _, ident := range out.Identifiers {
+ if ident.Type == identifierType {
+ return ident.Id, nil
+ }
+ }
+ return 0, nil
+ case http.StatusNoContent:
+ return 0, nil
+ case http.StatusUnauthorized:
+ return 0, ErrAuth401
+ default:
+ return 0, &HTTPError{Method: "GET", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/people.go
b/connectors/COmanage/Identity-Provisioner/internal/client/people.go
new file mode 100644
index 000000000..8968c1f8a
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/people.go
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/url"
+)
+
+// CreatePersonResponseEntry is one Identifier echoed by POST /people. COmanage
+// returns one row per identifier type configured on the binding, typically
+// including the auto-assigned ids.
+type CreatePersonResponseEntry struct {
+ Identifier string `json:"identifier"`
+ Type string `json:"type"`
+ Login bool `json:"login"`
+ Status string `json:"status"`
+}
+
+// CreatePerson POSTs the composite body (CoPerson + Name + Email +
+// Identifier blocks) to /people and returns the resulting Identifier list.
+func (c *Client) CreatePerson(body []byte) ([]CreatePersonResponseEntry,
error) {
+ u := c.coreAPI("/people")
+ resp, respBody, err := c.Do(http.MethodPost, u, body)
+ if err != nil {
+ return nil, err
+ }
+ switch resp.StatusCode {
+ case http.StatusCreated, http.StatusOK:
+ var out []CreatePersonResponseEntry
+ if err := json.Unmarshal(respBody, &out); err != nil {
+ return nil, fmt.Errorf("decode create-person response:
%w: %s", err, string(respBody))
+ }
+ return out, nil
+ case http.StatusUnauthorized:
+ return nil, ErrAuth401
+ default:
+ return nil, &HTTPError{Method: "POST", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
+
+// GetPersonComposite returns the raw composite JSON, suitable for
round-tripping
+// back into UpdatePerson.
+func (c *Client) GetPersonComposite(identifier string) (json.RawMessage,
error) {
+ u := c.coreAPI("/people/" + identifier)
+ resp, respBody, err := c.Do(http.MethodGet, u, nil)
+ if err != nil {
+ return nil, err
+ }
+ switch resp.StatusCode {
+ case http.StatusOK:
+ return json.RawMessage(respBody), nil
+ case http.StatusUnauthorized:
+ return nil, ErrAuth401
+ case http.StatusNotFound:
+ return nil, ErrNotFound
+ default:
+ return nil, &HTTPError{Method: "GET", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
+
+// UpdatePerson PUTs a full-composite body. The body MUST include every related
+// model (Name, EmailAddress, Identifier, etc.) or COmanage's deleteOmitted
+// behavior will wipe them.
+func (c *Client) UpdatePerson(identifier string, body []byte)
(json.RawMessage, error) {
+ u := c.coreAPI("/people/" + identifier)
+ resp, respBody, err := c.Do(http.MethodPut, u, body)
+ if err != nil {
+ return nil, err
+ }
+ switch resp.StatusCode {
+ case http.StatusOK:
+ return json.RawMessage(respBody), nil
+ case http.StatusUnauthorized:
+ return nil, ErrAuth401
+ case http.StatusNotFound:
+ return nil, ErrNotFound
+ default:
+ return nil, &HTTPError{Method: "PUT", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
+
+func (c *Client) DeletePerson(identifier string) error {
+ u := c.coreAPI("/people/" + identifier)
+ resp, respBody, err := c.Do(http.MethodDelete, u, nil)
+ if err != nil {
+ return err
+ }
+ switch resp.StatusCode {
+ case http.StatusOK, http.StatusNoContent:
+ return nil
+ case http.StatusUnauthorized:
+ return ErrAuth401
+ case http.StatusNotFound:
+ return ErrNotFound
+ default:
+ return &HTTPError{Method: "DELETE", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
+
+type CoPersonListResponse struct {
+ ResponseType string `json:"ResponseType"`
+ Version string `json:"Version"`
+ CoPeople []CoPersonListOne `json:"CoPeople"`
+}
+
+type CoPersonListOne struct {
+ Version string `json:"Version"`
+ Id int `json:"Id"`
+ CoId int `json:"CoId"`
+}
+
+// FindCoPersonByEmail searches for CoPeople. search.mail is a LIKE match in
+// COmanage; callers must post-filter for exact equality before trusting a hit.
+func (c *Client) FindCoPersonByEmail(email string) ([]CoPersonListOne, error) {
+ u := c.restAPI(fmt.Sprintf("/co_people.json?coid=%d&search.mail=%s",
c.cfg.COID, url.QueryEscape(email)))
+ resp, respBody, err := c.Do(http.MethodGet, u, nil)
+ if err != nil {
+ return nil, err
+ }
+ switch resp.StatusCode {
+ case http.StatusOK:
+ var out CoPersonListResponse
+ if err := json.Unmarshal(respBody, &out); err != nil {
+ return nil, fmt.Errorf("decode co_people list: %w", err)
+ }
+ return out.CoPeople, nil
+ case http.StatusNoContent:
+ return nil, nil
+ case http.StatusUnauthorized:
+ return nil, ErrAuth401
+ case http.StatusNotFound:
+ return nil, ErrNotFound
+ default:
+ return nil, &HTTPError{Method: "GET", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
diff --git
a/connectors/COmanage/Identity-Provisioner/internal/client/unix_cluster.go
b/connectors/COmanage/Identity-Provisioner/internal/client/unix_cluster.go
new file mode 100644
index 000000000..2a9eaa99d
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/unix_cluster.go
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+)
+
+type UnixClusterGroupCreateRequest struct {
+ RequestType string `json:"RequestType"`
+ Version string `json:"Version"`
+ UnixClusterGroups []UnixClusterGroupCreateOne `json:"UnixClusterGroups"`
+}
+
+type UnixClusterGroupCreateOne struct {
+ Version string `json:"Version"`
+ UnixClusterId int `json:"UnixClusterId"`
+ CoGroupId int `json:"CoGroupId"`
+}
+
+// CreateUnixClusterGroup binds a CoGroup to a UnixCluster. The URL takes no
+// named params for POST.
+func (c *Client) CreateUnixClusterGroup(coGroupId int) (int, error) {
+ body, err := json.Marshal(UnixClusterGroupCreateRequest{
+ RequestType: "UnixClusterGroups",
+ Version: restAPIVersion,
+ UnixClusterGroups: []UnixClusterGroupCreateOne{{
+ Version: restAPIVersion,
+ UnixClusterId: c.cfg.UnixClusterID,
+ CoGroupId: coGroupId,
+ }},
+ })
+ if err != nil {
+ return 0, fmt.Errorf("marshal unix_cluster_group: %w", err)
+ }
+
+ u := c.restAPI("/unix_cluster/unix_cluster_groups.json")
+ resp, respBody, err := c.Do(http.MethodPost, u, body)
+ if err != nil {
+ return 0, err
+ }
+ switch resp.StatusCode {
+ case http.StatusCreated, http.StatusOK:
+ var out CoGroupCreateResponse // NewObject shape
+ if err := json.Unmarshal(respBody, &out); err != nil {
+ return 0, fmt.Errorf("decode unix_cluster_group
response: %w: %s", err, string(respBody))
+ }
+ var id int
+ if _, err := fmt.Sscanf(out.Id, "%d", &id); err != nil {
+ return 0, fmt.Errorf("parse unix_cluster_group id %q:
%w", out.Id, err)
+ }
+ return id, nil
+ case http.StatusUnauthorized:
+ return 0, ErrAuth401
+ default:
+ return 0, &HTTPError{Method: "POST", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
+
+func (c *Client) DeleteUnixClusterGroup(id int) error {
+ u := c.restAPI(fmt.Sprintf("/unix_cluster/unix_cluster_groups/%d.json",
id))
+ resp, respBody, err := c.Do(http.MethodDelete, u, nil)
+ if err != nil {
+ return err
+ }
+ switch resp.StatusCode {
+ case http.StatusOK, http.StatusNoContent:
+ return nil
+ case http.StatusUnauthorized:
+ return ErrAuth401
+ case http.StatusNotFound:
+ return ErrNotFound
+ default:
+ return &HTTPError{Method: "DELETE", URL: u, StatusCode:
resp.StatusCode, Body: string(respBody)}
+ }
+}
diff --git
a/connectors/COmanage/Identity-Provisioner/internal/operations/compose.go
b/connectors/COmanage/Identity-Provisioner/internal/operations/compose.go
new file mode 100644
index 000000000..e080acbf5
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/operations/compose.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package operations
+
+import (
+ "encoding/json"
+ "fmt"
+)
+
+// UnixClusterAccountBlock is appended to the composite PUT body. Field names
+// mirror COmanage's JSON attributes.
+type UnixClusterAccountBlock struct {
+ UnixClusterId int `json:"unix_cluster_id"`
+ SyncMode string `json:"sync_mode"`
+ Status string `json:"status"`
+ Username string `json:"username"`
+ Uid int64 `json:"uid"`
+ Gecos string `json:"gecos"`
+ LoginShell string `json:"login_shell"`
+ HomeDirectory string `json:"home_directory"`
+ PrimaryCoGroupId int `json:"primary_co_group_id"`
+}
+
+// mergeUnixClusterAccount sets "UnixClusterAccount" on the composite while
+// preserving every other top-level key.
+func mergeUnixClusterAccount(composite json.RawMessage, block
UnixClusterAccountBlock) ([]byte, error) {
+ var top map[string]json.RawMessage
+ if err := json.Unmarshal(composite, &top); err != nil {
+ return nil, fmt.Errorf("decode composite: %w", err)
+ }
+ blockJSON, err := json.Marshal([]UnixClusterAccountBlock{block})
+ if err != nil {
+ return nil, fmt.Errorf("encode UnixClusterAccount block: %w",
err)
+ }
+ top["UnixClusterAccount"] = blockJSON
+ out, err := json.Marshal(top)
+ if err != nil {
+ return nil, fmt.Errorf("encode merged composite: %w", err)
+ }
+ return out, nil
+}
+
+// extractIdentifier returns the first Identifier.identifier whose type
matches,
+// or "" if none.
+func extractIdentifier(composite json.RawMessage, identifierType string)
(string, error) {
+ var top map[string]json.RawMessage
+ if err := json.Unmarshal(composite, &top); err != nil {
+ return "", fmt.Errorf("decode composite: %w", err)
+ }
+ rawIdents, ok := top["Identifier"]
+ if !ok {
+ return "", nil
+ }
+ var idents []struct {
+ Identifier string `json:"identifier"`
+ Type string `json:"type"`
+ }
+ if err := json.Unmarshal(rawIdents, &idents); err != nil {
+ return "", fmt.Errorf("decode Identifier array: %w", err)
+ }
+ for _, id := range idents {
+ if id.Type == identifierType {
+ return id.Identifier, nil
+ }
+ }
+ return "", nil
+}
+
+// extractCoPersonID returns the numeric CoPerson.meta.id (distinct from the
+// PersonIDType identifier string), or 0 if missing.
+func extractCoPersonID(composite json.RawMessage) (int, error) {
+ var top struct {
+ CoPerson struct {
+ Meta struct {
+ Id int `json:"id"`
+ } `json:"meta"`
+ } `json:"CoPerson"`
+ }
+ if err := json.Unmarshal(composite, &top); err != nil {
+ return 0, fmt.Errorf("decode composite for CoPerson.id: %w",
err)
+ }
+ return top.CoPerson.Meta.Id, nil
+}
diff --git
a/connectors/COmanage/Identity-Provisioner/internal/operations/compose_test.go
b/connectors/COmanage/Identity-Provisioner/internal/operations/compose_test.go
new file mode 100644
index 000000000..c7d0b12d4
--- /dev/null
+++
b/connectors/COmanage/Identity-Provisioner/internal/operations/compose_test.go
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package operations
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/apache/airavata-custos/pkg/models"
+)
+
+// sampleComposite mirrors a real CoPerson composite trimmed to the fields the
+// operations layer reads.
+const sampleComposite = `{
+ "CoPerson":{"meta":{"id":98},"co_id":2,"status":"A"},
+
"Name":[{"given":"GoalE2E","family":"Throwaway","type":"official","primary_name":true}],
+
"EmailAddress":[{"mail":"[email protected]","type":"official","verified":false}],
+ "CoGroupMember":[
+ {"co_group_id":7,"member":true,"owner":false},
+ {"co_group_id":25,"member":true,"owner":true}
+ ],
+ "Identifier":[
+
{"identifier":"http://test.invalid/sub","type":"oidcsub","login":true,"status":"A"},
+
{"identifier":"Person100016","type":"comanage_id","login":false,"status":"A"},
+
{"identifier":"100016","type":"comanage_number","login":false,"status":"A"},
+ {"identifier":"vspectes2","type":"uid","login":false,"status":"A"},
+ {"identifier":"2000016","type":"uidnumber","login":false,"status":"A"},
+ {"identifier":"2000016","type":"gidnumber","login":false,"status":"A"}
+ ]
+}`
+
+func TestExtractIdentifier(t *testing.T) {
+ tests := []struct {
+ typeName string
+ want string
+ }{
+ {"comanage_id", "Person100016"},
+ {"uidnumber", "2000016"},
+ {"gidnumber", "2000016"},
+ {"oidcsub", "http://test.invalid/sub"},
+ {"missing-type", ""},
+ }
+ for _, tc := range tests {
+ t.Run(tc.typeName, func(t *testing.T) {
+ got, err := extractIdentifier([]byte(sampleComposite),
tc.typeName)
+ if err != nil {
+ t.Fatalf("extractIdentifier: %v", err)
+ }
+ if got != tc.want {
+ t.Errorf("got %q, want %q", got, tc.want)
+ }
+ })
+ }
+}
+
+func TestExtractCoPersonID(t *testing.T) {
+ got, err := extractCoPersonID([]byte(sampleComposite))
+ if err != nil {
+ t.Fatalf("extractCoPersonID: %v", err)
+ }
+ if got != 98 {
+ t.Errorf("got %d, want 98", got)
+ }
+}
+
+func TestMergeUnixClusterAccount_PreservesAllKeysAndAppendsBlock(t *testing.T)
{
+ block := UnixClusterAccountBlock{
+ UnixClusterId: 1,
+ SyncMode: "M",
+ Status: "A",
+ Username: "custos-gthrowa",
+ Uid: 2000016,
+ Gecos: "",
+ LoginShell: "/bin/bash",
+ HomeDirectory: "/home/custos-gthrowa",
+ PrimaryCoGroupId: 25,
+ }
+ out, err := mergeUnixClusterAccount([]byte(sampleComposite), block)
+ if err != nil {
+ t.Fatalf("mergeUnixClusterAccount: %v", err)
+ }
+
+ var merged map[string]json.RawMessage
+ if err := json.Unmarshal(out, &merged); err != nil {
+ t.Fatalf("decode merged: %v", err)
+ }
+
+ for _, key := range []string{"CoPerson", "Name", "EmailAddress",
"Identifier", "CoGroupMember", "UnixClusterAccount"} {
+ if _, ok := merged[key]; !ok {
+ t.Errorf("merged composite missing key %q", key)
+ }
+ }
+
+ // UnixClusterAccount must be an array with one block matching what we
sent.
+ var unix []UnixClusterAccountBlock
+ if err := json.Unmarshal(merged["UnixClusterAccount"], &unix); err !=
nil {
+ t.Fatalf("decode UnixClusterAccount: %v", err)
+ }
+ if len(unix) != 1 || unix[0].Username != "custos-gthrowa" ||
unix[0].Uid != 2000016 || unix[0].PrimaryCoGroupId != 25 {
+ t.Errorf("unix block: %+v", unix)
+ }
+}
+
+func TestBuildCreatePersonBody_Shape(t *testing.T) {
+ u := &models.User{FirstName: "GoalE2E", LastName: "Throwaway", Email:
"[email protected]"}
+ raw, err := buildCreatePersonBody(2, u)
+ if err != nil {
+ t.Fatalf("buildCreatePersonBody: %v", err)
+ }
+ var got map[string]json.RawMessage
+ if err := json.Unmarshal(raw, &got); err != nil {
+ t.Fatalf("decode: %v", err)
+ }
+ for _, key := range []string{"CoPerson", "Name", "EmailAddress"} {
+ if _, ok := got[key]; !ok {
+ t.Errorf("body missing %q", key)
+ }
+ }
+}
diff --git
a/connectors/COmanage/Identity-Provisioner/internal/operations/ensure_posix_account.go
b/connectors/COmanage/Identity-Provisioner/internal/operations/ensure_posix_account.go
new file mode 100644
index 000000000..7c2d8c964
--- /dev/null
+++
b/connectors/COmanage/Identity-Provisioner/internal/operations/ensure_posix_account.go
@@ -0,0 +1,265 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package operations
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "log/slog"
+ "strconv"
+
+
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client"
+ "github.com/apache/airavata-custos/pkg/models"
+)
+
+func (o *Orchestrator) ensurePOSIXAccountImpl(ctx context.Context, cu
*models.ComputeClusterUser) error {
+ log := slog.With("correlation_id", cu.ID, "custos_user_id", cu.UserID,
"unix_cluster_id", o.c.Config().UnixClusterID)
+
+ user, err := o.core.GetUser(ctx, cu.UserID)
+ if err != nil {
+ return fmt.Errorf("get custos user: %w", err)
+ }
+
+ personID, composite, created, err := o.lookupOrCreateCoPerson(ctx, user)
+ if err != nil {
+ o.dlq(ctx, cu, "lookup_or_create_coperson", err)
+ return err
+ }
+ log = log.With("comanage_person_id", personID)
+ log.Info("comanage: CoPerson resolved", "created", created)
+ if created {
+ o.audit(ctx, cu, "ComanageCoPersonCreated",
fmt.Sprintf("comanage_id=%s email=%s", personID, user.Email))
+ }
+
+ if err := o.storePersonID(ctx, cu.UserID, personID); err != nil {
+ log.Warn("comanage: failed to store CoPerson id", "err", err)
+ }
+
+ if composite == nil {
+ composite, err = o.c.GetPersonComposite(personID)
+ if err != nil {
+ o.dlq(ctx, cu, "get_composite", err)
+ return err
+ }
+ }
+
+ uidnumber, err := extractIdentifier(composite, "uidnumber")
+ if err != nil {
+ o.dlq(ctx, cu, "extract_uidnumber", err)
+ return err
+ }
+ if uidnumber == "" {
+ err := fmt.Errorf("uidnumber identifier missing on CoPerson
%s", personID)
+ o.dlq(ctx, cu, "missing_uidnumber", err)
+ return err
+ }
+ uidInt, err := strconv.ParseInt(uidnumber, 10, 64)
+ if err != nil {
+ o.dlq(ctx, cu, "parse_uidnumber", err)
+ return err
+ }
+
+ coPersonID, err := extractCoPersonID(composite)
+ if err != nil || coPersonID == 0 {
+ o.dlq(ctx, cu, "extract_coperson_id", err)
+ return fmt.Errorf("extract CoPerson.meta.id: %w", err)
+ }
+
+ coGroupID, err := o.c.FindCoGroupByName(cu.LocalUsername)
+ if err != nil {
+ o.dlq(ctx, cu, "find_co_group", err)
+ return err
+ }
+ if coGroupID == 0 {
+ coGroupID, err = o.c.CreateCoGroup(cu.LocalUsername, "Primary
group for "+cu.LocalUsername)
+ if err != nil {
+ o.dlq(ctx, cu, "create_co_group", err)
+ return err
+ }
+ log.Info("comanage: CoGroup created", "co_group_id", coGroupID)
+ }
+
+ if existing, err := o.c.FindIdentifierOnGroup(coGroupID, "uid"); err !=
nil {
+ o.dlq(ctx, cu, "find_groupname_identifier", err)
+ return err
+ } else if existing == 0 {
+ if _, err := o.c.CreateIdentifierOnGroup(cu.LocalUsername,
"uid", coGroupID); err != nil {
+ o.dlq(ctx, cu, "create_groupname_identifier", err)
+ return err
+ }
+ }
+
+ if existing, err := o.c.FindIdentifierOnGroup(coGroupID, "gidnumber");
err != nil {
+ o.dlq(ctx, cu, "find_gidnumber_identifier", err)
+ return err
+ } else if existing == 0 {
+ if _, err := o.c.CreateIdentifierOnGroup(uidnumber,
"gidnumber", coGroupID); err != nil {
+ o.dlq(ctx, cu, "create_gidnumber_identifier", err)
+ return err
+ }
+ }
+
+ if existing, err := o.c.FindCoGroupMember(coGroupID, coPersonID); err
!= nil {
+ o.dlq(ctx, cu, "find_co_group_member", err)
+ return err
+ } else if existing == 0 {
+ if _, err := o.c.CreateCoGroupMember(coPersonID, coGroupID);
err != nil {
+ o.dlq(ctx, cu, "create_co_group_member", err)
+ return err
+ }
+ }
+
+ // UnixClusterGroup attach: re-attempt is the idempotency mechanism. A
4xx
+ // here means the pair already exists; continue.
+ if _, err := o.c.CreateUnixClusterGroup(coGroupID); err != nil {
+ var httpErr *client.HTTPError
+ if !errors.As(err, &httpErr) || httpErr.StatusCode < 400 ||
httpErr.StatusCode >= 500 {
+ o.dlq(ctx, cu, "create_unix_cluster_group", err)
+ return err
+ }
+ log.Info("comanage: UnixClusterGroup attach returned 4xx
(already attached)", "status", httpErr.StatusCode)
+ }
+
+ fresh, err := o.c.GetPersonComposite(personID)
+ if err != nil {
+ o.dlq(ctx, cu, "refetch_composite", err)
+ return err
+ }
+ block := UnixClusterAccountBlock{
+ UnixClusterId: o.c.Config().UnixClusterID,
+ SyncMode: "M",
+ Status: "A",
+ Username: cu.LocalUsername,
+ Uid: uidInt,
+ Gecos: "",
+ LoginShell: o.c.Config().DefaultShell,
+ HomeDirectory: o.c.Config().HomedirPrefix + cu.LocalUsername,
+ PrimaryCoGroupId: coGroupID,
+ }
+ putBody, err := mergeUnixClusterAccount(fresh, block)
+ if err != nil {
+ o.dlq(ctx, cu, "merge_composite", err)
+ return err
+ }
+ if _, err := o.c.UpdatePerson(personID, putBody); err != nil {
+ o.dlq(ctx, cu, "put_composite", err)
+ return err
+ }
+ log.Info("comanage: UnixClusterAccount attached", "username",
cu.LocalUsername, "uid", uidInt, "co_group_id", coGroupID)
+ o.audit(ctx, cu, "ComanageClusterAccountAttached",
fmt.Sprintf("comanage_id=%s username=%s uid=%d", personID, cu.LocalUsername,
uidInt))
+ return nil
+}
+
+// lookupOrCreateCoPerson resolves the user's CoPerson, returning the COmanage
+// person identifier, the composite (if the GET path was used), and whether a
+// new CoPerson was created.
+func (o *Orchestrator) lookupOrCreateCoPerson(ctx context.Context, user
*models.User) (string, json.RawMessage, bool, error) {
+ personIDType := o.c.Config().PersonIDType
+
+ if stored, err := o.findStoredPersonID(ctx, user.ID); err != nil {
+ return "", nil, false, fmt.Errorf("stored lookup: %w", err)
+ } else if stored != "" {
+ composite, err := o.c.GetPersonComposite(stored)
+ if err == nil {
+ return stored, composite, false, nil
+ }
+ if !errors.Is(err, client.ErrNotFound) {
+ return "", nil, false, fmt.Errorf("get composite for
stored id: %w", err)
+ }
+ // stored id no longer resolves; fall through to email search
+ }
+
+ if user.Email != "" {
+ coPersonID, err := o.findByEmailExact(user.Email)
+ if err != nil && !errors.Is(err, client.ErrNotFound) {
+ return "", nil, false, fmt.Errorf("email search: %w",
err)
+ }
+ if coPersonID != 0 {
+ composite, err :=
o.c.GetPersonComposite(strconv.Itoa(coPersonID))
+ if err != nil {
+ return "", nil, false, fmt.Errorf("get
composite by numeric id: %w", err)
+ }
+ personID, err := extractIdentifier(composite,
personIDType)
+ if err != nil {
+ return "", nil, false, fmt.Errorf("extract %s
from composite: %w", personIDType, err)
+ }
+ if personID == "" {
+ return "", nil, false, fmt.Errorf("CoPerson %d
has no %s identifier", coPersonID, personIDType)
+ }
+ return personID, composite, false, nil
+ }
+ }
+
+ body, err := buildCreatePersonBody(o.c.Config().COID, user)
+ if err != nil {
+ return "", nil, false, fmt.Errorf("build create body: %w", err)
+ }
+ resp, err := o.c.CreatePerson(body)
+ if err != nil {
+ return "", nil, false, fmt.Errorf("create coperson: %w", err)
+ }
+ for _, r := range resp {
+ if r.Type == personIDType {
+ return r.Identifier, nil, true, nil
+ }
+ }
+ return "", nil, false, fmt.Errorf("POST /people returned no %s
identifier: %+v", personIDType, resp)
+}
+
+func buildCreatePersonBody(coID int, user *models.User) ([]byte, error) {
+ body := map[string]interface{}{
+ "CoPerson": map[string]interface{}{
+ "co_id": coID,
+ "status": "A",
+ },
+ "Name": []map[string]interface{}{{
+ "given": user.FirstName,
+ "family": user.LastName,
+ "type": "official",
+ "primary_name": true,
+ "language": "en",
+ }},
+ "EmailAddress": []map[string]interface{}{{
+ "mail": user.Email,
+ "type": "official",
+ "verified": false,
+ }},
+ }
+ return json.Marshal(body)
+}
+
+func (o *Orchestrator) audit(ctx context.Context, cu
*models.ComputeClusterUser, eventType, details string) {
+ _, _ = o.core.CreateAuditEvent(ctx, &models.AuditEvent{
+ EventType: eventType,
+ EntityID: cu.ID,
+ Details: details,
+ })
+}
+
+func (o *Orchestrator) dlq(ctx context.Context, cu *models.ComputeClusterUser,
step string, err error) {
+ details := fmt.Sprintf("step=%s err=%v", step, err)
+ _, _ = o.core.CreateAuditEvent(ctx, &models.AuditEvent{
+ EventType: "ComanageProvisioningFailed",
+ EntityID: cu.ID,
+ Details: details,
+ })
+ // TODO: admin endpoint + CLI to re-fire ComputeClusterUserCreateEvent
for a
+ // specific user, and to clean up orphan CoGroups from terminal
dead-letters.
+}
diff --git
a/connectors/COmanage/Identity-Provisioner/internal/operations/lookup.go
b/connectors/COmanage/Identity-Provisioner/internal/operations/lookup.go
new file mode 100644
index 000000000..cb851a28b
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/operations/lookup.go
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package operations
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client"
+ "github.com/apache/airavata-custos/pkg/models"
+ _ "github.com/apache/airavata-custos/pkg/service"
+)
+
+var ErrNotFoundShim = client.ErrNotFound
+
+const comanageIdentitySource = "comanage"
+
+func (o *Orchestrator) findStoredPersonID(ctx context.Context, userID string)
(string, error) {
+ idents, err := o.core.ListUserIdentitiesForUser(ctx, userID)
+ if err != nil {
+ return "", fmt.Errorf("list user identities: %w", err)
+ }
+ for _, id := range idents {
+ if id.Source == comanageIdentitySource && id.ExternalID != "" {
+ return id.ExternalID, nil
+ }
+ }
+ return "", nil
+}
+
+// storePersonID writes the COmanage CoPerson identifier into user_identities.
+// No-op if a row already exists.
+func (o *Orchestrator) storePersonID(ctx context.Context, userID, personID
string) error {
+ if existing, err := o.findStoredPersonID(ctx, userID); err != nil {
+ return err
+ } else if existing != "" {
+ return nil
+ }
+ _, err := o.core.CreateUserIdentity(ctx, &models.UserIdentity{
+ UserID: userID,
+ Source: comanageIdentitySource,
+ ExternalID: personID,
+ })
+ return err
+}
+
+// findByEmailExact searches by email and returns the first match in the
+// configured CO, or 0 if none. COmanage's search.mail is a LIKE match: callers
+// trading off precision for one round-trip should be aware that very similar
+// emails could collide.
+func (o *Orchestrator) findByEmailExact(email string) (int, error) {
+ if email == "" {
+ return 0, nil
+ }
+ candidates, err := o.c.FindCoPersonByEmail(email)
+ if err != nil {
+ if errors.Is(err, ErrNotFoundShim) {
+ return 0, nil
+ }
+ return 0, err
+ }
+ for _, p := range candidates {
+ if p.CoId == o.c.Config().COID {
+ return p.Id, nil
+ }
+ }
+ return 0, nil
+}
diff --git
a/connectors/COmanage/Identity-Provisioner/internal/operations/orchestrator.go
b/connectors/COmanage/Identity-Provisioner/internal/operations/orchestrator.go
new file mode 100644
index 000000000..f48c062dc
--- /dev/null
+++
b/connectors/COmanage/Identity-Provisioner/internal/operations/orchestrator.go
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package operations provisions POSIX identity in COmanage for a
+// (CoPerson, UnixCluster) pair.
+package operations
+
+import (
+ "context"
+
+
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client"
+ "github.com/apache/airavata-custos/pkg/models"
+ "github.com/apache/airavata-custos/pkg/service"
+)
+
+type Orchestrator struct {
+ c *client.Client
+ core *service.Service
+}
+
+func New(c *client.Client, core *service.Service) *Orchestrator {
+ return &Orchestrator{c: c, core: core}
+}
+
+func (o *Orchestrator) EnsurePOSIXAccount(ctx context.Context, cu
*models.ComputeClusterUser) error {
+ return o.ensurePOSIXAccountImpl(ctx, cu)
+}
diff --git
a/connectors/COmanage/Identity-Provisioner/internal/subscribers/cluster_user.go
b/connectors/COmanage/Identity-Provisioner/internal/subscribers/cluster_user.go
new file mode 100644
index 000000000..a3fac2901
--- /dev/null
+++
b/connectors/COmanage/Identity-Provisioner/internal/subscribers/cluster_user.go
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package subscribers
+
+import (
+ "context"
+ "log/slog"
+
+
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client"
+
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/operations"
+ "github.com/apache/airavata-custos/pkg/events"
+ "github.com/apache/airavata-custos/pkg/models"
+ "github.com/apache/airavata-custos/pkg/service"
+)
+
+// ClusterUserSubscriber listens for ComputeClusterUserCreateEvent and drives
+// the orchestrator. Events whose ComputeClusterID does not match
+// CustosClusterID are dropped.
+type ClusterUserSubscriber struct {
+ ops *operations.Orchestrator
+ bus *events.Bus
+ core *service.Service
+ custosClusterID string
+}
+
+func NewClusterUserSubscriber(c *client.Client, bus *events.Bus, core
*service.Service, custosClusterID string) *ClusterUserSubscriber {
+ return &ClusterUserSubscriber{
+ ops: operations.New(c, core),
+ bus: bus,
+ core: core,
+ custosClusterID: custosClusterID,
+ }
+}
+
+func (s *ClusterUserSubscriber) RegisterSubscribers() {
+ s.bus.Subscribe(events.ComputeClusterUserCreateEvent,
s.handleClusterUserCreate)
+}
+
+func (s *ClusterUserSubscriber) handleClusterUserCreate(_ events.Event,
payload interface{}) {
+ cu, ok := payload.(*models.ComputeClusterUser)
+ if !ok {
+ slog.Error("comanage subscriber: payload is not
*ComputeClusterUser", "type", payload)
+ return
+ }
+ if cu.ComputeClusterID != s.custosClusterID {
+ return
+ }
+ ctx := context.Background()
+ // TODO: move to a transactional scope. In-process delivery loses events
+ // if the process crashes between the core commit and subscriber pickup.
+ if err := s.ops.EnsurePOSIXAccount(ctx, cu); err != nil {
+ slog.Error("comanage subscriber: EnsurePOSIXAccount failed",
+ "compute_cluster_user_id", cu.ID, "user_id", cu.UserID,
"err", err)
+ }
+}
diff --git a/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go
b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go
new file mode 100644
index 000000000..9c9628dce
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package comanage is the COmanage Identity-Provisioner entry point. Wired
+// from internal/connectors/loader.go.
+package comanage
+
+import (
+ "context"
+ "log/slog"
+ "os"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/jmoiron/sqlx"
+
+
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client"
+
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/subscribers"
+ "github.com/apache/airavata-custos/pkg/events"
+ "github.com/apache/airavata-custos/pkg/service"
+)
+
+// LoadConnector wires the subscriber to the event bus. If any required env
+// var is missing, the loader logs and returns nil without registering.
+func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus,
coreService *service.Service, _ *sync.WaitGroup) error {
+ cfg, ok := loadConfigFromEnv()
+ if !ok {
+ slog.Info("comanage provisioner: required env vars not set;
skipping")
+ return nil
+ }
+ httpClient := client.New(cfg)
+ subscribers.NewClusterUserSubscriber(httpClient, eventBus, coreService,
cfg.CustosClusterID).RegisterSubscribers()
+ slog.Info("comanage provisioner: subscriber registered",
+ "registry", cfg.RegistryURL, "co_id", cfg.COID, "cluster_id",
cfg.CustosClusterID)
+ // Custos is the source of truth: CoPerson and UnixClusterAccount
records
+ // for users provisioned via this connector must not be edited directly
in
+ // COmanage. There is no drift reconciliation; out-of-band edits will be
+ // invisible to Custos.
+ return nil
+}
+
+func loadConfigFromEnv() (client.Config, bool) {
+ registryURL := os.Getenv("COMANAGE_REGISTRY_URL")
+ coIDStr := os.Getenv("COMANAGE_CO_ID")
+ apiUser := os.Getenv("COMANAGE_API_USER")
+ apiKey := os.Getenv("COMANAGE_API_KEY")
+ personIDType := os.Getenv("COMANAGE_PERSON_ID_TYPE")
+ unixClusterStr := os.Getenv("COMANAGE_UNIX_CLUSTER_ID")
+ custosCluster := os.Getenv("CUSTOS_CLUSTER_ID")
+
+ if registryURL == "" || coIDStr == "" || apiUser == "" || apiKey == ""
|| personIDType == "" || unixClusterStr == "" || custosCluster == "" {
+ return client.Config{}, false
+ }
+ coID, err := strconv.Atoi(coIDStr)
+ if err != nil {
+ slog.Error("comanage provisioner: COMANAGE_CO_ID is not an
int", "value", coIDStr, "err", err)
+ return client.Config{}, false
+ }
+ unixClusterID, err := strconv.Atoi(unixClusterStr)
+ if err != nil {
+ slog.Error("comanage provisioner: COMANAGE_UNIX_CLUSTER_ID is
not an int", "value", unixClusterStr, "err", err)
+ return client.Config{}, false
+ }
+
+ timeout := 30 * time.Second
+ if v := os.Getenv("COMANAGE_HTTP_TIMEOUT"); v != "" {
+ if d, err := time.ParseDuration(v); err == nil {
+ timeout = d
+ }
+ }
+
+ defaultShell := os.Getenv("COMANAGE_DEFAULT_SHELL")
+ if defaultShell == "" {
+ defaultShell = "/bin/bash"
+ }
+ homedirPrefix := os.Getenv("COMANAGE_HOMEDIR_PREFIX")
+ if homedirPrefix == "" {
+ homedirPrefix = "/home/"
+ }
+
+ return client.Config{
+ RegistryURL: registryURL,
+ COID: coID,
+ APIUser: apiUser,
+ APIKey: apiKey,
+ PersonIDType: personIDType,
+ UnixClusterID: unixClusterID,
+ CustosClusterID: custosCluster,
+ DefaultShell: defaultShell,
+ HomedirPrefix: homedirPrefix,
+ HTTPTimeout: timeout,
+ }, true
+}
diff --git a/internal/connectors/loader.go b/internal/connectors/loader.go
index 99fb221c0..ea070208c 100644
--- a/internal/connectors/loader.go
+++ b/internal/connectors/loader.go
@@ -25,6 +25,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/pkg/amie"
+
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/pkg/comanage"
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/pkg/smapper"
"github.com/apache/airavata-custos/pkg/events"
"github.com/apache/airavata-custos/pkg/service"
@@ -45,6 +46,12 @@ func LoadConnectors(ctx context.Context, database *sqlx.DB,
eventBus *events.Bus
return err
}
+ slog.Info("loading COmanage Identity-Provisioner connector")
+ if err := comanage.LoadConnector(ctx, database, eventBus, coreService,
wg); err != nil {
+ slog.Error("failed to load COmanage Identity-Provisioner
connector", "error", err)
+ return err
+ }
+
slog.Info("finished loading connectors")
return nil
}
diff --git a/internal/db/migrations/000002_compute_clusters.up.sql
b/internal/db/migrations/000002_compute_clusters.up.sql
index f0c268482..c150758db 100644
--- a/internal/db/migrations/000002_compute_clusters.up.sql
+++ b/internal/db/migrations/000002_compute_clusters.up.sql
@@ -38,6 +38,7 @@ CREATE TABLE IF NOT EXISTS compute_cluster_users
updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON
UPDATE CURRENT_TIMESTAMP(6),
PRIMARY KEY (id),
UNIQUE KEY uq_compute_cluster_users_pair (compute_cluster_id, user_id),
+ UNIQUE KEY uq_compute_cluster_users_local_username (compute_cluster_id,
local_username),
KEY idx_compute_cluster_users_user (user_id),
CONSTRAINT fk_compute_cluster_users_cluster FOREIGN KEY
(compute_cluster_id) REFERENCES compute_clusters (id) ON DELETE CASCADE,
CONSTRAINT fk_compute_cluster_users_user FOREIGN KEY (user_id) REFERENCES
users (id) ON DELETE CASCADE
diff --git a/pkg/posix/username.go b/pkg/posix/username.go
new file mode 100644
index 000000000..e5fbd34c9
--- /dev/null
+++ b/pkg/posix/username.go
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package posix builds and validates POSIX-conformant usernames for HPC
+// account provisioning.
+package posix
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "strings"
+ "unicode"
+
+ "github.com/apache/airavata-custos/pkg/models"
+)
+
+const MaxCollisionSuffix = 999
+
+var ErrUnbuildableUsername = errors.New("posix: cannot build username from
empty first and last name")
+
+// BuildBase returns the unsuffixed username. 'truncated' is set when the name
+// portion was shortened to fit the 32-char POSIX login cap.
+func BuildBase(u *models.User, prefix string) (string, bool, error) {
+ first := Normalize(u.FirstName)
+ last := Normalize(u.LastName)
+
+ var local string
+ switch {
+ case first != "" && last != "":
+ local = string(first[0]) + last
+ case last != "":
+ local = last
+ case first != "":
+ local = first
+ default:
+ return "", false, fmt.Errorf("%w: user %q (first=%q last=%q)",
ErrUnbuildableUsername, u.ID, u.FirstName, u.LastName)
+ }
+
+ // 32 = POSIX login cap; -1 separator, -3 reserved for collision suffix
(up to "999").
+ maxLocal := 32 - len(prefix) - 1 - 3
+ truncated := false
+ if len(local) > maxLocal {
+ local = local[:maxLocal]
+ truncated = true
+ }
+ return prefix + "-" + local, truncated, nil
+}
+
+func Normalize(s string) string {
+ s = strings.ToLower(s)
+ var b strings.Builder
+ for _, r := range s {
+ if r > unicode.MaxASCII {
+ continue
+ }
+ if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') {
+ b.WriteRune(r)
+ }
+ }
+ return b.String()
+}
+
+func Prefix() string {
+ if v := os.Getenv("POSIX_USERNAME_PREFIX"); v != "" {
+ return v
+ }
+ return "custos"
+}
diff --git a/pkg/posix/username_test.go b/pkg/posix/username_test.go
new file mode 100644
index 000000000..57b4f28f4
--- /dev/null
+++ b/pkg/posix/username_test.go
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package posix
+
+import (
+ "errors"
+ "strings"
+ "testing"
+
+ "github.com/apache/airavata-custos/pkg/models"
+)
+
+func TestBuildBase(t *testing.T) {
+ tests := []struct {
+ name string
+ first string
+ middle string
+ last string
+ prefix string
+ wantBase string
+ wantTrunc bool
+ }{
+ {
+ name: "standard",
+ first: "Alice", last: "Smith", prefix: "custos",
+ wantBase: "custos-asmith", wantTrunc: false,
+ },
+ {
+ name: "middle ignored",
+ first: "Alice", middle: "Marie", last: "Smith", prefix:
"custos",
+ wantBase: "custos-asmith", wantTrunc: false,
+ },
+ {
+ name: "single name as last",
+ first: "", last: "Madonna", prefix: "custos",
+ wantBase: "custos-madonna", wantTrunc: false,
+ },
+ {
+ name: "first only",
+ first: "Alice", last: "", prefix: "custos",
+ wantBase: "custos-alice", wantTrunc: false,
+ },
+ {
+ name: "non-ASCII stripped",
+ first: "Aña", last: "Şəkili", prefix: "custos",
+ wantBase: "custos-akili", wantTrunc: false,
+ },
+ {
+ name: "prefix swap",
+ first: "Alice", last: "Smith", prefix: "nexus",
+ wantBase: "nexus-asmith", wantTrunc: false,
+ },
+ {
+ name: "truncation at 32-char limit",
+ first: "L", last: strings.Repeat("a", 50), prefix:
"custos",
+ wantTrunc: true,
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ u := &models.User{FirstName: tc.first, MiddleName:
tc.middle, LastName: tc.last}
+ got, trunc, err := BuildBase(u, tc.prefix)
+ if err != nil {
+ t.Fatalf("BuildBase: %v", err)
+ }
+
+ if trunc != tc.wantTrunc {
+ t.Errorf("truncated = %v, want %v", trunc,
tc.wantTrunc)
+ }
+ if tc.wantBase != "" && got != tc.wantBase {
+ t.Errorf("base = %q, want %q", got, tc.wantBase)
+ }
+ if len(got) > 32 {
+ t.Errorf("base len %d > 32: %q", len(got), got)
+ }
+ if !strings.HasPrefix(got, tc.prefix+"-") {
+ t.Errorf("base %q does not start with prefix
%q", got, tc.prefix+"-")
+ }
+ })
+ }
+}
+
+func TestBuildBase_UnbuildableReturnsError(t *testing.T) {
+ cases := []struct {
+ name string
+ first string
+ last string
+ }{
+ {"both empty", "", ""},
+ {"both non-ASCII only", "李", "王"},
+ {"both punctuation only", "...", "---"},
+ }
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ u := &models.User{ID: "u-1", FirstName: tc.first,
LastName: tc.last}
+ got, _, err := BuildBase(u, "custos")
+ if !errors.Is(err, ErrUnbuildableUsername) {
+ t.Fatalf("err = %v, want
ErrUnbuildableUsername", err)
+ }
+ if got != "" {
+ t.Errorf("expected empty username on error, got
%q", got)
+ }
+ })
+ }
+}
+
+func TestNormalize(t *testing.T) {
+ tests := []struct {
+ in string
+ want string
+ }{
+ {"Alice", "alice"},
+ {"ALLCAPS", "allcaps"},
+ {"abc123", "abc123"},
+ {"Aña", "aa"},
+ {"Şəkili", "kili"},
+ {"hello-world", "helloworld"},
+ {"O'Brien", "obrien"},
+ {"", ""},
+ }
+ for _, tc := range tests {
+ t.Run(tc.in, func(t *testing.T) {
+ got := Normalize(tc.in)
+ if got != tc.want {
+ t.Errorf("Normalize(%q) = %q, want %q", tc.in,
got, tc.want)
+ }
+ })
+ }
+}
+
+func TestPrefix(t *testing.T) {
+ t.Setenv("POSIX_USERNAME_PREFIX", "")
+ if got := Prefix(); got != "custos" {
+ t.Errorf("default = %q, want %q", got, "custos")
+ }
+ t.Setenv("POSIX_USERNAME_PREFIX", "nexus")
+ if got := Prefix(); got != "nexus" {
+ t.Errorf("override = %q, want %q", got, "nexus")
+ }
+}
diff --git a/pkg/service/compute_cluster_user.go
b/pkg/service/compute_cluster_user.go
index b8311bbf4..0aeca1f27 100644
--- a/pkg/service/compute_cluster_user.go
+++ b/pkg/service/compute_cluster_user.go
@@ -21,6 +21,7 @@ import (
"context"
"database/sql"
"fmt"
+ "strings"
"github.com/apache/airavata-custos/pkg/events"
"github.com/apache/airavata-custos/pkg/models"
@@ -67,13 +68,29 @@ func (s *Service) CreateComputeClusterUser(ctx
context.Context, cu *models.Compu
if err := s.inTx(ctx, func(tx *sql.Tx) error {
return s.clusterUsers.Create(ctx, tx, cu)
}); err != nil {
- return nil, fmt.Errorf("create compute cluster user: %w", err)
+ switch {
+ case isLocalUsernameDuplicate(err):
+ return nil, fmt.Errorf("%w: %s", ErrAlreadyExists,
cu.LocalUsername)
+ case isPairDuplicate(err):
+ return nil, fmt.Errorf("%w: user %q is already mapped
on cluster %q",
+ ErrAlreadyExists, cu.UserID,
cu.ComputeClusterID)
+ default:
+ return nil, fmt.Errorf("create compute cluster user:
%w", err)
+ }
}
s.eventBus.Publish(events.ComputeClusterUserCreateEvent, cu)
return cu, nil
}
+func isLocalUsernameDuplicate(err error) bool {
+ return err != nil && strings.Contains(err.Error(),
"uq_compute_cluster_users_local_username")
+}
+
+func isPairDuplicate(err error) bool {
+ return err != nil && strings.Contains(err.Error(),
"uq_compute_cluster_users_pair")
+}
+
// GetComputeClusterUser retrieves a compute-cluster user by its ID.
func (s *Service) GetComputeClusterUser(ctx context.Context, id string)
(*models.ComputeClusterUser, error) {
c, err := s.clusterUsers.FindByID(ctx, id)