Sahina Bose has uploaded a new change for review.

Change subject: engine: [WIP] Support for gluster task monitoring
......................................................................

engine: [WIP] Support for gluster task monitoring

Added GlusterTaskManager that periodically polls all running gluster tasks and 
updates
the corresponding step and job with status and description.

Change-Id: Ica67b56804cbb892886160f0b22043687277d476
Signed-off-by: Sahina Bose <sab...@redhat.com>
---
M 
backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/GlusterJobsManager.java
A 
backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksService.java
A 
backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksSyncJob.java
3 files changed, 306 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/65/17965/1

diff --git 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/GlusterJobsManager.java
 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/GlusterJobsManager.java
index 615739b..8a182bb 100644
--- 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/GlusterJobsManager.java
+++ 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/GlusterJobsManager.java
@@ -2,6 +2,7 @@
 
 import java.util.concurrent.TimeUnit;
 
+import org.ovirt.engine.core.bll.gluster.tasks.GlusterTasksSyncJob;
 import org.ovirt.engine.core.common.config.Config;
 import org.ovirt.engine.core.common.config.ConfigValues;
 import org.ovirt.engine.core.common.mode.ApplicationMode;
@@ -55,6 +56,13 @@
                 getRefreshRate(ConfigValues.GlusterRefreshRateLight),
                 getRefreshRate(ConfigValues.GlusterRefreshRateLight),
                 TimeUnit.SECONDS);
