多线程任务设计

沙落雁 关注

收藏于 : 2017-11-22 10:20   被转藏 : 1   

/**

*任务管理类

*/

public class TaskManager {

 

/**

* 日志处理类

*/

private static Log logger = LogFactory.getLog(TaskManager.class);

 

/**

* 线程池任务执行对象

*/

private ThreadPoolTaskExecutor taskExecutor;

 

/**

* 任务结果的保存位置

*/

private final static String resultSavePath = Constants.getDownloadPath()

+ File.separator + Constants.get(Constants.TASK_TEMP_PATH);

 

/**

* 任务执行结果序列化成文件的文件名

*/

private static final String RESULT_FILE_NAME = "result";

 

/**

* 任务执行过程中产生的文件被保存的目录名

*/

private static final String FILES_DIR = "files";

 

/**

* 将任务添加至线程池

* @param task 任务对象

* @param params 执行任务所需的参数

* @param operatorId 当前的操作员编号

* @return 如果添加成功返回任务编号,如果服务器繁忙返回null

*/

public String addTask(AbstractTask task, Map<String, Object> params, String operatorId) {

if (resultSavePath == null || resultSavePath.length() < 1) {

if (logger.isErrorEnabled()) {

logger.error("未配置保存目录");

}

return null;

}

//如果保存目录不存在,则自动创建此目录

File saveDir = new File(resultSavePath);

if (!saveDir.exists()) {

saveDir.mkdirs();

if (saveDir.exists()) {

if (logger.isDebugEnabled()) {

logger.debug("成功创建保存目录");

}

} else {

if (logger.isErrorEnabled()) {

logger.error("配置目录出错");

return null;

}

}

}

 

//如果线程池已满

if (taskExecutor.getThreadPoolExecutor().getQueue().remainingCapacity() == 0) {

return null;

}

//设置任务执行时所需的参数

task.setParams(params);

//产生任务编号

String taskId = generateTaskId(operatorId);

//创建任务执行时产生的文件存放目录

String savePath = resultSavePath + File.separator + taskId + File.separator + FILES_DIR + File.separator;

createDir(savePath);

//设置任务执行时产生的文件存放目录

task.setSavePath(savePath);

//产生任务线程对象

TaskThread t = new TaskThread(taskId, task);

//将任务添加至连接池

taskExecutor.execute(t);

return taskId;

}

 

/**

* 获取任务执行的结果

* @param taskId 任务编号

* @return 如果结果不存在,则返回null,否则返回结果对象

* @throws IOException 

*/

public TaskResult getTaskResult(String taskId) throws IOException {

File resultFile = new File(resultSavePath + File.separator + taskId + File.separator + RESULT_FILE_NAME + File.separator + RESULT_FILE_NAME);

if (!resultFile.exists()) {//如果结果文件不存在

return null;

}

 

FileInputStream fis = new FileInputStream(resultFile);

ObjectInputStream ois = new ObjectInputStream(fis);

try {

Object obj = ois.readObject();

ois.close();

fis.close();

if (obj == null) {//如果读取结果为空,则返回null

logger.info("result File content is null!");

return null;

} else {

TaskResult result = (TaskResult) obj;

return result;

}

} catch (ClassNotFoundException e) {

throw new RuntimeException(e);

} finally {

if (ois != null) {

ois.close();

}

if (fis != null) {

fis.close();

}

}

}

 

/**

* 获取任务执行过程中产生的文件

* @param taskId 任务编号

* @param fileName 要取的文件名

* @return 如果结果不存在,则返回null,否则返回文件对象

* @throws IOException 

*/

public File getTaskResultFile(String taskId, String fileName) throws IOException {

File reportFile = new File(resultSavePath + File.separator + taskId + File.separator + FILES_DIR + File.separator + fileName);

if (!reportFile.exists()) {//如果文件不存在

File resultFile = new File(resultSavePath + File.separator + taskId + File.separator + RESULT_FILE_NAME + File.separator + RESULT_FILE_NAME);

FileInputStream fis = new FileInputStream(resultFile);

ObjectInputStream ois = new ObjectInputStream(fis);

try {

Object obj = ois.readObject();

ois.close();

fis.close();

if (obj == null) {//如果读取结果为空,则返回null

logger.info("result File content is null!");

return null;

} else {

TaskResult result = (TaskResult) obj;

logger.error("task " + result.getTaskClassName() + " not generate report file!");

}

} catch (ClassNotFoundException e) {

throw new RuntimeException(e);

} finally {

if (ois != null) {

ois.close();

}

if (fis != null) {

fis.close();

}

}

return null;

}

return reportFile;

}

 

/**

* 创建目录-支持多级

* @param path 目录的路径

*/

private void createDir(String path) {

File dir = new File(path);

dir.mkdirs();

}

 

/**

* 产生任务唯一的编号

* @param operatorId 当前操作员

* @return 返回生成的编号

*/

private String generateTaskId(String operatorId) {

Date now = new Date();

SimpleDateFormat sdf = new SimpleDateFormat(Constants.YYYY_MM_DD_SSS);

if (operatorId == null || operatorId.length() < 1) {

return sdf.format(now) + Toolkit.getUuidRandomizer().generate();

} else {

return sdf.format(now) + operatorId + Toolkit.getUuidRandomizer().generate();

}

}

 

public void setTaskExecutor(ThreadPoolTaskExecutor taskExecutor) {

this.taskExecutor = taskExecutor;

}

 

/**

* 封装任务的任务线程对象

* @author oscar.xie

*

*/

private class TaskThread implements Runnable {

/**

* 任务编号-由任务管理器分配

*/

private String taskId;

 

/**

* 任务对象

*/

private AbstractTask task;

 

public TaskThread(String taskId, AbstractTask task) {

this.taskId = taskId;

this.task = task;

}

 

/**

* 任务执行方法

*/

public void run() {

TaskResult result = task.execute();

if (result != null) {

result.setTaskClassName(task.getClass().getName());

String savePath = resultSavePath + File.separator + this.taskId + File.separator + RESULT_FILE_NAME;

createDir(savePath);

File resultFile = new File(savePath + File.separator + RESULT_FILE_NAME);

FileOutputStream fos = null;

ObjectOutputStream oos = null;

try {

fos = new FileOutputStream(resultFile);

oos = new ObjectOutputStream(fos);

oos.writeObject(result);

oos.flush();

} catch (Exception e) { //一般不会出现此异常

logger.error("write task result File failure:" + e.getMessage());

} finally {

try {

if (oos != null) {

oos.close();

}

if (fos != null) {

fos.close();

}

} catch (Exception e) {

throw new RuntimeException(e);

}

}

} else {

logger.info("result is null************************************!");

}

}

}

}

