輕量級工作流引擎的設計與實現

一、什麼是工作流引擎

工作流引擎是驅動工作流執行的一套代碼。

至於什麼是工作流、爲什麼要有工作流、工作流的應用景,同學們可以看一看網上的資料,在此處不在展開。

二、爲什麼要重複造輪子

開源的工作流引擎很多,比如 activiti、flowable、Camunda 等,那麼,爲什麼沒有選它們呢?基於以下幾點考慮:

最重要的,滿足不了業務需求,一些特殊的場景無法實現。

有些需求實現起來比較繞,更有甚者,需要直接修改引擎數據庫,這對於引擎的穩定運行帶來了巨大的隱患,也對以後引擎的版本升級製造了一些困難。

資料、代碼量、API繁多,學習成本較高,維護性較差。

經過分析與評估,我們的業務場景需要的BPMN元素較少,開發實現的代價不大。

因此,重複造了輪子,其實,還有一個更深層次的戰略上的考慮,即:作爲科技公司,我們一定要有我們自己的核心底層技術!這樣,才能不受制於人(參考最近的芯片問題)。

三、怎麼造的輪子

對於一次學習型分享來講,過程比結果更重要,那些只說結果,不細說過程甚至不說的分享,我認爲是秀肌肉,而不是真正意義上的分享。因此,接下來,本文將重點描述造輪子的主要過程。

一個成熟的工作流引擎的構建是很複雜的,如何應對這種複雜性呢?一般來講,有以下三種方法:

確定性交付:弄清楚需求是什麼,驗收標準是什麼,最好能夠寫出測試用例,這一步是爲了明確目標。

迭代式開發:先從小的問題集的解決開始,逐步過渡到解決大的問題集上來,羅馬不是一天建成的,人也不是一天就能成熟的,是需要個過程的。

分而治之:把大的問題拆成小的問題,小問題的解決會推動大問題的解決(這個思想適用場景比較多,同學們可以用心體會和理解哈)。

如果按照上述方法,一步一步的詳細展開,那麼可能需要一本書。爲了縮減篇幅而又不失乾貨,本文會描述重點幾個迭代,進而闡述輕量級工作流引擎的設計與主要實現。

那麼,輕量級又是指什麼呢?這裡,主要是指以下幾點

少依賴:代碼的java實現上,除了jdk8以外,不依賴與其他第三方jar包,從而可以更好的減少依賴帶來的問題。

內核化:設計上,採用了微內核架構模式,內核小巧,實用,同時提供了一定的擴展性。從而可以更好地理解與應用本引擎。

輕規範:並沒有完全實現BPMN規範,也沒有完全按照BPMN規範進行設計,而只是參考了該規範,且只實現以一小部分必須實現的元素。從而降低了學習成本,可以按照需求自由發揮。

工具化:代碼上,只是一個工具(UTIL),不是一個應用程序。從而你可以簡單的運行它,擴展你自己的數據層、節點層,更加方便的集成到其他應用中去。

好,廢話說完了,開始第一個迭代......

四、Hello ProcessEngine

按照國際慣例,第一個迭代用來實現 hello world 。

1、需求

作爲一個流程管理員,我希望流程引擎可以運行如下圖所示的流程,以便我能夠配置流程來打印不同的字符串。

2、分析

第一個流程,可以打印Hello ProcessEngine,第二個流程可以打印ProcessEngine Hello,這兩個流程的區別是隻有順序不同,藍色的節點與紅色的節點的本身功能沒有發生變化

藍色的節點與紅色的節點都是節點,它們的功能是不一樣的,即:紅色的節點打印Hello,藍色的節點打印ProcessEngine

開始與結束節點是兩個特殊的節點,一個開始流程,一個結束流程

節點與節點之間是通過線來連接的,一個節點執行完畢後,是通過箭頭來確定下一個要執行的節點

需要一種表示流程的方式,或是XML、或是JSON、或是其他,而不是圖片

3、設計

(1)流程的表示

