package com.lyms.cm.job; import java.io.Serializable; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import com.alibaba.fastjson.JSONObject; import com.lyms.sync.SyncCallback; import com.lyms.sync.SyncCenter; import com.lyms.sync.SyncCenter.Work; import com.lyms.sync.channel.ChannelData; import com.lyms.sync.queue.SyncQueue; import com.lyms.util.HttpUtils; import com.lyms.util.JsonUtils; /** *
  • @ClassName: SycnJob *
  • @Description: 同步任务执行器,依赖spring *
  • @author maliang *
  • @date 2017年3月14日 *
  • */ public class SyncTmpJob { private Logger LOG = LoggerFactory.getLogger(getClass()); private final AtomicBoolean reset = new AtomicBoolean(false); @Autowired public SyncCenter center; @Autowired public RedisTemplate template; /** * 初始化的时候将错误队列内容迁移到临时任务队列 */ private static boolean initLoadErrorToTempWork = true; public void excute() { if(initLoadErrorToTempWork){ errorWorkToTempWork(); } if (reset.get()) { return; } reset.set(true); // 拉取远端数据 //LOG.debug("执行推送远程数据任务..."); final Work work = center.buildWork(); try { work.pullTmp(new SyncCallback() { @Override public void callBack(Object object) { if (object instanceof ChannelData) { ChannelData model = (ChannelData) object; if (model != null) { // 任务发送执行 String result = HttpUtils.REMOTE.post(model); if(result == null){//远程无法访问等情况 work.backPressure(); return; } model = JsonUtils.jsonToBean(result, ChannelData.class); if (model != null && model.getAck()) { work.backPressure(); } } } } }); } catch (Exception e) { LOG.error("同步数据错误 {}", e); work.backPressure(); } finally { reset.set(false); } } /** *
  • @Description:错误队列迁移到临时任务队列 *
  • *
  • 创建人:方承 *
  • 创建时间:2017年5月5日 *
  • 修改人: *
  • 修改时间: */ public void errorWorkToTempWork(){ initLoadErrorToTempWork = false; LOG.info("=============开始==迁移同步队列【错误队列 转移到 临时任务队列】============"); boolean hasNext = true; while(hasNext){ Serializable object = template.opsForList().rightPop(SyncQueue.TMP_WORK_ERROR); if(object != null){ ChannelData data = (ChannelData) object; LOG.info(JSONObject.toJSONString(data)); data.setErrorRetryCount(0); template.opsForList().leftPush(SyncQueue.TMP_WORK, data); }else{ hasNext = false; } } LOG.info("=============结束==迁移同步队列【错误队列 转移到 临时任务队列】============"); return; } }