SyncFixJob.java 4.89 KB
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
package com.lyms.cm.job;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;

import com.alibaba.fastjson.JSONException;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.lyms.base.common.entity.organ.OrganGroup;
import com.lyms.base.common.service.organ.OrganGroupService;
import com.lyms.base.common.service.sys.impl.SyncDataBasicServiceImpl;
import com.lyms.sync.ParamsAdpter;
import com.lyms.sync.SyncCallback;
import com.lyms.sync.SyncCenter;
import com.lyms.sync.SyncCenter.Work;
import com.lyms.sync.SyncHandler;
import com.lyms.sync.channel.ChannelData;
import com.lyms.sync.queue.SyncQueue;
import com.lyms.util.HttpUtils;
import com.lyms.util.InstanceUtils;
import com.lyms.util.JsonUtils;
import com.lyms.util.StrUtils;

/**
* <li>@ClassName: SycnJob
* <li>@Description: 同步任务执行器,依赖spring
* <li>@author maliang
* <li>@date 2017年3月14日
* <li>
*/
public class SyncFixJob {
private Logger LOG = LoggerFactory.getLogger(getClass());

@Autowired
public SyncCenter center;
@Autowired
public OrganGroupService organGroupService;
@Autowired
public RedisTemplate<Serializable, Serializable> template;
private static List<String> pullUrlList = null;
private static String remoteClazz = SyncDataBasicServiceImpl.class.getName();
private static String remoteMethod = "selectOne";

private final AtomicBoolean reset = new AtomicBoolean(false);

public void excute() {
if( pullUrlList == null ){
LOG.info("初始化固定拉取远程任务");
template.delete(SyncQueue.FIXATION_WORK);
LOG.info("清空固定拉取远程任务");
List<OrganGroup> groupList = organGroupService.selectList(new EntityWrapper<OrganGroup>().where("IFDEL=0"));
pullUrlList = InstanceUtils.newArrayList();
for(OrganGroup entity : groupList){
if(StrUtils.isNotEmpty(entity.getPulladdress()) && entity.getPulladdress().startsWith("http")){
LOG.info("添加固定拉取远程任务:医院组={},pullAddress={}",entity.getName(),entity.getPulladdress());
if( !pullUrlList.contains(entity.getPulladdress().trim()) ){
pullUrlList.add(entity.getPulladdress().trim());
center.pushFix(StrUtils.uuid(), entity.getPulladdress().trim(), remoteClazz, remoteMethod, null);
}
}
}
}
if (reset.get()) {
return;
}
reset.set(true);
// 拉取远端数据
//LOG.debug("执行拉取远程数据任务...");
final Work work = center.buildWork();
try {
work.pullFix(new SyncCallback() {
@Override
public void callBack(Object object) {
if (object instanceof ChannelData) {
ChannelData oldData = (ChannelData) object;
if (oldData != null && StringUtils.isNotEmpty(oldData.getId())) {
// 任务发送执行
//LOG.info("拉取到本地任务: " + ToStringBuilder.reflectionToString(oldData));
String result = HttpUtils.REMOTE.post(oldData);
//LOG.info("返回结果: " + result);
if (StringUtils.isBlank(result))
return;
ChannelData data = null;
try{
data = JsonUtils.jsonToBean(result, ChannelData.class);
}catch(JSONException e){
LOG.error("同步数据转换异常,请求结果:{} ",result);
return;
}
// 处理回执消息
if (data != null && data.getAck() && StrUtils.isNotEmpty(data.getRemoteClazz()) && StrUtils.isNotEmpty(data.getRemoteMethod())) {
// 拉取到数据执行本地方法
LOG.info("拉取到本地任务: " + ToStringBuilder.reflectionToString(oldData));
LOG.info("返回结果: " + result);
System.out.println("执行: " + ToStringBuilder.reflectionToString(data));
try{
Object handlResult = SyncHandler.handler(data);
// 要求处理成功都必须返回true | false;
Boolean tag = (Boolean) handlResult;
if (tag) {
data.setRemote(oldData.getRemote());
data.setData(ParamsAdpter.builder().push(data.getId()).toJsonString());
data.setAck(true);
// 处理成功回执消息
data.setRemoteClazz("com.lyms.hospital.service.sys.impl.SyncDataBasicServiceImpl");
data.setRemoteMethod("updateBasic");
work.ack(data);
}
}catch (Exception e) {
LOG.error("拉取任务执行失败:{}",e);
}
}
}
}
}
});
} catch (Exception e) {
LOG.error("同步数据错误 {}", e);
} finally {
reset.set(false);
}
}

}