Commit c6021f331c523fdbea2a69f906deba5992198cea

Authored by maliang
1 parent 64e57cf446
Exists in master

修改redis 序列化问题

Showing 10 changed files with 215 additions and 75 deletions

center.manager/src/main/java/com/lyms/cm/controller/sys/SysRemoteController.java View file @ c6021f3
... ... @@ -9,6 +9,8 @@
9 9 import org.springframework.web.bind.annotation.ResponseBody;
10 10  
11 11 import com.lyms.cm.entity.sys.SysUsers;
  12 +import com.lyms.synch.SyncUtils;
  13 +import com.lyms.synch.entity.AckObj;
12 14 import com.lyms.synch.entity.SyncnModel;
13 15 import com.lyms.web.controller.BaseController;
14 16  
... ... @@ -19,13 +21,14 @@
19 21 @RequestMapping(value = "/push")
20 22 @ResponseBody
21 23 public String push(HttpServletRequest request) throws ClassNotFoundException, IOException {
22   - SyncnModel model = conver(request);
23   - SysUsers user = (SysUsers) model.getData();
24   - System.out.println(user.getRoles().getName());
25   - // model
26   - // service
27   -
28   - System.out.println(user.getName() + " " + model.getMethod());
  24 + SyncnModel model = SyncUtils.conver(request);
  25 + if (model.isAck()) {
  26 + AckObj ack = SyncUtils.converDataToAck(model);
  27 + System.out.println("回执信息: " + ack.getData());
  28 + } else {
  29 + SysUsers user = SyncUtils.converData(model, SysUsers.class);
  30 + System.out.println("user: " + user.getName());
  31 + }
29 32 return "OK";
30 33 }
31 34 }
center.manager/src/test/java/center/manager/test/user/WorkTest.java View file @ c6021f3
... ... @@ -10,7 +10,6 @@
10 10 import com.lyms.synch.SyncnCenter.Work;
11 11 import com.lyms.synch.entity.ModelType;
12 12 import com.lyms.synch.entity.SyncnModel;
13   -import com.lyms.util.HttpUtils;
14 13 import com.lyms.util.StrUtils;
15 14  
16 15 /**
... ... @@ -25,8 +24,7 @@
25 24 @Autowired
26 25 public SyncnCenter center;
27 26  
28   - @Test
29   - public void workTest() throws InterruptedException {
  27 + public void push() {
30 28 SyncnModel model = new SyncnModel();
31 29 model.setId(StrUtils.uuid());
32 30 model.setModel("user2");
33 31  
34 32  
35 33  
36 34  
... ... @@ -47,32 +45,60 @@
47 45 user.setRoles(roles);
48 46  
49 47 center.buildWork().push(model, new CenterCallback() {
50   -
51 48 @Override
52 49 public void callBack(Object object) {
53 50 System.out.println("push callBack : " + object);
54   - tmp();
55 51 }
56 52 });
  53 + }
57 54  
  55 + @Test
  56 + public void workTest() throws InterruptedException {
  57 + new Thread(new Runnable() {
  58 +
  59 + @Override
  60 + public void run() {
  61 + tmp();
  62 + }
  63 + }).start();
  64 +
  65 + new Thread(new Runnable() {
  66 + @Override
  67 + public void run() {
  68 + try {
  69 + fix();
  70 + } catch (InterruptedException e) {
  71 + e.printStackTrace();
  72 + }
  73 + }
  74 + }).start();
  75 + while (true) {
  76 + push();
  77 + Thread.sleep(10000);
  78 + }
  79 +
58 80 }
59 81  
60 82 @Test
61 83 public void tmp() {
62   - Work work = center.buildWork();
63   - try {
64   - work.pullTmp(new CenterCallback() {
65   - @Override
66   - public void callBack(Object object) {
67   - System.out.println("pull callBack : " + object);
68   - SyncnModel model = (SyncnModel) object;
69   - HttpUtils.REMOTE.post(model);
70   - }
71   - });
72   - } catch (Exception e) {
73   - e.printStackTrace();
74   - // 异常数据反压
75   - work.backPressure();
  84 + while (true) {
  85 + Work work = center.buildWork();
  86 + try {
  87 + // final AckObj ack = new AckObj();
  88 + work.pullTmp(new CenterCallback() {
  89 + @Override
  90 + public void callBack(Object object) {
  91 + SyncnModel model = (SyncnModel) object;
  92 + // String result = HttpUtils.REMOTE.post(model);
  93 + // ack.setData(result);
  94 + System.out.println("tmp ....");
  95 + }
  96 + });
  97 + } catch (Exception e) {
  98 + e.printStackTrace();
  99 + // 异常数据反压
  100 + work.backPressure();
  101 + }
76 102 }
77 103 }
78 104  
79 105  
80 106  
81 107  
82 108  
83 109  
... ... @@ -83,41 +109,25 @@
83 109 while (true) {
84 110 Work work2 = center.buildWork();
85 111 try {
86   -
  112 + // final AckObj ack = new AckObj();
87 113 work2.pullFix(new CenterCallback() {
88 114 @Override
89 115 public void callBack(Object object) {
90 116 // remote
91   - // 处理实际业务
92 117 SyncnModel model = (SyncnModel) object;
  118 + // 处理实际业务
  119 + System.out.println(model.getRemote() + "====>fix");
  120 +
93 121 // String result = HttpUtils.REMOTE.post(model);
94   - System.out.println(model.getModel());
  122 + // System.out.println(model.getModel());
  123 + // ack.setData(model.getData());
95 124 }
96 125 });// .ackGet(params);// .ackGet(null)
97 126 } catch (Exception e) {
98 127 e.printStackTrace();
99 128 }
100   - Thread.sleep(3000);
  129 + Thread.sleep(15000);
101 130 }
102   - }
103   -
104   - public static void main(String[] args) {
105   - SyncnModel model = new SyncnModel();
106   - model.setId(StrUtils.uuid());
107   - model.setModel("user");
108   - model.setMethod("post");
109   - model.setType(ModelType.ADD.name());
110   - // 推送数据远程地址
111   - model.setRemote("http://127.0.0.1:9090/center.manager/remote/push");
112   - // 对于远程获取数据,回调地址
113   - model.setCallBack("http://www.baidu.com");
114   - model.setTs(System.currentTimeMillis());
115   -
116   - SysUsers user = new SysUsers();
117   - user.setName("你好");
118   - model.setData(user);
119   - String result = HttpUtils.REMOTE.post(model);
120   - System.out.println(result);
121 131 }
122 132 }
core.sdk/src/main/java/com/lyms/spring/redis/operation/RedisLock.java View file @ c6021f3
1 1 package com.lyms.spring.redis.operation;
2 2  
  3 +import java.io.UnsupportedEncodingException;
3 4 import java.util.concurrent.TimeUnit;
4 5  
5 6 import org.springframework.dao.DataAccessException;
... ... @@ -56,7 +57,7 @@
56 57 public boolean acquireLock(String lockName) throws Exception {
57 58 String redisKey = PREFIX_KEY + lockName;
58 59 long expire = 15;// 单位:秒
59   - long timeout = 5000;// 单位:毫秒
  60 + long timeout = 20000;// 单位:毫秒
60 61 long redisValue = System.currentTimeMillis() + timeout + 1;
61 62 // 通过SETNX试图获取一个lock
62 63 if (setNX(redisKey, String.valueOf(redisValue), expire)) {// SETNX成功,则成功获取一个锁
... ... @@ -69,6 +70,7 @@
69 70 if (oldValue < System.currentTimeMillis()) {
70 71 String getValue = getAndSet(redisKey, String.valueOf(redisValue));
71 72 // 获取锁成功
  73 + // System.out.println(getValue + " " + oldValue);
72 74 if (getValue.equals(oldValue)) {
73 75 this.lockNode = lockName;
74 76 return true;
75 77  
... ... @@ -90,10 +92,14 @@
90 92 private boolean setNX(final String key, final String value, final long expire) {
91 93 return (Boolean) redisTemplate.execute(new RedisCallback<Boolean>() {
92 94 public Boolean doInRedis(RedisConnection connection) {
93   - byte[] keyBytes = redisTemplate.getStringSerializer().serialize(key);
94   - boolean locked = connection.setNX(keyBytes, redisTemplate.getDefaultSerializer().serialize(value));
  95 + // byte[] keyBytes =
  96 + // redisTemplate.getStringSerializer().serialize(key);
  97 + // boolean locked = connection.setNX(keyBytes,
  98 + // redisTemplate.getDefaultSerializer().serialize(value));
  99 + byte[] kbytes = key.getBytes();
  100 + boolean locked = connection.setNX(kbytes, value.getBytes());
95 101 if (locked) {
96   - connection.expire(keyBytes, expire);
  102 + connection.expire(kbytes, expire);
97 103 }
98 104 return locked;
99 105 }
100 106  
... ... @@ -104,10 +110,18 @@
104 110 return (String) redisTemplate.execute(new RedisCallback<String>() {
105 111 @Override
106 112 public String doInRedis(RedisConnection connection) throws DataAccessException {
107   - byte[] result = connection.getSet(redisTemplate.getStringSerializer().serialize(key),
108   - redisTemplate.getDefaultSerializer().serialize(value));
  113 + // byte[] result =
  114 + // connection.getSet(redisTemplate.getStringSerializer().serialize(key),
  115 + // redisTemplate.getDefaultSerializer().serialize(value));
  116 +
  117 + byte[] result = connection.getSet(key.getBytes(), value.getBytes());
  118 +
109 119 if (result != null) {
110   - return new String(result);
  120 + try {
  121 + return new String(result, "UTF-8");
  122 + } catch (UnsupportedEncodingException e) {
  123 + e.printStackTrace();
  124 + }
111 125 }
112 126 return null;
113 127 }
core.sdk/src/main/java/com/lyms/synch/CenterCallback.java View file @ c6021f3
1 1 package com.lyms.synch;
2 2  
  3 +/**
  4 + * <li>@ClassName: CenterCallback
  5 + * <li>@Description: 数据回调
  6 + * <li>@author maliang
  7 + * <li>@date 2017年3月14日
  8 + * <li>
  9 + */