相較於JSON,XML的語義更豐富,可以表達更多的信息,因此這裡使用XML來對流程進行表示,如下所示

flow_1

flow_1

flow_2

flow_2

flow_3

flow_3

process表示一個流程

startEvent表示開始節點,endEvent表示結束節點

printHello表示打印hello節點,就是需求中的藍色節點

processEngine表示打印processEngine節點,就是需求中的紅色節點

sequenceFlow表示連線,從sourceRef開始,指向targetRef,例如:flow_3,表示一條從printProcessEngine_1到endEvent_1的連線。

(2)節點的表示

outgoing表示出邊,即節點執行完畢後,應該從那個邊出去。

incoming表示入邊,即從哪個邊進入到本節點。

一個節點只有outgoing而沒有incoming,如:startEvent,也可以 只有入邊而沒有出邊,如:endEvent,也可以既有入邊也有出邊,如:printHello、processEngine。

(3)流程引擎的邏輯

基於上述XML,流程引擎的運行邏輯如下

找到開始節點(startEvent)

找到startEvent的outgoing邊(sequenceFlow)

找到該邊(sequenceFlow)指向的節點(targetRef)

執行節點自身的邏輯

找到該節點的outgoing邊(sequenceFlow)

重複3-5,直到遇到結束節點(endEvent),流程結束

4、實現

首先要進行數據結構的設計,即:要把問題域中的信息映射到計算機中的數據。

可以看到,一個流程(PeProcess)由多個節點(PeNode)與邊(PeEdge)組成,節點有出邊(out)、入邊(in),邊有流入節點(from)、流出節點(to)。

具體的定義如下:

public class PeProcess { public String id; public PeNode start; public PeProcess(String id, PeNode start) { this.id = id; this.start = start; } } public class PeEdge { private String id; public PeNode from; public PeNode to; public PeEdge(String id) { this.id = id; } } public class PeNode { private String id; public String type; public PeEdge in; public PeEdge out; public PeNode(String id) { this.id=id; } }

PS : 爲了表述主要思想,在代碼上比較“奔放自由”,生產中不可直接複製粘貼!

接下來,構建流程圖,代碼如下:

