From 2fee7384f2e2c5df231982c93324b2854f5abc8e Mon Sep 17 00:00:00 2001 From: maliang Date: Thu, 16 Mar 2017 10:55:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=8F=90=E4=BA=A4=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=EF=BC=8C=E7=BC=96=E5=86=99=E6=B5=8B=E8=AF=95=E7=94=A8?= =?UTF-8?q?=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- center.manager/src/main/resources/xml/app-sycn.xml | 21 +++ .../src/main/resources/xml/app-syncn.xml | 21 --- .../java/center/manager/test/user/WorkTest.java | 166 +++++---------------- .../src/main/java/com/lyms/sycn/SycnCallback.java | 10 ++ .../src/main/java/com/lyms/sycn/SycnCenter.java | 87 +++++++++++ .../src/main/java/com/lyms/sycn/SycnHandler.java | 46 ------ 6 files changed, 155 insertions(+), 196 deletions(-) create mode 100644 center.manager/src/main/resources/xml/app-sycn.xml delete mode 100644 center.manager/src/main/resources/xml/app-syncn.xml diff --git a/center.manager/src/main/resources/xml/app-sycn.xml b/center.manager/src/main/resources/xml/app-sycn.xml new file mode 100644 index 0000000..4798db4 --- /dev/null +++ b/center.manager/src/main/resources/xml/app-sycn.xml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/center.manager/src/main/resources/xml/app-syncn.xml b/center.manager/src/main/resources/xml/app-syncn.xml deleted file mode 100644 index 49a7fa5..0000000 --- a/center.manager/src/main/resources/xml/app-syncn.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/center.manager/src/test/java/center/manager/test/user/WorkTest.java b/center.manager/src/test/java/center/manager/test/user/WorkTest.java index 5316662..a382795 100644 --- a/center.manager/src/test/java/center/manager/test/user/WorkTest.java +++ b/center.manager/src/test/java/center/manager/test/user/WorkTest.java @@ -4,15 +4,9 @@ import org.apache.commons.lang3.builder.ToStringBuilder; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; -import com.alibaba.fastjson.JSON; -import com.lyms.cm.entity.sys.SysUsers; -import com.lyms.sycn.ParamsAdpter; import com.lyms.sycn.SycnCallback; import com.lyms.sycn.SycnCenter; -import com.lyms.sycn.SycnCenter.Work; import com.lyms.sycn.channel.ChannelData; -import com.lyms.util.HttpUtils; -import com.lyms.util.JsonUtils; import com.lyms.util.StrUtils; /** @@ -27,140 +21,54 @@ public class WorkTest extends BaseTest { @Autowired public SycnCenter center; + /** + *
  • @Description:测试往固定任务队列中添加任务 + *
  • + *
  • 创建人:maliang + *
  • 创建时间:2017年3月16日 + *
  • 修改人: + *
  • 修改时间: + */ @Test - public void push() { + public void pushFix() { + String id = StrUtils.uuid(); String remote = "http://127.0.0.1:9090/hospital.web/remote/sycn"; String remoteClazz = "com.lyms.hospital.service.sys.impl.SyncDataBasicServiceImpl"; String remoteMethod = "selectOne"; - Object data = null;// new Object[] { user, null }; - - boolean loop = true; - - ChannelData push = new ChannelData(remote, remoteClazz, remoteMethod, null, loop); - - center.buildWork().push(push, new SycnCallback() { - @Override - public void callBack(Object object) { - System.out.println("push callBack : " + object); - } - }); - } - - @Test - public void workTest() throws InterruptedException { - new Thread(new Runnable() { - - @Override - public void run() { - tmp(); - } - }).start(); - - /* - * new Thread(new Runnable() { - * - * @Override public void run() { try { fix(); } catch - * (InterruptedException e) { e.printStackTrace(); } } }).start(); - */ - // - while (true) { - push(); - Thread.sleep(30000); - } - + center.pushFix(id, remote, remoteClazz, remoteMethod, null); } + /** + *
  • @Description:测试往固定任务队列中添加任务 + *
  • + *
  • 创建人:maliang + *
  • 创建时间:2017年3月16日 + *
  • 修改人: + *
  • 修改时间: + */ @Test - public void tmp() { - while (true) { - final Work work = center.buildWork(); - try { - // final AckObj ack = new AckObj(); - - work.pullTmp(new SycnCallback() { - @Override - public void callBack(Object object) { - if (object == null) - return; - if (object instanceof ChannelData) { - ChannelData model = (ChannelData) object; - String result = HttpUtils.REMOTE.post(model); - System.out.println(result); - model = JsonUtils.jsonToBean(result, ChannelData.class); - System.out.println("回执: " + ToStringBuilder.reflectionToString(model)); - if (model != null) { - if (!model.getAck()) { - work.backPressure(); - } else { - System.out.println("远程数据处理成功"); - } - } - } - } - }); - } catch (Exception e) { - e.printStackTrace(); - // 异常数据反压 - System.out.println("数据反压"); - work.backPressure(); - } - } + public void pushTmp() { + String id = StrUtils.uuid(); + String remote = "http://127.0.0.1:9090/hospital.web/remote/sycn"; + String remoteClazz = "com.lyms.hospital.service.sys.impl.SyncDataBasicServiceImpl"; + String remoteMethod = "selectOne"; + // 参数构建如下注释方式 + // ParamsAdpter adpter = + // ParamsAdpter.builder().push(null).push(null).push(null);; + center.pushFix(id, remote, remoteClazz, remoteMethod, null); } @Test - public void fix() throws InterruptedException { - // 获取固定任务队列任务 - // 包含ack 回调方法 - // while (true) { - Work work2 = center.buildWork(); - try { - // final AckObj ack = new AckObj(); - work2.pullFix(new SycnCallback() { - @Override - public void callBack(Object object) { - if (object != null && object instanceof ChannelData) { - // remote - ChannelData model = (ChannelData) object; - // 处理实际业务 - System.out.println(ToStringBuilder.reflectionToString(model)); - System.out.println(model.getRemote() + "====>fix"); - String result = HttpUtils.REMOTE.post(model); - System.out.println(result); - // ack.setData(model.getData()); - } + public void pull() { + center.pull(true, new SycnCallback() { + @Override + public void callBack(Object object) { + if (object instanceof ChannelData) { + ChannelData data = (ChannelData) object; + System.out.println(ToStringBuilder.reflectionToString(data)); } - });// .ackGet(params);// .ackGet(null) - } catch (Exception e) { - e.printStackTrace(); - } - // Thread.sleep(15000); - // } - } - - public static void buildParams() throws ClassNotFoundException { - String uid = StrUtils.uuid(); - - ChannelData data = ChannelData.emtpy(); - data.setId(uid); - data.setRemote(null); - data.setRemoteClazz("com.lyms.cm.service.sys.impl.SysUsersServiceImpl"); - data.setRemoteMethod("addUser"); - SysUsers user = new SysUsers(); - user.setId(uid); - user.setName("拉取信息测试"); - // Object params = new Object[] { user, null }; - - // 设置参数 - ParamsAdpter params = ParamsAdpter.builder().push(user).push(null); - data.setData(params.toJsonString()); - - String str = JSON.toJSONString(data); - System.out.println(str); - - } - - public static void main(String[] args) throws ClassNotFoundException { - buildParams(); + } + }); } } diff --git a/core.sdk/src/main/java/com/lyms/sycn/SycnCallback.java b/core.sdk/src/main/java/com/lyms/sycn/SycnCallback.java index 4ccb979..31f59ad 100644 --- a/core.sdk/src/main/java/com/lyms/sycn/SycnCallback.java +++ b/core.sdk/src/main/java/com/lyms/sycn/SycnCallback.java @@ -1,5 +1,7 @@ package com.lyms.sycn; +import org.apache.ibatis.javassist.expr.Instanceof; + /** *
  • @ClassName: CenterCallback *
  • @Description: 数据回调 @@ -8,5 +10,13 @@ package com.lyms.sycn; *
  • */ public interface SycnCallback { + /** + *
  • @Description:回调方法 + *
  • @param object Instanceof {@link Instanceof ChannelData} + *
  • 创建人:maliang + *
  • 创建时间:2017年3月16日 + *
  • 修改人: + *
  • 修改时间: + */ public void callBack(Object object); } diff --git a/core.sdk/src/main/java/com/lyms/sycn/SycnCenter.java b/core.sdk/src/main/java/com/lyms/sycn/SycnCenter.java index 84cf269..33731f0 100644 --- a/core.sdk/src/main/java/com/lyms/sycn/SycnCenter.java +++ b/core.sdk/src/main/java/com/lyms/sycn/SycnCenter.java @@ -43,6 +43,93 @@ public class SycnCenter { } /** + *
  • @Description:在3次没有成功将会放弃,如果需要对失败处理 ,使用 + * {@link Work#push(ChannelData, SycnCallback)} + *

    + * 该方法使用尽量异步操作,避免阻塞 + *

  • @return + *
  • 创建人:maliang + *
  • 创建时间:2017年3月16日 + *
  • 修改人: + *
  • 修改时间: + */ + public boolean pushFix(String id, String remote, String remoteClazz, String remoteMethod, ParamsAdpter adpter) { + ChannelData data = new ChannelData(id, remote, remoteClazz, remoteMethod, adpter.toJsonString(), true); + + int tag = 0; + + do { + try { + buildWork().push(data, new SycnCallback() { + @Override + public void callBack(Object object) { + // nothing to do + } + }); + return true; + } catch (Exception e) { + e.printStackTrace(); + tag++; + } + } while (tag < 3); + + return false; + } + + /** + * + *
  • @Description: 获取数据,等价于下面两个方法 + *

    + * {@link Work#pullFix(SycnCallback)} + *

    + * {@link Work#pullTmp(SycnCallback)} + *

  • @param loop 从固定任务队列中获取任务 + *
  • @param callBack + *
  • 创建人:maliang + *
  • 创建时间:2017年3月16日 + *
  • 修改人: + *
  • 修改时间: + */ + public void pull(boolean loop, SycnCallback callBack) { + if (loop) + buildWork().pullFix(callBack); + else + buildWork().pullTmp(callBack); + } + + /** + *
  • @Description:在3次没有成功将会放弃,如果需要对失败处理 ,使用 + * {@link Work#push(ChannelData, SycnCallback)} + *

    + * 该方法使用尽量异步操作,避免阻塞 + *

  • @return + *
  • 创建人:maliang + *
  • 创建时间:2017年3月16日 + *
  • 修改人: + *
  • 修改时间: + */ + public boolean pushTmp(String id, String remote, String remoteClazz, String remoteMethod, ParamsAdpter adpter) { + ChannelData data = new ChannelData(id, remote, remoteClazz, remoteMethod, adpter.toJsonString(), false); + int tag = 0; + do { + try { + buildWork().push(data, new SycnCallback() { + @Override + public void callBack(Object object) { + // nothing to do + } + }); + return true; + } catch (Exception e) { + e.printStackTrace(); + tag++; + } + } while (tag < 3); + + return false; + } + + /** *
  • @Description:构建一个工作器 *
  • @return *
  • 创建人:maliang diff --git a/core.sdk/src/main/java/com/lyms/sycn/SycnHandler.java b/core.sdk/src/main/java/com/lyms/sycn/SycnHandler.java index 3b680fc..3cce4ad 100644 --- a/core.sdk/src/main/java/com/lyms/sycn/SycnHandler.java +++ b/core.sdk/src/main/java/com/lyms/sycn/SycnHandler.java @@ -114,50 +114,4 @@ public class SycnHandler { } return null; } - - /** - *
  • @Description:TODO(方法描述) - *
  • @param t - *
  • @param loop - *
  • @return - *
  • 创建人:maliang - *
  • 创建时间:2017年3月16日 - *
  • 修改人: - *
  • 修改时间: - */ - public static boolean pushFix(String id, String remote, String remoteClazz, String remoteMethod, - ParamsAdpter adpter) { - - ChannelData data = new ChannelData(id, remote, remoteClazz, remoteMethod, adpter.toJsonString(), true); - try { - center.buildWork().push(data, new SycnCallback() { - @Override - public void callBack(Object object) { - // nothing to do - } - }); - } catch (Exception e) { - e.printStackTrace(); - return false; - } - return true; - } - - public static boolean pushTmp(String id, String remote, String remoteClazz, String remoteMethod, - ParamsAdpter adpter) { - ChannelData data = new ChannelData(id, remote, remoteClazz, remoteMethod, adpter.toJsonString(), false); - try { - center.buildWork().push(data, new SycnCallback() { - @Override - public void callBack(Object object) { - // nothing to do - } - }); - } catch (Exception e) { - e.printStackTrace(); - return false; - } - return true; - } - } -- 1.8.3.1