3 10 public interface CenterCallback {
4 11 public void callBack(Object object);
5 12 }
core.sdk/src/main/java/com/lyms/synch/SyncUtils.java View file @ c6021f3
  1 +package com.lyms.synch;
  2 +
  3 +import java.io.IOException;
  4 +import java.io.ObjectInputStream;
  5 +import java.io.Serializable;
  6 +
  7 +import javax.servlet.http.HttpServletRequest;
  8 +
  9 +import com.lyms.synch.entity.AckObj;
  10 +import com.lyms.synch.entity.SyncnModel;
  11 +
  12 +/**
  13 + * <li>@ClassName: SyncUtils
  14 + * <li>@Description: 同步辅助处理类
  15 + * <li>@author maliang
  16 + * <li>@date 2017年3月14日
  17 + * <li>
  18 + */
  19 +public class SyncUtils {
  20 +
  21 + public static SyncnModel conver(HttpServletRequest request) throws IOException, ClassNotFoundException {
  22 + request.setCharacterEncoding("utf-8");
  23 + ObjectInputStream dis = new ObjectInputStream(request.getInputStream());
  24 + SyncnModel model = (SyncnModel) dis.readObject();
  25 + return model;
  26 + }
  27 +
  28 + /**
  29 + * <li>@Description:获取包装的Data
  30 + * <li>@param model
  31 + * <li>@return
  32 + * <li>创建人:maliang
  33 + * <li>创建时间:2017年3月14日
  34 + * <li>修改人:
  35 + * <li>修改时间:
  36 + */
  37 + @SuppressWarnings("unchecked")
  38 + public static <T> T converData(SyncnModel model, Class<T> t) {
  39 + if (model == null || model.getData() == null)
  40 + return null;
  41 + if (!Serializable.class.isAssignableFrom(t)) {
  42 + return null;
  43 + }
  44 + return (T) model.getData();
  45 + }
  46 +
  47 + /**
  48 + * <li>@Description:直接转化为ACK Object
  49 + * <li>@param model
  50 + * <li>@return
  51 + * <li>创建人:maliang
  52 + * <li>创建时间:2017年3月14日
  53 + * <li>修改人:
  54 + * <li>修改时间:
  55 + */
  56 + public static AckObj converDataToAck(SyncnModel model) {
  57 + if (model == null || model.getData() == null)
  58 + return null;
  59 + if (!(model.getData() instanceof AckObj)) {
  60 + return null;
  61 + }
  62 + return (AckObj) model.getData();
  63 + // return converData(model, AckObj.class);
  64 + }
  65 +
  66 +}
