/**
*任务管理类
*/
public class TaskManager {
/**
* 日志处理类
*/
private static Log logger = LogFactory.getLog(TaskManager.class);
/**
* 线程池任务执行对象
*/
private ThreadPoolTaskExecutor taskExecutor;
/**
* 任务结果的保存位置
*/
private final static String resultSavePath = Constants.getDownloadPath()
+ File.separator + Constants.get(Constants.TASK_TEMP_PATH);
/**
* 任务执行结果序列化成文件的文件名
*/
private static final String RESULT_FILE_NAME = "result";
/**
* 任务执行过程中产生的文件被保存的目录名
*/
private static final String FILES_DIR = "files";
/**
* 将任务添加至线程池
* @param task 任务对象
* @param params 执行任务所需的参数
* @param operatorId 当前的操作员编号
* @return 如果添加成功返回任务编号,如果服务器繁忙返回null
*/
public String addTask(AbstractTask task, Map<String, Object> params, String operatorId) {
if (resultSavePath == null || resultSavePath.length() < 1) {
if (logger.isErrorEnabled()) {
logger.error("未配置保存目录");
}
return null;
}
//如果保存目录不存在,则自动创建此目录
File saveDir = new File(resultSavePath);
if (!saveDir.exists()) {
saveDir.mkdirs();
if (saveDir.exists()) {
if (logger.isDebugEnabled()) {
logger.debug("成功创建保存目录");
}
} else {
if (logger.isErrorEnabled()) {
logger.error("配置目录出错");
return null;
}
}
}
//如果线程池已满
if (taskExecutor.getThreadPoolExecutor().getQueue().remainingCapacity() == 0) {
return null;
}
//设置任务执行时所需的参数
task.setParams(params);
//产生任务编号
String taskId = generateTaskId(operatorId);
//创建任务执行时产生的文件存放目录
String savePath = resultSavePath + File.separator + taskId + File.separator + FILES_DIR + File.separator;
createDir(savePath);
//设置任务执行时产生的文件存放目录
task.setSavePath(savePath);
//产生任务线程对象
TaskThread t = new TaskThread(taskId, task);
//将任务添加至连接池
taskExecutor.execute(t);
return taskId;
}
/**
* 获取任务执行的结果
* @param taskId 任务编号
* @return 如果结果不存在,则返回null,否则返回结果对象
* @throws IOException
*/
public TaskResult getTaskResult(String taskId) throws IOException {
File resultFile = new File(resultSavePath + File.separator + taskId + File.separator + RESULT_FILE_NAME + File.separator + RESULT_FILE_NAME);
if (!resultFile.exists()) {//如果结果文件不存在
return null;
}
FileInputStream fis = new FileInputStream(resultFile);
ObjectInputStream ois = new ObjectInputStream(fis);
try {
Object obj = ois.readObject();
ois.close();
fis.close();
if (obj == null) {//如果读取结果为空,则返回null
logger.info("result File content is null!");
return null;
} else {
TaskResult result = (TaskResult) obj;
return result;
}
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} finally {
if (ois != null) {
ois.close();
}
if (fis != null) {
fis.close();
}
}
}
/**
* 获取任务执行过程中产生的文件
* @param taskId 任务编号
* @param fileName 要取的文件名
* @return 如果结果不存在,则返回null,否则返回文件对象
* @throws IOException
*/
public File getTaskResultFile(String taskId, String fileName) throws IOException {
File reportFile = new File(resultSavePath + File.separator + taskId + File.separator + FILES_DIR + File.separator + fileName);
if (!reportFile.exists()) {//如果文件不存在
File resultFile = new File(resultSavePath + File.separator + taskId + File.separator + RESULT_FILE_NAME + File.separator + RESULT_FILE_NAME);
FileInputStream fis = new FileInputStream(resultFile);
ObjectInputStream ois = new ObjectInputStream(fis);
try {
Object obj = ois.readObject();
ois.close();
fis.close();
if (obj == null) {//如果读取结果为空,则返回null
logger.info("result File content is null!");
return null;
} else {
TaskResult result = (TaskResult) obj;
logger.error("task " + result.getTaskClassName() + " not generate report file!");
}
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} finally {
if (ois != null) {
ois.close();
}
if (fis != null) {
fis.close();
}
}
return null;
}
return reportFile;
}
/**
* 创建目录-支持多级
* @param path 目录的路径
*/
private void createDir(String path) {
File dir = new File(path);
dir.mkdirs();
}
/**
* 产生任务唯一的编号
* @param operatorId 当前操作员
* @return 返回生成的编号
*/
private String generateTaskId(String operatorId) {
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat(Constants.YYYY_MM_DD_SSS);
if (operatorId == null || operatorId.length() < 1) {
return sdf.format(now) + Toolkit.getUuidRandomizer().generate();
} else {
return sdf.format(now) + operatorId + Toolkit.getUuidRandomizer().generate();
}
}
public void setTaskExecutor(ThreadPoolTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* 封装任务的任务线程对象
* @author oscar.xie
*
*/
private class TaskThread implements Runnable {
/**
* 任务编号-由任务管理器分配
*/
private String taskId;
/**
* 任务对象
*/
private AbstractTask task;
public TaskThread(String taskId, AbstractTask task) {
this.taskId = taskId;
this.task = task;
}
/**
* 任务执行方法
*/
public void run() {
TaskResult result = task.execute();
if (result != null) {
result.setTaskClassName(task.getClass().getName());
String savePath = resultSavePath + File.separator + this.taskId + File.separator + RESULT_FILE_NAME;
createDir(savePath);
File resultFile = new File(savePath + File.separator + RESULT_FILE_NAME);
FileOutputStream fos = null;
ObjectOutputStream oos = null;
try {
fos = new FileOutputStream(resultFile);
oos = new ObjectOutputStream(fos);
oos.writeObject(result);
oos.flush();
} catch (Exception e) { //一般不会出现此异常
logger.error("write task result File failure:" + e.getMessage());
} finally {
try {
if (oos != null) {
oos.close();
}
if (fos != null) {
fos.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
} else {
logger.info("result is null************************************!");
}
}
}
}
</div>
<div>
<!-- 异步任务线程操作配置 -->
<bean id="threadPoolTaskExecutor" >
<property name="corePoolSize" value="30" /> <!-- 核心线程池大小 -->
<property name="keepAliveSeconds" value="200" /> <!-- 保持活跃的秒数 -->
<property name="maxPoolSize" value="35" /> <!-- 最大线程池大小 -->
<property name="queueCapacity" value="1" /> <!-- 等待队列大小 -->
</bean>
<!-- 异步任务管理器 -->
<bean id="taskManager" >
<property name="taskExecutor" ref="threadPoolTaskExecutor"></property>
</bean>
/**
* 抽象任务类-用于需要长时间运行的业务操作
* @author oscar.xie
*
*/
public abstract class AbstractTask {
/**
* 任务执行时的所需的参数
*/
protected Map<String, Object> params;
/**
* 任务执行过程中产生的文件的存放目录
*/
protected String savePath;
/**
* 要执行的任务操作
* @return 执行的结果对象
*/
public abstract TaskResult execute();
public Map<String, Object> getParams() {
return params;
}
public void setParams(Map<String, Object> params) {
this.params = params;
}
public void setSavePath(String savePath) {
this.savePath = savePath;
}
}
/**
* 任务执行的结果类
* @author oscar.xie
*
*/
public class TaskResult implements Serializable {
private static final long serialVersionUID = -4396986540034886297L;
/**
* 任务是否成功执行
*/
private Boolean success;
/**
* 任务执行的结果
*/
private String result;
/**
* 任务执行的类名-用于发生错误时找到处理类
*/
private String taskClassName;
public TaskResult() {
}
public TaskResult(Boolean success, String result) {
this.success = success;
this.result = result;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
public Boolean getSuccess() {
return success;
}
public void setSuccess(Boolean success) {
this.success = success;
}
public String getTaskClassName() {
return taskClassName;
}
public void setTaskClassName(String taskClassName) {
this.taskClassName = taskClassName;
}
}