SyncTmpJob.java 3.09 KB
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
package com.lyms.cm.job;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;

import com.alibaba.fastjson.JSONObject;
import com.lyms.sync.SyncCallback;
import com.lyms.sync.SyncCenter;
import com.lyms.sync.SyncCenter.Work;
import com.lyms.sync.channel.ChannelData;
import com.lyms.sync.queue.SyncQueue;
import com.lyms.util.HttpUtils;
import com.lyms.util.JsonUtils;

/**
* <li>@ClassName: SycnJob
* <li>@Description: 同步任务执行器,依赖spring
* <li>@author maliang
* <li>@date 2017年3月14日
* <li>
*/
public class SyncTmpJob {

private Logger LOG = LoggerFactory.getLogger(getClass());

private final AtomicBoolean reset = new AtomicBoolean(false);

@Autowired
public SyncCenter center;
@Autowired
public RedisTemplate<Serializable, Serializable> template;
/**
* 初始化的时候将错误队列内容迁移到临时任务队列
*/
private static boolean initLoadErrorToTempWork = true;

public void excute() {
if(initLoadErrorToTempWork){
errorWorkToTempWork();
}
if (reset.get()) {
return;
}
reset.set(true);
// 拉取远端数据
//LOG.debug("执行推送远程数据任务...");
final Work work = center.buildWork();
try {
work.pullTmp(new SyncCallback() {
@Override
public void callBack(Object object) {
if (object instanceof ChannelData) {
ChannelData model = (ChannelData) object;
if (model != null) {
// 任务发送执行
String result = HttpUtils.REMOTE.post(model);
if(result == null){//远程无法访问等情况
work.backPressure();
return;
}
model = JsonUtils.jsonToBean(result, ChannelData.class);
if (model != null && model.getAck()) {
work.backPressure();
}
}
}
}
});
} catch (Exception e) {
LOG.error("同步数据错误 {}", e);
work.backPressure();
} finally {
reset.set(false);
}
}
/**
* <li>@Description:错误队列迁移到临时任务队列
* <li>
* <li>创建人:方承
* <li>创建时间:2017年5月5日
* <li>修改人:
* <li>修改时间:
*/
public void errorWorkToTempWork(){
initLoadErrorToTempWork = false;
LOG.info("=============开始==迁移同步队列【错误队列 转移到 临时任务队列】============");
boolean hasNext = true;
while(hasNext){
Serializable object = template.opsForList().rightPop(SyncQueue.TMP_WORK_ERROR);
if(object != null){
ChannelData data = (ChannelData) object;
LOG.info(JSONObject.toJSONString(data));
data.setErrorRetryCount(0);
template.opsForList().leftPush(SyncQueue.TMP_WORK, data);
}else{
hasNext = false;
}
}
LOG.info("=============结束==迁移同步队列【错误队列 转移到 临时任务队列】============");
return;
}
}