From 610bf7d4629a9d9eae8ec8ed4b89b820366ab6cc Mon Sep 17 00:00:00 2001 From: fangcheng Date: Fri, 17 Mar 2017 13:46:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E7=B1=BB=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/lyms/sync/SyncCenter.java | 8 +- .../main/java/com/lyms/sync/queue/SycnQueue.java | 198 --------------------- .../main/java/com/lyms/sync/queue/SyncQueue.java | 198 +++++++++++++++++++++ 3 files changed, 202 insertions(+), 202 deletions(-) delete mode 100644 core.sdk/src/main/java/com/lyms/sync/queue/SycnQueue.java create mode 100644 core.sdk/src/main/java/com/lyms/sync/queue/SyncQueue.java diff --git a/core.sdk/src/main/java/com/lyms/sync/SyncCenter.java b/core.sdk/src/main/java/com/lyms/sync/SyncCenter.java index 87cc264..18f3ef3 100644 --- a/core.sdk/src/main/java/com/lyms/sync/SyncCenter.java +++ b/core.sdk/src/main/java/com/lyms/sync/SyncCenter.java @@ -4,7 +4,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import com.lyms.sync.channel.ChannelData; -import com.lyms.sync.queue.SycnQueue; +import com.lyms.sync.queue.SyncQueue; import com.lyms.util.HttpUtils; /** @@ -28,7 +28,7 @@ public class SyncCenter { /** * 数据缓冲队列 */ - private SycnQueue queue; + private SyncQueue queue; /** *
  • @Description:创建一个中心 @@ -238,11 +238,11 @@ public class SyncCenter { } - public SycnQueue getQueue() { + public SyncQueue getQueue() { return queue; } - public void setQueue(SycnQueue queue) { + public void setQueue(SyncQueue queue) { this.queue = queue; } diff --git a/core.sdk/src/main/java/com/lyms/sync/queue/SycnQueue.java b/core.sdk/src/main/java/com/lyms/sync/queue/SycnQueue.java deleted file mode 100644 index 956d1a6..0000000 --- a/core.sdk/src/main/java/com/lyms/sync/queue/SycnQueue.java +++ /dev/null @@ -1,198 +0,0 @@ -package com.lyms.sync.queue; - -import java.io.Serializable; - -import org.springframework.data.redis.core.RedisTemplate; - -import com.lyms.spring.redis.operation.RedisLock; -import com.lyms.sync.channel.ChannelData; - -/** - *
  • @ClassName: SyncnQueue - *
  • @Description: 数据队列 - *
  • @author maliang - *
  • @date 2017年3月13日 - *
  • - */ -public class SycnQueue { - - public RedisTemplate template; - - /** - * 构建临时任务队列,固定任务队列锁 - *

    - * 区分锁,避免高并发下锁释放的错误 - */ - public static RedisLock Fix_Lock; - - public static RedisLock Tmp_Lock; - - /** - * 针对数据获取的时候添加锁 - */ - private static final String LOCK_TMP_NAME = "TMP_lock"; - - private static final String LOCK_FIXATION_NAME = "FIXATION_lock"; - - /** - * 等待获取锁的时间5 S - */ - private static final Long WAIT_LOCK_TIME = 5000L; - - /** - * 固定任务 - */ - private static final String FIXATION_WORK = "FIXATION_WORK"; - - /** - * 临时任务 - */ - private static final String TMP_WORK = "TMP_WORK"; - - /** - * 回执消息任务 - */ - @Deprecated - private static final String ACK_WORK = "ACK_WORK"; - - public SycnQueue(RedisTemplate template) { - this.template = template; - Fix_Lock = new RedisLock(template); - Tmp_Lock = new RedisLock(template); - } - - /** - *

  • @Description:固定任务 {@link ModelType#GET} - *

    - * 固定任务执行效率跟任务链表的长度有关系 - *

  • @param model - *
  • @return - *
  • 创建人:maliang - *
  • 创建时间:2017年3月13日 - *
  • 修改人: - *
  • 修改时间: - */ - private Boolean pushFixationWork(ChannelData data) { - Long tag = template.opsForList().leftPush(FIXATION_WORK, data); - return tag >= 1; - } - - private ChannelData pullFixationData() { - // 添加锁 - try { - if (Fix_Lock.acquireLock(LOCK_FIXATION_NAME, WAIT_LOCK_TIME)) { - ChannelData obj = (ChannelData) template.opsForList().rightPopAndLeftPush(FIXATION_WORK, FIXATION_WORK); - return obj; - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - try { - if (Fix_Lock != null) { - Fix_Lock.releaseLock(); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - return null; - - } - - /** - *
  • @Description:临时任务 - *

    - * {@link ModelType#ADD ModelType#UPDATE ModelType#DEL} - *

  • @param model - *
  • @return - *
  • 创建人:maliang - *
  • 创建时间:2017年3月13日 - *
  • 修改人: - *
  • 修改时间: - */ - private boolean pushTempWork(ChannelData data) { - Long tag = template.opsForList().leftPush(TMP_WORK, data); - return tag >= 1; - } - - /** - *
  • @Description:临时队列数据弹出 - *
  • @return - *
  • 创建人:maliang - *
  • 创建时间:2017年3月13日 - *
  • 修改人: - *
  • 修改时间: - */ - private ChannelData pullTempData() { - - try { - if (Tmp_Lock.acquireLock(LOCK_TMP_NAME, WAIT_LOCK_TIME)) { - Serializable object = template.opsForList().rightPop(TMP_WORK); - return object != null ? (ChannelData) object : null; - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (Tmp_Lock != null) { - try { - Tmp_Lock.releaseLock(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - return null; - } - - /** - *
  • @Description:将任务信息添加到队列中,任务分为固定任务,临时任务,目前拉取数据的任务为固定任务 - * {@link ModelType#GET} - *
  • @param model - *
  • @return - *
  • 创建人:maliang - *
  • 创建时间:2017年3月13日 - *
  • 修改人: - *
  • 修改时间: - */ - public Boolean push(ChannelData data) { - if (template == null || data == null) - return Boolean.FALSE; - // 设置任务添加时间 - /* - * model.setTs(System.currentTimeMillis()); String modelType = - * model.getType(); if (ModelType.isGet(modelType)) { return - * this.pushFixationWork(model); } else { return - * this.pushTempWork(model); } - */ - boolean loop = data.getLoop(); - return loop ? this.pushFixationWork(data) : this.pushTempWork(data); - } - - /** - *
  • @Description:获取队列中的数据 - *

    - *

  • @param type - *
  • @return - *
  • 创建人:maliang - *
  • 创建时间:2017年3月13日 - *
  • 修改人: - *
  • 修改时间: - */ - public ChannelData pull(boolean loop) { - // boolean loop = data != null && data.getLoop(); - return loop ? this.pullFixationData() : this.pullTempData(); - /* - * if (ModelType.isGet(type)) { return this.pullFixationModel(); } else - * { return this.pullTempModel(); } - */ - } - - public RedisTemplate getTemplate() { - return template; - } - - public void setTemplate(RedisTemplate template) { - this.template = template; - } - -} diff --git a/core.sdk/src/main/java/com/lyms/sync/queue/SyncQueue.java b/core.sdk/src/main/java/com/lyms/sync/queue/SyncQueue.java new file mode 100644 index 0000000..d022e3c --- /dev/null +++ b/core.sdk/src/main/java/com/lyms/sync/queue/SyncQueue.java @@ -0,0 +1,198 @@ +package com.lyms.sync.queue; + +import java.io.Serializable; + +import org.springframework.data.redis.core.RedisTemplate; + +import com.lyms.spring.redis.operation.RedisLock; +import com.lyms.sync.channel.ChannelData; + +/** + *
  • @ClassName: SyncnQueue + *
  • @Description: 数据队列 + *
  • @author maliang + *
  • @date 2017年3月13日 + *
  • + */ +public class SyncQueue { + + public RedisTemplate template; + + /** + * 构建临时任务队列,固定任务队列锁 + *

    + * 区分锁,避免高并发下锁释放的错误 + */ + public static RedisLock Fix_Lock; + + public static RedisLock Tmp_Lock; + + /** + * 针对数据获取的时候添加锁 + */ + private static final String LOCK_TMP_NAME = "TMP_lock"; + + private static final String LOCK_FIXATION_NAME = "FIXATION_lock"; + + /** + * 等待获取锁的时间5 S + */ + private static final Long WAIT_LOCK_TIME = 5000L; + + /** + * 固定任务 + */ + private static final String FIXATION_WORK = "FIXATION_WORK"; + + /** + * 临时任务 + */ + private static final String TMP_WORK = "TMP_WORK"; + + /** + * 回执消息任务 + */ + @Deprecated + private static final String ACK_WORK = "ACK_WORK"; + + public SyncQueue(RedisTemplate template) { + this.template = template; + Fix_Lock = new RedisLock(template); + Tmp_Lock = new RedisLock(template); + } + + /** + *

  • @Description:固定任务 {@link ModelType#GET} + *

    + * 固定任务执行效率跟任务链表的长度有关系 + *

  • @param model + *
  • @return + *
  • 创建人:maliang + *
  • 创建时间:2017年3月13日 + *
  • 修改人: + *
  • 修改时间: + */ + private Boolean pushFixationWork(ChannelData data) { + Long tag = template.opsForList().leftPush(FIXATION_WORK, data); + return tag >= 1; + } + + private ChannelData pullFixationData() { + // 添加锁 + try { + if (Fix_Lock.acquireLock(LOCK_FIXATION_NAME, WAIT_LOCK_TIME)) { + ChannelData obj = (ChannelData) template.opsForList().rightPopAndLeftPush(FIXATION_WORK, FIXATION_WORK); + return obj; + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + if (Fix_Lock != null) { + Fix_Lock.releaseLock(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + return null; + + } + + /** + *
  • @Description:临时任务 + *

    + * {@link ModelType#ADD ModelType#UPDATE ModelType#DEL} + *

  • @param model + *
  • @return + *
  • 创建人:maliang + *
  • 创建时间:2017年3月13日 + *
  • 修改人: + *
  • 修改时间: + */ + private boolean pushTempWork(ChannelData data) { + Long tag = template.opsForList().leftPush(TMP_WORK, data); + return tag >= 1; + } + + /** + *
  • @Description:临时队列数据弹出 + *
  • @return + *
  • 创建人:maliang + *
  • 创建时间:2017年3月13日 + *
  • 修改人: + *
  • 修改时间: + */ + private ChannelData pullTempData() { + + try { + if (Tmp_Lock.acquireLock(LOCK_TMP_NAME, WAIT_LOCK_TIME)) { + Serializable object = template.opsForList().rightPop(TMP_WORK); + return object != null ? (ChannelData) object : null; + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (Tmp_Lock != null) { + try { + Tmp_Lock.releaseLock(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + return null; + } + + /** + *
  • @Description:将任务信息添加到队列中,任务分为固定任务,临时任务,目前拉取数据的任务为固定任务 + * {@link ModelType#GET} + *
  • @param model + *
  • @return + *
  • 创建人:maliang + *
  • 创建时间:2017年3月13日 + *
  • 修改人: + *
  • 修改时间: + */ + public Boolean push(ChannelData data) { + if (template == null || data == null) + return Boolean.FALSE; + // 设置任务添加时间 + /* + * model.setTs(System.currentTimeMillis()); String modelType = + * model.getType(); if (ModelType.isGet(modelType)) { return + * this.pushFixationWork(model); } else { return + * this.pushTempWork(model); } + */ + boolean loop = data.getLoop(); + return loop ? this.pushFixationWork(data) : this.pushTempWork(data); + } + + /** + *
  • @Description:获取队列中的数据 + *

    + *

  • @param type + *
  • @return + *
  • 创建人:maliang + *
  • 创建时间:2017年3月13日 + *
  • 修改人: + *
  • 修改时间: + */ + public ChannelData pull(boolean loop) { + // boolean loop = data != null && data.getLoop(); + return loop ? this.pullFixationData() : this.pullTempData(); + /* + * if (ModelType.isGet(type)) { return this.pullFixationModel(); } else + * { return this.pullTempModel(); } + */ + } + + public RedisTemplate getTemplate() { + return template; + } + + public void setTemplate(RedisTemplate template) { + this.template = template; + } + +} -- 1.8.3.1