public class XmlPeProcessBuilder { private String xmlStr; private final Map

id2PeNode = new HashMap

(); private final Map

id2PeEdge = new HashMap

(); public XmlPeProcessBuilder(String xmlStr) { this.xmlStr = xmlStr; } public PeProcess build() throws Exception { //strToNode : 把一段xml轉換爲org.w3c.dom.Node Node definations = XmlUtil.strToNode(xmlStr); //childByName : 找到definations子節點中nodeName爲process的那個Node Node process = XmlUtil.childByName(definations, "process"); NodeList childNodes = process.getChildNodes(); for (int j = 0; j

startEventEntry = id2PeNode.entrySet().stream().filter(entry -> "startEvent".equals(entry.getValue().type)).findFirst().get(); return new PeProcess(startEventEntry.getKey(), startEventEntry.getValue()); } private void buildPeEdge(Node node) { //attributeValue : 找到node節點上屬性爲id的值 PeEdge peEdge = id2PeEdge.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeEdge(id)); peEdge.from = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "sourceRef"), id -> new PeNode(id)); peEdge.to = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "targetRef"), id -> new PeNode(id)); } private void buildPeNode(Node node) { PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id)); peNode.type = node.getNodeName(); Node inPeEdgeNode = XmlUtil.childByName(node, "incoming"); if (inPeEdgeNode != null) //text : 得到inPeEdgeNode的nodeValue peNode.in = id2PeEdge.computeIfAbsent(XmlUtil.text(inPeEdgeNode), id -> new PeEdge(id)); Node outPeEdgeNode = XmlUtil.childByName(node, "outgoing"); if (outPeEdgeNode != null) peNode.out = id2PeEdge.computeIfAbsent(XmlUtil.text(outPeEdgeNode), id -> new PeEdge(id)); } }

"startEvent".equals(entry.getValue().type)).findFirst().get(); return new PeProcess(startEventEntry.getKey(), startEventEntry.getValue()); } private void buildPeEdge(Node node) { //attributeValue : 找到node節點上屬性爲id的值 PeEdge peEdge = id2PeEdge.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeEdge(id)); peEdge.from = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "sourceRef"), id -> new PeNode(id)); peEdge.to = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "targetRef"), id -> new PeNode(id)); } private void buildPeNode(Node node) { PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id)); peNode.type = node.getNodeName(); Node inPeEdgeNode = XmlUtil.childByName(node, "incoming"); if (inPeEdgeNode != null) //text : 得到inPeEdgeNode的nodeValue peNode.in = id2PeEdge.computeIfAbsent(XmlUtil.text(inPeEdgeNode), id -> new PeEdge(id)); Node outPeEdgeNode = XmlUtil.childByName(node, "outgoing"); if (outPeEdgeNode != null) peNode.out = id2PeEdge.computeIfAbsent(XmlUtil.text(outPeEdgeNode), id -> new PeEdge(id)); } }

new PeEdge(id)); peEdge.from = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "sourceRef"), id -> new PeNode(id)); peEdge.to = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "targetRef"), id -> new PeNode(id)); } private void buildPeNode(Node node) { PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id)); peNode.type = node.getNodeName(); Node inPeEdgeNode = XmlUtil.childByName(node, "incoming"); if (inPeEdgeNode != null) //text : 得到inPeEdgeNode的nodeValue peNode.in = id2PeEdge.computeIfAbsent(XmlUtil.text(inPeEdgeNode), id -> new PeEdge(id)); Node outPeEdgeNode = XmlUtil.childByName(node, "outgoing"); if (outPeEdgeNode != null) peNode.out = id2PeEdge.computeIfAbsent(XmlUtil.text(outPeEdgeNode), id -> new PeEdge(id)); } }

new PeNode(id)); peEdge.to = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "targetRef"), id -> new PeNode(id)); } private void buildPeNode(Node node) { PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id)); peNode.type = node.getNodeName(); Node inPeEdgeNode = XmlUtil.childByName(node, "incoming"); if (inPeEdgeNode != null) //text : 得到inPeEdgeNode的nodeValue peNode.in = id2PeEdge.computeIfAbsent(XmlUtil.text(inPeEdgeNode), id -> new PeEdge(id)); Node outPeEdgeNode = XmlUtil.childByName(node, "outgoing"); if (outPeEdgeNode != null) peNode.out = id2PeEdge.computeIfAbsent(XmlUtil.text(outPeEdgeNode), id -> new PeEdge(id)); } }

new PeNode(id)); } private void buildPeNode(Node node) { PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id)); peNode.type = node.getNodeName(); Node inPeEdgeNode = XmlUtil.childByName(node, "incoming"); if (inPeEdgeNode != null) //text : 得到inPeEdgeNode的nodeValue peNode.in = id2PeEdge.computeIfAbsent(XmlUtil.text(inPeEdgeNode), id -> new PeEdge(id)); Node outPeEdgeNode = XmlUtil.childByName(node, "outgoing"); if (outPeEdgeNode != null) peNode.out = id2PeEdge.computeIfAbsent(XmlUtil.text(outPeEdgeNode), id -> new PeEdge(id)); } }

new PeNode(id)); peNode.type = node.getNodeName(); Node inPeEdgeNode = XmlUtil.childByName(node, "incoming"); if (inPeEdgeNode != null) //text : 得到inPeEdgeNode的nodeValue peNode.in = id2PeEdge.computeIfAbsent(XmlUtil.text(inPeEdgeNode), id -> new PeEdge(id)); Node outPeEdgeNode = XmlUtil.childByName(node, "outgoing"); if (outPeEdgeNode != null) peNode.out = id2PeEdge.computeIfAbsent(XmlUtil.text(outPeEdgeNode), id -> new PeEdge(id)); } }

