template;
-
- /**
- * 构建临时任务队列,固定任务队列锁
- *
- * 区分锁,避免高并发下锁释放的错误
- */
- public static RedisLock Fix_Lock;
-
- public static RedisLock Tmp_Lock;
-
- /**
- * 针对数据获取的时候添加锁
- */
- private static final String LOCK_TMP_NAME = "TMP_lock";
-
- private static final String LOCK_FIXATION_NAME = "FIXATION_lock";
-
- /**
- * 等待获取锁的时间5 S
- */
- private static final Long WAIT_LOCK_TIME = 5000L;
-
- /**
- * 固定任务
- */
- private static final String FIXATION_WORK = "FIXATION_WORK";
-
- /**
- * 临时任务
- */
- private static final String TMP_WORK = "TMP_WORK";
-
- /**
- * 回执消息任务
- */
- @Deprecated
- private static final String ACK_WORK = "ACK_WORK";
-
- public SycnQueue(RedisTemplate template) {
- this.template = template;
- Fix_Lock = new RedisLock(template);
- Tmp_Lock = new RedisLock(template);
- }
-
- /**
- * @Description:固定任务 {@link ModelType#GET}
- *
- * 固定任务执行效率跟任务链表的长度有关系
- *
@param model
- * @return
- * 创建人:maliang
- * 创建时间:2017年3月13日
- * 修改人:
- * 修改时间:
- */
- private Boolean pushFixationWork(ChannelData data) {
- Long tag = template.opsForList().leftPush(FIXATION_WORK, data);
- return tag >= 1;
- }
-
- private ChannelData pullFixationData() {
- // 添加锁
- try {
- if (Fix_Lock.acquireLock(LOCK_FIXATION_NAME, WAIT_LOCK_TIME)) {
- ChannelData obj = (ChannelData) template.opsForList().rightPopAndLeftPush(FIXATION_WORK, FIXATION_WORK);
- return obj;
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- if (Fix_Lock != null) {
- Fix_Lock.releaseLock();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- return null;
-
- }
-
- /**
- * @Description:临时任务
- *
- * {@link ModelType#ADD ModelType#UPDATE ModelType#DEL}
- *
@param model
- * @return
- * 创建人:maliang
- * 创建时间:2017年3月13日
- * 修改人:
- * 修改时间:
- */
- private boolean pushTempWork(ChannelData data) {
- Long tag = template.opsForList().leftPush(TMP_WORK, data);
- return tag >= 1;
- }
-
- /**
- * @Description:临时队列数据弹出
- * @return
- * 创建人:maliang
- * 创建时间:2017年3月13日
- * 修改人:
- * 修改时间:
- */
- private ChannelData pullTempData() {
-
- try {
- if (Tmp_Lock.acquireLock(LOCK_TMP_NAME, WAIT_LOCK_TIME)) {
- Serializable object = template.opsForList().rightPop(TMP_WORK);
- return object != null ? (ChannelData) object : null;
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (Tmp_Lock != null) {
- try {
- Tmp_Lock.releaseLock();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- return null;
- }
-
- /**
- * @Description:将任务信息添加到队列中,任务分为固定任务,临时任务,目前拉取数据的任务为固定任务
- * {@link ModelType#GET}
- * @param model
- * @return
- * 创建人:maliang
- * 创建时间:2017年3月13日
- * 修改人:
- * 修改时间:
- */
- public Boolean push(ChannelData data) {
- if (template == null || data == null)
- return Boolean.FALSE;
- // 设置任务添加时间
- /*
- * model.setTs(System.currentTimeMillis()); String modelType =
- * model.getType(); if (ModelType.isGet(modelType)) { return
- * this.pushFixationWork(model); } else { return
- * this.pushTempWork(model); }
- */
- boolean loop = data.getLoop();
- return loop ? this.pushFixationWork(data) : this.pushTempWork(data);
- }
-
- /**
- * @Description:获取队列中的数据
- *
- *
@param type
- * @return
- * 创建人:maliang
- * 创建时间:2017年3月13日
- * 修改人:
- * 修改时间:
- */
- public ChannelData pull(boolean loop) {
- // boolean loop = data != null && data.getLoop();
- return loop ? this.pullFixationData() : this.pullTempData();
- /*
- * if (ModelType.isGet(type)) { return this.pullFixationModel(); } else
- * { return this.pullTempModel(); }
- */
- }
-
- public RedisTemplate getTemplate() {
- return template;
- }
-
- public void setTemplate(RedisTemplate template) {
- this.template = template;
- }
-
-}
diff --git a/core.sdk/src/main/java/com/lyms/sync/queue/SyncQueue.java b/core.sdk/src/main/java/com/lyms/sync/queue/SyncQueue.java
new file mode 100644
index 0000000..d022e3c
--- /dev/null
+++ b/core.sdk/src/main/java/com/lyms/sync/queue/SyncQueue.java
@@ -0,0 +1,198 @@
+package com.lyms.sync.queue;
+
+import java.io.Serializable;
+
+import org.springframework.data.redis.core.RedisTemplate;
+
+import com.lyms.spring.redis.operation.RedisLock;
+import com.lyms.sync.channel.ChannelData;
+
+/**
+ * @ClassName: SyncnQueue
+ * @Description: 数据队列
+ * @author maliang
+ * @date 2017年3月13日
+ *
+ */
+public class SyncQueue {
+
+ public RedisTemplate template;
+
+ /**
+ * 构建临时任务队列,固定任务队列锁
+ *
+ * 区分锁,避免高并发下锁释放的错误
+ */
+ public static RedisLock Fix_Lock;
+
+ public static RedisLock Tmp_Lock;
+
+ /**
+ * 针对数据获取的时候添加锁
+ */
+ private static final String LOCK_TMP_NAME = "TMP_lock";
+
+ private static final String LOCK_FIXATION_NAME = "FIXATION_lock";
+
+ /**
+ * 等待获取锁的时间5 S
+ */
+ private static final Long WAIT_LOCK_TIME = 5000L;
+
+ /**
+ * 固定任务
+ */
+ private static final String FIXATION_WORK = "FIXATION_WORK";
+
+ /**
+ * 临时任务
+ */
+ private static final String TMP_WORK = "TMP_WORK";
+
+ /**
+ * 回执消息任务
+ */
+ @Deprecated
+ private static final String ACK_WORK = "ACK_WORK";
+
+ public SyncQueue(RedisTemplate template) {
+ this.template = template;
+ Fix_Lock = new RedisLock(template);
+ Tmp_Lock = new RedisLock(template);
+ }
+
+ /**
+ * @Description:固定任务 {@link ModelType#GET}
+ *
+ * 固定任务执行效率跟任务链表的长度有关系
+ *
@param model
+ * @return
+ * 创建人:maliang
+ * 创建时间:2017年3月13日
+ * 修改人:
+ * 修改时间:
+ */
+ private Boolean pushFixationWork(ChannelData data) {
+ Long tag = template.opsForList().leftPush(FIXATION_WORK, data);
+ return tag >= 1;
+ }
+
+ private ChannelData pullFixationData() {
+ // 添加锁
+ try {
+ if (Fix_Lock.acquireLock(LOCK_FIXATION_NAME, WAIT_LOCK_TIME)) {
+ ChannelData obj = (ChannelData) template.opsForList().rightPopAndLeftPush(FIXATION_WORK, FIXATION_WORK);
+ return obj;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ if (Fix_Lock != null) {
+ Fix_Lock.releaseLock();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ return null;
+
+ }
+
+ /**
+ * @Description:临时任务
+ *
+ * {@link ModelType#ADD ModelType#UPDATE ModelType#DEL}
+ *
@param model
+ * @return
+ * 创建人:maliang
+ * 创建时间:2017年3月13日
+ * 修改人:
+ * 修改时间:
+ */
+ private boolean pushTempWork(ChannelData data) {
+ Long tag = template.opsForList().leftPush(TMP_WORK, data);
+ return tag >= 1;
+ }
+
+ /**
+ * @Description:临时队列数据弹出
+ * @return
+ * 创建人:maliang
+ * 创建时间:2017年3月13日
+ * 修改人:
+ * 修改时间:
+ */
+ private ChannelData pullTempData() {
+
+ try {
+ if (Tmp_Lock.acquireLock(LOCK_TMP_NAME, WAIT_LOCK_TIME)) {
+ Serializable object = template.opsForList().rightPop(TMP_WORK);
+ return object != null ? (ChannelData) object : null;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (Tmp_Lock != null) {
+ try {
+ Tmp_Lock.releaseLock();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @Description:将任务信息添加到队列中,任务分为固定任务,临时任务,目前拉取数据的任务为固定任务
+ * {@link ModelType#GET}
+ * @param model
+ * @return
+ * 创建人:maliang
+ * 创建时间:2017年3月13日
+ * 修改人:
+ * 修改时间:
+ */
+ public Boolean push(ChannelData data) {
+ if (template == null || data == null)
+ return Boolean.FALSE;
+ // 设置任务添加时间
+ /*
+ * model.setTs(System.currentTimeMillis()); String modelType =
+ * model.getType(); if (ModelType.isGet(modelType)) { return
+ * this.pushFixationWork(model); } else { return
+ * this.pushTempWork(model); }
+ */
+ boolean loop = data.getLoop();
+ return loop ? this.pushFixationWork(data) : this.pushTempWork(data);
+ }
+
+ /**
+ * @Description:获取队列中的数据
+ *
+ *
@param type
+ * @return
+ * 创建人:maliang
+ * 创建时间:2017年3月13日
+ * 修改人:
+ * 修改时间:
+ */
+ public ChannelData pull(boolean loop) {
+ // boolean loop = data != null && data.getLoop();
+ return loop ? this.pullFixationData() : this.pullTempData();
+ /*
+ * if (ModelType.isGet(type)) { return this.pullFixationModel(); } else
+ * { return this.pullTempModel(); }
+ */
+ }
+
+ public RedisTemplate getTemplate() {
+ return template;
+ }
+
+ public void setTemplate(RedisTemplate template) {
+ this.template = template;
+ }
+
+}