diff --git a/center.manager/src/main/java/com/lyms/cm/controller/sys/SysRemoteController.java b/center.manager/src/main/java/com/lyms/cm/controller/sys/SysRemoteController.java index a2c89bd..5c7bf49 100644 --- a/center.manager/src/main/java/com/lyms/cm/controller/sys/SysRemoteController.java +++ b/center.manager/src/main/java/com/lyms/cm/controller/sys/SysRemoteController.java @@ -9,6 +9,8 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import com.lyms.cm.entity.sys.SysUsers; +import com.lyms.synch.SyncUtils; +import com.lyms.synch.entity.AckObj; import com.lyms.synch.entity.SyncnModel; import com.lyms.web.controller.BaseController; @@ -19,13 +21,14 @@ public class SysRemoteController extends BaseController { @RequestMapping(value = "/push") @ResponseBody public String push(HttpServletRequest request) throws ClassNotFoundException, IOException { - SyncnModel model = conver(request); - SysUsers user = (SysUsers) model.getData(); - System.out.println(user.getRoles().getName()); - // model - // service - - System.out.println(user.getName() + " " + model.getMethod()); + SyncnModel model = SyncUtils.conver(request); + if (model.isAck()) { + AckObj ack = SyncUtils.converDataToAck(model); + System.out.println("回执信息: " + ack.getData()); + } else { + SysUsers user = SyncUtils.converData(model, SysUsers.class); + System.out.println("user: " + user.getName()); + } return "OK"; } } diff --git a/center.manager/src/test/java/center/manager/test/user/WorkTest.java b/center.manager/src/test/java/center/manager/test/user/WorkTest.java index 85c6505..b855561 100644 --- a/center.manager/src/test/java/center/manager/test/user/WorkTest.java +++ b/center.manager/src/test/java/center/manager/test/user/WorkTest.java @@ -10,7 +10,6 @@ import com.lyms.synch.SyncnCenter; import com.lyms.synch.SyncnCenter.Work; import com.lyms.synch.entity.ModelType; import com.lyms.synch.entity.SyncnModel; -import com.lyms.util.HttpUtils; import com.lyms.util.StrUtils; /** @@ -25,8 +24,7 @@ public class WorkTest extends BaseTest { @Autowired public SyncnCenter center; - @Test - public void workTest() throws InterruptedException { + public void push() { SyncnModel model = new SyncnModel(); model.setId(StrUtils.uuid()); model.setModel("user2"); @@ -47,32 +45,60 @@ public class WorkTest extends BaseTest { user.setRoles(roles); center.buildWork().push(model, new CenterCallback() { - @Override public void callBack(Object object) { System.out.println("push callBack : " + object); - tmp(); } }); + } + + @Test + public void workTest() throws InterruptedException { + new Thread(new Runnable() { + + @Override + public void run() { + tmp(); + } + }).start(); + + new Thread(new Runnable() { + @Override + public void run() { + try { + fix(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + while (true) { + push(); + Thread.sleep(10000); + } } @Test public void tmp() { - Work work = center.buildWork(); - try { - work.pullTmp(new CenterCallback() { - @Override - public void callBack(Object object) { - System.out.println("pull callBack : " + object); - SyncnModel model = (SyncnModel) object; - HttpUtils.REMOTE.post(model); - } - }); - } catch (Exception e) { - e.printStackTrace(); - // 异常数据反压 - work.backPressure(); + while (true) { + Work work = center.buildWork(); + try { + // final AckObj ack = new AckObj(); + work.pullTmp(new CenterCallback() { + @Override + public void callBack(Object object) { + SyncnModel model = (SyncnModel) object; + // String result = HttpUtils.REMOTE.post(model); + // ack.setData(result); + System.out.println("tmp ...."); + } + }); + } catch (Exception e) { + e.printStackTrace(); + // 异常数据反压 + work.backPressure(); + } } } @@ -83,40 +109,24 @@ public class WorkTest extends BaseTest { while (true) { Work work2 = center.buildWork(); try { - + // final AckObj ack = new AckObj(); work2.pullFix(new CenterCallback() { @Override public void callBack(Object object) { // remote - // 处理实际业务 SyncnModel model = (SyncnModel) object; + // 处理实际业务 + System.out.println(model.getRemote() + "====>fix"); + // String result = HttpUtils.REMOTE.post(model); - System.out.println(model.getModel()); + // System.out.println(model.getModel()); + // ack.setData(model.getData()); } });// .ackGet(params);// .ackGet(null) } catch (Exception e) { e.printStackTrace(); } - Thread.sleep(3000); + Thread.sleep(15000); } } - - public static void main(String[] args) { - SyncnModel model = new SyncnModel(); - model.setId(StrUtils.uuid()); - model.setModel("user"); - model.setMethod("post"); - model.setType(ModelType.ADD.name()); - // 推送数据远程地址 - model.setRemote("http://127.0.0.1:9090/center.manager/remote/push"); - // 对于远程获取数据,回调地址 - model.setCallBack("http://www.baidu.com"); - model.setTs(System.currentTimeMillis()); - - SysUsers user = new SysUsers(); - user.setName("你好"); - model.setData(user); - String result = HttpUtils.REMOTE.post(model); - System.out.println(result); - } } diff --git a/core.sdk/src/main/java/com/lyms/spring/redis/operation/RedisLock.java b/core.sdk/src/main/java/com/lyms/spring/redis/operation/RedisLock.java index f63e297..4ba52bf 100644 --- a/core.sdk/src/main/java/com/lyms/spring/redis/operation/RedisLock.java +++ b/core.sdk/src/main/java/com/lyms/spring/redis/operation/RedisLock.java @@ -1,5 +1,6 @@ package com.lyms.spring.redis.operation; +import java.io.UnsupportedEncodingException; import java.util.concurrent.TimeUnit; import org.springframework.dao.DataAccessException; @@ -56,7 +57,7 @@ public class RedisLock { public boolean acquireLock(String lockName) throws Exception { String redisKey = PREFIX_KEY + lockName; long expire = 15;// 单位:秒 - long timeout = 5000;// 单位:毫秒 + long timeout = 20000;// 单位:毫秒 long redisValue = System.currentTimeMillis() + timeout + 1; // 通过SETNX试图获取一个lock if (setNX(redisKey, String.valueOf(redisValue), expire)) {// SETNX成功,则成功获取一个锁 @@ -69,6 +70,7 @@ public class RedisLock { if (oldValue < System.currentTimeMillis()) { String getValue = getAndSet(redisKey, String.valueOf(redisValue)); // 获取锁成功 + // System.out.println(getValue + " " + oldValue); if (getValue.equals(oldValue)) { this.lockNode = lockName; return true; @@ -90,10 +92,14 @@ public class RedisLock { private boolean setNX(final String key, final String value, final long expire) { return (Boolean) redisTemplate.execute(new RedisCallback() { public Boolean doInRedis(RedisConnection connection) { - byte[] keyBytes = redisTemplate.getStringSerializer().serialize(key); - boolean locked = connection.setNX(keyBytes, redisTemplate.getDefaultSerializer().serialize(value)); + // byte[] keyBytes = + // redisTemplate.getStringSerializer().serialize(key); + // boolean locked = connection.setNX(keyBytes, + // redisTemplate.getDefaultSerializer().serialize(value)); + byte[] kbytes = key.getBytes(); + boolean locked = connection.setNX(kbytes, value.getBytes()); if (locked) { - connection.expire(keyBytes, expire); + connection.expire(kbytes, expire); } return locked; } @@ -104,10 +110,18 @@ public class RedisLock { return (String) redisTemplate.execute(new RedisCallback() { @Override public String doInRedis(RedisConnection connection) throws DataAccessException { - byte[] result = connection.getSet(redisTemplate.getStringSerializer().serialize(key), - redisTemplate.getDefaultSerializer().serialize(value)); + // byte[] result = + // connection.getSet(redisTemplate.getStringSerializer().serialize(key), + // redisTemplate.getDefaultSerializer().serialize(value)); + + byte[] result = connection.getSet(key.getBytes(), value.getBytes()); + if (result != null) { - return new String(result); + try { + return new String(result, "UTF-8"); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } } return null; } diff --git a/core.sdk/src/main/java/com/lyms/synch/CenterCallback.java b/core.sdk/src/main/java/com/lyms/synch/CenterCallback.java index 4188de7..f99c637 100644 --- a/core.sdk/src/main/java/com/lyms/synch/CenterCallback.java +++ b/core.sdk/src/main/java/com/lyms/synch/CenterCallback.java @@ -1,5 +1,12 @@ package com.lyms.synch; +/** + *
  • @ClassName: CenterCallback + *
  • @Description: 数据回调 + *
  • @author maliang + *
  • @date 2017年3月14日 + *
  • + */ public interface CenterCallback { public void callBack(Object object); } diff --git a/core.sdk/src/main/java/com/lyms/synch/SyncUtils.java b/core.sdk/src/main/java/com/lyms/synch/SyncUtils.java new file mode 100644 index 0000000..266842d --- /dev/null +++ b/core.sdk/src/main/java/com/lyms/synch/SyncUtils.java @@ -0,0 +1,66 @@ +package com.lyms.synch; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import javax.servlet.http.HttpServletRequest; + +import com.lyms.synch.entity.AckObj; +import com.lyms.synch.entity.SyncnModel; + +/** + *
  • @ClassName: SyncUtils + *
  • @Description: 同步辅助处理类 + *
  • @author maliang + *
  • @date 2017年3月14日 + *
  • + */ +public class SyncUtils { + + public static SyncnModel conver(HttpServletRequest request) throws IOException, ClassNotFoundException { + request.setCharacterEncoding("utf-8"); + ObjectInputStream dis = new ObjectInputStream(request.getInputStream()); + SyncnModel model = (SyncnModel) dis.readObject(); + return model; + } + + /** + *
  • @Description:获取包装的Data + *
  • @param model + *
  • @return + *
  • 创建人:maliang + *
  • 创建时间:2017年3月14日 + *
  • 修改人: + *
  • 修改时间: + */ + @SuppressWarnings("unchecked") + public static T converData(SyncnModel model, Class t) { + if (model == null || model.getData() == null) + return null; + if (!Serializable.class.isAssignableFrom(t)) { + return null; + } + return (T) model.getData(); + } + + /** + *
  • @Description:直接转化为ACK Object + *
  • @param model + *
  • @return + *
  • 创建人:maliang + *
  • 创建时间:2017年3月14日 + *
  • 修改人: + *
  • 修改时间: + */ + public static AckObj converDataToAck(SyncnModel model) { + if (model == null || model.getData() == null) + return null; + if (!(model.getData() instanceof AckObj)) { + return null; + } + return (AckObj) model.getData(); + // return converData(model, AckObj.class); + } + +} diff --git a/core.sdk/src/main/java/com/lyms/synch/SyncnCenter.java b/core.sdk/src/main/java/com/lyms/synch/SyncnCenter.java index ff9a89a..9902e9a 100644 --- a/core.sdk/src/main/java/com/lyms/synch/SyncnCenter.java +++ b/core.sdk/src/main/java/com/lyms/synch/SyncnCenter.java @@ -6,6 +6,7 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.lang3.StringUtils; +import com.lyms.synch.entity.AckObj; import com.lyms.synch.entity.ModelType; import com.lyms.synch.entity.SyncnModel; import com.lyms.synch.queue.SyncnQueue; @@ -36,7 +37,7 @@ public class SyncnCenter { private SyncnQueue queue; /** - *
  • @Description:创建一个中心实力 + *
  • @Description:创建一个中心 *
  • @return *
  • 创建人:maliang *
  • 创建时间:2017年3月13日 @@ -144,13 +145,13 @@ public class SyncnCenter { *
  • 修改人: *
  • 修改时间: */ - public String ack(Object object) { - if (model != null && StringUtils.isNotBlank(model.getRemote())) { + public String ack(AckObj ack) { + if (model != null && !model.isAck() && StringUtils.isNotBlank(model.getRemote())) { model.setId(StrUtils.uuid()); - model.setMethod("POST"); model.setTs(System.currentTimeMillis()); - model.setData(object); + model.setData(ack); model.setType(ModelType.ACK.name()); + ack.setOldType(model.getType()); return HttpUtils.REMOTE.post(model); } return null; diff --git a/core.sdk/src/main/java/com/lyms/synch/entity/AckObj.java b/core.sdk/src/main/java/com/lyms/synch/entity/AckObj.java new file mode 100644 index 0000000..c6eb330 --- /dev/null +++ b/core.sdk/src/main/java/com/lyms/synch/entity/AckObj.java @@ -0,0 +1,43 @@ +package com.lyms.synch.entity; + +import java.io.Serializable; + +/** + *
  • @ClassName: AckObj + *
  • @Description: 回调实体,用于封装获取到的数据信息 + *
  • @author maliang + *
  • @date 2017年3月14日 + *
  • + */ +public class AckObj implements Serializable { + + /** + */ + private static final long serialVersionUID = -3939046868818014917L; + + /** + * {@link ModelType} 获取数据 + */ + private String oldType; + + /** + * remote、获取的数据信息,该参数需要自行封装 + */ + private Object data; + + public String getOldType() { + return oldType; + } + + public void setOldType(String oldType) { + this.oldType = oldType; + } + + public Object getData() { + return data; + } + + public void setData(Object data) { + this.data = data; + } +} diff --git a/core.sdk/src/main/java/com/lyms/synch/entity/SyncnModel.java b/core.sdk/src/main/java/com/lyms/synch/entity/SyncnModel.java index 25fb765..9068d9b 100644 --- a/core.sdk/src/main/java/com/lyms/synch/entity/SyncnModel.java +++ b/core.sdk/src/main/java/com/lyms/synch/entity/SyncnModel.java @@ -125,4 +125,8 @@ public class SyncnModel implements Serializable { return JsonUtils.jsonToBean(body, SyncnModel.class); } + public boolean isAck() { + return ModelType.isACK(getType()); + } + } diff --git a/core.sdk/src/main/java/com/lyms/synch/queue/SyncnQueue.java b/core.sdk/src/main/java/com/lyms/synch/queue/SyncnQueue.java index 5bd30c9..3634487 100644 --- a/core.sdk/src/main/java/com/lyms/synch/queue/SyncnQueue.java +++ b/core.sdk/src/main/java/com/lyms/synch/queue/SyncnQueue.java @@ -36,9 +36,9 @@ public class SyncnQueue { private static final String LOCK_FIXATION_NAME = "FIXATION_lock"; /** - * 等待获取锁的时间10 S + * 等待获取锁的时间5 S */ - private static final Long WAIT_LOCK_TIME = 10000L; + private static final Long WAIT_LOCK_TIME = 5000L; /** * 固定任务 @@ -81,16 +81,17 @@ public class SyncnQueue { private SyncnModel pullFixationModel() { // 添加锁 try { - if (Fix_Lock.acquireLock(LOCK_FIXATION_NAME, WAIT_LOCK_TIME)) { + if (Tmp_Lock.acquireLock(LOCK_FIXATION_NAME, WAIT_LOCK_TIME)) { SyncnModel obj = (SyncnModel) template.opsForList().rightPopAndLeftPush(FIXATION_WORK, FIXATION_WORK); // this.pushFixationWork(obj); return obj; } } catch (Exception e) { + e.printStackTrace(); } finally { try { - if (Fix_Lock != null) { - Fix_Lock.releaseLock(); + if (Tmp_Lock != null) { + Tmp_Lock.releaseLock(); } } catch (Exception e) { e.printStackTrace(); diff --git a/core.sdk/src/main/java/com/lyms/web/controller/BaseController.java b/core.sdk/src/main/java/com/lyms/web/controller/BaseController.java index 30a6bd8..a8cbba6 100644 --- a/core.sdk/src/main/java/com/lyms/web/controller/BaseController.java +++ b/core.sdk/src/main/java/com/lyms/web/controller/BaseController.java @@ -1,7 +1,6 @@ package com.lyms.web.controller; import java.io.IOException; -import java.io.ObjectInputStream; import java.io.PrintWriter; import java.text.SimpleDateFormat; import java.util.Date; @@ -24,7 +23,6 @@ import com.baomidou.mybatisplus.plugins.Page; import com.baomidou.mybatisplus.plugins.pagination.Pagination; import com.lyms.common.StringEscapeEditor; import com.lyms.exception.BusinessException; -import com.lyms.synch.entity.SyncnModel; import com.lyms.util.HttpUtils; import com.lyms.util.StrUtils; import com.lyms.web.bean.AjaxResult; @@ -54,13 +52,6 @@ public class BaseController { binder.registerCustomEditor(String.class, new StringEscapeEditor(true, false)); } - public SyncnModel conver(HttpServletRequest request) throws IOException, ClassNotFoundException { - request.setCharacterEncoding("utf-8"); - ObjectInputStream dis = new ObjectInputStream(request.getInputStream()); - SyncnModel model = (SyncnModel) dis.readObject(); - return model; - } - /** * 是否为 post 请求 */