</div>

<div>

<!-- 异步任务线程操作配置 -->

<bean id="threadPoolTaskExecutor" >

<property name="corePoolSize" value="30" />  <!-- 核心线程池大小 -->

<property name="keepAliveSeconds" value="200" />  <!-- 保持活跃的秒数 -->

<property name="maxPoolSize" value="35" />  <!-- 最大线程池大小 -->

<property name="queueCapacity" value="1" />  <!-- 等待队列大小 -->

</bean>

 

<!-- 异步任务管理器 -->

<bean id="taskManager" >

<property name="taskExecutor" ref="threadPoolTaskExecutor"></property>

</bean>

 

 

/**

 * 抽象任务类-用于需要长时间运行的业务操作

 * @author oscar.xie

 *

 */

public abstract class AbstractTask {

 

/**

* 任务执行时的所需的参数

*/

protected Map<String, Object> params;

 

/**

* 任务执行过程中产生的文件的存放目录

*/

protected String savePath;

 

/**

* 要执行的任务操作

* @return 执行的结果对象

*/

public abstract TaskResult execute();

 

public Map<String, Object> getParams() {

return params;

}

 

public void setParams(Map<String, Object> params) {

this.params = params;

}

 

public void setSavePath(String savePath) {

this.savePath = savePath;

}

}

 

 

/**

 * 任务执行的结果类

 * @author oscar.xie

 *

 */

public class TaskResult implements Serializable {

 

private static final long serialVersionUID = -4396986540034886297L;

 

/**

* 任务是否成功执行

*/

private Boolean success;

 

/**

* 任务执行的结果

*/

private String result;

 

/**

* 任务执行的类名-用于发生错误时找到处理类

*/

private String taskClassName;

 

public TaskResult() {

 

}

 

public TaskResult(Boolean success, String result) {

this.success = success;

this.result = result;

}

 

public String getResult() {

return result;

}

 

public void setResult(String result) {

this.result = result;

}

 

public Boolean getSuccess() {

return success;

}

 

public void setSuccess(Boolean success) {

this.success = success;

}

 

public String getTaskClassName() {

return taskClassName;

}

 

public void setTaskClassName(String taskClassName) {

this.taskClassName = taskClassName;

}

}

 

 

 阅读文章全部内容  
点击查看
文章点评
相关文章
沙落雁 关注

文章收藏:1313

TA的最新收藏