Sahina Bose has uploaded a new change for review. Change subject: engine: [WIP] Support for gluster task monitoring ......................................................................
engine: [WIP] Support for gluster task monitoring Added GlusterTaskManager that periodically polls all running gluster tasks and updates the corresponding step and job with status and description. Change-Id: Ica67b56804cbb892886160f0b22043687277d476 Signed-off-by: Sahina Bose <sab...@redhat.com> --- M backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/GlusterJobsManager.java A backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksService.java A backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksSyncJob.java 3 files changed, 306 insertions(+), 0 deletions(-) git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/65/17965/1 diff --git a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/GlusterJobsManager.java b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/GlusterJobsManager.java index 615739b..8a182bb 100644 --- a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/GlusterJobsManager.java +++ b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/GlusterJobsManager.java @@ -2,6 +2,7 @@ import java.util.concurrent.TimeUnit; +import org.ovirt.engine.core.bll.gluster.tasks.GlusterTasksSyncJob; import org.ovirt.engine.core.common.config.Config; import org.ovirt.engine.core.common.config.ConfigValues; import org.ovirt.engine.core.common.mode.ApplicationMode; @@ -55,6 +56,13 @@ getRefreshRate(ConfigValues.GlusterRefreshRateLight), getRefreshRate(ConfigValues.GlusterRefreshRateLight), TimeUnit.SECONDS); + + scheduler.scheduleAFixedDelayJob(GlusterTasksSyncJob.getInstance(), + "gluster_async_task_poll_event", new Class[] {}, new Class [] {}, 30, + 30, TimeUnit.SECONDS); + scheduler.scheduleAFixedDelayJob(GlusterTasksSyncJob.getInstance(), + "gluster_async_task_clear_event", new Class[] {}, new Class [] {}, 300, + 300, TimeUnit.SECONDS); } private static boolean glusterModeSupported() { diff --git a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksService.java b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksService.java new file mode 100644 index 0000000..555bed3 --- /dev/null +++ b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksService.java @@ -0,0 +1,94 @@ +package org.ovirt.engine.core.bll.gluster.tasks; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.ovirt.engine.core.bll.utils.ClusterUtils; +import org.ovirt.engine.core.common.VdcObjectType; +import org.ovirt.engine.core.common.asynctasks.gluster.GlusterAsyncTask; +import org.ovirt.engine.core.common.businessentities.VDS; +import org.ovirt.engine.core.common.businessentities.VDSGroup; +import org.ovirt.engine.core.common.businessentities.VdsStatic; +import org.ovirt.engine.core.common.businessentities.gluster.GlusterVolumeEntity; +import org.ovirt.engine.core.common.errors.VdcBLLException; +import org.ovirt.engine.core.common.errors.VdcBllErrors; +import org.ovirt.engine.core.common.job.ExternalSystemType; +import org.ovirt.engine.core.common.job.Job; +import org.ovirt.engine.core.common.job.Step; +import org.ovirt.engine.core.compat.Guid; +import org.ovirt.engine.core.dal.dbbroker.DbFacade; +import org.ovirt.engine.core.utils.log.Log; +import org.ovirt.engine.core.utils.log.LogFactory; + +public class GlusterTasksService { + private static final Log log = LogFactory.getLog(GlusterTasksService.class); + + public Map<Guid, GlusterAsyncTask> getAllTasks() { + Map<Guid,GlusterAsyncTask> map = new HashMap<Guid,GlusterAsyncTask>(); + // For each cluster, query gluster for list of running tasks. + List<VDSGroup> clusters = DbFacade.getInstance().getVdsGroupDao().getAll(); + for (VDSGroup cluster: clusters) { + Map<Guid,GlusterAsyncTask> clusterTasks = getTaskListForCluster(cluster.getId()); + if (clusterTasks != null) { + map.putAll(getTaskListForCluster(cluster.getId())); + } + } + return map; + } + + private Map<Guid, GlusterAsyncTask> getTaskListForCluster(Guid id) { + // TODO Auto-generated method stub + return null; + } + + public GlusterAsyncTask getTask(Guid taskId) { + //Get the cluster associated with task and see if host is UP + Guid clusterId = getClusterIdForTaskId(taskId); + VDS vds = ClusterUtils.getInstance().getUpServer(clusterId); + if (vds == null) { + log.error("No UP server in cluster"); + throw new VdcBLLException(VdcBllErrors.NO_UP_SERVER_FOUND); + } + //TODO: Call VDS command to get status + return null; + } + + @SuppressWarnings("incomplete-switch") + private Guid getClusterIdForTaskId(Guid taskId) { + List<Step> steps = DbFacade.getInstance().getStepDao().getStepsByExternalId(taskId); + for (Step step:steps) { + Job job = DbFacade.getInstance().getJobDao().get(step.getJobId()); + if (job.getJobSubjectEntities() != null) { + Iterator<Entry<Guid, VdcObjectType>> iter = job.getJobSubjectEntities().entrySet().iterator(); + while (iter.hasNext()) { + Entry<Guid, VdcObjectType> entry = iter.next(); + switch (entry.getValue()) { + case GlusterVolume : //get cluster from glusterVolume + GlusterVolumeEntity vol= DbFacade.getInstance().getGlusterVolumeDao().getById(entry.getKey()); + return vol.getClusterId(); + case VDS : + VdsStatic vds = DbFacade.getInstance().getVdsStaticDao().get(entry.getKey()); + return vds.getVdsGroupId(); + } + } + } + } + return null; + } + + /** + * Gets the list of stored tasks in database where the job is not ended + * + * @return + */ + public List<Guid> getMonitoredTaskIDsInDB() { + List<Guid> externalIds = DbFacade.getInstance().getStepDao(). + getExternalIdsForRunningSteps(ExternalSystemType.GLUSTER); + return externalIds; + } + + +} diff --git a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksSyncJob.java b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksSyncJob.java new file mode 100644 index 0000000..99d4267 --- /dev/null +++ b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/gluster/tasks/GlusterTasksSyncJob.java @@ -0,0 +1,204 @@ +package org.ovirt.engine.core.bll.gluster.tasks; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.ovirt.engine.core.bll.gluster.GlusterJob; +import org.ovirt.engine.core.bll.job.ExecutionContext; +import org.ovirt.engine.core.bll.job.ExecutionHandler; +import org.ovirt.engine.core.bll.job.JobRepositoryFactory; +import org.ovirt.engine.core.common.asynctasks.gluster.GlusterAsyncTask; +import org.ovirt.engine.core.common.asynctasks.gluster.GlusterTaskStatus; +import org.ovirt.engine.core.common.constants.gluster.GlusterConstants; +import org.ovirt.engine.core.common.errors.VdcBLLException; +import org.ovirt.engine.core.common.errors.VdcBllErrors; +import org.ovirt.engine.core.common.job.JobExecutionStatus; +import org.ovirt.engine.core.common.job.Step; +import org.ovirt.engine.core.compat.Guid; +import org.ovirt.engine.core.dal.dbbroker.DbFacade; +import org.ovirt.engine.core.dal.job.ExecutionMessageDirector; +import org.ovirt.engine.core.utils.log.Log; +import org.ovirt.engine.core.utils.log.LogFactory; +import org.ovirt.engine.core.utils.timer.OnTimerMethodAnnotation; + +public final class GlusterTasksSyncJob extends GlusterJob { + private static final Log log = LogFactory.getLog(GlusterTasksSyncJob.class); + + private static final Map<Guid,GlusterAsyncTask> glusterTasks = new ConcurrentHashMap<Guid,GlusterAsyncTask>(); + + //List of tasks stored in DB but not in cluster + private static final List<Guid> orphanTasks = new ArrayList<Guid>(); + + private static GlusterTasksSyncJob instance = new GlusterTasksSyncJob(); + + private final GlusterTasksService provider = new GlusterTasksService(); + + public static GlusterTasksSyncJob getInstance() { + return instance; + } + + + public void init() { + log.info("Gluster task manager has been initialized"); + } + + + @OnTimerMethodAnnotation("gluster_async_task_poll_event") + public void updateJobStepStatusForTasks() { + log.debug("polling for async gluster tasks"); + //Get list of running tasks for each cluster and update glusterTasks map + reloadTasks(); + + Iterator<Entry<Guid, GlusterAsyncTask>> iter = glusterTasks.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Guid, GlusterAsyncTask> entry = iter.next(); + Guid taskId = entry.getKey(); + GlusterAsyncTask task = entry.getValue(); + //get step from db for the corresponding gluster task + List<Step> steps = DbFacade.getInstance().getStepDao().getStepsByExternalId(taskId); + //update status in step table + for (Step step: steps) { + step.setDescription(getTaskMessage(step,task)); + if (hasTaskCompleted(task)) { + step.markStepEnded(getJobCompletedStatus(task)); + endStepJob(step); + iter.remove(); + } else { + JobRepositoryFactory.getJobRepository().updateStep(step); + } + } + } + + } + + @OnTimerMethodAnnotation("gluster_async_task_clear_event") + public void cleanupOrphanTasks() { + List<Guid> tasksToBeCleared = orphanTasks; + //Gluster tasks may have completed without the engine being aware of it + // or gluster host may not be responding. + for (Guid taskId: tasksToBeCleared) { + try { + GlusterAsyncTask task = provider.getTask(taskId); + if (task == null) { + //server has no knowledge of this task. + //TODO: Mark job as completed (?) with status unknown for each such task + + //remove from tasks list + removeTask(taskId); + } + } catch(VdcBLLException e) { + if (VdcBllErrors.NO_UP_SERVER_FOUND.equals(e.getErrorCode())) { + //host cannot be reached so ignore. will try again + } + } + + } + + } + + private void removeTask(Guid taskId) { + glusterTasks.remove(taskId); + } + + public void addTask(GlusterAsyncTask task) { + glusterTasks.put(task.getTaskId(), task); + } + + private static void endStepJob(Step step) { + JobRepositoryFactory.getJobRepository().updateStep(step); + ExecutionContext finalContext = ExecutionHandler.createFinalizingContext(step.getId()); + ExecutionHandler.endTaskJob(finalContext, isTaskSuccess(step)); + } + + private static boolean isTaskSuccess(Step step) { + switch (step.getStatus()) { + case ABORTED: + case FAILED: + return false; + case FINISHED: + return true; + default: + return false; + } + } + + private static JobExecutionStatus getJobCompletedStatus(GlusterAsyncTask task) { + switch (task.getStatus()) { + case ABORTED : + return JobExecutionStatus.ABORTED; + case FAILED : + return JobExecutionStatus.FAILED; + case COMPLETED : + return JobExecutionStatus.FINISHED; + case RUNNING : + return JobExecutionStatus.STARTED; + default: + return JobExecutionStatus.UNKNOWN; + } + + } + + private static boolean hasTaskCompleted(GlusterAsyncTask task) { + if (GlusterTaskStatus.ABORTED.equals(task.getStatus()) || GlusterTaskStatus.COMPLETED.equals(task.getStatus()) + || GlusterTaskStatus.FAILED.equals(task.getStatus())) { + return true; + } + return false; + } + + private static String getTaskMessage(Step step, GlusterAsyncTask task) { + if (task==null) { + return null; + } + Map<String, String> values = new HashMap<String, String>(); + values.put(GlusterConstants.CLUSTER, "CL"); + values.put(GlusterConstants.VOLUME, "vol"); + values.put("status", task.getStatus().toString()); + values.put("info", task.getMessage()); + + return ExecutionMessageDirector.resolveStepMessage(step.getStepType(), values); + } + + private void reloadTasks() { + //Populate the list of tasks that need to be monitored from database + List<Guid> taskListInDB = provider.getMonitoredTaskIDsInDB(); + if (taskListInDB == null) { + taskListInDB = new ArrayList<Guid>(); + } + + //Populate gluster running tasks list from UP servers + Map<Guid,GlusterAsyncTask> runningTaskList = provider.getAllTasks(); + if (runningTaskList == null) { + runningTaskList = new HashMap<Guid, GlusterAsyncTask>(); + } + + //if task is in DB but not in running task list + final Set<Guid> tasksNotRunning = new HashSet<Guid>(taskListInDB); + tasksNotRunning.removeAll(runningTaskList.keySet()); + + //if task is in running task list but not in DB. + final Set<Guid> tasksNotInDB = new HashSet<Guid>(runningTaskList.keySet()); + tasksNotInDB.removeAll(taskListInDB); + + glusterTasks.putAll(runningTaskList); + + synchronized (this) { + orphanTasks.clear(); + orphanTasks.addAll(new ArrayList<Guid>(tasksNotRunning)); + } + + for (Guid taskId: tasksNotInDB) { + //TODO: + //for tasks that have been started but not monitored by engine + // add a job to monitor + } + + } +} -- To view, visit http://gerrit.ovirt.org/17965 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ica67b56804cbb892886160f0b22043687277d476 Gerrit-PatchSet: 1 Gerrit-Project: ovirt-engine Gerrit-Branch: master Gerrit-Owner: Sahina Bose <sab...@redhat.com> _______________________________________________ Engine-patches mailing list Engine-patches@ovirt.org http://lists.ovirt.org/mailman/listinfo/engine-patches