From 3ef63df5dc6bedf462077d65951a345beff6d70d Mon Sep 17 00:00:00 2001 From: maliang Date: Mon, 13 Mar 2017 16:55:17 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E5=9B=BA=E5=AE=9A=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=8F=AF=E8=83=BD=E4=BC=9A=E4=B8=A2=E5=A4=B1=E7=9A=84?= =?UTF-8?q?=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/center/manager/test/user/WorkTest.java | 65 +++++++++++++--------- .../src/main/java/com/lyms/context/AppContext.java | 20 +++---- .../main/java/com/lyms/synch/queue/SyncnQueue.java | 10 ++-- 3 files changed, 53 insertions(+), 42 deletions(-) diff --git a/center.manager/src/test/java/center/manager/test/user/WorkTest.java b/center.manager/src/test/java/center/manager/test/user/WorkTest.java index 6870ce6..37be4d0 100644 --- a/center.manager/src/test/java/center/manager/test/user/WorkTest.java +++ b/center.manager/src/test/java/center/manager/test/user/WorkTest.java @@ -25,12 +25,12 @@ public class WorkTest extends BaseTest { public SyncnCenter center; @Test - public void workTest() { + public void workTest() throws InterruptedException { SyncnModel model = new SyncnModel(); model.setId(StrUtils.uuid()); - model.setModel("user"); + model.setModel("user2"); model.setMethod("post"); - model.setType(ModelType.ADD.name()); + model.setType(ModelType.GET.name()); // 推送数据远程地址 model.setRemote("http://127.0.0.1:9090/center.manager/remote/push"); // 对于远程获取数据,回调地址 @@ -41,32 +41,45 @@ public class WorkTest extends BaseTest { user.setName("乱码测试"); model.setData(user); - // 添加任务信息,临时任务信息 - center.buildWork().push(model, new CenterCallback() { - @Override - public void callBack(Object object) { - System.out.println("push callBack : " + object); - } - }); + // 添加任务信息,临时任务信息 ModelType.Get()固定任务 + /* + * center.buildWork().push(model, new CenterCallback() { + * + * @Override public void callBack(Object object) { + * System.out.println("push callBack : " + object); } }); + */ - // 获取临时任务信息 + /* + * Work work = center.buildWork(); try { + * + * work.pullFix(new CenterCallback() { + * + * @Override public void callBack(Object object) { // 处理实际业务 SyncnModel + * model = (SyncnModel) object; // String result = + * HttpUtils.REMOTE.post(model); System.out.println(model.getModel()); } + * }); } catch (Exception e) { e.printStackTrace(); // 临时队列数据,需要手动反压 + * work.backPressure(); } + */ + + // 获取固定任务队列任务 // 包含ack 回调方法 - Work work = center.buildWork(); - try { + while (true) { + Work work2 = center.buildWork(); + try { - work.pullTmp(new CenterCallback() { - @Override - public void callBack(Object object) { - // 处理实际业务 - SyncnModel model = (SyncnModel) object; - String result = HttpUtils.REMOTE.post(model); - System.out.println(result); - } - }); - } catch (Exception e) { - e.printStackTrace(); - // 业务逻辑错误,反压数据,将会输重写写入队列 - work.backPressure(); + work2.pullFix(new CenterCallback() { + @Override + public void callBack(Object object) { + // 处理实际业务 + SyncnModel model = (SyncnModel) object; + // String result = HttpUtils.REMOTE.post(model); + System.out.println(model.getModel()); + } + });// .ackGet(null) + } catch (Exception e) { + e.printStackTrace(); + } + Thread.sleep(3000); } // .ackGet(null); // System.out.println("callBack: " + get); diff --git a/core.sdk/src/main/java/com/lyms/context/AppContext.java b/core.sdk/src/main/java/com/lyms/context/AppContext.java index 88b0d23..34bac32 100644 --- a/core.sdk/src/main/java/com/lyms/context/AppContext.java +++ b/core.sdk/src/main/java/com/lyms/context/AppContext.java @@ -8,11 +8,11 @@ import javax.servlet.ServletContext; import org.springframework.stereotype.Component; import org.springframework.web.context.ServletContextAware; -/** - *
  • ClassName:AppContext
    +/** + *
  • ClassName:AppContext
    *
  • @Description: TODO(类描述) - *
  • @Date: 2016年11月29日
    - *
  • @author 方承 + *
  • @Date: 2016年11月29日
    + *
  • @author 方承 */ @Component public class AppContext implements ServletContextAware { @@ -21,27 +21,26 @@ public class AppContext implements ServletContextAware { * (系统配置信息)- 在 StartupListener 类中加载 */ public Map config; - + /** * 容器全局变量 */ private ServletContext servletContext; - - + public Map getConfig() { return config; } public void setConfig(Map config) { this.config = config; - //同步更新容器全局变量 + // 同步更新容器全局变量 Iterator> iter = config.entrySet().iterator(); - while (iter.hasNext()){ + while (iter.hasNext()) { Map.Entry e = iter.next(); servletContext.setAttribute(e.getKey(), e.getValue()); } } - + @Override public void setServletContext(ServletContext servletContext) { this.servletContext = servletContext; @@ -51,5 +50,4 @@ public class AppContext implements ServletContextAware { return servletContext; } - } diff --git a/core.sdk/src/main/java/com/lyms/synch/queue/SyncnQueue.java b/core.sdk/src/main/java/com/lyms/synch/queue/SyncnQueue.java index 66b6341..5bd30c9 100644 --- a/core.sdk/src/main/java/com/lyms/synch/queue/SyncnQueue.java +++ b/core.sdk/src/main/java/com/lyms/synch/queue/SyncnQueue.java @@ -74,7 +74,7 @@ public class SyncnQueue { *
  • 修改时间: */ private Boolean pushFixationWork(SyncnModel model) { - Long tag = template.opsForList().rightPush(FIXATION_WORK, model); + Long tag = template.opsForList().leftPush(FIXATION_WORK, model); return tag >= 1; } @@ -82,8 +82,8 @@ public class SyncnQueue { // 添加锁 try { if (Fix_Lock.acquireLock(LOCK_FIXATION_NAME, WAIT_LOCK_TIME)) { - SyncnModel obj = (SyncnModel) template.opsForList().leftPop(FIXATION_WORK); - this.pushFixationWork(obj); + SyncnModel obj = (SyncnModel) template.opsForList().rightPopAndLeftPush(FIXATION_WORK, FIXATION_WORK); + // this.pushFixationWork(obj); return obj; } } catch (Exception e) { @@ -112,7 +112,7 @@ public class SyncnQueue { *
  • 修改时间: */ private boolean pushTempWork(SyncnModel model) { - Long tag = template.opsForList().rightPush(TMP_WORK, model); + Long tag = template.opsForList().leftPush(TMP_WORK, model); return tag >= 1; } @@ -128,7 +128,7 @@ public class SyncnQueue { try { if (Tmp_Lock.acquireLock(LOCK_TMP_NAME, WAIT_LOCK_TIME)) { - Serializable object = template.opsForList().leftPop(TMP_WORK); + Serializable object = template.opsForList().rightPop(TMP_WORK); return object != null ? (SyncnModel) object : null; } } catch (Exception e) { -- 1.8.3.1