// 「获取操作」伪代码While(synchronization state does not allow acquire) { // * 骨架扩展点 enqueue current thread if not already queued; // 线程结点入队 possibly block current thread; // 阻塞当前线程}dequeue current thread if it was queued; // 线程结点出队
// 「释放操作」伪代码update synchronization state // * 骨架扩展点if (state may permit a blocked thread to acquire) { // * 骨架扩展点 unblock one or more queued threads; // 唤起被阻塞的线程}
// AQS acquire/release 操作算法骨架代码public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 同步状态 synchronization state private volatile int state;
// 排他式「获取操作」 public final void acquire(int arg) { if (!tryAcquire(arg) && // * 骨架扩展点 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 线程结点入队 selfInterrupt(); } // 针对已入队线程结点的排他式「获取操作」 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // * 骨架扩展点 setHead(node); // 线程结点出队(队列head为哑结点) p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 阻塞当前线程 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
// 排他式「释放操作」 public final boolean release(int arg) { if (tryRelease(arg)) { // * 骨架扩展点 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 唤起被阻塞的线程 return true; } return false; }
// * 排他式「获取操作」骨架扩展点 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
// * 排他式「释放操作」骨架扩展点 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
}
public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>, BeanClassLoaderAware { // 管道模式命令执行,RedisCallback @Override public List<Object> executePipelined(RedisCallback> action, @Nullable RedisSerializer> resultSerializer) {
return execute((RedisCallback<List<Object>>) connection -> { connection.openPipeline(); // * 扩展点:开启管道模式 boolean pipelinedClosed = false; try { Object result = action.doInRedis(connection); // * 扩展点:回调执行用户自定义操作 if (result != null) { throw new InvalidDataAccessApiUsageException( "Callback cannot return a non-null value as it gets overwritten by the pipeline"); } List<Object> closePipeline = connection.closePipeline(); // * 扩展点:关闭管道模式 pipelinedClosed = true; return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer); } finally { if (!pipelinedClosed) { connection.closePipeline(); } } }); } // 事务+管道模式命令执行 @Override public List<Object> executePipelined(SessionCallback> session, @Nullable RedisSerializer> resultSerializer) { // 具体代码省略 } }
类似地,在数农WMS的库存操作重构中,我们定义了ContainerInventoryOperationTemplate「模板类」,作为承载库存操作业务逻辑的框架。下述为其中的库存操作核心代码片段。可以看到,框架统一定义了库存操作流程,并对其中的通用逻辑提供了支持,使各类不同的库存操作得以复用:如构建库存操作明细、持久化操作明细及同步任务、并发冲突重试等;而对于其中随不同库存操作类型变动的逻辑 —— 如操作库存数据、确认前置操作、持久化库存数据等 —— 则通过对ContainerInventoryOperationHandler接口实例的回调实现,它们可以被看作是库存操作框架代码中的扩展点。接口由不同类型的库存操作分别实现,如库存增加、库存占用、库存转移、库存释放等等。如此,如果我们后续需要添加某种新类型的库存操作,只需要实现ContainerInventoryOperationHandler接口中定义的个性化逻辑即可;而如果我们需要对整个库存操作流程进行迭代,也只需要修改ContainerInventoryOperationTemplate中的框架代码,而不是像先前那样,需要同时修改多处代码(这里模板类和库存操作handler的命名均以Container作为前缀,是因为数农WMS以容器托盘作为基本的库存管理单元)。
public class ContainerInventoryOperationTemplate { private Boolean doOperateInTransaction(OperationContext context) { final Boolean transactionSuccess = transactionTemplate.execute(transactionStatus -> { try { ContainerInventoryOperationHandler handler = context.getHandler(); // 库存操作回调handler handler.getAndCheckCurrentInventory(context); // 获取并校验库存数据 buildInventoryDetail(context); // 构建库存操作明细 handler.operateInventory(context); // * 扩展点:操作库存数据 handler.confirmPreOperationIfNecessary(context); // * 扩展点:确认前置操作(如库存占用) handler.persistInventoryOperation(context); // * 扩展点:持久化库存数据 persistInventoryDetailAndSyncTask(context); // 持久化操作明细及同步任务 doSyncOperationIfNecessary(context); // 库存同步操作 return Boolean.TRUE; } catch (WhException we) { context.setWhException(we); // 遇到并发冲突异常,需要重试 if (Objects.equals(we.getErrorCode(), ErrorCodeEnum.CAS_SAVE_ERROR.getCode())) { context.setCanRetry(true); } } // 省略部分代码 transactionStatus.setRollbackOnly(); return Boolean.FALSE; }); // 省略部分代码 return transactionSuccess; } }
// 库存增加并占用public class IncreaseAndOccupyOperationHandler implements ContainerInventoryOperationHandler {
private IncreaseOperationHandler increaseOperationHandler; // 组合「库存增加」操作handler
private OccupyOperationHandler occupyOperationHandler; // 组合「库存占用」操作handler
// 委托「库存占用」操作handler进行前置操作校验,判断是否单据占用已存在 public void checkPreOperationIfNecessary(ContainerInventoryOperationTemplate.OperationContext context) { occupyOperationHandler.checkPreOperationIfNecessary(context); } // 委托「库存增加」操作handler进行库存信息校验 public void getAndCheckCurrentInventory(ContainerInventoryOperationTemplate.OperationContext context) { increaseOperationHandler.getAndCheckCurrentInventory(context); } // 委托「库存增加」、「库存占用」操作handler进行「库存增加并占用」操作 public void operateInventory(ContainerInventoryOperationTemplate.OperationContext context) { increaseOperationHandler.operateInventory(context); occupyOperationHandler.operateInventory(context); } // 其余代码略
}
代码重构并且总结成文的过程要求不断地学习、思辨和实践,也让自己获益良多。
点击阅读原文查看详情!