+
+        scheduler.scheduleAFixedDelayJob(GlusterTasksSyncJob.getInstance(),
+                "gluster_async_task_poll_event", new Class[] {}, new Class [] 
{}, 30,
+                30, TimeUnit.SECONDS);
+        scheduler.scheduleAFixedDelayJob(GlusterTasksSyncJob.getInstance(),
+                "gluster_async_task_clear_event", new Class[] {}, new Class [] 
{}, 300,
+                300, TimeUnit.SECONDS);
     }
 
     private static boolean glusterModeSupported() {
diff --git 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksService.java
 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksService.java
new file mode 100644
index 0000000..555bed3
--- /dev/null
+++ 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksService.java
@@ -0,0 +1,94 @@
+package org.ovirt.engine.core.bll.gluster.tasks;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.ovirt.engine.core.bll.utils.ClusterUtils;
+import org.ovirt.engine.core.common.VdcObjectType;
+import org.ovirt.engine.core.common.asynctasks.gluster.GlusterAsyncTask;
+import org.ovirt.engine.core.common.businessentities.VDS;
+import org.ovirt.engine.core.common.businessentities.VDSGroup;
+import org.ovirt.engine.core.common.businessentities.VdsStatic;
+import 
org.ovirt.engine.core.common.businessentities.gluster.GlusterVolumeEntity;
+import org.ovirt.engine.core.common.errors.VdcBLLException;
+import org.ovirt.engine.core.common.errors.VdcBllErrors;
+import org.ovirt.engine.core.common.job.ExternalSystemType;
+import org.ovirt.engine.core.common.job.Job;
+import org.ovirt.engine.core.common.job.Step;
+import org.ovirt.engine.core.compat.Guid;
+import org.ovirt.engine.core.dal.dbbroker.DbFacade;
+import org.ovirt.engine.core.utils.log.Log;
+import org.ovirt.engine.core.utils.log.LogFactory;
+
+public class GlusterTasksService {
+    private static final Log log = 
LogFactory.getLog(GlusterTasksService.class);
+
+    public Map<Guid, GlusterAsyncTask> getAllTasks() {
+        Map<Guid,GlusterAsyncTask>  map = new HashMap<Guid,GlusterAsyncTask>();
+        // For each cluster, query gluster for list of running tasks.
+        List<VDSGroup> clusters = 
DbFacade.getInstance().getVdsGroupDao().getAll();
+        for (VDSGroup cluster: clusters) {
+            Map<Guid,GlusterAsyncTask> clusterTasks = 
getTaskListForCluster(cluster.getId());
+            if (clusterTasks != null) {
+                map.putAll(getTaskListForCluster(cluster.getId()));
+            }
+        }
+        return map;
+    }
+
+    private Map<Guid, GlusterAsyncTask> getTaskListForCluster(Guid id) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public GlusterAsyncTask getTask(Guid taskId) {
+        //Get the cluster associated with task and see if host is UP
+        Guid clusterId = getClusterIdForTaskId(taskId);
+        VDS vds = ClusterUtils.getInstance().getUpServer(clusterId);
+        if (vds == null) {
+            log.error("No UP server in cluster");
+            throw new VdcBLLException(VdcBllErrors.NO_UP_SERVER_FOUND);
+        }
+        //TODO: Call VDS command to get status
+        return null;
+    }
+
+    @SuppressWarnings("incomplete-switch")
+    private Guid getClusterIdForTaskId(Guid taskId) {
+        List<Step> steps = 
DbFacade.getInstance().getStepDao().getStepsByExternalId(taskId);
+        for (Step step:steps) {
+            Job job = DbFacade.getInstance().getJobDao().get(step.getJobId());
+            if (job.getJobSubjectEntities() != null) {
+                Iterator<Entry<Guid, VdcObjectType>> iter = 
job.getJobSubjectEntities().entrySet().iterator();
+                while (iter.hasNext()) {
+                    Entry<Guid, VdcObjectType> entry = iter.next();
+                    switch (entry.getValue()) {
+                    case GlusterVolume : //get cluster from glusterVolume
+                        GlusterVolumeEntity vol= 
DbFacade.getInstance().getGlusterVolumeDao().getById(entry.getKey());
+                        return vol.getClusterId();
+                    case VDS :
+                        VdsStatic vds = 
DbFacade.getInstance().getVdsStaticDao().get(entry.getKey());
+                        return vds.getVdsGroupId();
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Gets the list of stored tasks in database where the job is not ended
+     *
+     * @return
+     */
+    public List<Guid> getMonitoredTaskIDsInDB() {
+        List<Guid> externalIds = DbFacade.getInstance().getStepDao().
+                getExternalIdsForRunningSteps(ExternalSystemType.GLUSTER);
+        return externalIds;
+    }
+
+
+}
diff --git 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksSyncJob.java
 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksSyncJob.java
new file mode 100644
index 0000000..99d4267
--- /dev/null
+++ 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksSyncJob.java
@@ -0,0 +1,204 @@
+package org.ovirt.engine.core.bll.gluster.tasks;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.ovirt.engine.core.bll.gluster.GlusterJob;
+import org.ovirt.engine.core.bll.job.ExecutionContext;
+import org.ovirt.engine.core.bll.job.ExecutionHandler;
+import org.ovirt.engine.core.bll.job.JobRepositoryFactory;
+import org.ovirt.engine.core.common.asynctasks.gluster.GlusterAsyncTask;
+import org.ovirt.engine.core.common.asynctasks.gluster.GlusterTaskStatus;
+import org.ovirt.engine.core.common.constants.gluster.GlusterConstants;
+import org.ovirt.engine.core.common.errors.VdcBLLException;
+import org.ovirt.engine.core.common.errors.VdcBllErrors;
+import org.ovirt.engine.core.common.job.JobExecutionStatus;
+import org.ovirt.engine.core.common.job.Step;
+import org.ovirt.engine.core.compat.Guid;
+import org.ovirt.engine.core.dal.dbbroker.DbFacade;
+import org.ovirt.engine.core.dal.job.ExecutionMessageDirector;
+import org.ovirt.engine.core.utils.log.Log;
+import org.ovirt.engine.core.utils.log.LogFactory;
+import org.ovirt.engine.core.utils.timer.OnTimerMethodAnnotation;
+
+public final class GlusterTasksSyncJob extends GlusterJob  {
+    private static final Log log = 
LogFactory.getLog(GlusterTasksSyncJob.class);
+
+    private static final Map<Guid,GlusterAsyncTask> glusterTasks = new 
ConcurrentHashMap<Guid,GlusterAsyncTask>();
+
+    //List of tasks stored in DB but not in cluster
+    private static final List<Guid> orphanTasks = new ArrayList<Guid>();
+
+    private static GlusterTasksSyncJob instance = new GlusterTasksSyncJob();
+
+    private final GlusterTasksService provider = new GlusterTasksService();
+
+    public static GlusterTasksSyncJob getInstance() {
+        return instance;
+    }
+
+
+    public void init() {
+        log.info("Gluster task manager has been initialized");
+    }
+
+
+    @OnTimerMethodAnnotation("gluster_async_task_poll_event")
+    public void updateJobStepStatusForTasks() {
+        log.debug("polling for async gluster tasks");
+        //Get list of running tasks for each cluster and update glusterTasks 
map
+        reloadTasks();
+
+        Iterator<Entry<Guid, GlusterAsyncTask>> iter = 
glusterTasks.entrySet().iterator();
+        while (iter.hasNext()) {
+            Entry<Guid, GlusterAsyncTask> entry = iter.next();
+            Guid taskId = entry.getKey();
+            GlusterAsyncTask task =  entry.getValue();
+            //get step from db for the corresponding gluster task
+            List<Step> steps = 
DbFacade.getInstance().getStepDao().getStepsByExternalId(taskId);
+            //update status in step table
+            for (Step step: steps) {
+                step.setDescription(getTaskMessage(step,task));
+                if (hasTaskCompleted(task)) {
+                    step.markStepEnded(getJobCompletedStatus(task));
+                    endStepJob(step);
+                    iter.remove();
+                } else {
+                    JobRepositoryFactory.getJobRepository().updateStep(step);
+                }
+            }
+        }
+
+    }
+
+    @OnTimerMethodAnnotation("gluster_async_task_clear_event")
+    public void cleanupOrphanTasks() {
+        List<Guid> tasksToBeCleared = orphanTasks;
+        //Gluster tasks may have completed without the engine being aware of it
+        // or gluster host may not be responding.
+        for (Guid taskId: tasksToBeCleared) {
+            try {
+                GlusterAsyncTask task = provider.getTask(taskId);
+                if (task == null) {
+                    //server has no knowledge of this task.
+                    //TODO: Mark job as completed (?) with status unknown for 
each such task
+
+                    //remove from tasks list
+                    removeTask(taskId);
+                }
+            } catch(VdcBLLException e) {
+                if (VdcBllErrors.NO_UP_SERVER_FOUND.equals(e.getErrorCode())) {
+                    //host cannot be reached so ignore. will try again
+                }
+            }
+
+        }
+
+    }
+
+    private void removeTask(Guid taskId) {
+        glusterTasks.remove(taskId);
+    }
+
+    public void addTask(GlusterAsyncTask task) {
+        glusterTasks.put(task.getTaskId(), task);
+    }
+
+    private static void endStepJob(Step step) {
+        JobRepositoryFactory.getJobRepository().updateStep(step);
+        ExecutionContext finalContext = 
ExecutionHandler.createFinalizingContext(step.getId());
+        ExecutionHandler.endTaskJob(finalContext, isTaskSuccess(step));
+    }
+
+    private static boolean isTaskSuccess(Step step) {
+        switch (step.getStatus()) {
+        case ABORTED:
+        case FAILED:
+            return false;
+        case FINISHED:
+            return true;
+        default:
+            return false;
+        }
+    }
+
+    private static JobExecutionStatus getJobCompletedStatus(GlusterAsyncTask 
task) {
+        switch (task.getStatus()) {
+        case ABORTED :
+            return JobExecutionStatus.ABORTED;
+        case FAILED :
+            return JobExecutionStatus.FAILED;
+        case COMPLETED :
+            return JobExecutionStatus.FINISHED;
+        case RUNNING :
+            return JobExecutionStatus.STARTED;
+        default:
+            return JobExecutionStatus.UNKNOWN;
+        }
+
+    }
+
+    private static boolean hasTaskCompleted(GlusterAsyncTask task) {
+        if (GlusterTaskStatus.ABORTED.equals(task.getStatus()) || 
GlusterTaskStatus.COMPLETED.equals(task.getStatus())
+                || GlusterTaskStatus.FAILED.equals(task.getStatus())) {
+            return true;
+        }
+        return false;
+    }
+
+    private static String getTaskMessage(Step step, GlusterAsyncTask task) {
+        if (task==null) {
+            return null;
+        }
+        Map<String, String> values = new HashMap<String, String>();
+        values.put(GlusterConstants.CLUSTER, "CL");
+        values.put(GlusterConstants.VOLUME, "vol");
+        values.put("status", task.getStatus().toString());
+        values.put("info", task.getMessage());
+
+        return ExecutionMessageDirector.resolveStepMessage(step.getStepType(), 
values);
+    }
+
+    private void reloadTasks() {
+        //Populate the list of tasks that need to be monitored from database
+        List<Guid> taskListInDB = provider.getMonitoredTaskIDsInDB();
+        if (taskListInDB == null) {
+            taskListInDB = new ArrayList<Guid>();
+        }
+
+        //Populate gluster running tasks list from UP servers
+        Map<Guid,GlusterAsyncTask> runningTaskList = provider.getAllTasks();
+        if (runningTaskList == null) {
+            runningTaskList = new HashMap<Guid, GlusterAsyncTask>();
+        }
+
+        //if task is in DB but not in running task list
+        final Set<Guid> tasksNotRunning = new HashSet<Guid>(taskListInDB);
+        tasksNotRunning.removeAll(runningTaskList.keySet());
+
+        //if task is in running task list but not in DB.
+        final Set<Guid> tasksNotInDB = new 
HashSet<Guid>(runningTaskList.keySet());
+        tasksNotInDB.removeAll(taskListInDB);
+
+        glusterTasks.putAll(runningTaskList);
+
+        synchronized (this) {
+            orphanTasks.clear();
+            orphanTasks.addAll(new ArrayList<Guid>(tasksNotRunning));
+        }
+
+        for (Guid taskId: tasksNotInDB) {
+            //TODO:
+            //for tasks that have been started but not monitored by engine
+            // add a job to monitor
+        }
+
+    }
+}


-- 
To view, visit http://gerrit.ovirt.org/17965
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ica67b56804cbb892886160f0b22043687277d476
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-engine
Gerrit-Branch: master
Gerrit-Owner: Sahina Bose <sab...@redhat.com>
_______________________________________________
Engine-patches mailing list
Engine-patches@ovirt.org
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to