Commit 3ef63df5dc6bedf462077d65951a345beff6d70d

Authored by maliang
1 parent 50965946ed
Exists in master

解决固定任务可能会丢失的情况

Showing 3 changed files with 53 additions and 42 deletions

center.manager/src/test/java/center/manager/test/user/WorkTest.java View file @ 3ef63df
... ... @@ -25,12 +25,12 @@
25 25 public SyncnCenter center;
26 26  
27 27 @Test
28   - public void workTest() {
  28 + public void workTest() throws InterruptedException {
29 29 SyncnModel model = new SyncnModel();
30 30 model.setId(StrUtils.uuid());
31   - model.setModel("user");
  31 + model.setModel("user2");
32 32 model.setMethod("post");
33   - model.setType(ModelType.ADD.name());
  33 + model.setType(ModelType.GET.name());
34 34 // 推送数据远程地址
35 35 model.setRemote("http://127.0.0.1:9090/center.manager/remote/push");
36 36 // 对于远程获取数据,回调地址
37 37  
38 38  
39 39  
... ... @@ -41,32 +41,45 @@
41 41 user.setName("乱码测试");
42 42 model.setData(user);
43 43  
44   - // 添加任务信息,临时任务信息
45   - center.buildWork().push(model, new CenterCallback() {
46   - @Override
47   - public void callBack(Object object) {
48   - System.out.println("push callBack : " + object);
49   - }
50   - });
  44 + // 添加任务信息,临时任务信息 ModelType.Get()固定任务
  45 + /*
  46 + * center.buildWork().push(model, new CenterCallback() {
  47 + *
  48 + * @Override public void callBack(Object object) {
  49 + * System.out.println("push callBack : " + object); } });
  50 + */
51 51  
52   - // 获取临时任务信息
  52 + /*
  53 + * Work work = center.buildWork(); try {
  54 + *
  55 + * work.pullFix(new CenterCallback() {
  56 + *
  57 + * @Override public void callBack(Object object) { // 处理实际业务 SyncnModel
  58 + * model = (SyncnModel) object; // String result =
  59 + * HttpUtils.REMOTE.post(model); System.out.println(model.getModel()); }
  60 + * }); } catch (Exception e) { e.printStackTrace(); // 临时队列数据,需要手动反压
  61 + * work.backPressure(); }
  62 + */
  63 +
  64 + // 获取固定任务队列任务
53 65 // 包含ack 回调方法
54   - Work work = center.buildWork();
55   - try {
  66 + while (true) {
  67 + Work work2 = center.buildWork();
  68 + try {
56 69  
57   - work.pullTmp(new CenterCallback() {
58   - @Override
59   - public void callBack(Object object) {
60   - // 处理实际业务
61   - SyncnModel model = (SyncnModel) object;
62   - String result = HttpUtils.REMOTE.post(model);
63   - System.out.println(result);
64   - }
65   - });
66   - } catch (Exception e) {
67   - e.printStackTrace();
68   - // 业务逻辑错误,反压数据,将会输重写写入队列
69   - work.backPressure();
  70 + work2.pullFix(new CenterCallback() {
  71 + @Override
  72 + public void callBack(Object object) {
  73 + // 处理实际业务
  74 + SyncnModel model = (SyncnModel) object;
  75 + // String result = HttpUtils.REMOTE.post(model);
  76 + System.out.println(model.getModel());
  77 + }
  78 + });// .ackGet(null)
  79 + } catch (Exception e) {
  80 + e.printStackTrace();
  81 + }
  82 + Thread.sleep(3000);
70 83 }
71 84 // .ackGet(null);
72 85 // System.out.println("callBack: " + get);
core.sdk/src/main/java/com/lyms/context/AppContext.java View file @ 3ef63df
... ... @@ -8,11 +8,11 @@
8 8 import org.springframework.stereotype.Component;
9 9 import org.springframework.web.context.ServletContextAware;
10 10  
11   -/**
12   - * <li>ClassName:AppContext <br/>
  11 +/**
  12 + * <li>ClassName:AppContext <br/>
13 13 * <li>@Description: TODO(类描述)
14   - * <li>@Date: 2016年11月29日 <br/>
15   - * <li>@author 方承
  14 + * <li>@Date: 2016年11月29日 <br/>
  15 + * <li>@author 方承
16 16 */
17 17 @Component
18 18 public class AppContext implements ServletContextAware {
19 19  
20 20  
21 21  
22 22  
... ... @@ -21,27 +21,26 @@
21 21 * (系统配置信息)- 在 StartupListener 类中加载
22 22 */
23 23 public Map<String, String> config;
24   -
  24 +
25 25 /**
26 26 * 容器全局变量
27 27 */
28 28 private ServletContext servletContext;
29   -
30   -
  29 +
31 30 public Map<String, String> getConfig() {
32 31 return config;
33 32 }
34 33  
35 34 public void setConfig(Map<String, String> config) {
36 35 this.config = config;
37   - //同步更新容器全局变量
  36 + // 同步更新容器全局变量
38 37 Iterator<Map.Entry<String, String>> iter = config.entrySet().iterator();
39   - while (iter.hasNext()){
  38 + while (iter.hasNext()) {
40 39 Map.Entry<String, String> e = iter.next();
41 40 servletContext.setAttribute(e.getKey(), e.getValue());
42 41 }
43 42 }
44   -
  43 +
45 44 @Override
46 45 public void setServletContext(ServletContext servletContext) {
47 46 this.servletContext = servletContext;
... ... @@ -50,7 +49,6 @@
50 49 public ServletContext getServletContext() {
51 50 return servletContext;
52 51 }
53   -
54 52  
55 53 }
core.sdk/src/main/java/com/lyms/synch/queue/SyncnQueue.java View file @ 3ef63df
... ... @@ -74,7 +74,7 @@
74 74 * <li>修改时间:
75 75 */
76 76 private Boolean pushFixationWork(SyncnModel model) {
77   - Long tag = template.opsForList().rightPush(FIXATION_WORK, model);
  77 + Long tag = template.opsForList().leftPush(FIXATION_WORK, model);
78 78 return tag >= 1;
79 79 }
80 80  
... ... @@ -82,8 +82,8 @@
82 82 // 添加锁
83 83 try {
84 84 if (Fix_Lock.acquireLock(LOCK_FIXATION_NAME, WAIT_LOCK_TIME)) {
85   - SyncnModel obj = (SyncnModel) template.opsForList().leftPop(FIXATION_WORK);
86   - this.pushFixationWork(obj);
  85 + SyncnModel obj = (SyncnModel) template.opsForList().rightPopAndLeftPush(FIXATION_WORK, FIXATION_WORK);
  86 + // this.pushFixationWork(obj);
87 87 return obj;
88 88 }
89 89 } catch (Exception e) {
... ... @@ -112,7 +112,7 @@
112 112 * <li>修改时间:
113 113 */
114 114 private boolean pushTempWork(SyncnModel model) {
115   - Long tag = template.opsForList().rightPush(TMP_WORK, model);
  115 + Long tag = template.opsForList().leftPush(TMP_WORK, model);
116 116 return tag >= 1;
117 117 }
118 118  
... ... @@ -128,7 +128,7 @@
128 128  
129 129 try {
130 130 if (Tmp_Lock.acquireLock(LOCK_TMP_NAME, WAIT_LOCK_TIME)) {
131   - Serializable object = template.opsForList().leftPop(TMP_WORK);
  131 + Serializable object = template.opsForList().rightPop(TMP_WORK);
132 132 return object != null ? (SyncnModel) object : null;
133 133 }
134 134 } catch (Exception e) {