This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch provisioner-integration in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 2c92022880517e96e4f0aa5f8a54b7d9c5ad2185 Author: lahiruj <[email protected]> AuthorDate: Tue Jun 2 20:23:36 2026 -0400 COmanage connector implementation --- connectors/COmanage/Identity-Provisioner/README.md | 140 +++++++++++ .../Identity-Provisioner/config.example.yaml | 20 ++ .../Identity-Provisioner/internal/client/client.go | 144 +++++++++++ .../internal/client/client_test.go | 243 +++++++++++++++++++ .../internal/client/group_members.go | 121 ++++++++++ .../Identity-Provisioner/internal/client/groups.go | 154 ++++++++++++ .../internal/client/identifiers.go | 130 ++++++++++ .../Identity-Provisioner/internal/client/people.go | 154 ++++++++++++ .../internal/client/unix_cluster.go | 93 ++++++++ .../internal/operations/compose.go | 98 ++++++++ .../internal/operations/compose_test.go | 134 +++++++++++ .../internal/operations/ensure_posix_account.go | 265 +++++++++++++++++++++ .../internal/operations/lookup.go | 84 +++++++ .../internal/operations/orchestrator.go | 41 ++++ .../internal/subscribers/cluster_user.go | 70 ++++++ .../Identity-Provisioner/pkg/comanage/loader.go | 108 +++++++++ internal/connectors/loader.go | 7 + 17 files changed, 2006 insertions(+) diff --git a/connectors/COmanage/Identity-Provisioner/README.md b/connectors/COmanage/Identity-Provisioner/README.md new file mode 100644 index 000000000..9df9cf4be --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/README.md @@ -0,0 +1,140 @@ +# COmanage Identity-Provisioner + +Connector that bridges Custos to a COmanage Registry. When the core service +emits `ComputeClusterUserCreateEvent` for a cluster this connector services, +the orchestrator ensures the user has a fully provisioned POSIX identity in +COmanage: a CoPerson, a per-user CoGroup, the matching Identifiers, and a +`UnixClusterAccount` block on the target UnixCluster. + +## How it loads + +The loader is wired from `internal/connectors/loader.go` alongside SLURM and +AMIE. On startup: + +1. `comanage.LoadConnector` reads seven required env vars: + `COMANAGE_REGISTRY_URL`, `COMANAGE_CO_ID`, `COMANAGE_API_USER`, + `COMANAGE_API_KEY`, `COMANAGE_PERSON_ID_TYPE`, + `COMANAGE_UNIX_CLUSTER_ID`, `CUSTOS_CLUSTER_ID`. +2. If any are missing it logs `comanage provisioner: required env vars not set; skipping` + and returns `nil`. This is the documented way to disable the connector in a + deployment without code changes. + +`COMANAGE_PERSON_ID_TYPE` is the Identifier Type configured in the registry's +Identifier Types screen that the connector uses to look up and tag a CoPerson. +Set it to whatever string your COmanage instance uses for the per-person ID. +3. If all six are present it constructs an HTTP client and a + `ClusterUserSubscriber`, then registers it against the shared event bus. + The subscriber filters by `CustosClusterID` so a deployment that services + multiple clusters can run multiple connector instances side by side. + +See `config.example.yaml` for the full env var reference (required + optional). + +## What the orchestrator does + +For each event, the orchestrator runs the following calls in order. Steps that +find an existing record are short-circuited; steps that create something +tolerate "already exists" responses so re-runs after a partial failure are +idempotent. + +| # | Step | What it does | Endpoint | +|---|------|--------------|----------| +| 1 | Lookup CoPerson | Look up `comanage_id` stored in `user_identities(source="comanage")`, fall back to REST email search, otherwise POST a new CoPerson. | Core `GET /people/<comanage_id>`, REST `co_people.json`, Core `POST /people` | +| 2 | Store `comanage_id` | Persist the COmanage identifier in `user_identities` so step 1 finds it next time. | Core service `CreateUserIdentity` | +| 3 | Read composite | Pull the full Core composite to read `uidnumber` and `CoPerson.meta.id`. | Core `GET /people/<comanage_id>` | +| 4 | Per-user CoGroup | Find or create a CoGroup named after the user's local username (`GroupType:"CL"`, `Auto:false`). | REST `co_groups.json` | +| 5 | groupname Identifier | Attach a `type:"uid"` Identifier (the local username string) to the CoGroup. | REST `identifiers.json` | +| 6 | gidnumber Identifier | Attach a `type:"gidnumber"` Identifier (numeric, mirrors `uidnumber`) to the CoGroup. | REST `identifiers.json` | +| 7 | CoGroupMember | Join the CoPerson to the CoGroup as member + owner. | REST `co_group_members.json` | +| 8 | UnixClusterGroup | Attach the CoGroup to the configured UnixCluster. A 4xx here is treated as "already attached". | REST `unix_cluster_groups.json` | +| 9 | UnixClusterAccount | Re-GET a fresh composite, merge a `UnixClusterAccount` block, then full-composite PUT. The merge round-trips unmodeled fields as `json.RawMessage` so `deleteOmitted` cannot drop attributes the connector does not understand. | Core `GET /people/<comanage_id>` then `PUT /people/<comanage_id>` | + +Step 9 uses `sync_mode:"M"` (Manual) so a downstream provisioning plugin +cannot overwrite the block when Custos is the source of truth. + +## Audit events + +The connector emits the following `audit_events.event_type` values via the +core service. `audit_events.entity_id` is always the `compute_cluster_users.id`. + +| Event type | When | +|------------|------| +| `ComanageCoPersonCreated` | A new CoPerson was POSTed in step 1. Lookups that resolved to an existing CoPerson do not fire this. | +| `ComanageClusterAccountAttached` | The sequence completed and the user has a UnixClusterAccount block on the configured UnixCluster. | +| `ComanageProvisioningFailed` | Any step returned an error. The `details` field carries `step=<name> err=<message>` so the dead-letter is tractable from the audit log alone. | + +The AMIE-Processor side of this flow also emits: + +| Event type | When | +|------------|------| +| `PosixUsernameTruncated` | The base username derived from the user's name was longer than the POSIX cap and got truncated. | +| `PosixUsernameUnbuildable` | First and last name both normalized to empty so no candidate could be built. | +| `PosixUsernameAllocatorExhausted` | All collision suffixes were exhausted. The cluster-user row is not created. | + +## End-to-end audit walkthrough + +A successful provisioning of a new user produces this sequence for a single +`compute_cluster_users.id`: + +1. AMIE side: `PosixUsernameTruncated` may fire if the user's given and family + name exceeded the POSIX cap. +2. AMIE side: the cluster-user row is committed and the event bus delivers + `ComputeClusterUserCreateEvent`. +3. COmanage side: `ComanageCoPersonCreated` fires if step 1 went through the + POST path. Idempotent re-runs against an existing CoPerson skip this event. +4. COmanage side: `ComanageClusterAccountAttached` fires after step 9, with + `details` of the form `comanage_id=<id> username=<local> uid=<n>`. + +A failure produces `ComanageProvisioningFailed` with +`details=step=<n> err=<msg>` in place of the success event. + +## Username allocation + +Local POSIX usernames are allocated by `pkg/posix`: + +1. `BuildBase` derives a candidate from `Prefix() + first-initial + last-name`, + normalised to lowercase ASCII, capped at the POSIX length limit. Truncation + fires a `PosixUsernameTruncated` audit event. If both names normalize to + empty, the allocator emits `PosixUsernameUnbuildable` and returns an error. +2. The AMIE handler attempts `CreateComputeClusterUser` with the base. If the + composite UNIQUE on `(compute_cluster_id, local_username)` rejects it, the + handler retries with `base + "2"`, `base + "3"`, and so on up to + `MaxCollisionSuffix`. +3. If suffixes are exhausted a `PosixUsernameAllocatorExhausted` event is + emitted and the handler returns an error. + +## Local development + +The connector reads config from env vars. To run it against a test registry: + +```bash +export COMANAGE_REGISTRY_URL=https://<your-registry>/registry +export COMANAGE_CO_ID=<numeric CO id> +export COMANAGE_API_USER=<api user for that CO> +export COMANAGE_API_KEY=<api key for that user> +export COMANAGE_PERSON_ID_TYPE=<CoPerson Identifier Type, per your registry> +export COMANAGE_UNIX_CLUSTER_ID=<numeric UnixCluster id> +export CUSTOS_CLUSTER_ID=<UUID from the compute_clusters table> +go run ./cmd/server +``` + +Without those vars set the connector skips registration silently, so the rest +of Custos runs unaffected. + +## Test surface + +```bash +# unit +go test ./connectors/COmanage/Identity-Provisioner/... + +# integration (requires a real registry; skipped by default) +go test -tags integration ./connectors/COmanage/Identity-Provisioner/... +``` + +The unit tests cover: + +- HTTP client transport: 5xx retry with exponential backoff, error + classification (`ErrNotFound`, `ErrAuth401`, `HTTPError`). +- Core API and REST wrappers via `httptest.NewServer`. +- The `compose.go` merge layer: a fixture composite is merged with a + `UnixClusterAccount` block, then asserted to preserve every key from the + original. diff --git a/connectors/COmanage/Identity-Provisioner/config.example.yaml b/connectors/COmanage/Identity-Provisioner/config.example.yaml new file mode 100644 index 000000000..9ed3b990b --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/config.example.yaml @@ -0,0 +1,20 @@ +# COmanage Identity-Provisioner connector +# +# This connector is configured via environment variables. Below is the full +# list with example values; see README.md for semantics. Missing required +# vars causes the loader to skip subscriber registration silently. + +# REQUIRED --------------------------------------------------------------- +# COMANAGE_REGISTRY_URL=https://<your-registry>/registry +# COMANAGE_CO_ID=<numeric CO id> +# COMANAGE_API_USER=<api user for that CO> +# COMANAGE_API_KEY=<api key for that user> +# COMANAGE_PERSON_ID_TYPE=<CoPerson Identifier Type configured in the registry> +# COMANAGE_UNIX_CLUSTER_ID=<numeric UnixCluster id> +# CUSTOS_CLUSTER_ID=<UUID of the compute_cluster row this connector services> + +# OPTIONAL --------------------------------------------------------------- +# COMANAGE_DEFAULT_SHELL=/bin/bash +# COMANAGE_HOMEDIR_PREFIX=/home/ +# COMANAGE_HTTP_TIMEOUT=30s +# POSIX_USERNAME_PREFIX=custos # read by pkg/posix; default "custos" diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/client.go b/connectors/COmanage/Identity-Provisioner/internal/client/client.go new file mode 100644 index 000000000..bf95a998c --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/client/client.go @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package client wraps the COmanage Core API and REST surfaces. Auth is HTTP +// Basic. +package client + +import ( + "bytes" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// restAPIVersion is the COmanage REST v1 model version. +const restAPIVersion = "1.0" + +type Config struct { + RegistryURL string + COID int + APIUser string + APIKey string + // PersonIDType is the COmanage Identifier Type used to look up and tag a + // CoPerson (e.g. the type name configured in the registry's Identifier + // Types). Required value. + PersonIDType string + UnixClusterID int + CustosClusterID string + DefaultShell string + HomedirPrefix string + HTTPTimeout time.Duration +} + +type Client struct { + cfg Config + http *http.Client +} + +func New(cfg Config) *Client { + timeout := cfg.HTTPTimeout + if timeout == 0 { + timeout = 30 * time.Second + } + return &Client{ + cfg: cfg, + http: &http.Client{Timeout: timeout}, + } +} + +func (c *Client) Config() Config { return c.cfg } + +// coreAPI and restAPI are the two URL families. Centralized so an upstream +// version bump flips one line. +func (c *Client) coreAPI(path string) string { + return fmt.Sprintf("%s/api/co/%d/core/v1%s", c.cfg.RegistryURL, c.cfg.COID, path) +} + +func (c *Client) restAPI(path string) string { + return c.cfg.RegistryURL + path +} + +// Do issue an authenticated request and retries 5xx with backoff. +func (c *Client) Do(method, url string, body []byte) (*http.Response, []byte, error) { + resp, respBody, err := c.doOnce(method, url, body, c.cfg.APIKey) + if err == nil && resp.StatusCode >= 500 && resp.StatusCode < 600 { + // 1s, 2s, 4s + for attempt := 1; attempt <= 3; attempt++ { + _ = resp.Body.Close() + time.Sleep(time.Duration(1<<uint(attempt-1)) * time.Second) + resp, respBody, err = c.doOnce(method, url, body, c.cfg.APIKey) + if err != nil || resp.StatusCode < 500 { + break + } + } + } + return resp, respBody, err +} + +func (c *Client) doOnce(method, url string, body []byte, apiKey string) (*http.Response, []byte, error) { + var rdr io.Reader + if body != nil { + rdr = bytes.NewReader(body) + } + req, err := http.NewRequest(method, url, rdr) + if err != nil { + return nil, nil, fmt.Errorf("build request: %w", err) + } + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + req.SetBasicAuth(c.cfg.APIUser, apiKey) + + resp, err := c.http.Do(req) + if err != nil { + return nil, nil, fmt.Errorf("http: %w", err) + } + respBody, readErr := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if readErr != nil { + return resp, nil, fmt.Errorf("read response: %w", readErr) + } + return resp, respBody, nil +} + +var ( + ErrAuth401 = errors.New("comanage: 401 unauthorized") + ErrNotFound = errors.New("comanage: not found") +) + +type HTTPError struct { + Method string + URL string + StatusCode int + Body string +} + +func (e *HTTPError) Error() string { + return fmt.Sprintf("comanage %s %s: %d: %s", e.Method, e.URL, e.StatusCode, truncate(e.Body, 200)) +} + +func truncate(s string, n int) string { + s = strings.TrimSpace(s) + if len(s) <= n { + return s + } + return s[:n] + "..." +} diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/client_test.go b/connectors/COmanage/Identity-Provisioner/internal/client/client_test.go new file mode 100644 index 000000000..471aa606b --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/client/client_test.go @@ -0,0 +1,243 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" +) + +func newTestClient(t *testing.T, baseURL string) *Client { + t.Helper() + return New(Config{ + RegistryURL: baseURL, + COID: 2, + APIUser: "co_2.testuser", + APIKey: "key-primary", + UnixClusterID: 1, + HTTPTimeout: 5 * time.Second, + }) +} + +func TestCreatePerson_SendsBasicAuthAndDecodes(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost || !strings.HasSuffix(r.URL.Path, "/api/co/2/core/v1/people") { + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + } + user, pass, ok := r.BasicAuth() + if !ok || user != "co_2.testuser" || pass != "key-primary" { + t.Errorf("auth header: ok=%v user=%q pass=%q", ok, user, pass) + } + w.WriteHeader(http.StatusCreated) + _, _ = io.WriteString(w, `[{"identifier":"Person100099","type":"comanage_id","login":false,"status":"A"}]`) + })) + defer srv.Close() + + c := newTestClient(t, srv.URL+"/registry") + c.cfg.RegistryURL = srv.URL // override to match server root + out, err := c.CreatePerson([]byte(`{}`)) + if err != nil { + t.Fatalf("CreatePerson: %v", err) + } + if len(out) != 1 || out[0].Identifier != "Person100099" || out[0].Type != "comanage_id" { + t.Errorf("got %+v", out) + } +} + +func TestDo_RetriesOn5xx(t *testing.T) { + var calls atomic.Int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := calls.Add(1) + if n < 3 { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, "ok") + })) + defer srv.Close() + + c := New(Config{ + RegistryURL: srv.URL, + COID: 2, + APIUser: "u", + APIKey: "k", + HTTPTimeout: 2 * time.Second, + }) + resp, body, err := c.Do("GET", srv.URL+"/anything", nil) + if err != nil { + t.Fatalf("Do: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("status: got %d", resp.StatusCode) + } + if got := string(body); got != "ok" { + t.Errorf("body: %q", got) + } + if calls.Load() != 3 { + t.Errorf("calls: got %d, want 3", calls.Load()) + } +} + +func TestCreateCoGroup_EnvelopeShape(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasSuffix(r.URL.Path, "/co_groups.json") { + t.Errorf("unexpected path: %s", r.URL.Path) + } + body, _ := io.ReadAll(r.Body) + var req CoGroupCreateRequest + if err := json.Unmarshal(body, &req); err != nil { + t.Fatalf("decode request: %v", err) + } + if req.RequestType != "CoGroups" || req.Version != "1.0" || len(req.CoGroups) != 1 { + t.Errorf("envelope: %+v", req) + } + g := req.CoGroups[0] + if g.CoId != 2 || g.Name != "custos-test" || g.GroupType != "CL" || g.Auto { + t.Errorf("group fields: %+v", g) + } + w.WriteHeader(http.StatusCreated) + _, _ = io.WriteString(w, `{"ResponseType":"NewObject","Version":"1.0","ObjectType":"CoGroup","Id":"42"}`) + })) + defer srv.Close() + + c := newTestClient(t, srv.URL) + c.cfg.RegistryURL = srv.URL + id, err := c.CreateCoGroup("custos-test", "Primary group for custos-test") + if err != nil { + t.Fatalf("CreateCoGroup: %v", err) + } + if id != 42 { + t.Errorf("id: got %d, want 42", id) + } +} + +func TestCreateIdentifierOnGroup_UsesNestedPersonShape(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + var req IdentifierCreateRequest + if err := json.Unmarshal(body, &req); err != nil { + t.Fatalf("decode: %v", err) + } + ident := req.Identifiers[0] + if ident.Person.Type != "Group" || ident.Person.Id != 25 { + t.Errorf("Person field: %+v (need {Type:Group,Id:25})", ident.Person) + } + w.WriteHeader(http.StatusCreated) + _, _ = io.WriteString(w, `{"ResponseType":"NewObject","Version":"1.0","ObjectType":"Identifier","Id":"7"}`) + })) + defer srv.Close() + + c := newTestClient(t, srv.URL) + c.cfg.RegistryURL = srv.URL + id, err := c.CreateIdentifierOnGroup("custos-alice", "uid", 25) + if err != nil { + t.Fatalf("CreateIdentifierOnGroup: %v", err) + } + if id != 7 { + t.Errorf("id: %d", id) + } +} + +func TestCreateUnixClusterGroup_URLAndBody(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasSuffix(r.URL.Path, "/unix_cluster/unix_cluster_groups.json") { + t.Errorf("unexpected path: %s", r.URL.Path) + } + body, _ := io.ReadAll(r.Body) + var req UnixClusterGroupCreateRequest + if err := json.Unmarshal(body, &req); err != nil { + t.Fatalf("decode: %v", err) + } + one := req.UnixClusterGroups[0] + if one.UnixClusterId != 1 || one.CoGroupId != 25 { + t.Errorf("fields: %+v", one) + } + w.WriteHeader(http.StatusCreated) + _, _ = io.WriteString(w, `{"ResponseType":"NewObject","Version":"1.0","ObjectType":"UnixClusterGroup","Id":"3"}`) + })) + defer srv.Close() + + c := newTestClient(t, srv.URL) + c.cfg.RegistryURL = srv.URL + id, err := c.CreateUnixClusterGroup(25) + if err != nil { + t.Fatalf("CreateUnixClusterGroup: %v", err) + } + if id != 3 { + t.Errorf("id: %d", id) + } +} + +func TestGetPersonComposite_PreservesRawJSON(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet || !strings.Contains(r.URL.Path, "/api/co/2/core/v1/people/Person100099") { + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `{"CoPerson":{"meta":{"id":99}},"Name":[],"Identifier":[]}`) + })) + defer srv.Close() + + c := newTestClient(t, srv.URL) + c.cfg.RegistryURL = srv.URL + raw, err := c.GetPersonComposite("Person100099") + if err != nil { + t.Fatalf("GetPersonComposite: %v", err) + } + if !strings.Contains(string(raw), `"id":99`) { + t.Errorf("raw missing expected substring: %s", string(raw)) + } +} + +func TestFindCoGroupByName_ExactMatchOnly(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.WriteString(w, `{ + "ResponseType":"CoGroups","Version":"1.0", + "CoGroups":[ + {"Version":"1.0","Id":11,"CoId":2,"Name":"custos-alice"}, + {"Version":"1.0","Id":12,"CoId":2,"Name":"custos-bob"} + ] + }`) + })) + defer srv.Close() + + c := newTestClient(t, srv.URL) + c.cfg.RegistryURL = srv.URL + + id, err := c.FindCoGroupByName("custos-bob") + if err != nil { + t.Fatalf("FindCoGroupByName: %v", err) + } + if id != 12 { + t.Errorf("id: got %d, want 12", id) + } + id, err = c.FindCoGroupByName("custos-nope") + if err != nil { + t.Fatalf("FindCoGroupByName miss: %v", err) + } + if id != 0 { + t.Errorf("miss should return 0, got %d", id) + } +} diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/group_members.go b/connectors/COmanage/Identity-Provisioner/internal/client/group_members.go new file mode 100644 index 000000000..922cc8984 --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/client/group_members.go @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "encoding/json" + "fmt" + "net/http" +) + +type CoGroupMemberCreateRequest struct { + RequestType string `json:"RequestType"` + Version string `json:"Version"` + CoGroupMembers []CoGroupMemberCreateOne `json:"CoGroupMembers"` +} + +type CoGroupMemberCreateOne struct { + Version string `json:"Version"` + Person IdentifierParent `json:"Person"` + CoGroupId int `json:"CoGroupId"` + Member bool `json:"Member"` + Owner bool `json:"Owner"` +} + +func (c *Client) CreateCoGroupMember(coPersonId, coGroupId int) (int, error) { + body, err := json.Marshal(CoGroupMemberCreateRequest{ + RequestType: "CoGroupMembers", + Version: restAPIVersion, + CoGroupMembers: []CoGroupMemberCreateOne{{ + Version: restAPIVersion, + Person: IdentifierParent{Type: "CO", Id: coPersonId}, + CoGroupId: coGroupId, + Member: true, + Owner: true, + }}, + }) + if err != nil { + return 0, fmt.Errorf("marshal co_group_member: %w", err) + } + + u := c.restAPI("/co_group_members.json") + resp, respBody, err := c.Do(http.MethodPost, u, body) + if err != nil { + return 0, err + } + switch resp.StatusCode { + case http.StatusCreated, http.StatusOK: + var out CoGroupCreateResponse + if err := json.Unmarshal(respBody, &out); err != nil { + return 0, fmt.Errorf("decode co_group_member response: %w: %s", err, string(respBody)) + } + var id int + if _, err := fmt.Sscanf(out.Id, "%d", &id); err != nil { + return 0, fmt.Errorf("parse co_group_member id %q: %w", out.Id, err) + } + return id, nil + case http.StatusUnauthorized: + return 0, ErrAuth401 + default: + return 0, &HTTPError{Method: "POST", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} + +type CoGroupMemberListResponse struct { + ResponseType string `json:"ResponseType"` + Version string `json:"Version"` + CoGroupMembers []CoGroupMemberListOne `json:"CoGroupMembers"` +} + +type CoGroupMemberListOne struct { + Version string `json:"Version"` + Id int `json:"Id"` + Person IdentifierParent `json:"Person"` + CoGroupId int `json:"CoGroupId"` + Member bool `json:"Member"` + Owner bool `json:"Owner"` +} + +// FindCoGroupMember returns the membership id for a (group, person) pair, or +// 0 if none. +func (c *Client) FindCoGroupMember(coGroupId, coPersonId int) (int, error) { + u := c.restAPI(fmt.Sprintf("/co_group_members.json?cogroupid=%d&copersonid=%d", coGroupId, coPersonId)) + resp, respBody, err := c.Do(http.MethodGet, u, nil) + if err != nil { + return 0, err + } + switch resp.StatusCode { + case http.StatusOK: + var out CoGroupMemberListResponse + if err := json.Unmarshal(respBody, &out); err != nil { + return 0, fmt.Errorf("decode co_group_members list: %w", err) + } + for _, m := range out.CoGroupMembers { + if m.CoGroupId == coGroupId && m.Person.Id == coPersonId { + return m.Id, nil + } + } + return 0, nil + case http.StatusNoContent: + return 0, nil + case http.StatusUnauthorized: + return 0, ErrAuth401 + default: + return 0, &HTTPError{Method: "GET", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/groups.go b/connectors/COmanage/Identity-Provisioner/internal/client/groups.go new file mode 100644 index 000000000..d52e52ad7 --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/client/groups.go @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "encoding/json" + "fmt" + "net/http" +) + +// CoGroupCreateRequest is the body envelope for POST /co_groups.json. +// GroupType "CL" = GroupEnum::Clusters (per-user cluster primary group). +type CoGroupCreateRequest struct { + RequestType string `json:"RequestType"` + Version string `json:"Version"` + CoGroups []CoGroupCreateOne `json:"CoGroups"` +} + +type CoGroupCreateOne struct { + Version string `json:"Version"` + CoId int `json:"CoId"` + Name string `json:"Name"` + Description string `json:"Description"` + Open bool `json:"Open"` + Status string `json:"Status"` + GroupType string `json:"GroupType"` + Auto bool `json:"Auto"` +} + +// CoGroupCreateResponse is the standard NewObject shape returned by COmanage +// REST POSTs. +type CoGroupCreateResponse struct { + ResponseType string `json:"ResponseType"` + Version string `json:"Version"` + ObjectType string `json:"ObjectType"` + Id string `json:"Id"` +} + +func (c *Client) CreateCoGroup(name, description string) (int, error) { + body, err := json.Marshal(CoGroupCreateRequest{ + RequestType: "CoGroups", + Version: restAPIVersion, + CoGroups: []CoGroupCreateOne{{ + Version: restAPIVersion, + CoId: c.cfg.COID, + Name: name, + Description: description, + Open: false, + Status: "Active", + GroupType: "CL", + Auto: false, + }}, + }) + if err != nil { + return 0, fmt.Errorf("marshal co_group: %w", err) + } + + u := c.restAPI("/co_groups.json") + resp, respBody, err := c.Do(http.MethodPost, u, body) + if err != nil { + return 0, err + } + switch resp.StatusCode { + case http.StatusCreated, http.StatusOK: + var out CoGroupCreateResponse + if err := json.Unmarshal(respBody, &out); err != nil { + return 0, fmt.Errorf("decode co_group create response: %w: %s", err, string(respBody)) + } + var id int + if _, err := fmt.Sscanf(out.Id, "%d", &id); err != nil { + return 0, fmt.Errorf("parse co_group id %q: %w", out.Id, err) + } + return id, nil + case http.StatusUnauthorized: + return 0, ErrAuth401 + default: + return 0, &HTTPError{Method: "POST", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} + +type CoGroupListResponse struct { + ResponseType string `json:"ResponseType"` + Version string `json:"Version"` + CoGroups []CoGroupReadOne `json:"CoGroups"` +} + +type CoGroupReadOne struct { + Version string `json:"Version"` + Id int `json:"Id"` + CoId int `json:"CoId"` + Name string `json:"Name"` +} + +// FindCoGroupByName returns the group id, or 0 if no group with that name +// exists in the configured CO. +func (c *Client) FindCoGroupByName(name string) (int, error) { + u := c.restAPI(fmt.Sprintf("/co_groups.json?coid=%d", c.cfg.COID)) + resp, respBody, err := c.Do(http.MethodGet, u, nil) + if err != nil { + return 0, err + } + switch resp.StatusCode { + case http.StatusOK: + var out CoGroupListResponse + if err := json.Unmarshal(respBody, &out); err != nil { + return 0, fmt.Errorf("decode co_groups list: %w", err) + } + for _, g := range out.CoGroups { + if g.Name == name { + return g.Id, nil + } + } + return 0, nil + case http.StatusNoContent: + return 0, nil + case http.StatusUnauthorized: + return 0, ErrAuth401 + default: + return 0, &HTTPError{Method: "GET", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} + +func (c *Client) DeleteCoGroup(id int) error { + u := c.restAPI(fmt.Sprintf("/co_groups/%d.json", id)) + resp, respBody, err := c.Do(http.MethodDelete, u, nil) + if err != nil { + return err + } + switch resp.StatusCode { + case http.StatusOK, http.StatusNoContent: + return nil + case http.StatusUnauthorized: + return ErrAuth401 + case http.StatusNotFound: + return ErrNotFound + default: + return &HTTPError{Method: "DELETE", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/identifiers.go b/connectors/COmanage/Identity-Provisioner/internal/client/identifiers.go new file mode 100644 index 000000000..f562c2915 --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/client/identifiers.go @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "encoding/json" + "fmt" + "net/http" +) + +// IdentifierParent is the nested {"Type":"Group"|"CO","Id":<n>} block that +// attaches an Identifier to a CoGroup or CoPerson. +type IdentifierParent struct { + Type string `json:"Type"` + Id int `json:"Id"` +} + +type IdentifierCreateRequest struct { + RequestType string `json:"RequestType"` + Version string `json:"Version"` + Identifiers []IdentifierCreateOne `json:"Identifiers"` +} + +type IdentifierCreateOne struct { + Version string `json:"Version"` + Identifier string `json:"Identifier"` + Type string `json:"Type"` + Login bool `json:"Login"` + Status string `json:"Status"` + Person IdentifierParent `json:"Person"` +} + +// CreateIdentifierOnGroup POSTs an Identifier attached to a CoGroup. +func (c *Client) CreateIdentifierOnGroup(value, identifierType string, coGroupId int) (int, error) { + body, err := json.Marshal(IdentifierCreateRequest{ + RequestType: "Identifiers", + Version: restAPIVersion, + Identifiers: []IdentifierCreateOne{{ + Version: restAPIVersion, + Identifier: value, + Type: identifierType, + Login: false, + Status: "Active", + Person: IdentifierParent{Type: "Group", Id: coGroupId}, + }}, + }) + if err != nil { + return 0, fmt.Errorf("marshal identifier: %w", err) + } + + u := c.restAPI("/identifiers.json") + resp, respBody, err := c.Do(http.MethodPost, u, body) + if err != nil { + return 0, err + } + switch resp.StatusCode { + case http.StatusCreated, http.StatusOK: + var out CoGroupCreateResponse // same NewObject shape + if err := json.Unmarshal(respBody, &out); err != nil { + return 0, fmt.Errorf("decode identifier create response: %w: %s", err, string(respBody)) + } + var id int + if _, err := fmt.Sscanf(out.Id, "%d", &id); err != nil { + return 0, fmt.Errorf("parse identifier id %q: %w", out.Id, err) + } + return id, nil + case http.StatusUnauthorized: + return 0, ErrAuth401 + default: + return 0, &HTTPError{Method: "POST", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} + +type IdentifierListResponse struct { + ResponseType string `json:"ResponseType"` + Version string `json:"Version"` + Identifiers []IdentifierListOne `json:"Identifiers"` +} + +type IdentifierListOne struct { + Version string `json:"Version"` + Id int `json:"Id"` + Identifier string `json:"Identifier"` + Type string `json:"Type"` + Status string `json:"Status"` +} + +// FindIdentifierOnGroup returns the existing Identifier id with the given +// type on a CoGroup, or 0 if none. +func (c *Client) FindIdentifierOnGroup(coGroupId int, identifierType string) (int, error) { + u := c.restAPI(fmt.Sprintf("/identifiers.json?cogroupid=%d", coGroupId)) + resp, respBody, err := c.Do(http.MethodGet, u, nil) + if err != nil { + return 0, err + } + switch resp.StatusCode { + case http.StatusOK: + var out IdentifierListResponse + if err := json.Unmarshal(respBody, &out); err != nil { + return 0, fmt.Errorf("decode identifiers list: %w", err) + } + for _, ident := range out.Identifiers { + if ident.Type == identifierType { + return ident.Id, nil + } + } + return 0, nil + case http.StatusNoContent: + return 0, nil + case http.StatusUnauthorized: + return 0, ErrAuth401 + default: + return 0, &HTTPError{Method: "GET", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/people.go b/connectors/COmanage/Identity-Provisioner/internal/client/people.go new file mode 100644 index 000000000..8968c1f8a --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/client/people.go @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" +) + +// CreatePersonResponseEntry is one Identifier echoed by POST /people. COmanage +// returns one row per identifier type configured on the binding, typically +// including the auto-assigned ids. +type CreatePersonResponseEntry struct { + Identifier string `json:"identifier"` + Type string `json:"type"` + Login bool `json:"login"` + Status string `json:"status"` +} + +// CreatePerson POSTs the composite body (CoPerson + Name + Email + +// Identifier blocks) to /people and returns the resulting Identifier list. +func (c *Client) CreatePerson(body []byte) ([]CreatePersonResponseEntry, error) { + u := c.coreAPI("/people") + resp, respBody, err := c.Do(http.MethodPost, u, body) + if err != nil { + return nil, err + } + switch resp.StatusCode { + case http.StatusCreated, http.StatusOK: + var out []CreatePersonResponseEntry + if err := json.Unmarshal(respBody, &out); err != nil { + return nil, fmt.Errorf("decode create-person response: %w: %s", err, string(respBody)) + } + return out, nil + case http.StatusUnauthorized: + return nil, ErrAuth401 + default: + return nil, &HTTPError{Method: "POST", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} + +// GetPersonComposite returns the raw composite JSON, suitable for round-tripping +// back into UpdatePerson. +func (c *Client) GetPersonComposite(identifier string) (json.RawMessage, error) { + u := c.coreAPI("/people/" + identifier) + resp, respBody, err := c.Do(http.MethodGet, u, nil) + if err != nil { + return nil, err + } + switch resp.StatusCode { + case http.StatusOK: + return json.RawMessage(respBody), nil + case http.StatusUnauthorized: + return nil, ErrAuth401 + case http.StatusNotFound: + return nil, ErrNotFound + default: + return nil, &HTTPError{Method: "GET", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} + +// UpdatePerson PUTs a full-composite body. The body MUST include every related +// model (Name, EmailAddress, Identifier, etc.) or COmanage's deleteOmitted +// behavior will wipe them. +func (c *Client) UpdatePerson(identifier string, body []byte) (json.RawMessage, error) { + u := c.coreAPI("/people/" + identifier) + resp, respBody, err := c.Do(http.MethodPut, u, body) + if err != nil { + return nil, err + } + switch resp.StatusCode { + case http.StatusOK: + return json.RawMessage(respBody), nil + case http.StatusUnauthorized: + return nil, ErrAuth401 + case http.StatusNotFound: + return nil, ErrNotFound + default: + return nil, &HTTPError{Method: "PUT", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} + +func (c *Client) DeletePerson(identifier string) error { + u := c.coreAPI("/people/" + identifier) + resp, respBody, err := c.Do(http.MethodDelete, u, nil) + if err != nil { + return err + } + switch resp.StatusCode { + case http.StatusOK, http.StatusNoContent: + return nil + case http.StatusUnauthorized: + return ErrAuth401 + case http.StatusNotFound: + return ErrNotFound + default: + return &HTTPError{Method: "DELETE", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} + +type CoPersonListResponse struct { + ResponseType string `json:"ResponseType"` + Version string `json:"Version"` + CoPeople []CoPersonListOne `json:"CoPeople"` +} + +type CoPersonListOne struct { + Version string `json:"Version"` + Id int `json:"Id"` + CoId int `json:"CoId"` +} + +// FindCoPersonByEmail searches for CoPeople. search.mail is a LIKE match in +// COmanage; callers must post-filter for exact equality before trusting a hit. +func (c *Client) FindCoPersonByEmail(email string) ([]CoPersonListOne, error) { + u := c.restAPI(fmt.Sprintf("/co_people.json?coid=%d&search.mail=%s", c.cfg.COID, url.QueryEscape(email))) + resp, respBody, err := c.Do(http.MethodGet, u, nil) + if err != nil { + return nil, err + } + switch resp.StatusCode { + case http.StatusOK: + var out CoPersonListResponse + if err := json.Unmarshal(respBody, &out); err != nil { + return nil, fmt.Errorf("decode co_people list: %w", err) + } + return out.CoPeople, nil + case http.StatusNoContent: + return nil, nil + case http.StatusUnauthorized: + return nil, ErrAuth401 + case http.StatusNotFound: + return nil, ErrNotFound + default: + return nil, &HTTPError{Method: "GET", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/unix_cluster.go b/connectors/COmanage/Identity-Provisioner/internal/client/unix_cluster.go new file mode 100644 index 000000000..2a9eaa99d --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/client/unix_cluster.go @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "encoding/json" + "fmt" + "net/http" +) + +type UnixClusterGroupCreateRequest struct { + RequestType string `json:"RequestType"` + Version string `json:"Version"` + UnixClusterGroups []UnixClusterGroupCreateOne `json:"UnixClusterGroups"` +} + +type UnixClusterGroupCreateOne struct { + Version string `json:"Version"` + UnixClusterId int `json:"UnixClusterId"` + CoGroupId int `json:"CoGroupId"` +} + +// CreateUnixClusterGroup binds a CoGroup to a UnixCluster. The URL takes no +// named params for POST. +func (c *Client) CreateUnixClusterGroup(coGroupId int) (int, error) { + body, err := json.Marshal(UnixClusterGroupCreateRequest{ + RequestType: "UnixClusterGroups", + Version: restAPIVersion, + UnixClusterGroups: []UnixClusterGroupCreateOne{{ + Version: restAPIVersion, + UnixClusterId: c.cfg.UnixClusterID, + CoGroupId: coGroupId, + }}, + }) + if err != nil { + return 0, fmt.Errorf("marshal unix_cluster_group: %w", err) + } + + u := c.restAPI("/unix_cluster/unix_cluster_groups.json") + resp, respBody, err := c.Do(http.MethodPost, u, body) + if err != nil { + return 0, err + } + switch resp.StatusCode { + case http.StatusCreated, http.StatusOK: + var out CoGroupCreateResponse // NewObject shape + if err := json.Unmarshal(respBody, &out); err != nil { + return 0, fmt.Errorf("decode unix_cluster_group response: %w: %s", err, string(respBody)) + } + var id int + if _, err := fmt.Sscanf(out.Id, "%d", &id); err != nil { + return 0, fmt.Errorf("parse unix_cluster_group id %q: %w", out.Id, err) + } + return id, nil + case http.StatusUnauthorized: + return 0, ErrAuth401 + default: + return 0, &HTTPError{Method: "POST", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} + +func (c *Client) DeleteUnixClusterGroup(id int) error { + u := c.restAPI(fmt.Sprintf("/unix_cluster/unix_cluster_groups/%d.json", id)) + resp, respBody, err := c.Do(http.MethodDelete, u, nil) + if err != nil { + return err + } + switch resp.StatusCode { + case http.StatusOK, http.StatusNoContent: + return nil + case http.StatusUnauthorized: + return ErrAuth401 + case http.StatusNotFound: + return ErrNotFound + default: + return &HTTPError{Method: "DELETE", URL: u, StatusCode: resp.StatusCode, Body: string(respBody)} + } +} diff --git a/connectors/COmanage/Identity-Provisioner/internal/operations/compose.go b/connectors/COmanage/Identity-Provisioner/internal/operations/compose.go new file mode 100644 index 000000000..e080acbf5 --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/operations/compose.go @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package operations + +import ( + "encoding/json" + "fmt" +) + +// UnixClusterAccountBlock is appended to the composite PUT body. Field names +// mirror COmanage's JSON attributes. +type UnixClusterAccountBlock struct { + UnixClusterId int `json:"unix_cluster_id"` + SyncMode string `json:"sync_mode"` + Status string `json:"status"` + Username string `json:"username"` + Uid int64 `json:"uid"` + Gecos string `json:"gecos"` + LoginShell string `json:"login_shell"` + HomeDirectory string `json:"home_directory"` + PrimaryCoGroupId int `json:"primary_co_group_id"` +} + +// mergeUnixClusterAccount sets "UnixClusterAccount" on the composite while +// preserving every other top-level key. +func mergeUnixClusterAccount(composite json.RawMessage, block UnixClusterAccountBlock) ([]byte, error) { + var top map[string]json.RawMessage + if err := json.Unmarshal(composite, &top); err != nil { + return nil, fmt.Errorf("decode composite: %w", err) + } + blockJSON, err := json.Marshal([]UnixClusterAccountBlock{block}) + if err != nil { + return nil, fmt.Errorf("encode UnixClusterAccount block: %w", err) + } + top["UnixClusterAccount"] = blockJSON + out, err := json.Marshal(top) + if err != nil { + return nil, fmt.Errorf("encode merged composite: %w", err) + } + return out, nil +} + +// extractIdentifier returns the first Identifier.identifier whose type matches, +// or "" if none. +func extractIdentifier(composite json.RawMessage, identifierType string) (string, error) { + var top map[string]json.RawMessage + if err := json.Unmarshal(composite, &top); err != nil { + return "", fmt.Errorf("decode composite: %w", err) + } + rawIdents, ok := top["Identifier"] + if !ok { + return "", nil + } + var idents []struct { + Identifier string `json:"identifier"` + Type string `json:"type"` + } + if err := json.Unmarshal(rawIdents, &idents); err != nil { + return "", fmt.Errorf("decode Identifier array: %w", err) + } + for _, id := range idents { + if id.Type == identifierType { + return id.Identifier, nil + } + } + return "", nil +} + +// extractCoPersonID returns the numeric CoPerson.meta.id (distinct from the +// PersonIDType identifier string), or 0 if missing. +func extractCoPersonID(composite json.RawMessage) (int, error) { + var top struct { + CoPerson struct { + Meta struct { + Id int `json:"id"` + } `json:"meta"` + } `json:"CoPerson"` + } + if err := json.Unmarshal(composite, &top); err != nil { + return 0, fmt.Errorf("decode composite for CoPerson.id: %w", err) + } + return top.CoPerson.Meta.Id, nil +} diff --git a/connectors/COmanage/Identity-Provisioner/internal/operations/compose_test.go b/connectors/COmanage/Identity-Provisioner/internal/operations/compose_test.go new file mode 100644 index 000000000..c7d0b12d4 --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/operations/compose_test.go @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package operations + +import ( + "encoding/json" + "testing" + + "github.com/apache/airavata-custos/pkg/models" +) + +// sampleComposite mirrors a real CoPerson composite trimmed to the fields the +// operations layer reads. +const sampleComposite = `{ + "CoPerson":{"meta":{"id":98},"co_id":2,"status":"A"}, + "Name":[{"given":"GoalE2E","family":"Throwaway","type":"official","primary_name":true}], + "EmailAddress":[{"mail":"[email protected]","type":"official","verified":false}], + "CoGroupMember":[ + {"co_group_id":7,"member":true,"owner":false}, + {"co_group_id":25,"member":true,"owner":true} + ], + "Identifier":[ + {"identifier":"http://test.invalid/sub","type":"oidcsub","login":true,"status":"A"}, + {"identifier":"Person100016","type":"comanage_id","login":false,"status":"A"}, + {"identifier":"100016","type":"comanage_number","login":false,"status":"A"}, + {"identifier":"vspectes2","type":"uid","login":false,"status":"A"}, + {"identifier":"2000016","type":"uidnumber","login":false,"status":"A"}, + {"identifier":"2000016","type":"gidnumber","login":false,"status":"A"} + ] +}` + +func TestExtractIdentifier(t *testing.T) { + tests := []struct { + typeName string + want string + }{ + {"comanage_id", "Person100016"}, + {"uidnumber", "2000016"}, + {"gidnumber", "2000016"}, + {"oidcsub", "http://test.invalid/sub"}, + {"missing-type", ""}, + } + for _, tc := range tests { + t.Run(tc.typeName, func(t *testing.T) { + got, err := extractIdentifier([]byte(sampleComposite), tc.typeName) + if err != nil { + t.Fatalf("extractIdentifier: %v", err) + } + if got != tc.want { + t.Errorf("got %q, want %q", got, tc.want) + } + }) + } +} + +func TestExtractCoPersonID(t *testing.T) { + got, err := extractCoPersonID([]byte(sampleComposite)) + if err != nil { + t.Fatalf("extractCoPersonID: %v", err) + } + if got != 98 { + t.Errorf("got %d, want 98", got) + } +} + +func TestMergeUnixClusterAccount_PreservesAllKeysAndAppendsBlock(t *testing.T) { + block := UnixClusterAccountBlock{ + UnixClusterId: 1, + SyncMode: "M", + Status: "A", + Username: "custos-gthrowa", + Uid: 2000016, + Gecos: "", + LoginShell: "/bin/bash", + HomeDirectory: "/home/custos-gthrowa", + PrimaryCoGroupId: 25, + } + out, err := mergeUnixClusterAccount([]byte(sampleComposite), block) + if err != nil { + t.Fatalf("mergeUnixClusterAccount: %v", err) + } + + var merged map[string]json.RawMessage + if err := json.Unmarshal(out, &merged); err != nil { + t.Fatalf("decode merged: %v", err) + } + + for _, key := range []string{"CoPerson", "Name", "EmailAddress", "Identifier", "CoGroupMember", "UnixClusterAccount"} { + if _, ok := merged[key]; !ok { + t.Errorf("merged composite missing key %q", key) + } + } + + // UnixClusterAccount must be an array with one block matching what we sent. + var unix []UnixClusterAccountBlock + if err := json.Unmarshal(merged["UnixClusterAccount"], &unix); err != nil { + t.Fatalf("decode UnixClusterAccount: %v", err) + } + if len(unix) != 1 || unix[0].Username != "custos-gthrowa" || unix[0].Uid != 2000016 || unix[0].PrimaryCoGroupId != 25 { + t.Errorf("unix block: %+v", unix) + } +} + +func TestBuildCreatePersonBody_Shape(t *testing.T) { + u := &models.User{FirstName: "GoalE2E", LastName: "Throwaway", Email: "[email protected]"} + raw, err := buildCreatePersonBody(2, u) + if err != nil { + t.Fatalf("buildCreatePersonBody: %v", err) + } + var got map[string]json.RawMessage + if err := json.Unmarshal(raw, &got); err != nil { + t.Fatalf("decode: %v", err) + } + for _, key := range []string{"CoPerson", "Name", "EmailAddress"} { + if _, ok := got[key]; !ok { + t.Errorf("body missing %q", key) + } + } +} diff --git a/connectors/COmanage/Identity-Provisioner/internal/operations/ensure_posix_account.go b/connectors/COmanage/Identity-Provisioner/internal/operations/ensure_posix_account.go new file mode 100644 index 000000000..7c2d8c964 --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/operations/ensure_posix_account.go @@ -0,0 +1,265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package operations + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "strconv" + + "github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client" + "github.com/apache/airavata-custos/pkg/models" +) + +func (o *Orchestrator) ensurePOSIXAccountImpl(ctx context.Context, cu *models.ComputeClusterUser) error { + log := slog.With("correlation_id", cu.ID, "custos_user_id", cu.UserID, "unix_cluster_id", o.c.Config().UnixClusterID) + + user, err := o.core.GetUser(ctx, cu.UserID) + if err != nil { + return fmt.Errorf("get custos user: %w", err) + } + + personID, composite, created, err := o.lookupOrCreateCoPerson(ctx, user) + if err != nil { + o.dlq(ctx, cu, "lookup_or_create_coperson", err) + return err + } + log = log.With("comanage_person_id", personID) + log.Info("comanage: CoPerson resolved", "created", created) + if created { + o.audit(ctx, cu, "ComanageCoPersonCreated", fmt.Sprintf("comanage_id=%s email=%s", personID, user.Email)) + } + + if err := o.storePersonID(ctx, cu.UserID, personID); err != nil { + log.Warn("comanage: failed to store CoPerson id", "err", err) + } + + if composite == nil { + composite, err = o.c.GetPersonComposite(personID) + if err != nil { + o.dlq(ctx, cu, "get_composite", err) + return err + } + } + + uidnumber, err := extractIdentifier(composite, "uidnumber") + if err != nil { + o.dlq(ctx, cu, "extract_uidnumber", err) + return err + } + if uidnumber == "" { + err := fmt.Errorf("uidnumber identifier missing on CoPerson %s", personID) + o.dlq(ctx, cu, "missing_uidnumber", err) + return err + } + uidInt, err := strconv.ParseInt(uidnumber, 10, 64) + if err != nil { + o.dlq(ctx, cu, "parse_uidnumber", err) + return err + } + + coPersonID, err := extractCoPersonID(composite) + if err != nil || coPersonID == 0 { + o.dlq(ctx, cu, "extract_coperson_id", err) + return fmt.Errorf("extract CoPerson.meta.id: %w", err) + } + + coGroupID, err := o.c.FindCoGroupByName(cu.LocalUsername) + if err != nil { + o.dlq(ctx, cu, "find_co_group", err) + return err + } + if coGroupID == 0 { + coGroupID, err = o.c.CreateCoGroup(cu.LocalUsername, "Primary group for "+cu.LocalUsername) + if err != nil { + o.dlq(ctx, cu, "create_co_group", err) + return err + } + log.Info("comanage: CoGroup created", "co_group_id", coGroupID) + } + + if existing, err := o.c.FindIdentifierOnGroup(coGroupID, "uid"); err != nil { + o.dlq(ctx, cu, "find_groupname_identifier", err) + return err + } else if existing == 0 { + if _, err := o.c.CreateIdentifierOnGroup(cu.LocalUsername, "uid", coGroupID); err != nil { + o.dlq(ctx, cu, "create_groupname_identifier", err) + return err + } + } + + if existing, err := o.c.FindIdentifierOnGroup(coGroupID, "gidnumber"); err != nil { + o.dlq(ctx, cu, "find_gidnumber_identifier", err) + return err + } else if existing == 0 { + if _, err := o.c.CreateIdentifierOnGroup(uidnumber, "gidnumber", coGroupID); err != nil { + o.dlq(ctx, cu, "create_gidnumber_identifier", err) + return err + } + } + + if existing, err := o.c.FindCoGroupMember(coGroupID, coPersonID); err != nil { + o.dlq(ctx, cu, "find_co_group_member", err) + return err + } else if existing == 0 { + if _, err := o.c.CreateCoGroupMember(coPersonID, coGroupID); err != nil { + o.dlq(ctx, cu, "create_co_group_member", err) + return err + } + } + + // UnixClusterGroup attach: re-attempt is the idempotency mechanism. A 4xx + // here means the pair already exists; continue. + if _, err := o.c.CreateUnixClusterGroup(coGroupID); err != nil { + var httpErr *client.HTTPError + if !errors.As(err, &httpErr) || httpErr.StatusCode < 400 || httpErr.StatusCode >= 500 { + o.dlq(ctx, cu, "create_unix_cluster_group", err) + return err + } + log.Info("comanage: UnixClusterGroup attach returned 4xx (already attached)", "status", httpErr.StatusCode) + } + + fresh, err := o.c.GetPersonComposite(personID) + if err != nil { + o.dlq(ctx, cu, "refetch_composite", err) + return err + } + block := UnixClusterAccountBlock{ + UnixClusterId: o.c.Config().UnixClusterID, + SyncMode: "M", + Status: "A", + Username: cu.LocalUsername, + Uid: uidInt, + Gecos: "", + LoginShell: o.c.Config().DefaultShell, + HomeDirectory: o.c.Config().HomedirPrefix + cu.LocalUsername, + PrimaryCoGroupId: coGroupID, + } + putBody, err := mergeUnixClusterAccount(fresh, block) + if err != nil { + o.dlq(ctx, cu, "merge_composite", err) + return err + } + if _, err := o.c.UpdatePerson(personID, putBody); err != nil { + o.dlq(ctx, cu, "put_composite", err) + return err + } + log.Info("comanage: UnixClusterAccount attached", "username", cu.LocalUsername, "uid", uidInt, "co_group_id", coGroupID) + o.audit(ctx, cu, "ComanageClusterAccountAttached", fmt.Sprintf("comanage_id=%s username=%s uid=%d", personID, cu.LocalUsername, uidInt)) + return nil +} + +// lookupOrCreateCoPerson resolves the user's CoPerson, returning the COmanage +// person identifier, the composite (if the GET path was used), and whether a +// new CoPerson was created. +func (o *Orchestrator) lookupOrCreateCoPerson(ctx context.Context, user *models.User) (string, json.RawMessage, bool, error) { + personIDType := o.c.Config().PersonIDType + + if stored, err := o.findStoredPersonID(ctx, user.ID); err != nil { + return "", nil, false, fmt.Errorf("stored lookup: %w", err) + } else if stored != "" { + composite, err := o.c.GetPersonComposite(stored) + if err == nil { + return stored, composite, false, nil + } + if !errors.Is(err, client.ErrNotFound) { + return "", nil, false, fmt.Errorf("get composite for stored id: %w", err) + } + // stored id no longer resolves; fall through to email search + } + + if user.Email != "" { + coPersonID, err := o.findByEmailExact(user.Email) + if err != nil && !errors.Is(err, client.ErrNotFound) { + return "", nil, false, fmt.Errorf("email search: %w", err) + } + if coPersonID != 0 { + composite, err := o.c.GetPersonComposite(strconv.Itoa(coPersonID)) + if err != nil { + return "", nil, false, fmt.Errorf("get composite by numeric id: %w", err) + } + personID, err := extractIdentifier(composite, personIDType) + if err != nil { + return "", nil, false, fmt.Errorf("extract %s from composite: %w", personIDType, err) + } + if personID == "" { + return "", nil, false, fmt.Errorf("CoPerson %d has no %s identifier", coPersonID, personIDType) + } + return personID, composite, false, nil + } + } + + body, err := buildCreatePersonBody(o.c.Config().COID, user) + if err != nil { + return "", nil, false, fmt.Errorf("build create body: %w", err) + } + resp, err := o.c.CreatePerson(body) + if err != nil { + return "", nil, false, fmt.Errorf("create coperson: %w", err) + } + for _, r := range resp { + if r.Type == personIDType { + return r.Identifier, nil, true, nil + } + } + return "", nil, false, fmt.Errorf("POST /people returned no %s identifier: %+v", personIDType, resp) +} + +func buildCreatePersonBody(coID int, user *models.User) ([]byte, error) { + body := map[string]interface{}{ + "CoPerson": map[string]interface{}{ + "co_id": coID, + "status": "A", + }, + "Name": []map[string]interface{}{{ + "given": user.FirstName, + "family": user.LastName, + "type": "official", + "primary_name": true, + "language": "en", + }}, + "EmailAddress": []map[string]interface{}{{ + "mail": user.Email, + "type": "official", + "verified": false, + }}, + } + return json.Marshal(body) +} + +func (o *Orchestrator) audit(ctx context.Context, cu *models.ComputeClusterUser, eventType, details string) { + _, _ = o.core.CreateAuditEvent(ctx, &models.AuditEvent{ + EventType: eventType, + EntityID: cu.ID, + Details: details, + }) +} + +func (o *Orchestrator) dlq(ctx context.Context, cu *models.ComputeClusterUser, step string, err error) { + details := fmt.Sprintf("step=%s err=%v", step, err) + _, _ = o.core.CreateAuditEvent(ctx, &models.AuditEvent{ + EventType: "ComanageProvisioningFailed", + EntityID: cu.ID, + Details: details, + }) + // TODO: admin endpoint + CLI to re-fire ComputeClusterUserCreateEvent for a + // specific user, and to clean up orphan CoGroups from terminal dead-letters. +} diff --git a/connectors/COmanage/Identity-Provisioner/internal/operations/lookup.go b/connectors/COmanage/Identity-Provisioner/internal/operations/lookup.go new file mode 100644 index 000000000..cb851a28b --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/operations/lookup.go @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package operations + +import ( + "context" + "errors" + "fmt" + + "github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client" + "github.com/apache/airavata-custos/pkg/models" + _ "github.com/apache/airavata-custos/pkg/service" +) + +var ErrNotFoundShim = client.ErrNotFound + +const comanageIdentitySource = "comanage" + +func (o *Orchestrator) findStoredPersonID(ctx context.Context, userID string) (string, error) { + idents, err := o.core.ListUserIdentitiesForUser(ctx, userID) + if err != nil { + return "", fmt.Errorf("list user identities: %w", err) + } + for _, id := range idents { + if id.Source == comanageIdentitySource && id.ExternalID != "" { + return id.ExternalID, nil + } + } + return "", nil +} + +// storePersonID writes the COmanage CoPerson identifier into user_identities. +// No-op if a row already exists. +func (o *Orchestrator) storePersonID(ctx context.Context, userID, personID string) error { + if existing, err := o.findStoredPersonID(ctx, userID); err != nil { + return err + } else if existing != "" { + return nil + } + _, err := o.core.CreateUserIdentity(ctx, &models.UserIdentity{ + UserID: userID, + Source: comanageIdentitySource, + ExternalID: personID, + }) + return err +} + +// findByEmailExact searches by email and returns the first match in the +// configured CO, or 0 if none. COmanage's search.mail is a LIKE match: callers +// trading off precision for one round-trip should be aware that very similar +// emails could collide. +func (o *Orchestrator) findByEmailExact(email string) (int, error) { + if email == "" { + return 0, nil + } + candidates, err := o.c.FindCoPersonByEmail(email) + if err != nil { + if errors.Is(err, ErrNotFoundShim) { + return 0, nil + } + return 0, err + } + for _, p := range candidates { + if p.CoId == o.c.Config().COID { + return p.Id, nil + } + } + return 0, nil +} diff --git a/connectors/COmanage/Identity-Provisioner/internal/operations/orchestrator.go b/connectors/COmanage/Identity-Provisioner/internal/operations/orchestrator.go new file mode 100644 index 000000000..f48c062dc --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/operations/orchestrator.go @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package operations provisions POSIX identity in COmanage for a +// (CoPerson, UnixCluster) pair. +package operations + +import ( + "context" + + "github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client" + "github.com/apache/airavata-custos/pkg/models" + "github.com/apache/airavata-custos/pkg/service" +) + +type Orchestrator struct { + c *client.Client + core *service.Service +} + +func New(c *client.Client, core *service.Service) *Orchestrator { + return &Orchestrator{c: c, core: core} +} + +func (o *Orchestrator) EnsurePOSIXAccount(ctx context.Context, cu *models.ComputeClusterUser) error { + return o.ensurePOSIXAccountImpl(ctx, cu) +} diff --git a/connectors/COmanage/Identity-Provisioner/internal/subscribers/cluster_user.go b/connectors/COmanage/Identity-Provisioner/internal/subscribers/cluster_user.go new file mode 100644 index 000000000..a3fac2901 --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/internal/subscribers/cluster_user.go @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package subscribers + +import ( + "context" + "log/slog" + + "github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client" + "github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/operations" + "github.com/apache/airavata-custos/pkg/events" + "github.com/apache/airavata-custos/pkg/models" + "github.com/apache/airavata-custos/pkg/service" +) + +// ClusterUserSubscriber listens for ComputeClusterUserCreateEvent and drives +// the orchestrator. Events whose ComputeClusterID does not match +// CustosClusterID are dropped. +type ClusterUserSubscriber struct { + ops *operations.Orchestrator + bus *events.Bus + core *service.Service + custosClusterID string +} + +func NewClusterUserSubscriber(c *client.Client, bus *events.Bus, core *service.Service, custosClusterID string) *ClusterUserSubscriber { + return &ClusterUserSubscriber{ + ops: operations.New(c, core), + bus: bus, + core: core, + custosClusterID: custosClusterID, + } +} + +func (s *ClusterUserSubscriber) RegisterSubscribers() { + s.bus.Subscribe(events.ComputeClusterUserCreateEvent, s.handleClusterUserCreate) +} + +func (s *ClusterUserSubscriber) handleClusterUserCreate(_ events.Event, payload interface{}) { + cu, ok := payload.(*models.ComputeClusterUser) + if !ok { + slog.Error("comanage subscriber: payload is not *ComputeClusterUser", "type", payload) + return + } + if cu.ComputeClusterID != s.custosClusterID { + return + } + ctx := context.Background() + // TODO: move to a transactional scope. In-process delivery loses events + // if the process crashes between the core commit and subscriber pickup. + if err := s.ops.EnsurePOSIXAccount(ctx, cu); err != nil { + slog.Error("comanage subscriber: EnsurePOSIXAccount failed", + "compute_cluster_user_id", cu.ID, "user_id", cu.UserID, "err", err) + } +} diff --git a/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go new file mode 100644 index 000000000..9c9628dce --- /dev/null +++ b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package comanage is the COmanage Identity-Provisioner entry point. Wired +// from internal/connectors/loader.go. +package comanage + +import ( + "context" + "log/slog" + "os" + "strconv" + "sync" + "time" + + "github.com/jmoiron/sqlx" + + "github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client" + "github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/subscribers" + "github.com/apache/airavata-custos/pkg/events" + "github.com/apache/airavata-custos/pkg/service" +) + +// LoadConnector wires the subscriber to the event bus. If any required env +// var is missing, the loader logs and returns nil without registering. +func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, coreService *service.Service, _ *sync.WaitGroup) error { + cfg, ok := loadConfigFromEnv() + if !ok { + slog.Info("comanage provisioner: required env vars not set; skipping") + return nil + } + httpClient := client.New(cfg) + subscribers.NewClusterUserSubscriber(httpClient, eventBus, coreService, cfg.CustosClusterID).RegisterSubscribers() + slog.Info("comanage provisioner: subscriber registered", + "registry", cfg.RegistryURL, "co_id", cfg.COID, "cluster_id", cfg.CustosClusterID) + // Custos is the source of truth: CoPerson and UnixClusterAccount records + // for users provisioned via this connector must not be edited directly in + // COmanage. There is no drift reconciliation; out-of-band edits will be + // invisible to Custos. + return nil +} + +func loadConfigFromEnv() (client.Config, bool) { + registryURL := os.Getenv("COMANAGE_REGISTRY_URL") + coIDStr := os.Getenv("COMANAGE_CO_ID") + apiUser := os.Getenv("COMANAGE_API_USER") + apiKey := os.Getenv("COMANAGE_API_KEY") + personIDType := os.Getenv("COMANAGE_PERSON_ID_TYPE") + unixClusterStr := os.Getenv("COMANAGE_UNIX_CLUSTER_ID") + custosCluster := os.Getenv("CUSTOS_CLUSTER_ID") + + if registryURL == "" || coIDStr == "" || apiUser == "" || apiKey == "" || personIDType == "" || unixClusterStr == "" || custosCluster == "" { + return client.Config{}, false + } + coID, err := strconv.Atoi(coIDStr) + if err != nil { + slog.Error("comanage provisioner: COMANAGE_CO_ID is not an int", "value", coIDStr, "err", err) + return client.Config{}, false + } + unixClusterID, err := strconv.Atoi(unixClusterStr) + if err != nil { + slog.Error("comanage provisioner: COMANAGE_UNIX_CLUSTER_ID is not an int", "value", unixClusterStr, "err", err) + return client.Config{}, false + } + + timeout := 30 * time.Second + if v := os.Getenv("COMANAGE_HTTP_TIMEOUT"); v != "" { + if d, err := time.ParseDuration(v); err == nil { + timeout = d + } + } + + defaultShell := os.Getenv("COMANAGE_DEFAULT_SHELL") + if defaultShell == "" { + defaultShell = "/bin/bash" + } + homedirPrefix := os.Getenv("COMANAGE_HOMEDIR_PREFIX") + if homedirPrefix == "" { + homedirPrefix = "/home/" + } + + return client.Config{ + RegistryURL: registryURL, + COID: coID, + APIUser: apiUser, + APIKey: apiKey, + PersonIDType: personIDType, + UnixClusterID: unixClusterID, + CustosClusterID: custosCluster, + DefaultShell: defaultShell, + HomedirPrefix: homedirPrefix, + HTTPTimeout: timeout, + }, true +} diff --git a/internal/connectors/loader.go b/internal/connectors/loader.go index 99fb221c0..ea070208c 100644 --- a/internal/connectors/loader.go +++ b/internal/connectors/loader.go @@ -25,6 +25,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/pkg/amie" + "github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/pkg/comanage" "github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/pkg/smapper" "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/service" @@ -45,6 +46,12 @@ func LoadConnectors(ctx context.Context, database *sqlx.DB, eventBus *events.Bus return err } + slog.Info("loading COmanage Identity-Provisioner connector") + if err := comanage.LoadConnector(ctx, database, eventBus, coreService, wg); err != nil { + slog.Error("failed to load COmanage Identity-Provisioner connector", "error", err) + return err + } + slog.Info("finished loading connectors") return nil }