new PeEdge(id)); Node outPeEdgeNode = XmlUtil.childByName(node, "outgoing"); if (outPeEdgeNode != null) peNode.out = id2PeEdge.computeIfAbsent(XmlUtil.text(outPeEdgeNode), id -> new PeEdge(id)); } }

new PeEdge(id)); } }

接下來,實現流程引擎主邏輯,代碼如下:

public class ProcessEngine { private String xmlStr; public ProcessEngine(String xmlStr) { this.xmlStr = xmlStr; } public void run() throws Exception { PeProcess peProcess = new XmlPeProcessBuilder(xmlStr).build(); PeNode node = peProcess.start; while (!node.type.equals("endEvent")) { if ("printHello".equals(node.type)) System.out.print("Hello "); if ("printProcessEngine".equals(node.type)) System.out.print("ProcessEngine "); node = node.out.to; } } }

就這?工作流引擎就這?同學們可千萬不要這樣簡單理解啊,畢竟這還只是hello world而已,各種代碼量就已經不少了。

另外,這裡面還有很多可以改進的空間,比如異常控制、泛化、設計模式等,但畢竟只是一個hello world而已,其目的是方便同學理解,讓同學入門。

那麼,接下來呢,就要稍微貼近一些具體的實際應用場景了,我們繼續第二個迭代。

五、簡單審批

一般來講工作流引擎屬於底層技術,在它之上可以構建審批流、業務流、數據流等類型的應用,那麼接下啦就以實際中的簡單審批場景爲例,繼續深入工作流引擎的設計,好,我們開始。

1、需求

作爲一個流程管理員,我希望流程引擎可以運行如下圖所示的流程,以便我能夠配置流程來實現簡單的審批流。

例如:小張提交了一個申請單,然後經過經理審批,審批結束後,不管通過還是不通過,都會經過第三步把結果發送給小張。

2、分析

總體上來講,這個流程還是線性順序類的,基本上可以沿用上次迭代的部分設計

審批節點的耗時可能會比較長,甚至會達到幾天時間,工作流引擎主動式的調取下一個節點的邏輯並不適合此場景

隨着節點類型的增多,工作流引擎裡寫死的那部分節點類型自由邏輯也不合適

審批時需要申請單信息、審批人,結果郵件通知還需要審批結果等信息,這些信息如何傳遞也是一個要考慮的問題

3、設計

採用註冊機制,把節點類型及其自有邏輯註冊進工作流引擎,以便能夠擴展更多節點,使得工作流引擎與節點解耦

工作流引擎增加被動式驅動邏輯,使得能夠通過外部來使工作流引擎執行下一個節點

增加上下文語義,作爲全局變量來使用,使得數據能夠流經各個節點

4、實現

新的XML定義如下:

flow_1

flow_1

flow_2

flow_2

flow_3

flow_3

flow_4

flow_4

首先要有一個上下文對象類,用於傳遞變量的,定義如下:

public class PeContext { private Map

info = new ConcurrentHashMap

(); public Object getValue(String key) { return info.get(key); } public void putValue(String key, Object value) { info.put(key, value); } }

每個節點的處理邏輯是不一樣的,此處應該進行一定的抽象,爲了強調流程中節點的作用是邏輯處理,引入了一種新的類型--算子(Operator),定義如下:

public interface IOperator { //引擎可以據此來找到本算子 String getType(); //引擎調度本算子 void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext); }

對於引擎來講,當遇到一個節點時,需要調度之,但怎麼調度呢?首先需要各個節點算子註冊(registNodeProcessor())進來,這樣才能找到要調度的那個算子。

其次,引擎怎麼知道節點算子自有邏輯處理完了呢?一般來講,引擎是不知道的,只能是由算子告訴引擎,所以引擎要提供一個功能(nodeFinished()),這個功能由算子調用。

最後,把算子任務的調度和引擎的驅動解耦開來,放入不同的線程中。

修改後的ProcessEngine代碼如下:

public class ProcessEngine { private String xmlStr; //存儲算子 private Map

type2Operator = new ConcurrentHashMap

(); private PeProcess peProcess = null; private PeContext peContext = null; //任務數據暫存 public final BlockingQueue

arrayBlockingQueue = new LinkedBlockingQueue(); //任務調度線程 public final Thread dispatchThread = new Thread(() -> { while (true) { try { PeNode node = arrayBlockingQueue.take(); type2Operator.get(node.type).doTask(this, node, peContext); } catch (Exception e) { } } }); public ProcessEngine(String xmlStr) { this.xmlStr = xmlStr; } //算子註冊到引擎中,便於引擎調用之 public void registNodeProcessor(IOperator operator) { type2Operator.put(operator.getType(), operator); } public void start() throws Exception { peProcess = new XmlPeProcessBuilder(xmlStr).build(); peContext = new PeContext(); dispatchThread.setDaemon(true); dispatchThread.start(); executeNode(peProcess.start.out.to); } private void executeNode(PeNode node) { if (!node.type.equals("endEvent")) arrayBlockingQueue.add(node); else System.out.println("process finished!"); } public void nodeFinished(String peNodeID) { PeNode node = peProcess.peNodeWithID(peNodeID); executeNode(node.out.to); } }

{ while (true) { try { PeNode node = arrayBlockingQueue.take(); type2Operator.get(node.type).doTask(this, node, peContext); } catch (Exception e) { } } }); public ProcessEngine(String xmlStr) { this.xmlStr = xmlStr; } //算子註冊到引擎中,便於引擎調用之 public void registNodeProcessor(IOperator operator) { type2Operator.put(operator.getType(), operator); } public void start() throws Exception { peProcess = new XmlPeProcessBuilder(xmlStr).build(); peContext = new PeContext(); dispatchThread.setDaemon(true); dispatchThread.start(); executeNode(peProcess.start.out.to); } private void executeNode(PeNode node) { if (!node.type.equals("endEvent")) arrayBlockingQueue.add(node); else System.out.println("process finished!"); } public void nodeFinished(String peNodeID) { PeNode node = peProcess.peNodeWithID(peNodeID); executeNode(node.out.to); } }

接下來,簡單(簡陋)實現本示例所需的三個算子,代碼如下:

/** * 提交申請單 */ public class OperatorOfApprovalApply implements IOperator { @Override public String getType() { return "approvalApply"; } @Override public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) { peContext.putValue("form", "formInfo"); peContext.putValue("applicant", "小張"); processEngine.nodeFinished(node.id); } } /** * 審批 */ public class OperatorOfApproval implements IOperator { @Override public String getType() { return "approval"; } @Override public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) { peContext.putValue("approver", "經理"); peContext.putValue("message", "審批通過"); processEngine.nodeFinished(node.id); } } /** * 結果郵件通知 */ public class OperatorOfNotify implements IOperator { @Override public String getType() { return "notify"; } @Override public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) { System.out.println(String.format("%s 提交的申請單 %s 被 %s 審批,結果爲 %s", peContext.getValue("applicant"), peContext.getValue("form"), peContext.getValue("approver"), peContext.getValue("message"))); processEngine.nodeFinished(node.id); } }

運行一下,看看結果如何,代碼如下:

public class ProcessEngineTest { @Test public void testRun() throws Exception { //讀取文件內容到字符串 String modelStr = Tools.readResoucesFile("model/two/hello.xml"); ProcessEngine processEngine = new ProcessEngine(modelStr); processEngine.registNodeProcessor(new OperatorOfApproval()); processEngine.registNodeProcessor(new OperatorOfApprovalApply()); processEngine.registNodeProcessor(new OperatorOfNotify()); processEngine.start(); Thread.sleep(1000 * 1); } }

小張 提交的申請單 formInfo 被 經理 審批,結果爲 審批通過 process finished!

到此,輕量級工作流引擎的核心邏輯介紹的差不多了,然而,只支持順序結構是太單薄的,我們知道,程序流程的三種基本結構爲順序、分支、循環,有了這三種結構,基本上就可以表示絕大多數流程邏輯。循環可以看做一種組合結構,即:循環可以由順序與分支推導出來,我們已經實現了順序,那麼接下來只要實現分支即可,而分支有很多類型,如:二選一、N選一、N選M(1

六、一般審批

作爲一個流程管理員,我希望流程引擎可以運行如下圖所示的流程,以便我能夠配置流程來實現一般的審批流。

例如:小張提交了一個申請單,然後經過經理審批,審批結束後,如果通過,發郵件通知,不通過,則打回重寫填寫申請單,直到通過爲止。

1、分析

需要引入一種分支節點,可以進行簡單的二選一流轉

節點的入邊、出邊不只一條

需要一種邏輯表達式語義,可以配置分支節點

2、設計

節點要支持多入邊、多出邊

節點算子來決定從哪個出邊出

使用一種簡單的規則引擎,支持簡單的邏輯表達式的解析

簡單分支節點的XML定義

3、實現

新的XML定義如下:

flow_1

flow_1

flow_5

flow_2

flow_2

flow_3

flow_4

approvalResult

flow_3

flow_4

flow_5

flow_4

flow_6

flow_6

其中,加入了simpleGateway這個簡單分支節點,用於表示簡單的二選一分支,當expr中的表達式爲真時,走trueOutGoing中的出邊,否則走另一個出邊。

節點支持多入邊、多出邊,修改後的PeNode如下:

public class PeNode { public String id; public String type; public List

in = new ArrayList

(); public List

out = new ArrayList

(); public Node xmlNode; public PeNode(String id) { this.id = id; } public PeEdge onlyOneOut() { return out.get(0); } public PeEdge outWithID(String nextPeEdgeID) { return out.stream().filter(e -> e.id.equals(nextPeEdgeID)).findFirst().get(); } public PeEdge outWithOutID(String nextPeEdgeID) { return out.stream().filter(e -> !e.id.equals(nextPeEdgeID)).findFirst().get(); } }

e.id.equals(nextPeEdgeID)).findFirst().get(); } public PeEdge outWithOutID(String nextPeEdgeID) { return out.stream().filter(e -> !e.id.equals(nextPeEdgeID)).findFirst().get(); } }

!e.id.equals(nextPeEdgeID)).findFirst().get(); } }

以前只有一個出邊時,是由當前節點來決定下一節點的,現在多出邊了,該由邊來決定下一個節點是什麼,修改後的流程引擎代碼如下:

public class ProcessEngine { private String xmlStr; //存儲算子 private Map

type2Operator = new ConcurrentHashMap

(); private PeProcess peProcess = null; private PeContext peContext = null; //任務數據暫存 public final BlockingQueue

arrayBlockingQueue = new LinkedBlockingQueue(); //任務調度線程 public final Thread dispatchThread = new Thread(() -> { while (true) { try { PeNode node = arrayBlockingQueue.take(); type2Operator.get(node.type).doTask(this, node, peContext); } catch (Exception e) { e.printStackTrace(); } } }); public ProcessEngine(String xmlStr) { this.xmlStr = xmlStr; } //算子註冊到引擎中,便於引擎調用之 public void registNodeProcessor(IOperator operator) { type2Operator.put(operator.getType(), operator); } public void start() throws Exception { peProcess = new XmlPeProcessBuilder(xmlStr).build(); peContext = new PeContext(); dispatchThread.setDaemon(true); dispatchThread.start(); executeNode(peProcess.start.onlyOneOut().to); } private void executeNode(PeNode node) { if (!node.type.equals("endEvent")) arrayBlockingQueue.add(node); else System.out.println("process finished!"); } public void nodeFinished(PeEdge nextPeEdgeID) { executeNode(nextPeEdgeID.to); } }

{ while (true) { try { PeNode node = arrayBlockingQueue.take(); type2Operator.get(node.type).doTask(this, node, peContext); } catch (Exception e) { e.printStackTrace(); } } }); public ProcessEngine(String xmlStr) { this.xmlStr = xmlStr; } //算子註冊到引擎中,便於引擎調用之 public void registNodeProcessor(IOperator operator) { type2Operator.put(operator.getType(), operator); } public void start() throws Exception { peProcess = new XmlPeProcessBuilder(xmlStr).build(); peContext = new PeContext(); dispatchThread.setDaemon(true); dispatchThread.start(); executeNode(peProcess.start.onlyOneOut().to); } private void executeNode(PeNode node) { if (!node.type.equals("endEvent")) arrayBlockingQueue.add(node); else System.out.println("process finished!"); } public void nodeFinished(PeEdge nextPeEdgeID) { executeNode(nextPeEdgeID.to); } }