core.sdk/src/main/java/com/lyms/synch/SyncnCenter.java View file @ c6021f3
... ... @@ -6,6 +6,7 @@
6 6  
7 7 import org.apache.commons.lang3.StringUtils;
8 8  
  9 +import com.lyms.synch.entity.AckObj;
9 10 import com.lyms.synch.entity.ModelType;
10 11 import com.lyms.synch.entity.SyncnModel;
11 12 import com.lyms.synch.queue.SyncnQueue;
... ... @@ -36,7 +37,7 @@
36 37 private SyncnQueue queue;
37 38  
38 39 /**
39   - * <li>@Description:创建一个中心实力
  40 + * <li>@Description:创建一个中心
40 41 * <li>@return
41 42 * <li>创建人:maliang
42 43 * <li>创建时间:2017年3月13日
43 44  
44 45  
45 46  
... ... @@ -144,13 +145,13 @@
144 145 * <li>修改人:
145 146 * <li>修改时间:
146 147 */
147   - public String ack(Object object) {
148   - if (model != null && StringUtils.isNotBlank(model.getRemote())) {
  148 + public String ack(AckObj ack) {
  149 + if (model != null && !model.isAck() && StringUtils.isNotBlank(model.getRemote())) {
149 150 model.setId(StrUtils.uuid());
150   - model.setMethod("POST");
151 151 model.setTs(System.currentTimeMillis());
152   - model.setData(object);
  152 + model.setData(ack);
153 153 model.setType(ModelType.ACK.name());
  154 + ack.setOldType(model.getType());
154 155 return HttpUtils.REMOTE.post(model);
155 156 }
156 157 return null;
core.sdk/src/main/java/com/lyms/synch/entity/AckObj.java View file @ c6021f3
  1 +package com.lyms.synch.entity;
  2 +
  3 +import java.io.Serializable;
  4 +
  5 +/**
  6 + * <li>@ClassName: AckObj
  7 + * <li>@Description: 回调实体,用于封装获取到的数据信息
  8 + * <li>@author maliang
  9 + * <li>@date 2017年3月14日
  10 + * <li>
  11 + */
  12 +public class AckObj implements Serializable {
  13 +
  14 + /**
  15 + */
  16 + private static final long serialVersionUID = -3939046868818014917L;
  17 +
  18 + /**
  19 + * {@link ModelType} 获取数据
  20 + */
  21 + private String oldType;
  22 +
  23 + /**
  24 + * remote、获取的数据信息,该参数需要自行封装
  25 + */
  26 + private Object data;
  27 +
  28 + public String getOldType() {
  29 + return oldType;
  30 + }
  31 +
  32 + public void setOldType(String oldType) {
  33 + this.oldType = oldType;
  34 + }
  35 +
  36 + public Object getData() {
  37 + return data;
  38 + }
  39 +
  40 + public void setData(Object data) {
  41 + this.data = data;
  42 + }
  43 +}
core.sdk/src/main/java/com/lyms/synch/entity/SyncnModel.java View file @ c6021f3
... ... @@ -125,5 +125,9 @@
125 125 return JsonUtils.jsonToBean(body, SyncnModel.class);
126 126 }
127 127  
  128 + public boolean isAck() {
  129 + return ModelType.isACK(getType());
  130 + }
  131 +
128 132 }
core.sdk/src/main/java/com/lyms/synch/queue/SyncnQueue.java View file @ c6021f3
... ... @@ -36,9 +36,9 @@
36 36 private static final String LOCK_FIXATION_NAME = "FIXATION_lock";
37 37  
38 38 /**
39   - * 等待获取锁的时间10 S
  39 + * 等待获取锁的时间5 S
40 40 */
41   - private static final Long WAIT_LOCK_TIME = 10000L;
  41 + private static final Long WAIT_LOCK_TIME = 5000L;
42 42  
43 43 /**
44 44 * 固定任务
45 45  
46 46  
... ... @@ -81,16 +81,17 @@
81 81 private SyncnModel pullFixationModel() {
82 82 // 添加锁
83 83 try {
84   - if (Fix_Lock.acquireLock(LOCK_FIXATION_NAME, WAIT_LOCK_TIME)) {
  84 + if (Tmp_Lock.acquireLock(LOCK_FIXATION_NAME, WAIT_LOCK_TIME)) {
85 85 SyncnModel obj = (SyncnModel) template.opsForList().rightPopAndLeftPush(FIXATION_WORK, FIXATION_WORK);
86 86 // this.pushFixationWork(obj);
87 87 return obj;
88 88 }
89 89 } catch (Exception e) {
  90 + e.printStackTrace();
90 91 } finally {
91 92 try {
92   - if (Fix_Lock != null) {
93   - Fix_Lock.releaseLock();
  93 + if (Tmp_Lock != null) {
  94 + Tmp_Lock.releaseLock();
94 95 }
95 96 } catch (Exception e) {
96 97 e.printStackTrace();
core.sdk/src/main/java/com/lyms/web/controller/BaseController.java View file @ c6021f3
1 1 package com.lyms.web.controller;
2 2  
3 3 import java.io.IOException;
4   -import java.io.ObjectInputStream;
5 4 import java.io.PrintWriter;
6 5 import java.text.SimpleDateFormat;
7 6 import java.util.Date;
... ... @@ -24,7 +23,6 @@
24 23 import com.baomidou.mybatisplus.plugins.pagination.Pagination;
25 24 import com.lyms.common.StringEscapeEditor;
26 25 import com.lyms.exception.BusinessException;
27   -import com.lyms.synch.entity.SyncnModel;
28 26 import com.lyms.util.HttpUtils;
29 27 import com.lyms.util.StrUtils;
30 28 import com.lyms.web.bean.AjaxResult;
... ... @@ -52,13 +50,6 @@
52 50 * 防止XSS攻击
53 51 */
54 52 binder.registerCustomEditor(String.class, new StringEscapeEditor(true, false));
55   - }
56   -
57   - public SyncnModel conver(HttpServletRequest request) throws IOException, ClassNotFoundException {
58   - request.setCharacterEncoding("utf-8");
59   - ObjectInputStream dis = new ObjectInputStream(request.getInputStream());
60   - SyncnModel model = (SyncnModel) dis.readObject();
61   - return model;
62 53 }
63 54  
64 55 /**