新加入的simpleGateway節點算子如下:

/** * 簡單是非判斷 */ public class OperatorOfSimpleGateway implements IOperator { @Override public String getType() { return "simpleGateway"; } @Override public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) { ScriptEngineManager manager = new ScriptEngineManager(); ScriptEngine engine = manager.getEngineByName("js"); engine.put("approvalResult", peContext.getValue("approvalResult")); String expression = XmlUtil.childTextByName(node.xmlNode, "expr"); String trueOutGoingEdgeID = XmlUtil.childTextByName(node.xmlNode, "trueOutGoing"); PeEdge outPeEdge = null; try { outPeEdge = (Boolean) engine.eval(expression) ? node.outWithID(trueOutGoingEdgeID) : node.outWithOutID(trueOutGoingEdgeID); } catch (ScriptException e) { e.printStackTrace(); } processEngine.nodeFinished(outPeEdge); } }

其中簡單使用了js腳本作爲表達式,當然其中的弊端這裡就不展開了。

爲了方便同學們CC+CV,其他發生相應變化的代碼如下:

/** * 審批 */ public class OperatorOfApproval implements IOperator { @Override public String getType() { return "approval"; } @Override public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) { peContext.putValue("approver", "經理"); Integer price = (Integer) peContext.getValue("price"); //價格

id2PeNode = new HashMap

(); private final Map

id2PeEdge = new HashMap

(); public XmlPeProcessBuilder(String xmlStr) { this.xmlStr = xmlStr; } public PeProcess build() throws Exception { //strToNode : 把一段xml轉換爲org.w3c.dom.Node Node definations = XmlUtil.strToNode(xmlStr); //childByName : 找到definations子節點中nodeName爲process的那個Node Node process = XmlUtil.childByName(definations, "process"); NodeList childNodes = process.getChildNodes(); for (int j = 0; j

startEventEntry = id2PeNode.entrySet().stream().filter(entry -> "startEvent".equals(entry.getValue().type)).findFirst().get(); return new PeProcess(startEventEntry.getKey(), startEventEntry.getValue()); } private void buildPeEdge(Node node) { //attributeValue : 找到node節點上屬性爲id的值 PeEdge peEdge = id2PeEdge.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeEdge(id)); peEdge.from = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "sourceRef"), id -> new PeNode(id)); peEdge.to = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "targetRef"), id -> new PeNode(id)); } private void buildPeNode(Node node) { PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id)); peNode.type = node.getNodeName(); peNode.xmlNode = node; List

"startEvent".equals(entry.getValue().type)).findFirst().get(); return new PeProcess(startEventEntry.getKey(), startEventEntry.getValue()); } private void buildPeEdge(Node node) { //attributeValue : 找到node節點上屬性爲id的值 PeEdge peEdge = id2PeEdge.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeEdge(id)); peEdge.from = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "sourceRef"), id -> new PeNode(id)); peEdge.to = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "targetRef"), id -> new PeNode(id)); } private void buildPeNode(Node node) { PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id)); peNode.type = node.getNodeName(); peNode.xmlNode = node; List

new PeEdge(id)); peEdge.from = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "sourceRef"), id -> new PeNode(id)); peEdge.to = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "targetRef"), id -> new PeNode(id)); } private void buildPeNode(Node node) { PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id)); peNode.type = node.getNodeName(); peNode.xmlNode = node; List

new PeNode(id)); peEdge.to = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "targetRef"), id -> new PeNode(id)); } private void buildPeNode(Node node) { PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id)); peNode.type = node.getNodeName(); peNode.xmlNode = node; List

new PeNode(id)); } private void buildPeNode(Node node) { PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id)); peNode.type = node.getNodeName(); peNode.xmlNode = node; List

new PeNode(id)); peNode.type = node.getNodeName(); peNode.xmlNode = node; List

inPeEdgeNodes = XmlUtil.childsByName(node, "incoming"); inPeEdgeNodes.stream().forEach(n -> peNode.in.add(id2PeEdge.computeIfAbsent(XmlUtil.text(n), id -> new PeEdge(id)))); List

peNode.in.add(id2PeEdge.computeIfAbsent(XmlUtil.text(n), id -> new PeEdge(id)))); List

new PeEdge(id)))); List

outPeEdgeNodes = XmlUtil.childsByName(node, "outgoing"); outPeEdgeNodes.stream().forEach(n -> peNode.out.add(id2PeEdge.computeIfAbsent(XmlUtil.text(n), id -> new PeEdge(id)))); } }

peNode.out.add(id2PeEdge.computeIfAbsent(XmlUtil.text(n), id -> new PeEdge(id)))); } }

new PeEdge(id)))); } }

運行一下,看看結果如何,代碼如下:

public class ProcessEngineTest { @Test public void testRun() throws Exception { //讀取文件內容到字符串 String modelStr = Tools.readResoucesFile("model/third/hello.xml"); ProcessEngine processEngine = new ProcessEngine(modelStr); processEngine.registNodeProcessor(new OperatorOfApproval()); processEngine.registNodeProcessor(new OperatorOfApprovalApply()); processEngine.registNodeProcessor(new OperatorOfNotify()); processEngine.registNodeProcessor(new OperatorOfSimpleGateway()); processEngine.start(); Thread.sleep(1000 * 1); } }

approvalResult :false,price : 400 approvalResult :false,price : 300 approvalResult :true,price : 200 小張 提交的申請單 200 被 經理 審批,結果爲 true process finished!

至此,本需求實現完畢,除了直接實現了分支語義外,我們看到,這裡還間接實現了循環語義。

作爲一個輕量級的工作流引擎,到此就基本講完了,接下來,我們做一下總結與展望。

七、總結與展望

經過以上三個迭代,我們可以得到一個相對穩定的工作流引擎的結構,如下圖所示:

通過此圖我們可知,這裡有一個相對穩定的引擎層,同時爲了提供擴展性,提供了一個節點算子層,所有的節點算子的新增都在此處中。

此外,進行了一定程度的控制反轉,即:由算子決定下一步走哪裡,而不是引擎。這樣,極大地提高了引擎的靈活性,更好的進行了封裝。

最後,使用了上下文,提供了一種全局變量的機制,便於節點之間的數據流動。

當然,以上的三個迭代距離實際的線上應用場景相距甚遠,還需實現與展望以下幾點纔可,如下:

一些異常情況的考慮與設計

應把節點抽象成一個函數,要有入參、出參,數據類型等

關鍵的地方加入埋點,用以控制引擎或吐出事件

圖的語義合法性檢查,xsd、自定義檢查技術等

圖的dag算法檢測

流程的流程歷史記錄,及回滾到任意節點

流程圖的動態修改,即:可以在流程開始後,對流程圖進行修改

併發修改情況下的考慮

效率上的考慮

防止重啓後流轉信息丟失,需要持久化機制的加入

流程的取消、重置、變量傳入等

更合適的規則引擎及多種規則引擎的實現、配置

前端的畫布、前後端流程數據結構定義及轉換

作者:劉洋