<?xml version="1.0" encoding="UTF-8"?><rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:media="http://search.yahoo.com/mrss/"><channel><title><![CDATA[codedependant]]></title><description><![CDATA[I write codez. I share codez. I complain about codez]]></description><link>http://www.codedependant.net/</link><image><url>http://www.codedependant.net/favicon.png</url><title>codedependant</title><link>http://www.codedependant.net/</link></image><generator>Ghost 4.9</generator><lastBuildDate>Tue, 14 Apr 2026 16:50:24 GMT</lastBuildDate><atom:link href="http://www.codedependant.net/rss/" rel="self" type="application/rss+xml"/><ttl>60</ttl><item><title><![CDATA[A Herd Of Rabbits Part 2: RabbitMQ Data Pipelines]]></title><description><![CDATA[A look at how we used Node.js streams and RabbitMQ topic routing to build real time streaming data pipelines.]]></description><link>http://www.codedependant.net/2020/04/01/a-herd-of-rabbits-part-2-rabbitmq-data-pipelines/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa69</guid><category><![CDATA[herd-of-rabbits]]></category><category><![CDATA[node.js]]></category><category><![CDATA[rabbitmq]]></category><category><![CDATA[postgres]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Wed, 01 Apr 2020 00:21:01 GMT</pubDate><media:content url="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/herd-of-rabbits-med-1.png" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: html--><div class="first-letter"><span>R</span></div>
<img src="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/herd-of-rabbits-med-1.png" alt="A Herd Of Rabbits Part 2: RabbitMQ Data Pipelines"><p>
	abbitMQ is a powerful message borker allowing engineers to implement complex messaging topologies with relative ease. At the day job we used RabbitMQ as the backbone of our real time data infrastructure. In the <a href="http://www.codedependant.net/2020/03/27/heard-of-rabbits-1-postgres-change-data-capture-and-rabbitmq/" target="_blank">previous post</a> we setup a simple PostgreSQL trigger to send change capture messages to a RabbitMQ exchange. Conceptually, this is where we left off:
</p><!--kg-card-end: html--><figure class="kg-card kg-image-card kg-width-wide"><img src="https://s3.amazonaws.com/codedependant-blog/content/images/2020/04/postgres-amqp.png" class="kg-image" alt="A Herd Of Rabbits Part 2: RabbitMQ Data Pipelines" loading="lazy"></figure><p>In this early stage, we basically have a fire-hose that we can selectively tap into. But we have no way to control the flow of data. </p><p>To recap a bit before we get too deep, we had a simple and manual way of handling real time operations. Effectively, we just baked all of the logic in the specific application code path. This became complex and difficult to manage. We wanted to automate as much as possible and stop doing it manually. </p><!--kg-card-begin: html--><img src="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/leaky-hose.jpg" alt="A Herd Of Rabbits Part 2: RabbitMQ Data Pipelines" width="50%" class="fr p_all-4"><!--kg-card-end: html--><p>We had 3 primary goals:</p><ol><li>Continue sending writes to PG + not manage &quot;real-time&quot; inline</li><li>Capture and funnel changes through a single point</li><li>Apply various actions in response to the changes</li></ol><p></p><p>Staying with the <code>articles</code> theme of the previous post we can talk in terms of some basic business rules</p><ul><li>Anytime someone adds &#xA0;or modifies an article we want to cache its current state</li><li>Anytime someone adds or modifies an article we want to push a websocket message</li><li>Anytime someone adds a comment to an article, we want to push a websocket message</li><li>When an article goes live the first time, we want to send some data to a reporting system.</li></ul><!--kg-card-begin: markdown--><table>
<thead>
<tr>
<th style="text-align:center">RabbitMQ</th>
<th style="text-align:center">Kafka</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:center">Smart Server + Dumb Consumers</td>
<td style="text-align:center">Dumb Server + Smart Consumers</td>
</tr>
<tr>
<td style="text-align:center">No long term persistence</td>
<td style="text-align:center">Messages stored indefinitely</td>
</tr>
</tbody>
</table>
<!--kg-card-end: markdown--><h4 id="topic-routing">Topic Routing</h4><p>RabbitMQ takes the Smart Server / Dumb Client approach. Most of your interactions with rabbit are with the server. Creating resources, configuring them, stitching them together, etc. It has two basic constructs, <code>exchange</code> and <code>queue</code> and provides a way to bind them together in interesting ways.</p><p>Exchanges allow for complex message routing patterns like pub-sub, pattern matching and point-to-point. It is not uncommon to see rather elaborate server setups. RabbitMQ clients read messages from a queue, but it is possible for a message to arrive at a queue from multiple sources. RabbitMQ server do not have any other dependencies. </p><p>Think of an exchange as a post office, and a queue as a mailbox. In the case of a topic exchange, a routing pattern can be used to define how messages are routed to queues based on a <code>routing key</code>. Routing keys are words separated by a period ( <code>.</code> ). They can also contain positional wildcards ( <code>*</code> ) or a pound ( <code>#</code> ) which will match zero or more words</p><figure class="kg-card kg-image-card"><img src="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/topic-exchange.png" class="kg-image" alt="A Herd Of Rabbits Part 2: RabbitMQ Data Pipelines" loading="lazy"></figure><h4></h4><p>For our purposes, this results in having to write a lot less code. Most of what we are going to be building sits on top of RabbitMQ <a href="https://www.rabbitmq.com/tutorials/tutorial-five-python.html">topic routing</a>. In a nutshell, topic routing allows us to route a single message through multiple exchanges and arrive at multiple queues for consumers to handle. &#xA0;In our setup, PostgreSQL publishes everything the single <code>pg-messaging</code> exchange, and all we need to do is define how things get routed around and what needs to happen when a consumer receives a message.</p><h4 id="realtime-engine">Realtime Engine</h4><p>Internally, we referred to the system that defined this logic as the Realtime Engine (RTE). &#xA0;It is collection of &quot;pipeline&quot; instances. Each pipeline is a RabbitMQ connection that holds one or more node.js streams housed in a <a href="https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback">Node.js Pipeline</a> instance. The pipeline receives a message from a RabbitMQ Queue, and sends it through its streams. </p><!--kg-card-begin: html--><svg xmlns="http://www.w3.org/2000/svg" id="mermaid-1584820824894" width="100%" height="400" viewbox="-10 -10 288.6640625 289.40625">
<!--
    https://mermaid-js.github.io/mermaid-live-editor/#/edit/eyJjb2RlIjoiY2xhc3NEaWFncmFtXG5cdEFNUVBDb25uZWN0aW9uIDx8LS0gUlRFUGlwZWxpbmVcblx0QU1RUENvbm5lY3Rpb24gOiArQXJyYXkgcXVldWVzXG5cdEFNUVBDb25uZWN0aW9uOiArY29ubmVjdCgpXG5cdEFNUVBDb25uZWN0aW9uOiArY29uc3VtZSgpXG4gIEFNUVBDb25uZWN0aW9uOiArY2xvc2UoKVxuICBBTVFQQ29ubmVjdGlvbjogK3NodXRkb3duKClcbiAgQU1RUENvbm5lY3Rpb246ICthc3NlcnRFeGNoYW5nZSgpXG4gIEFNUVBDb25uZWN0aW9uOiArYXNzZXJ0UXVldWUoKVxuICBBTVFQQ29ubmVjdGlvbjogK3B1Ymxpc2goKVxuXHRjbGFzcyBSVEVQaXBlbGluZXtcblx0XHQrU3RyaW5nIG5hbWVcbiAgICArVHJhbnNmb3JtU3RyZWFtW10gc3RyZWFtc1xuXHRcdCtzdGFydCgpXG5cdFx0K2VuZCgpXG4gICAgK3dyaXRlKClcbiAgICArb25GaW5pc2goKVxuXHR9XG4gIGNsYXNzIFRyYW5zZm9ybVN0cmVhbSB7XG4gICAgK2Jvb2xlYW4gcmVhZGFibGVcbiAgICArYm9vbGVhbiB3cml0YWJsZVxuICAgICtfdHJhbnNmb3JtKClcbiAgfVxuXHRcdFx0XHRcdCIsIm1lcm1haWQiOnsidGhlbWUiOiJmb3Jlc3QifSwidXBkYXRlRWRpdG9yIjpmYWxzZX0
-->
<style>



#mermaid-1584820824894 .label {
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family);
  color: #333; }

#mermaid-1584820824894 .label text {
  fill: #333; }

#mermaid-1584820824894 .node rect,
#mermaid-1584820824894 .node circle,
#mermaid-1584820824894 .node ellipse,
#mermaid-1584820824894 .node polygon,
#mermaid-1584820824894 .node path {
  fill: #cde498;
  stroke: #13540c;
  stroke-width: 1px; }

#mermaid-1584820824894 .node .label {
  text-align: center; }

#mermaid-1584820824894 .node.clickable {
  cursor: pointer; }

#mermaid-1584820824894 .arrowheadPath {
  fill: green; }

#mermaid-1584820824894 .edgePath .path {
  stroke: green;
  stroke-width: 1.5px; }

#mermaid-1584820824894 .edgeLabel {
  background-color: #e8e8e8;
  text-align: center; }

#mermaid-1584820824894 .cluster rect {
  fill: #cdffb2;
  stroke: #6eaa49;
  stroke-width: 1px; }

#mermaid-1584820824894 .cluster text {
  fill: #333; }

#mermaid-1584820824894 div.mermaidTooltip {
  position: absolute;
  text-align: center;
  max-width: 200px;
  padding: 2px;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family);
  font-size: 12px;
  background: #cdffb2;
  border: 1px solid #6eaa49;
  border-radius: 2px;
  pointer-events: none;
  z-index: 100; }

#mermaid-1584820824894 .actor {
  stroke: #13540c;
  fill: #cde498; }

#mermaid-1584820824894 text.actor {
  fill: black;
  stroke: none; }

#mermaid-1584820824894 .actor-line {
  stroke: grey; }

#mermaid-1584820824894 .messageLine0 {
  stroke-width: 1.5;
  stroke-dasharray: '2 2';
  stroke: #333; }

#mermaid-1584820824894 .messageLine1 {
  stroke-width: 1.5;
  stroke-dasharray: '2 2';
  stroke: #333; }

#mermaid-1584820824894 #arrowhead {
  fill: #333; }

#mermaid-1584820824894 .sequenceNumber {
  fill: white; }

#mermaid-1584820824894 #sequencenumber {
  fill: #333; }

#mermaid-1584820824894 #crosshead path {
  fill: #333 !important;
  stroke: #333 !important; }

#mermaid-1584820824894 .messageText {
  fill: #333;
  stroke: none; }

#mermaid-1584820824894 .labelBox {
  stroke: #326932;
  fill: #cde498; }

#mermaid-1584820824894 .labelText {
  fill: black;
  stroke: none; }

#mermaid-1584820824894 .loopText {
  fill: black;
  stroke: none; }

#mermaid-1584820824894 .loopLine {
  stroke-width: 2;
  stroke-dasharray: '2 2';
  stroke: #326932; }

#mermaid-1584820824894 .note {
  stroke: #6eaa49;
  fill: #fff5ad; }

#mermaid-1584820824894 .noteText {
  fill: black;
  stroke: none;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family);
  font-size: 14px; }

#mermaid-1584820824894 .activation0 {
  fill: #f4f4f4;
  stroke: #666; }

#mermaid-1584820824894 .activation1 {
  fill: #f4f4f4;
  stroke: #666; }

#mermaid-1584820824894 .activation2 {
  fill: #f4f4f4;
  stroke: #666; }


#mermaid-1584820824894 .mermaid-main-font {
  font-family: "trebuchet ms", verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1584820824894 .section {
  stroke: none;
  opacity: 0.2; }

#mermaid-1584820824894 .section0 {
  fill: #6eaa49; }

#mermaid-1584820824894 .section2 {
  fill: #6eaa49; }

#mermaid-1584820824894 .section1,
#mermaid-1584820824894 .section3 {
  fill: white;
  opacity: 0.2; }

#mermaid-1584820824894 .sectionTitle0 {
  fill: #333; }

#mermaid-1584820824894 .sectionTitle1 {
  fill: #333; }

#mermaid-1584820824894 .sectionTitle2 {
  fill: #333; }

#mermaid-1584820824894 .sectionTitle3 {
  fill: #333; }

#mermaid-1584820824894 .sectionTitle {
  text-anchor: start;
  font-size: 11px;
  text-height: 14px;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }


#mermaid-1584820824894 .grid .tick {
  stroke: lightgrey;
  opacity: 0.8;
  shape-rendering: crispEdges; }
#mermaid-1584820824894   .grid .tick text {
    font-family: 'trebuchet ms', verdana, arial;
    font-family: var(--mermaid-font-family); }

#mermaid-1584820824894 .grid path {
  stroke-width: 0; }


#mermaid-1584820824894 .today {
  fill: none;
  stroke: red;
  stroke-width: 2px; }



#mermaid-1584820824894 .task {
  stroke-width: 2; }

#mermaid-1584820824894 .taskText {
  text-anchor: middle;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1584820824894 .taskText:not([font-size]) {
  font-size: 11px; }

#mermaid-1584820824894 .taskTextOutsideRight {
  fill: black;
  text-anchor: start;
  font-size: 11px;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1584820824894 .taskTextOutsideLeft {
  fill: black;
  text-anchor: end;
  font-size: 11px; }


#mermaid-1584820824894 .task.clickable {
  cursor: pointer; }

#mermaid-1584820824894 .taskText.clickable {
  cursor: pointer;
  fill: #003163 !important;
  font-weight: bold; }

#mermaid-1584820824894 .taskTextOutsideLeft.clickable {
  cursor: pointer;
  fill: #003163 !important;
  font-weight: bold; }

#mermaid-1584820824894 .taskTextOutsideRight.clickable {
  cursor: pointer;
  fill: #003163 !important;
  font-weight: bold; }


#mermaid-1584820824894 .taskText0,
#mermaid-1584820824894 .taskText1,
#mermaid-1584820824894 .taskText2,
#mermaid-1584820824894 .taskText3 {
  fill: white; }

#mermaid-1584820824894 .task0,
#mermaid-1584820824894 .task1,
#mermaid-1584820824894 .task2,
#mermaid-1584820824894 .task3 {
  fill: #487e3a;
  stroke: #13540c; }

#mermaid-1584820824894 .taskTextOutside0,
#mermaid-1584820824894 .taskTextOutside2 {
  fill: black; }

#mermaid-1584820824894 .taskTextOutside1,
#mermaid-1584820824894 .taskTextOutside3 {
  fill: black; }


#mermaid-1584820824894 .active0,
#mermaid-1584820824894 .active1,
#mermaid-1584820824894 .active2,
#mermaid-1584820824894 .active3 {
  fill: #cde498;
  stroke: #13540c; }

#mermaid-1584820824894 .activeText0,
#mermaid-1584820824894 .activeText1,
#mermaid-1584820824894 .activeText2,
#mermaid-1584820824894 .activeText3 {
  fill: black !important; }


#mermaid-1584820824894 .done0,
#mermaid-1584820824894 .done1,
#mermaid-1584820824894 .done2,
#mermaid-1584820824894 .done3 {
  stroke: grey;
  fill: lightgrey;
  stroke-width: 2; }

#mermaid-1584820824894 .doneText0,
#mermaid-1584820824894 .doneText1,
#mermaid-1584820824894 .doneText2,
#mermaid-1584820824894 .doneText3 {
  fill: black !important; }


#mermaid-1584820824894 .crit0,
#mermaid-1584820824894 .crit1,
#mermaid-1584820824894 .crit2,
#mermaid-1584820824894 .crit3 {
  stroke: #ff8888;
  fill: red;
  stroke-width: 2; }

#mermaid-1584820824894 .activeCrit0,
#mermaid-1584820824894 .activeCrit1,
#mermaid-1584820824894 .activeCrit2,
#mermaid-1584820824894 .activeCrit3 {
  stroke: #ff8888;
  fill: #cde498;
  stroke-width: 2; }

#mermaid-1584820824894 .doneCrit0,
#mermaid-1584820824894 .doneCrit1,
#mermaid-1584820824894 .doneCrit2,
#mermaid-1584820824894 .doneCrit3 {
  stroke: #ff8888;
  fill: lightgrey;
  stroke-width: 2;
  cursor: pointer;
  shape-rendering: crispEdges; }

#mermaid-1584820824894 .milestone {
  transform: rotate(45deg) scale(0.8, 0.8); }

#mermaid-1584820824894 .milestoneText {
  font-style: italic; }

#mermaid-1584820824894 .doneCritText0,
#mermaid-1584820824894 .doneCritText1,
#mermaid-1584820824894 .doneCritText2,
#mermaid-1584820824894 .doneCritText3 {
  fill: black !important; }

#mermaid-1584820824894 .activeCritText0,
#mermaid-1584820824894 .activeCritText1,
#mermaid-1584820824894 .activeCritText2,
#mermaid-1584820824894 .activeCritText3 {
  fill: black !important; }

#mermaid-1584820824894 .titleText {
  text-anchor: middle;
  font-size: 18px;
  fill: black;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1584820824894 g.classGroup text {
  fill: #13540c;
  stroke: none;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family);
  font-size: 10px; }
#mermaid-1584820824894   g.classGroup text .title {
    font-weight: bolder; }

#mermaid-1584820824894 g.clickable {
  cursor: pointer; }

#mermaid-1584820824894 g.classGroup rect {
  fill: #cde498;
  stroke: #13540c; }

#mermaid-1584820824894 g.classGroup line {
  stroke: #13540c;
  stroke-width: 1; }

#mermaid-1584820824894 .classLabel .box {
  stroke: none;
  stroke-width: 0;
  fill: #cde498;
  opacity: 0.5; }

#mermaid-1584820824894 .classLabel .label {
  fill: #13540c;
  font-size: 10px; }

#mermaid-1584820824894 .relation {
  stroke: #13540c;
  stroke-width: 1;
  fill: none; }

#mermaid-1584820824894 .dashed-line {
  stroke-dasharray: 3; }

#mermaid-1584820824894 #compositionStart {
  fill: #13540c;
  stroke: #13540c;
  stroke-width: 1; }

#mermaid-1584820824894 #compositionEnd {
  fill: #13540c;
  stroke: #13540c;
  stroke-width: 1; }

#mermaid-1584820824894 #aggregationStart {
  fill: #cde498;
  stroke: #13540c;
  stroke-width: 1; }

#mermaid-1584820824894 #aggregationEnd {
  fill: #cde498;
  stroke: #13540c;
  stroke-width: 1; }

#mermaid-1584820824894 #dependencyStart {
  fill: #13540c;
  stroke: #13540c;
  stroke-width: 1; }

#mermaid-1584820824894 #dependencyEnd {
  fill: #13540c;
  stroke: #13540c;
  stroke-width: 1; }

#mermaid-1584820824894 #extensionStart {
  fill: #13540c;
  stroke: #13540c;
  stroke-width: 1; }

#mermaid-1584820824894 #extensionEnd {
  fill: #13540c;
  stroke: #13540c;
  stroke-width: 1; }

#mermaid-1584820824894 .commit-id,
#mermaid-1584820824894 .commit-msg,
#mermaid-1584820824894 .branch-label {
  fill: lightgrey;
  color: lightgrey;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1584820824894 .pieTitleText {
  text-anchor: middle;
  font-size: 25px;
  fill: black;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1584820824894 .slice {
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1584820824894 g.stateGroup text {
  fill: #13540c;
  stroke: none;
  font-size: 10px;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1584820824894 g.stateGroup text {
  fill: #13540c;
  stroke: none;
  font-size: 10px; }

#mermaid-1584820824894 g.stateGroup .state-title {
  font-weight: bolder;
  fill: black; }

#mermaid-1584820824894 g.stateGroup rect {
  fill: #cde498;
  stroke: #13540c; }

#mermaid-1584820824894 g.stateGroup line {
  stroke: #13540c;
  stroke-width: 1; }

#mermaid-1584820824894 .transition {
  stroke: #13540c;
  stroke-width: 1;
  fill: none; }

#mermaid-1584820824894 .stateGroup .composit {
  fill: white;
  border-bottom: 1px; }

#mermaid-1584820824894 .stateGroup .alt-composit {
  fill: #e0e0e0;
  border-bottom: 1px; }

#mermaid-1584820824894 .state-note {
  stroke: #6eaa49;
  fill: #fff5ad; }
#mermaid-1584820824894   .state-note text {
    fill: black;
    stroke: none;
    font-size: 10px; }

#mermaid-1584820824894 .stateLabel .box {
  stroke: none;
  stroke-width: 0;
  fill: #cde498;
  opacity: 0.5; }

#mermaid-1584820824894 .stateLabel text {
  fill: black;
  font-size: 10px;
  font-weight: bold;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

:root {
  --mermaid-font-family: '"trebuchet ms", verdana, arial';
  --mermaid-font-family: "Comic Sans MS", "Comic Sans", cursive; }

:root { --mermaid-font-family: "trebuchet ms", verdana, arial;}</style><style>#mermaid-1584820824894 {
    color: rgb(0, 0, 0);
    font: 16px "trebuchet ms", verdana, arial;
  }</style><g/><defs><marker id="extensionStart" class="extension" refx="0" refy="7" markerwidth="190" markerheight="240" orient="auto"><path d="M 1,7 L18,13 V 1 Z"/></marker></defs><defs><marker id="extensionEnd" refx="19" refy="7" markerwidth="20" markerheight="28" orient="auto"><path d="M 1,1 V 13 L18,7 Z"/></marker></defs><defs><marker id="compositionStart" class="extension" refx="0" refy="7" markerwidth="190" markerheight="240" orient="auto"><path d="M 18,7 L9,13 L1,7 L9,1 Z"/></marker></defs><defs><marker id="compositionEnd" refx="19" refy="7" markerwidth="20" markerheight="28" orient="auto"><path d="M 18,7 L9,13 L1,7 L9,1 Z"/></marker></defs><defs><marker id="aggregationStart" class="extension" refx="0" refy="7" markerwidth="190" markerheight="240" orient="auto"><path d="M 18,7 L9,13 L1,7 L9,1 Z"/></marker></defs><defs><marker id="aggregationEnd" refx="19" refy="7" markerwidth="20" markerheight="28" orient="auto"><path d="M 18,7 L9,13 L1,7 L9,1 Z"/></marker></defs><defs><marker id="dependencyStart" class="extension" refx="0" refy="7" markerwidth="190" markerheight="240" orient="auto"><path d="M 5,7 L9,13 L1,7 L9,1 Z"/></marker></defs><defs><marker id="dependencyEnd" refx="19" refy="7" markerwidth="20" markerheight="28" orient="auto"><path d="M 18,7 L9,13 L14,7 L9,1 Z"/></marker></defs><g id="classid-AMQPConnection-2521" class="classGroup " transform="translate(22.4765625,0 )"><rect x="0" y="0" width="96.890625" height="119.703125"/><text y="15" x="0"><tspan class="title" x="8.203125">AMQPConnection</tspan></text><line x1="0" y1="21.34375" y2="21.34375" x2="96.890625"/><text x="5" y="31.34375" fill="white" class="classText"><tspan x="5">+Array queues</tspan></text><line x1="0" y1="37.6875" y2="37.6875" x2="96.890625"/><text x="5" y="52.6875" fill="white" class="classText"><tspan x="5">+connect()</tspan><tspan x="5" dy="10">+consume()</tspan><tspan x="5" dy="10">+close()</tspan><tspan x="5" dy="10">+shutdown()</tspan><tspan x="5" dy="10">+assertExchange()</tspan><tspan x="5" dy="10">+assertQueue()</tspan><tspan x="5" dy="10">+publish()</tspan></text></g><g id="classid-RTEPipeline-2522" class="classGroup " transform="translate(0,169.703125 )"><rect x="0" y="0" width="141.84375" height="99.703125"/><text y="15" x="0"><tspan class="title" x="43.03125">RTEPipeline</tspan></text><line x1="0" y1="21.34375" y2="21.34375" x2="141.84375"/><text x="5" y="31.34375" fill="white" class="classText"><tspan x="5">+String name</tspan><tspan x="5" dy="10">+TransformStream[] streams</tspan></text><line x1="0" y1="47.6875" y2="47.6875" x2="141.84375"/><text x="5" y="62.6875" fill="white" class="classText"><tspan x="5">+start()</tspan><tspan x="5" dy="10">+end()</tspan><tspan x="5" dy="10">+write()</tspan><tspan x="5" dy="10">+onFinish()</tspan></text></g><g id="classid-TransformStream-2523" class="classGroup " transform="translate(169.3671875,25 )"><rect x="0" y="0" width="99.296875" height="69.703125"/><text y="15" x="0"><tspan class="title" x="9.359375">TransformStream</tspan></text><line x1="0" y1="21.34375" y2="21.34375" x2="99.296875"/><text x="5" y="31.34375" fill="white" class="classText"><tspan x="5">+boolean readable</tspan><tspan x="5" dy="10">+boolean writable</tspan></text><line x1="0" y1="47.6875" y2="47.6875" x2="99.296875"/><text x="5" y="62.6875" fill="white" class="classText"><tspan x="5">+_transform()</tspan></text></g><path d="M70.921875,119.703125L70.921875,123.86979166666667C70.921875,128.03645833333334,70.921875,136.36979166666666,70.921875,144.703125C70.921875,153.03645833333334,70.921875,161.36979166666666,70.921875,165.53645833333334L70.921875,169.703125" id="edge1052" class="relation" marker-start="url(#extensionStart)"/></svg><!--kg-card-end: html--><p>A Simplified RTE pipeline class looks like this:</p><pre><code class="language-js:git">const {pipeline, Writable, Passthrough} = require(&apos;stream&apos;)

class RTEPipeline extends RabbitMQConnection {
  constructor(opts) {
    super(opts)
    if (!Array.isArray(opts.streams)) {
      throw new TypeError(
        &apos;streams is required and must be an array&apos;
      )
    }
     
    // setup exchanges
    // setup queues
    
    this.writer = new PassThrough()
    this.pipeline = pipeline(
      this.writer
    , ...this.opts.streams
    , new Writable({objectMode: true, write: this._drain.bind(this)})
    , this.onFinish.bind(this)
    )
    
    for(const queue of this.opts.queues) {
      queue.onMessage(this._onMessage.bind(this)
    }
  }

  write(buffer) {
    if (buffer === undefined) return true
    return this.writer.write(buffer)
  }

  onFinish(err) {
    // called when pipeline finishes or errors
    if (err) {
      console.error(err)
    } else {
      console.log(`${this.name} pipeline shutdown success`)
    }
  }

  _onMessage(buffer, msg) {
    this.ack(msg)
    return this.write(buffer)
  }

  _drain(buffer, enc, cb) {
    // things to do as the last stage of the pipeline
    // metrics
    // logging
    // other clean up
    
    cb()
  }
}</code></pre><ol><li>Sets up any declared RabbitMQ Exchanges + bindings</li><li>Given an array of <a href="https://nodejs.org/api/stream.html#stream_duplex_and_transform_streams">Duplex Streams</a>, create a stream pipeline. A dummy stream is used as the writable stream to push messages to as the come in from RabbitMQ. Add a single write stream to the end of the pipeline to drain the pipeline</li><li>Sets up any declared queues, and adds a handler </li><li>Adds all queue bindings from the primary exchange to each of the queues.</li></ol><p>In practice, a pipeline instance would look like this:</p><pre><code class="language-js:git">&apos;use strict&apos;
const Pipeline = require(&apos;./pipeline&apos;)
const {Passthrough} = require(&apos;stream&apos;)

new Pipeline({
  name: &apos;CRUD-Pipeline&apos;
, exchange: &apos;cdc&apos;
, exchange_routing: { // exchange (pg-messaging) &lt;&gt; exchange (cdc) routing
    from: &apos;pg-messaging&apos;
  , to: &apos;rte-cdc&apos;
  , pattern: &apos;cdc.*.*&apos;
  }  
, queues: [{
    name: &apos;rte-pipeline-noop&apos;
  , pattern: [ // cdc -&gt; rte-pipeline-crud bindings
      &apos;cdc.article.*&apos;
    , &apos;cdc.author.*&apos;
    , &apos;cdc.comment.*&apos;
    ]
  }]
, streams: [
    new Passthrough()
  , new Passthrough()
  , new Passthrough()
  ]
})</code></pre><p>This does the following:</p><ol><li>Create 2 exchanges if they do not exist <code>cdc</code> and <code>pg-messaging</code>. </li><li>Route all messages with a <code>cdc</code> routing key prefix to <code>cdc</code> exchange</li><li>Set up and consume from a queue named <code>rte-pipeline-crud</code></li><li>Route all CDC messages for <code>article</code>, <code>author</code>, and <code>comment</code> resource to the <code>rte-pipeline-crud</code> queue.</li><li>Push every message received through each of the defined streams.</li></ol><p>In this case all of the streams are pass-through streams, so nothing actually happens. Visually, this snippet of code looks like this.</p><!--kg-card-begin: html--><!--
https://mermaid-js.github.io/mermaid-live-editor/#/edit/eyJjb2RlIjoiZ3JhcGggTFJcblx0cG9zdGdyZXNbKFBvc3RncmVTUUwpXSAtLT58U1FMIFRyaWdnZXJ8IFJBQkJJVE1RKFJhYmJpdE1RKVxuICAgIFJBQkJJVE1RLS0-IG1lc3NhZ2luZ3twZy1tZXNzYWdpbmd9XG5cbiAgICBzdWJncmFwaCBSZWFsIFRpbWUgRW5naW5lXG4gICAgICBtZXNzYWdpbmctLT4gfGNkYy4qLip8IHJ0ZXtjZGN9XG5cblxuICAgICAgcnRlIC0tPiB8Y2RjLmFydGljbGUuKnwgbm9vcD5ydGUtcGlwZWxpbmUtY3J1ZF1cbiAgICAgIHJ0ZSAtLT4gfGNkYy5hdXRob3IuKnwgbm9vcD5ydGUtcGlwZWxpbmUtY3J1ZF1cbiAgICAgIHJ0ZSAtLT4gfGNkYy5jb21tZW50Lip8IG5vb3A-cnRlLXBpcGVsaW5lLWNydWRdXG5cbiAgICAgIHN1YmdyYXBoIG5vZGUtcGlwZWxpbmUtMVxuICAgICAgICBub29wIC0tPiBwaXBlbGluZTFbQ1JVRC1QaXBlbGluZV1cbiAgICAgICAgcGlwZWxpbmUxIC0uLT4gc3RyZWFtMXt7c3RyZWFtMX19XG4gICAgICAgIHN0cmVhbTEtLi0-IHN0cmVhbTJ7e3N0cmVhbTJ9fVxuICAgICAgICBzdHJlYW0yLS4tPiBzdHJlYW0ze3tzdHJlYW0zfX1cbiAgICAgIGVuZFxuXG5cbiAgICBlbmRcbmNsYXNzRGVmIHJhYmJpdCBmaWxsOiNlYjdkMmYsY29sb3I6d2hpdGU7XG5jbGFzc0RlZiBwb3N0Z3JlcyBmaWxsOiMzMzY3OTEsY29sb3I6d2hpdGUsc3Ryb2tlOndoaXRlO1xuY2xhc3NEZWYgbm9kZWpzIGZpbGw6IzcwQkE1MSxjb2xvcjp3aGl0ZTtcbmNsYXNzRGVmIHN0cmVhbSBmaWxsOiNhYWUyYjUsY29sb3I6YmxhY2ssZm9udC13ZWlnaHQ6Ym9sZDtcbmNsYXNzRGVmIHBpcGVsaW5lIGZpbGw6d2hpdGU7XG5cbmNsYXNzIHBvc3RncmVzIHBvc3RncmVzO1xuY2xhc3MgbWVzc2FnaW5nIHJhYmJpdDtcbmNsYXNzIHJ0ZSByYWJiaXQ7XG5jbGFzcyBjYXRjaGFsbCByYWJiaXQ7XG5jbGFzcyBub29wIHJhYmJpdDtcbmNsYXNzIHJlb3AgcmFiYml0O1xuY2xhc3MgUkFCQklUTVEgcmFiYml0O1xuY2xhc3MgcGlwZWxpbmUxIG5vZGVqcztcbmNsYXNzIHBpcGVsaW5lMiBub2RlanM7XG5jbGFzcyBwaXBlbGluZTMgbm9kZWpzO1xuY2xhc3MgZGVidWcgcmFiYml0O1xuY2xhc3Mgc3RyZWFtMSBzdHJlYW07XG5jbGFzcyBzdHJlYW0yIHN0cmVhbTtcbmNsYXNzIHN0cmVhbTMgc3RyZWFtO1xuY2xhc3Mgc3RyZWFtNCBzdHJlYW07XG5jbGFzcyBzdHJlYW01IHN0cmVhbTtcbmNsYXNzIHN0cmVhbTYgc3RyZWFtO1xuY2xhc3Mgc3RyZWFtNyBzdHJlYW07XG5cbmNsYXNzIG5vZGUtcGlwZWxpbmUtMSBwaXBlbGluZTtcbmNsYXNzIG5vZGUtcGlwZWxpbmUtMiBwaXBlbGluZTtcbmNsYXNzIG5vZGUtcGlwZWxpbmUtMyBwaXBlbGluZTsiLCJtZXJtYWlkIjp7InRoZW1lIjoiZGVmYXVsdCJ9fQ
--><!--kg-card-end: html--><!--kg-card-begin: html--><svg id="mermaid-1585418391945" width="100%" xmlns="http://www.w3.org/2000/svg" height="150px" viewbox="0 0 1974.05322265625 254.06640625"><style>



#mermaid-1585418391945 .label {
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family);
  color: #333; }

#mermaid-1585418391945 .label text {
  fill: #333; }

#mermaid-1585418391945 .node rect,
#mermaid-1585418391945 .node circle,
#mermaid-1585418391945 .node ellipse,
#mermaid-1585418391945 .node polygon,
#mermaid-1585418391945 .node path {
  fill: #ECECFF;
  stroke: #9370DB;
  stroke-width: 1px; }

#mermaid-1585418391945 .node .label {
  text-align: center; }

#mermaid-1585418391945 .node.clickable {
  cursor: pointer; }

#mermaid-1585418391945 .arrowheadPath {
  fill: #333333; }

#mermaid-1585418391945 .edgePath .path {
  stroke: #333333;
  stroke-width: 1.5px; }

#mermaid-1585418391945 .edgeLabel {
  background-color: #e8e8e8;
  text-align: center; }

#mermaid-1585418391945 .cluster rect {
  fill: #ffffde;
  stroke: #aaaa33;
  stroke-width: 1px; }

#mermaid-1585418391945 .cluster text {
  fill: #333; }

#mermaid-1585418391945 div.mermaidTooltip {
  position: absolute;
  text-align: center;
  max-width: 200px;
  padding: 2px;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family);
  font-size: 12px;
  background: #ffffde;
  border: 1px solid #aaaa33;
  border-radius: 2px;
  pointer-events: none;
  z-index: 100; }

#mermaid-1585418391945 .actor {
  stroke: #CCCCFF;
  fill: #ECECFF; }

#mermaid-1585418391945 text.actor {
  fill: black;
  stroke: none; }

#mermaid-1585418391945 .actor-line {
  stroke: grey; }

#mermaid-1585418391945 .messageLine0 {
  stroke-width: 1.5;
  stroke-dasharray: '2 2';
  stroke: #333; }

#mermaid-1585418391945 .messageLine1 {
  stroke-width: 1.5;
  stroke-dasharray: '2 2';
  stroke: #333; }

#mermaid-1585418391945 #arrowhead {
  fill: #333; }

#mermaid-1585418391945 .sequenceNumber {
  fill: white; }

#mermaid-1585418391945 #sequencenumber {
  fill: #333; }

#mermaid-1585418391945 #crosshead path {
  fill: #333 !important;
  stroke: #333 !important; }

#mermaid-1585418391945 .messageText {
  fill: #333;
  stroke: none; }

#mermaid-1585418391945 .labelBox {
  stroke: #CCCCFF;
  fill: #ECECFF; }

#mermaid-1585418391945 .labelText {
  fill: black;
  stroke: none; }

#mermaid-1585418391945 .loopText {
  fill: black;
  stroke: none; }

#mermaid-1585418391945 .loopLine {
  stroke-width: 2;
  stroke-dasharray: '2 2';
  stroke: #CCCCFF; }

#mermaid-1585418391945 .note {
  stroke: #aaaa33;
  fill: #fff5ad; }

#mermaid-1585418391945 .noteText {
  fill: black;
  stroke: none;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family);
  font-size: 14px; }

#mermaid-1585418391945 .activation0 {
  fill: #f4f4f4;
  stroke: #666; }

#mermaid-1585418391945 .activation1 {
  fill: #f4f4f4;
  stroke: #666; }

#mermaid-1585418391945 .activation2 {
  fill: #f4f4f4;
  stroke: #666; }


#mermaid-1585418391945 .mermaid-main-font {
  font-family: "trebuchet ms", verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585418391945 .section {
  stroke: none;
  opacity: 0.2; }

#mermaid-1585418391945 .section0 {
  fill: rgba(102, 102, 255, 0.49); }

#mermaid-1585418391945 .section2 {
  fill: #fff400; }

#mermaid-1585418391945 .section1,
#mermaid-1585418391945 .section3 {
  fill: white;
  opacity: 0.2; }

#mermaid-1585418391945 .sectionTitle0 {
  fill: #333; }

#mermaid-1585418391945 .sectionTitle1 {
  fill: #333; }

#mermaid-1585418391945 .sectionTitle2 {
  fill: #333; }

#mermaid-1585418391945 .sectionTitle3 {
  fill: #333; }

#mermaid-1585418391945 .sectionTitle {
  text-anchor: start;
  font-size: 11px;
  text-height: 14px;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }


#mermaid-1585418391945 .grid .tick {
  stroke: lightgrey;
  opacity: 0.8;
  shape-rendering: crispEdges; }
#mermaid-1585418391945   .grid .tick text {
    font-family: 'trebuchet ms', verdana, arial;
    font-family: var(--mermaid-font-family); }

#mermaid-1585418391945 .grid path {
  stroke-width: 0; }


#mermaid-1585418391945 .today {
  fill: none;
  stroke: red;
  stroke-width: 2px; }



#mermaid-1585418391945 .task {
  stroke-width: 2; }

#mermaid-1585418391945 .taskText {
  text-anchor: middle;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585418391945 .taskText:not([font-size]) {
  font-size: 11px; }

#mermaid-1585418391945 .taskTextOutsideRight {
  fill: black;
  text-anchor: start;
  font-size: 11px;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585418391945 .taskTextOutsideLeft {
  fill: black;
  text-anchor: end;
  font-size: 11px; }


#mermaid-1585418391945 .task.clickable {
  cursor: pointer; }

#mermaid-1585418391945 .taskText.clickable {
  cursor: pointer;
  fill: #003163 !important;
  font-weight: bold; }

#mermaid-1585418391945 .taskTextOutsideLeft.clickable {
  cursor: pointer;
  fill: #003163 !important;
  font-weight: bold; }

#mermaid-1585418391945 .taskTextOutsideRight.clickable {
  cursor: pointer;
  fill: #003163 !important;
  font-weight: bold; }


#mermaid-1585418391945 .taskText0,
#mermaid-1585418391945 .taskText1,
#mermaid-1585418391945 .taskText2,
#mermaid-1585418391945 .taskText3 {
  fill: white; }

#mermaid-1585418391945 .task0,
#mermaid-1585418391945 .task1,
#mermaid-1585418391945 .task2,
#mermaid-1585418391945 .task3 {
  fill: #8a90dd;
  stroke: #534fbc; }

#mermaid-1585418391945 .taskTextOutside0,
#mermaid-1585418391945 .taskTextOutside2 {
  fill: black; }

#mermaid-1585418391945 .taskTextOutside1,
#mermaid-1585418391945 .taskTextOutside3 {
  fill: black; }


#mermaid-1585418391945 .active0,
#mermaid-1585418391945 .active1,
#mermaid-1585418391945 .active2,
#mermaid-1585418391945 .active3 {
  fill: #bfc7ff;
  stroke: #534fbc; }

#mermaid-1585418391945 .activeText0,
#mermaid-1585418391945 .activeText1,
#mermaid-1585418391945 .activeText2,
#mermaid-1585418391945 .activeText3 {
  fill: black !important; }


#mermaid-1585418391945 .done0,
#mermaid-1585418391945 .done1,
#mermaid-1585418391945 .done2,
#mermaid-1585418391945 .done3 {
  stroke: grey;
  fill: lightgrey;
  stroke-width: 2; }

#mermaid-1585418391945 .doneText0,
#mermaid-1585418391945 .doneText1,
#mermaid-1585418391945 .doneText2,
#mermaid-1585418391945 .doneText3 {
  fill: black !important; }


#mermaid-1585418391945 .crit0,
#mermaid-1585418391945 .crit1,
#mermaid-1585418391945 .crit2,
#mermaid-1585418391945 .crit3 {
  stroke: #ff8888;
  fill: red;
  stroke-width: 2; }

#mermaid-1585418391945 .activeCrit0,
#mermaid-1585418391945 .activeCrit1,
#mermaid-1585418391945 .activeCrit2,
#mermaid-1585418391945 .activeCrit3 {
  stroke: #ff8888;
  fill: #bfc7ff;
  stroke-width: 2; }

#mermaid-1585418391945 .doneCrit0,
#mermaid-1585418391945 .doneCrit1,
#mermaid-1585418391945 .doneCrit2,
#mermaid-1585418391945 .doneCrit3 {
  stroke: #ff8888;
  fill: lightgrey;
  stroke-width: 2;
  cursor: pointer;
  shape-rendering: crispEdges; }

#mermaid-1585418391945 .milestone {
  transform: rotate(45deg) scale(0.8, 0.8); }

#mermaid-1585418391945 .milestoneText {
  font-style: italic; }

#mermaid-1585418391945 .doneCritText0,
#mermaid-1585418391945 .doneCritText1,
#mermaid-1585418391945 .doneCritText2,
#mermaid-1585418391945 .doneCritText3 {
  fill: black !important; }

#mermaid-1585418391945 .activeCritText0,
#mermaid-1585418391945 .activeCritText1,
#mermaid-1585418391945 .activeCritText2,
#mermaid-1585418391945 .activeCritText3 {
  fill: black !important; }

#mermaid-1585418391945 .titleText {
  text-anchor: middle;
  font-size: 18px;
  fill: black;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585418391945 g.classGroup text {
  fill: #9370DB;
  stroke: none;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family);
  font-size: 10px; }
#mermaid-1585418391945   g.classGroup text .title {
    font-weight: bolder; }

#mermaid-1585418391945 g.clickable {
  cursor: pointer; }

#mermaid-1585418391945 g.classGroup rect {
  fill: #ECECFF;
  stroke: #9370DB; }

#mermaid-1585418391945 g.classGroup line {
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585418391945 .classLabel .box {
  stroke: none;
  stroke-width: 0;
  fill: #ECECFF;
  opacity: 0.5; }

#mermaid-1585418391945 .classLabel .label {
  fill: #9370DB;
  font-size: 10px; }

#mermaid-1585418391945 .relation {
  stroke: #9370DB;
  stroke-width: 1;
  fill: none; }

#mermaid-1585418391945 .dashed-line {
  stroke-dasharray: 3; }

#mermaid-1585418391945 #compositionStart {
  fill: #9370DB;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585418391945 #compositionEnd {
  fill: #9370DB;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585418391945 #aggregationStart {
  fill: #ECECFF;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585418391945 #aggregationEnd {
  fill: #ECECFF;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585418391945 #dependencyStart {
  fill: #9370DB;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585418391945 #dependencyEnd {
  fill: #9370DB;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585418391945 #extensionStart {
  fill: #9370DB;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585418391945 #extensionEnd {
  fill: #9370DB;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585418391945 .commit-id,
#mermaid-1585418391945 .commit-msg,
#mermaid-1585418391945 .branch-label {
  fill: lightgrey;
  color: lightgrey;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585418391945 .pieTitleText {
  text-anchor: middle;
  font-size: 25px;
  fill: black;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585418391945 .slice {
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585418391945 g.stateGroup text {
  fill: #9370DB;
  stroke: none;
  font-size: 10px;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585418391945 g.stateGroup text {
  fill: #9370DB;
  stroke: none;
  font-size: 10px; }

#mermaid-1585418391945 g.stateGroup .state-title {
  font-weight: bolder;
  fill: black; }

#mermaid-1585418391945 g.stateGroup rect {
  fill: #ECECFF;
  stroke: #9370DB; }

#mermaid-1585418391945 g.stateGroup line {
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585418391945 .transition {
  stroke: #9370DB;
  stroke-width: 1;
  fill: none; }

#mermaid-1585418391945 .stateGroup .composit {
  fill: white;
  border-bottom: 1px; }

#mermaid-1585418391945 .stateGroup .alt-composit {
  fill: #e0e0e0;
  border-bottom: 1px; }

#mermaid-1585418391945 .state-note {
  stroke: #aaaa33;
  fill: #fff5ad; }
#mermaid-1585418391945   .state-note text {
    fill: black;
    stroke: none;
    font-size: 10px; }

#mermaid-1585418391945 .stateLabel .box {
  stroke: none;
  stroke-width: 0;
  fill: #ECECFF;
  opacity: 0.5; }

#mermaid-1585418391945 .stateLabel text {
  fill: black;
  font-size: 10px;
  font-weight: bold;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

:root {
  --mermaid-font-family: '"trebuchet ms", verdana, arial';
  --mermaid-font-family: "Comic Sans MS", "Comic Sans", cursive; }

:root { --mermaid-font-family: "trebuchet ms", verdana, arial;}
#mermaid-1585418391945 .rabbit &gt; * { fill:#eb7d2f !important; color:white !important; }
#mermaid-1585418391945 .rabbit tspan { fill:white !important; }
#mermaid-1585418391945 .postgres &gt; * { fill:#336791 !important; color:white !important; stroke:white !important; }
#mermaid-1585418391945 .postgres tspan { fill:white !important; }
#mermaid-1585418391945 .nodejs &gt; * { fill:#70BA51 !important; color:white !important; }
#mermaid-1585418391945 .nodejs tspan { fill:white !important; }
#mermaid-1585418391945 .stream &gt; * { fill:#aae2b5 !important; color:black !important; font-weight:bold !important; }
#mermaid-1585418391945 .stream tspan { fill:black !important; }
#mermaid-1585418391945 .pipeline &gt; * { fill:white !important; }
#mermaid-1585418391945 .pipeline tspan {  !important; }</style><style>#mermaid-1585418391945 {
    color: rgb(0, 0, 0);
    font: normal normal 400 normal 16px / normal "trebuchet ms", verdana, arial;
  }</style><g transform="translate(0, 0)"><g class="output"><g class="clusters"><g class="cluster" id="subGraph1" transform="translate(1199.8624992370605,127.033203125)" style="opacity: 1;"><rect width="1532.381248474121" height="238.06640625" x="-766.1906242370605" y="-119.033203125"/><g class="label" transform="translate(0, -105.033203125)" id="mermaid-1585418391945Text"><g transform="translate(-76.703125,-11.875)"><foreignobject width="153.41796875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">Real Time Engine</div></foreignobject></g></g></g><g class="cluster pipeline" id="node-pipeline-1" transform="translate(1473.373435974121,122.033203125)" style="opacity: 1;"><rect width="935.359375" height="174.375" x="-467.6796875" y="-87.1875"/><g class="label" transform="translate(0, -73.1875)" id="mermaid-1585418391945Text"><g transform="translate(-70.609375,-11.875)"><foreignobject width="141.23046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">node-pipeline-1</div></foreignobject></g></g></g></g><g class="edgePaths"><g class="edgePath" style="opacity: 1;"><path class="path" d="M126.953125,127.033203125L202.421875,127.033203125L277.890625,127.033203125" marker-end="url(#arrowhead229941)" style="fill:none"/><defs><marker id="arrowhead229941" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M383.671875,127.033203125L408.671875,127.033203125L433.671875,127.033203125L459.171875,127.533203125" marker-end="url(#arrowhead229942)" style="fill:none"/><defs><marker id="arrowhead229942" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M624.0546875,127.533203125L678.71875,127.033203125L734.3828132629396,127.53320312500001" marker-end="url(#arrowhead229943)" style="fill:none"/><defs><marker id="arrowhead229943" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M809.2376057201302,117.15518460806966L912.4046859741211,83.283203125L1005.6937484741211,83.283203125L1067.506248474121,105.658203125" marker-end="url(#arrowhead229944)" style="fill:none"/><defs><marker id="arrowhead229944" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M819.6156242370605,127.53320312500001L912.4046859741211,127.033203125L1005.6937484741211,127.033203125L1053.068748474121,127.533203125" marker-end="url(#arrowhead229945)" style="fill:none"/><defs><marker id="arrowhead229945" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M809.2376068744243,137.91122201351487L912.4046859741211,170.783203125L1005.6937484741211,170.783203125L1067.506248474121,149.408203125" marker-end="url(#arrowhead229946)" style="fill:none"/><defs><marker id="arrowhead229946" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1226.443748474121,127.533203125L1250.943748474121,127.033203125L1275.943748474121,127.033203125" marker-end="url(#arrowhead229947)" style="fill:none"/><defs><marker id="arrowhead229947" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1423.537498474121,127.033203125L1448.537498474121,127.033203125L1474.037498474121,127.533203125" marker-end="url(#arrowhead229948)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead229948" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1588.209373474121,127.533203125L1612.709373474121,127.033203125L1638.209373474121,127.533203125" marker-end="url(#arrowhead229949)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead229949" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1752.381248474121,127.533203125L1776.881248474121,127.033203125L1802.381248474121,127.533203125" marker-end="url(#arrowhead229950)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead229950" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g></g><g class="edgeLabels"><g class="edgeLabel" transform="translate(202.421875,127.033203125)" style="opacity: 1;"><g transform="translate(-50.46875,-11.875)" class="label"><foreignobject width="100.9375" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">SQL Trigger</span></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="translate(678.71875,127.033203125)" style="opacity: 1;"><g transform="translate(-30.1640625,-11.875)" class="label"><foreignobject width="60.33203125" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.*.*</span></div></foreignobject></g></g><g class="edgeLabel" transform="translate(912.4046859741211,83.283203125)" style="opacity: 1;"><g transform="translate(-55.796875,-11.875)" class="label"><foreignobject width="111.6015625" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.article.*</span></div></foreignobject></g></g><g class="edgeLabel" transform="translate(912.4046859741211,127.033203125)" style="opacity: 1;"><g transform="translate(-54.5546875,-11.875)" class="label"><foreignobject width="109.12109375" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.author.*</span></div></foreignobject></g></g><g class="edgeLabel" transform="translate(912.4046859741211,170.783203125)" style="opacity: 1;"><g transform="translate(-68.2890625,-11.875)" class="label"><foreignobject width="136.58203125" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.comment.*</span></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g></g><g class="nodes"><g class="node rabbit" id="rte" transform="translate(776.4992179870605,127.033203125)" style="opacity: 1;"><polygon points="42.616406250000004,0 85.23281250000001,-42.616406250000004 42.616406250000004,-85.23281250000001 0,-42.616406250000004" transform="translate(-42.616406250000004,42.616406250000004)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-15.4765625,-11.875)"><foreignobject width="30.95703125" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">cdc</div></foreignobject></g></g></g><g class="node rabbit" id="messaging" transform="translate(541.11328125,127.033203125)" style="opacity: 1;"><polygon points="82.44140625,0 164.8828125,-82.44140625 82.44140625,-164.8828125 0,-82.44140625" transform="translate(-82.44140625,82.44140625)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-59.7265625,-11.875)"><foreignobject width="119.453125" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">pg-messaging</div></foreignobject></g></g></g><g class="node rabbit" id="noop" transform="translate(1128.318748474121,127.033203125)" style="opacity: 1;"><polygon points="-21.875,0 173.375,0 173.375,-43.75 -21.875,-43.75 0,-21.875" transform="translate(-86.6875,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-76.6875,-11.875)"><foreignobject width="153.37890625" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">rte-pipeline-crud</div></foreignobject></g></g></g><g class="node nodejs" id="pipeline1" transform="translate(1349.740623474121,127.033203125)" style="opacity: 1;"><rect rx="0" ry="0" x="-73.796875" y="-21.875" width="147.59375" height="43.75" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-63.796875,-11.875)"><foreignobject width="127.59765625" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">CRUD-Pipeline</div></foreignobject></g></g></g><g class="node stream" id="stream1" transform="translate(1530.623435974121,127.033203125)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream1</div></foreignobject></g></g></g><g class="node stream" id="stream2" transform="translate(1694.795310974121,127.033203125)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream2</div></foreignobject></g></g></g><g class="node stream" id="stream3" transform="translate(1858.967185974121,127.033203125)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream3</div></foreignobject></g></g></g><g class="node postgres" id="postgres" label-offset-y="12.190162044450139" transform="translate(67.4765625,127.033203125)" style="opacity: 1;"><path d="M 0,12.190162044450139 a 59.4765625,12.190162044450139 0,0,0 118.953125 0 a 59.4765625,12.190162044450139 0,0,0 -118.953125 0 l 0,55.94016204445014 a 59.4765625,12.190162044450139 0,0,0 118.953125 0 l 0,-55.94016204445014" transform="translate(-59.4765625,-40.16024306667521)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-49.4765625,-11.875)"><foreignobject width="98.96484375" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">PostgreSQL</div></foreignobject></g></g></g><g class="node rabbit" id="RABBITMQ" transform="translate(330.78125,127.033203125)" style="opacity: 1;"><rect rx="5" ry="5" x="-52.890625" y="-21.875" width="105.78125" height="43.75" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-42.890625,-11.875)"><foreignobject width="85.78125" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">RabbitMQ</div></foreignobject></g></g></g></g></g></g></svg><!--kg-card-end: html--><p>Here is a slightly more involved and concrete setup to illustrate some use cases a bit better:</p><pre><code class="language-js:git">&apos;use strict&apos;

new Pipeline({
  name: &apos;crud-Pipeline&apos;
, queues: [{
    name: &apos;rte-pipeline-crud&apos;
  , pattern: [
      &apos;cdc.article.*&apos;
    , &apos;cdc.author.*&apos;
    , &apos;cdc.comment.*&apos;
    ]
  }]
, streams: [
    new AMQPParseStream()
  , new CacheStream([&apos;article&apos;, &apos;comment&apos;])
  , new ParallelStream([
      new EmailStream([&apos;author&apos;])
    , new GraphQLSubscriptionStream([&apos;article&apos;, &apos;comment&apos;])
    ])
  ]
})</code></pre><ol><li>Parse the incoming message</li><li>Cache all articles + comments</li><li>Send an update message via GraphQL Subscriptions for article + comment updates</li><li>Send and email notification to the authors.</li></ol><p>This is also very easy to scale out. By running more processes with the same pipelines, we get more consumers on the same queues which gives use higher throughput and more workers to process messages. </p><!--kg-card-begin: html--><!--
https://mermaid-js.github.io/mermaid-live-editor/#/edit/eyJjb2RlIjoiZ3JhcGggTFJcblx0cG9zdGdyZXNbKFBvc3RncmVTUUwpXSAtLT58U1FMIFRyaWdnZXJ8IFJBQkJJVE1RKFJhYmJpdE1RKVxuICAgIFJBQkJJVE1RLS0-IG1lc3NhZ2luZ3twZy1tZXNzYWdpbmd9XG5cbiAgICBzdWJncmFwaCBSZWFsIFRpbWUgRW5naW5lXG4gICAgICBtZXNzYWdpbmctLT4gfGNkYy4qLip8IHJ0ZXtjZGN9XG4gICAgICBtZXNzYWdpbmctLT4gfGNkYy4jfCBjYXRjaGFsbHtjYXRjaGFsbH1cblxuICAgICAgcnRlIC0tPiB8Y2RjLmFydGljbGUuKnwgbm9vcD5ydGUtcGlwZWxpbmUtY3J1ZF1cbiAgICAgIHJ0ZSAtLT4gfGNkYy5hdXRob3IuKnwgbm9vcD5ydGUtcGlwZWxpbmUtY3J1ZF1cbiAgICAgIHJ0ZSAtLT4gfGNkYy5jb21tZW50Lip8IG5vb3A-cnRlLXBpcGVsaW5lLWNydWRdXG5cbiAgICAgIHN1YmdyYXBoIG5vZGUtcGlwZWxpbmUtMVxuICAgICAgICBub29wIC0tPiBwaXBlbGluZTFbQ1JVRC1QaXBlbGluZV1cbiAgICAgICAgcGlwZWxpbmUxIC0uLT4gc3RyZWFtMXt7c3RyZWFtMX19XG4gICAgICAgIHN0cmVhbTEtLi0-IHN0cmVhbTJ7e3N0cmVhbTJ9fVxuICAgICAgICBzdHJlYW0yLS4tPiBzdHJlYW0ze3tzdHJlYW0zfX1cbiAgICAgIGVuZFxuXG4gICAgICBydGUgLS0-IHxjZGMubG9naW4uKnwgcmVvcD5ydGUtcGlwZWxpbmUtcmVwb3J0aW5nXVxuICAgICAgcnRlIC0tPiB8Y2RjLmFidXNlLip8IHJlb3A-cnRlLXBpcGVsaW5lLXJlcG9ydGluZ11cbiAgICAgIHJ0ZSAtLT4gfGNkYy5hcnRpY2xlLip8IHJlb3A-cnRlLXBpcGVsaW5lLXJlcG9ydGluZ11cblxuICAgICAgc3ViZ3JhcGggbm9kZS1waXBlbGluZS0yXG4gICAgICAgIHJlb3AgLS0-IHBpcGVsaW5lMltSRVBPUlQtUGlwZWxpbmVdXG4gICAgICAgIHBpcGVsaW5lMiAtLi0-IHN0cmVhbTR7e3N0cmVhbTR9fVxuICAgICAgICBzdHJlYW00LS4tPiBzdHJlYW01e3tzdHJlYW01fX1cbiAgICAgICAgc3RyZWFtNS0uLT4gc3RyZWFtNnt7c3RyZWFtNn19XG4gICAgICBlbmRcblxuICAgICAgY2F0Y2hhbGwgLS0-IHxjZGMuI3wgZGVidWc-cnRlLXBpcGVsaW5lLWRlYnVnXVxuXG4gICAgICBzdWJncmFwaCBub2RlLXBpcGVsaW5lLTNcbiAgICAgICAgZGVidWcgLS0-IHBpcGVsaW5lM1tERUJVRy1QaXBlbGluZV1cbiAgICAgICAgcGlwZWxpbmUzIC0uLT4gc3RyZWFtN3t7c3RyZWFtN319XG4gICAgICBlbmRcbiAgICBlbmRcbmNsYXNzRGVmIHJhYmJpdCBmaWxsOiNlYjdkMmYsY29sb3I6d2hpdGU7XG5jbGFzc0RlZiBwb3N0Z3JlcyBmaWxsOiMzMzY3OTEsY29sb3I6d2hpdGUsc3Ryb2tlOndoaXRlO1xuY2xhc3NEZWYgbm9kZWpzIGZpbGw6IzcwQkE1MSxjb2xvcjp3aGl0ZTtcbmNsYXNzRGVmIHN0cmVhbSBmaWxsOiNhYWUyYjUsY29sb3I6YmxhY2ssZm9udC13ZWlnaHQ6Ym9sZDtcbmNsYXNzRGVmIHBpcGVsaW5lIGZpbGw6d2hpdGU7XG5cbmNsYXNzIHBvc3RncmVzIHBvc3RncmVzO1xuY2xhc3MgbWVzc2FnaW5nIHJhYmJpdDtcbmNsYXNzIHJ0ZSByYWJiaXQ7XG5jbGFzcyBjYXRjaGFsbCByYWJiaXQ7XG5jbGFzcyBub29wIHJhYmJpdDtcbmNsYXNzIHJlb3AgcmFiYml0O1xuY2xhc3MgUkFCQklUTVEgcmFiYml0O1xuY2xhc3MgcGlwZWxpbmUxIG5vZGVqcztcbmNsYXNzIHBpcGVsaW5lMiBub2RlanM7XG5jbGFzcyBwaXBlbGluZTMgbm9kZWpzO1xuY2xhc3MgZGVidWcgcmFiYml0O1xuY2xhc3Mgc3RyZWFtMSBzdHJlYW07XG5jbGFzcyBzdHJlYW0yIHN0cmVhbTtcbmNsYXNzIHN0cmVhbTMgc3RyZWFtO1xuY2xhc3Mgc3RyZWFtNCBzdHJlYW07XG5jbGFzcyBzdHJlYW01IHN0cmVhbTtcbmNsYXNzIHN0cmVhbTYgc3RyZWFtO1xuY2xhc3Mgc3RyZWFtNyBzdHJlYW07XG5cbmNsYXNzIG5vZGUtcGlwZWxpbmUtMSBwaXBlbGluZTtcbmNsYXNzIG5vZGUtcGlwZWxpbmUtMiBwaXBlbGluZTtcbmNsYXNzIG5vZGUtcGlwZWxpbmUtMyBwaXBlbGluZTsiLCJtZXJtYWlkIjp7InRoZW1lIjoiZGVmYXVsdCJ9fQ
--><!--kg-card-end: html--><!--kg-card-begin: html--><a href="https://mermaid-js.github.io/mermaid-live-editor/#/view/eyJjb2RlIjoiZ3JhcGggTFJcblx0cG9zdGdyZXNbKFBvc3RncmVTUUwpXSAtLT58U1FMIFRyaWdnZXJ8IFJBQkJJVE1RKFJhYmJpdE1RKVxuICAgIFJBQkJJVE1RLS0-IG1lc3NhZ2luZ3twZy1tZXNzYWdpbmd9XG5cbiAgICBzdWJncmFwaCBSZWFsIFRpbWUgRW5naW5lXG4gICAgICBtZXNzYWdpbmctLT4gfGNkYy4qLip8IHJ0ZXtjZGN9XG4gICAgICBtZXNzYWdpbmctLT4gfGNkYy4jfCBjYXRjaGFsbHtjYXRjaGFsbH1cblxuICAgICAgcnRlIC0tPiB8Y2RjLmFydGljbGUuKnwgbm9vcD5ydGUtcGlwZWxpbmUtY3J1ZF1cbiAgICAgIHJ0ZSAtLT4gfGNkYy5hdXRob3IuKnwgbm9vcD5ydGUtcGlwZWxpbmUtY3J1ZF1cbiAgICAgIHJ0ZSAtLT4gfGNkYy5jb21tZW50Lip8IG5vb3A-cnRlLXBpcGVsaW5lLWNydWRdXG5cbiAgICAgIHN1YmdyYXBoIG5vZGUtcGlwZWxpbmUtMVxuICAgICAgICBub29wIC0tPiBwaXBlbGluZTFbQ1JVRC1QaXBlbGluZV1cbiAgICAgICAgcGlwZWxpbmUxIC0uLT4gc3RyZWFtMXt7c3RyZWFtMX19XG4gICAgICAgIHN0cmVhbTEtLi0-IHN0cmVhbTJ7e3N0cmVhbTJ9fVxuICAgICAgICBzdHJlYW0yLS4tPiBzdHJlYW0ze3tzdHJlYW0zfX1cblxuICAgICAgICBub29wIC0tPiBwaXBlbGluZTFhW0NSVUQtUGlwZWxpbmVdXG4gICAgICAgIHBpcGVsaW5lMWEgLS4tPiBzdHJlYW0xYXt7c3RyZWFtMX19XG4gICAgICAgIHN0cmVhbTFhLS4tPiBzdHJlYW0yYXt7c3RyZWFtMn19XG4gICAgICAgIHN0cmVhbTJhLS4tPiBzdHJlYW0zYXt7c3RyZWFtM319XG4gICAgICBlbmRcblxuICAgICAgcnRlIC0tPiB8Y2RjLmxvZ2luLip8IHJlb3A-cnRlLXBpcGVsaW5lLXJlcG9ydGluZ11cbiAgICAgIHJ0ZSAtLT4gfGNkYy5hYnVzZS4qfCByZW9wPnJ0ZS1waXBlbGluZS1yZXBvcnRpbmddXG4gICAgICBydGUgLS0-IHxjZGMuYXJ0aWNsZS4qfCByZW9wPnJ0ZS1waXBlbGluZS1yZXBvcnRpbmddXG5cbiAgICAgIHN1YmdyYXBoIG5vZGUtcGlwZWxpbmUtMlxuICAgICAgICByZW9wIC0tPiBwaXBlbGluZTJbUkVQT1JULVBpcGVsaW5lXVxuICAgICAgICBwaXBlbGluZTIgLS4tPiBzdHJlYW00e3tzdHJlYW00fX1cbiAgICAgICAgc3RyZWFtNC0uLT4gc3RyZWFtNXt7c3RyZWFtNX19XG4gICAgICAgIHN0cmVhbTUtLi0-IHN0cmVhbTZ7e3N0cmVhbTZ9fVxuICAgICAgZW5kXG5cbiAgICAgIGNhdGNoYWxsIC0tPiB8Y2RjLiN8IGRlYnVnPnJ0ZS1waXBlbGluZS1kZWJ1Z11cblxuICAgICAgc3ViZ3JhcGggbm9kZS1waXBlbGluZS0zXG4gICAgICAgIGRlYnVnIC0tPiBwaXBlbGluZTNbREVCVUctUGlwZWxpbmVdXG4gICAgICAgIHBpcGVsaW5lMyAtLi0-IHN0cmVhbTd7e3N0cmVhbTd9fVxuICAgICAgZW5kXG4gICAgZW5kXG5jbGFzc0RlZiByYWJiaXQgZmlsbDojZWI3ZDJmLGNvbG9yOndoaXRlO1xuY2xhc3NEZWYgcG9zdGdyZXMgZmlsbDojMzM2NzkxLGNvbG9yOndoaXRlLHN0cm9rZTp3aGl0ZTtcbmNsYXNzRGVmIG5vZGVqcyBmaWxsOiM3MEJBNTEsY29sb3I6d2hpdGU7XG5jbGFzc0RlZiBzdHJlYW0gZmlsbDojYWFlMmI1LGNvbG9yOmJsYWNrLGZvbnQtd2VpZ2h0OmJvbGQ7XG5jbGFzc0RlZiBwaXBlbGluZSBmaWxsOndoaXRlO1xuXG5jbGFzcyBwb3N0Z3JlcyBwb3N0Z3JlcztcbmNsYXNzIG1lc3NhZ2luZyByYWJiaXQ7XG5jbGFzcyBydGUgcmFiYml0O1xuY2xhc3MgY2F0Y2hhbGwgcmFiYml0O1xuY2xhc3Mgbm9vcCByYWJiaXQ7XG5jbGFzcyByZW9wIHJhYmJpdDtcbmNsYXNzIFJBQkJJVE1RIHJhYmJpdDtcbmNsYXNzIHBpcGVsaW5lMSBub2RlanM7XG5jbGFzcyBwaXBlbGluZTFhIG5vZGVqcztcbmNsYXNzIHBpcGVsaW5lMiBub2RlanM7XG5jbGFzcyBwaXBlbGluZTMgbm9kZWpzO1xuY2xhc3MgZGVidWcgcmFiYml0O1xuY2xhc3Mgc3RyZWFtMSBzdHJlYW07XG5jbGFzcyBzdHJlYW0yIHN0cmVhbTtcbmNsYXNzIHN0cmVhbTMgc3RyZWFtO1xuY2xhc3Mgc3RyZWFtMWEgc3RyZWFtO1xuY2xhc3Mgc3RyZWFtMmEgc3RyZWFtO1xuY2xhc3Mgc3RyZWFtM2Egc3RyZWFtO1xuY2xhc3Mgc3RyZWFtNCBzdHJlYW07XG5jbGFzcyBzdHJlYW01IHN0cmVhbTtcbmNsYXNzIHN0cmVhbTYgc3RyZWFtO1xuY2xhc3Mgc3RyZWFtNyBzdHJlYW07XG5cbmNsYXNzIG5vZGUtcGlwZWxpbmUtMSBwaXBlbGluZTtcbmNsYXNzIG5vZGUtcGlwZWxpbmUtMiBwaXBlbGluZTtcbmNsYXNzIG5vZGUtcGlwZWxpbmUtMyBwaXBlbGluZTsiLCJtZXJtYWlkIjp7InRoZW1lIjoiZGVmYXVsdCJ9LCJ1cGRhdGVFZGl0b3IiOmZhbHNlfQ" target="_blank">
<svg id="mermaid-1585420574418" width="100%" xmlns="http://www.w3.org/2000/svg" height="300" viewbox="0 0 2097.34375 621.3984375"><style>



#mermaid-1585420574418 .label {
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family);
  color: #333; }

#mermaid-1585420574418 .label text {
  fill: #333; }

#mermaid-1585420574418 .node rect,
#mermaid-1585420574418 .node circle,
#mermaid-1585420574418 .node ellipse,
#mermaid-1585420574418 .node polygon,
#mermaid-1585420574418 .node path {
  fill: #ECECFF;
  stroke: #9370DB;
  stroke-width: 1px; }

#mermaid-1585420574418 .node .label {
  text-align: center; }

#mermaid-1585420574418 .node.clickable {
  cursor: pointer; }

#mermaid-1585420574418 .arrowheadPath {
  fill: #333333; }

#mermaid-1585420574418 .edgePath .path {
  stroke: #333333;
  stroke-width: 1.5px; }

#mermaid-1585420574418 .edgeLabel {
  background-color: #e8e8e8;
  text-align: center; }

#mermaid-1585420574418 .cluster rect {
  fill: #ffffde;
  stroke: #aaaa33;
  stroke-width: 1px; }

#mermaid-1585420574418 .cluster text {
  fill: #333; }

#mermaid-1585420574418 div.mermaidTooltip {
  position: absolute;
  text-align: center;
  max-width: 200px;
  padding: 2px;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family);
  font-size: 12px;
  background: #ffffde;
  border: 1px solid #aaaa33;
  border-radius: 2px;
  pointer-events: none;
  z-index: 100; }

#mermaid-1585420574418 .actor {
  stroke: #CCCCFF;
  fill: #ECECFF; }

#mermaid-1585420574418 text.actor {
  fill: black;
  stroke: none; }

#mermaid-1585420574418 .actor-line {
  stroke: grey; }

#mermaid-1585420574418 .messageLine0 {
  stroke-width: 1.5;
  stroke-dasharray: '2 2';
  stroke: #333; }

#mermaid-1585420574418 .messageLine1 {
  stroke-width: 1.5;
  stroke-dasharray: '2 2';
  stroke: #333; }

#mermaid-1585420574418 #arrowhead {
  fill: #333; }

#mermaid-1585420574418 .sequenceNumber {
  fill: white; }

#mermaid-1585420574418 #sequencenumber {
  fill: #333; }

#mermaid-1585420574418 #crosshead path {
  fill: #333 !important;
  stroke: #333 !important; }

#mermaid-1585420574418 .messageText {
  fill: #333;
  stroke: none; }

#mermaid-1585420574418 .labelBox {
  stroke: #CCCCFF;
  fill: #ECECFF; }

#mermaid-1585420574418 .labelText {
  fill: black;
  stroke: none; }

#mermaid-1585420574418 .loopText {
  fill: black;
  stroke: none; }

#mermaid-1585420574418 .loopLine {
  stroke-width: 2;
  stroke-dasharray: '2 2';
  stroke: #CCCCFF; }

#mermaid-1585420574418 .note {
  stroke: #aaaa33;
  fill: #fff5ad; }

#mermaid-1585420574418 .noteText {
  fill: black;
  stroke: none;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family);
  font-size: 14px; }

#mermaid-1585420574418 .activation0 {
  fill: #f4f4f4;
  stroke: #666; }

#mermaid-1585420574418 .activation1 {
  fill: #f4f4f4;
  stroke: #666; }

#mermaid-1585420574418 .activation2 {
  fill: #f4f4f4;
  stroke: #666; }


#mermaid-1585420574418 .mermaid-main-font {
  font-family: "trebuchet ms", verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585420574418 .section {
  stroke: none;
  opacity: 0.2; }

#mermaid-1585420574418 .section0 {
  fill: rgba(102, 102, 255, 0.49); }

#mermaid-1585420574418 .section2 {
  fill: #fff400; }

#mermaid-1585420574418 .section1,
#mermaid-1585420574418 .section3 {
  fill: white;
  opacity: 0.2; }

#mermaid-1585420574418 .sectionTitle0 {
  fill: #333; }

#mermaid-1585420574418 .sectionTitle1 {
  fill: #333; }

#mermaid-1585420574418 .sectionTitle2 {
  fill: #333; }

#mermaid-1585420574418 .sectionTitle3 {
  fill: #333; }

#mermaid-1585420574418 .sectionTitle {
  text-anchor: start;
  font-size: 11px;
  text-height: 14px;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }


#mermaid-1585420574418 .grid .tick {
  stroke: lightgrey;
  opacity: 0.8;
  shape-rendering: crispEdges; }
#mermaid-1585420574418   .grid .tick text {
    font-family: 'trebuchet ms', verdana, arial;
    font-family: var(--mermaid-font-family); }

#mermaid-1585420574418 .grid path {
  stroke-width: 0; }


#mermaid-1585420574418 .today {
  fill: none;
  stroke: red;
  stroke-width: 2px; }



#mermaid-1585420574418 .task {
  stroke-width: 2; }

#mermaid-1585420574418 .taskText {
  text-anchor: middle;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585420574418 .taskText:not([font-size]) {
  font-size: 11px; }

#mermaid-1585420574418 .taskTextOutsideRight {
  fill: black;
  text-anchor: start;
  font-size: 11px;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585420574418 .taskTextOutsideLeft {
  fill: black;
  text-anchor: end;
  font-size: 11px; }


#mermaid-1585420574418 .task.clickable {
  cursor: pointer; }

#mermaid-1585420574418 .taskText.clickable {
  cursor: pointer;
  fill: #003163 !important;
  font-weight: bold; }

#mermaid-1585420574418 .taskTextOutsideLeft.clickable {
  cursor: pointer;
  fill: #003163 !important;
  font-weight: bold; }

#mermaid-1585420574418 .taskTextOutsideRight.clickable {
  cursor: pointer;
  fill: #003163 !important;
  font-weight: bold; }


#mermaid-1585420574418 .taskText0,
#mermaid-1585420574418 .taskText1,
#mermaid-1585420574418 .taskText2,
#mermaid-1585420574418 .taskText3 {
  fill: white; }

#mermaid-1585420574418 .task0,
#mermaid-1585420574418 .task1,
#mermaid-1585420574418 .task2,
#mermaid-1585420574418 .task3 {
  fill: #8a90dd;
  stroke: #534fbc; }

#mermaid-1585420574418 .taskTextOutside0,
#mermaid-1585420574418 .taskTextOutside2 {
  fill: black; }

#mermaid-1585420574418 .taskTextOutside1,
#mermaid-1585420574418 .taskTextOutside3 {
  fill: black; }


#mermaid-1585420574418 .active0,
#mermaid-1585420574418 .active1,
#mermaid-1585420574418 .active2,
#mermaid-1585420574418 .active3 {
  fill: #bfc7ff;
  stroke: #534fbc; }

#mermaid-1585420574418 .activeText0,
#mermaid-1585420574418 .activeText1,
#mermaid-1585420574418 .activeText2,
#mermaid-1585420574418 .activeText3 {
  fill: black !important; }


#mermaid-1585420574418 .done0,
#mermaid-1585420574418 .done1,
#mermaid-1585420574418 .done2,
#mermaid-1585420574418 .done3 {
  stroke: grey;
  fill: lightgrey;
  stroke-width: 2; }

#mermaid-1585420574418 .doneText0,
#mermaid-1585420574418 .doneText1,
#mermaid-1585420574418 .doneText2,
#mermaid-1585420574418 .doneText3 {
  fill: black !important; }


#mermaid-1585420574418 .crit0,
#mermaid-1585420574418 .crit1,
#mermaid-1585420574418 .crit2,
#mermaid-1585420574418 .crit3 {
  stroke: #ff8888;
  fill: red;
  stroke-width: 2; }

#mermaid-1585420574418 .activeCrit0,
#mermaid-1585420574418 .activeCrit1,
#mermaid-1585420574418 .activeCrit2,
#mermaid-1585420574418 .activeCrit3 {
  stroke: #ff8888;
  fill: #bfc7ff;
  stroke-width: 2; }

#mermaid-1585420574418 .doneCrit0,
#mermaid-1585420574418 .doneCrit1,
#mermaid-1585420574418 .doneCrit2,
#mermaid-1585420574418 .doneCrit3 {
  stroke: #ff8888;
  fill: lightgrey;
  stroke-width: 2;
  cursor: pointer;
  shape-rendering: crispEdges; }

#mermaid-1585420574418 .milestone {
  transform: rotate(45deg) scale(0.8, 0.8); }

#mermaid-1585420574418 .milestoneText {
  font-style: italic; }

#mermaid-1585420574418 .doneCritText0,
#mermaid-1585420574418 .doneCritText1,
#mermaid-1585420574418 .doneCritText2,
#mermaid-1585420574418 .doneCritText3 {
  fill: black !important; }

#mermaid-1585420574418 .activeCritText0,
#mermaid-1585420574418 .activeCritText1,
#mermaid-1585420574418 .activeCritText2,
#mermaid-1585420574418 .activeCritText3 {
  fill: black !important; }

#mermaid-1585420574418 .titleText {
  text-anchor: middle;
  font-size: 18px;
  fill: black;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585420574418 g.classGroup text {
  fill: #9370DB;
  stroke: none;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family);
  font-size: 10px; }
#mermaid-1585420574418   g.classGroup text .title {
    font-weight: bolder; }

#mermaid-1585420574418 g.clickable {
  cursor: pointer; }

#mermaid-1585420574418 g.classGroup rect {
  fill: #ECECFF;
  stroke: #9370DB; }

#mermaid-1585420574418 g.classGroup line {
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585420574418 .classLabel .box {
  stroke: none;
  stroke-width: 0;
  fill: #ECECFF;
  opacity: 0.5; }

#mermaid-1585420574418 .classLabel .label {
  fill: #9370DB;
  font-size: 10px; }

#mermaid-1585420574418 .relation {
  stroke: #9370DB;
  stroke-width: 1;
  fill: none; }

#mermaid-1585420574418 .dashed-line {
  stroke-dasharray: 3; }

#mermaid-1585420574418 #compositionStart {
  fill: #9370DB;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585420574418 #compositionEnd {
  fill: #9370DB;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585420574418 #aggregationStart {
  fill: #ECECFF;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585420574418 #aggregationEnd {
  fill: #ECECFF;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585420574418 #dependencyStart {
  fill: #9370DB;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585420574418 #dependencyEnd {
  fill: #9370DB;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585420574418 #extensionStart {
  fill: #9370DB;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585420574418 #extensionEnd {
  fill: #9370DB;
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585420574418 .commit-id,
#mermaid-1585420574418 .commit-msg,
#mermaid-1585420574418 .branch-label {
  fill: lightgrey;
  color: lightgrey;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585420574418 .pieTitleText {
  text-anchor: middle;
  font-size: 25px;
  fill: black;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585420574418 .slice {
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585420574418 g.stateGroup text {
  fill: #9370DB;
  stroke: none;
  font-size: 10px;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

#mermaid-1585420574418 g.stateGroup text {
  fill: #9370DB;
  stroke: none;
  font-size: 10px; }

#mermaid-1585420574418 g.stateGroup .state-title {
  font-weight: bolder;
  fill: black; }

#mermaid-1585420574418 g.stateGroup rect {
  fill: #ECECFF;
  stroke: #9370DB; }

#mermaid-1585420574418 g.stateGroup line {
  stroke: #9370DB;
  stroke-width: 1; }

#mermaid-1585420574418 .transition {
  stroke: #9370DB;
  stroke-width: 1;
  fill: none; }

#mermaid-1585420574418 .stateGroup .composit {
  fill: white;
  border-bottom: 1px; }

#mermaid-1585420574418 .stateGroup .alt-composit {
  fill: #e0e0e0;
  border-bottom: 1px; }

#mermaid-1585420574418 .state-note {
  stroke: #aaaa33;
  fill: #fff5ad; }
#mermaid-1585420574418   .state-note text {
    fill: black;
    stroke: none;
    font-size: 10px; }

#mermaid-1585420574418 .stateLabel .box {
  stroke: none;
  stroke-width: 0;
  fill: #ECECFF;
  opacity: 0.5; }

#mermaid-1585420574418 .stateLabel text {
  fill: black;
  font-size: 10px;
  font-weight: bold;
  font-family: 'trebuchet ms', verdana, arial;
  font-family: var(--mermaid-font-family); }

:root {
  --mermaid-font-family: '"trebuchet ms", verdana, arial';
  --mermaid-font-family: "Comic Sans MS", "Comic Sans", cursive; }

:root { --mermaid-font-family: "trebuchet ms", verdana, arial;}
#mermaid-1585420574418 .rabbit &gt; * { fill:#eb7d2f !important; color:white !important; }
#mermaid-1585420574418 .rabbit tspan { fill:white !important; }
#mermaid-1585420574418 .postgres &gt; * { fill:#336791 !important; color:white !important; stroke:white !important; }
#mermaid-1585420574418 .postgres tspan { fill:white !important; }
#mermaid-1585420574418 .nodejs &gt; * { fill:#70BA51 !important; color:white !important; }
#mermaid-1585420574418 .nodejs tspan { fill:white !important; }
#mermaid-1585420574418 .stream &gt; * { fill:#aae2b5 !important; color:black !important; font-weight:bold !important; }
#mermaid-1585420574418 .stream tspan { fill:black !important; }
#mermaid-1585420574418 .pipeline &gt; * { fill:white !important; }
#mermaid-1585420574418 .pipeline tspan {  !important; }</style><style>#mermaid-1585420574418 {
    color: rgb(0, 0, 0);
    font: normal normal 400 normal 16px / normal "trebuchet ms", verdana, arial;
  }</style><g transform="translate(0, 0)"><g class="output"><g class="clusters"><g class="cluster" id="subGraph3" transform="translate(1261.5078125,310.69921875)" style="opacity: 1;"><rect width="1655.671875" height="605.3984375" x="-827.8359375" y="-302.69921875"/><g class="label" transform="translate(0, -288.69921875)" id="mermaid-1585420574418Text"><g transform="translate(-76.703125,-11.875)"><foreignobject width="153.41796875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">Real Time Engine</div></foreignobject></g></g></g><g class="cluster pipeline" id="node-pipeline-1" transform="translate(1553.2578125,131.75)" style="opacity: 1;"><rect width="1022.171875" height="207.5" x="-511.0859375" y="-103.75"/><g class="label" transform="translate(0, -89.75)" id="mermaid-1585420574418Text"><g transform="translate(-70.609375,-11.875)"><foreignobject width="141.23046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">node-pipeline-1</div></foreignobject></g></g></g><g class="cluster pipeline" id="node-pipeline-2" transform="translate(1553.2578125,342.6875)" style="opacity: 1;"><rect width="1022.171875" height="174.375" x="-511.0859375" y="-87.1875"/><g class="label" transform="translate(0, -73.1875)" id="mermaid-1585420574418Text"><g transform="translate(-70.609375,-11.875)"><foreignobject width="141.23046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">node-pipeline-2</div></foreignobject></g></g></g><g class="cluster pipeline" id="node-pipeline-3" transform="translate(1376.5859375,506.75)" style="opacity: 1;"><rect width="668.828125" height="113.75" x="-334.4140625" y="-56.875"/><g class="label" transform="translate(0, -42.875)" id="mermaid-1585420574418Text"><g transform="translate(-70.609375,-11.875)"><foreignobject width="141.23046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">node-pipeline-3</div></foreignobject></g></g></g></g><g class="edgePaths"><g class="edgePath" style="opacity: 1;"><path class="path" d="M126.953125,366.125L202.421875,366.125L277.890625,366.125" marker-end="url(#arrowhead13746)" style="fill:none"/><defs><marker id="arrowhead13746" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M383.671875,366.125L408.671875,366.125L433.671875,366.125L459.171875,366.625" marker-end="url(#arrowhead13747)" style="fill:none"/><defs><marker id="arrowhead13747" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M587.3143122659213,329.8846247659213L678.71875,255.5L752.6218765258791,256" marker-end="url(#arrowhead13748)" style="fill:none"/><defs><marker id="arrowhead13748" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M582.3866314959566,408.2930560040434L678.71875,506.75L734.3828125,507.25" marker-end="url(#arrowhead13749)" style="fill:none"/><defs><marker id="arrowhead13749" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M814.9530091785265,233.0983216785268L948.8828125,76.4375L1042.171875,76.4375L1140.5621946180868,123.15859413146973" marker-end="url(#arrowhead13750)" style="fill:none"/><defs><marker id="arrowhead13750" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M821.6164696870317,239.7617821870319L948.8828125,160.60976600646973L1042.171875,160.60976600646973L1101.637382304638,154.53683682683163" marker-end="url(#arrowhead13751)" style="fill:none"/><defs><marker id="arrowhead13751" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M827.4388198801652,245.58413238016536L948.8828125,205.63906288146973L1042.171875,205.63906288146973L1135.2621010595794,166.90859413146973" marker-end="url(#arrowhead13752)" style="fill:none"/><defs><marker id="arrowhead13752" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1239.7712736672784,123.15859413146973L1330.609375,84.875L1364.921875,84.875" marker-end="url(#arrowhead13753)" style="fill:none"/><defs><marker id="arrowhead13753" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1512.515625,84.875L1546.828125,84.875L1572.328125,85.375" marker-end="url(#arrowhead13754)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead13754" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1686.5,85.375L1711,84.875L1736,84.875L1761.5,85.375" marker-end="url(#arrowhead13755)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead13755" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1875.671875,85.375L1900.171875,84.875L1925.671875,85.375" marker-end="url(#arrowhead13756)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead13756" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1279.4296410915065,166.90859413146973L1330.609375,178.625L1364.921875,178.625" marker-end="url(#arrowhead13757)" style="fill:none"/><defs><marker id="arrowhead13757" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1512.515625,178.625L1546.828125,178.625L1572.328125,179.125" marker-end="url(#arrowhead13758)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead13758" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1686.5,179.125L1711,178.625L1736,178.625L1761.5,179.125" marker-end="url(#arrowhead13759)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead13759" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1875.671875,179.125L1900.171875,178.625L1925.671875,179.125" marker-end="url(#arrowhead13760)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead13760" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M827.6650772478397,266.18961177803936L948.8828125,303.9375L1042.171875,303.9375L1114.78125,326.3125" marker-end="url(#arrowhead13761)" style="fill:none"/><defs><marker id="arrowhead13761" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M821.905889870922,271.948799154957L948.8828125,347.6875L1042.171875,347.6875L1089.546875,348.1875" marker-end="url(#arrowhead13762)" style="fill:none"/><defs><marker id="arrowhead13762" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M817.8838969465831,275.97079207929585L948.8828125,391.4375L1042.171875,391.4375L1114.78125,370.0625" marker-end="url(#arrowhead13763)" style="fill:none"/><defs><marker id="arrowhead13763" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1306.109375,348.1875L1330.609375,347.6875L1355.609375,347.6875" marker-end="url(#arrowhead13764)" style="fill:none"/><defs><marker id="arrowhead13764" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1521.828125,347.6875L1546.828125,347.6875L1572.328125,348.1875" marker-end="url(#arrowhead13765)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead13765" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1686.5,348.1875L1711,347.6875L1736,347.6875L1761.5,348.1875" marker-end="url(#arrowhead13766)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead13766" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1875.671875,348.1875L1900.171875,347.6875L1925.671875,348.1875" marker-end="url(#arrowhead13767)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead13767" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M856.09375,507.25L948.8828125,506.75L1042.171875,506.75L1103.9375,507.25" marker-end="url(#arrowhead13768)" style="fill:none"/><defs><marker id="arrowhead13768" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1291.71875,507.25L1330.609375,506.75L1358.9453125,506.75" marker-end="url(#arrowhead13769)" style="fill:none"/><defs><marker id="arrowhead13769" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g><g class="edgePath" style="opacity: 1;"><path class="path" d="M1518.4921875,506.75L1546.828125,506.75L1572.328125,507.25" marker-end="url(#arrowhead13770)" style="fill:none;stroke-width:2px;stroke-dasharray:3;"/><defs><marker id="arrowhead13770" viewbox="0 0 10 10" refx="9" refy="5" markerunits="strokeWidth" markerwidth="8" markerheight="6" orient="auto"><path d="M 0 0 L 10 5 L 0 10 z" class="arrowheadPath" style="stroke-width: 1; stroke-dasharray: 1, 0;"/></marker></defs></g></g><g class="edgeLabels"><g class="edgeLabel" transform="translate(202.421875,366.125)" style="opacity: 1;"><g transform="translate(-50.46875,-11.875)" class="label"><foreignobject width="100.9375" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">SQL Trigger</span></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="translate(678.71875,255.5)" style="opacity: 1;"><g transform="translate(-30.1640625,-11.875)" class="label"><foreignobject width="60.33203125" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.*.*</span></div></foreignobject></g></g><g class="edgeLabel" transform="translate(678.71875,506.75)" style="opacity: 1;"><g transform="translate(-24.390625,-11.875)" class="label"><foreignobject width="48.7890625" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.#</span></div></foreignobject></g></g><g class="edgeLabel" transform="translate(948.8828125,76.4375)" style="opacity: 1;"><g transform="translate(-55.796875,-11.875)" class="label"><foreignobject width="111.6015625" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.article.*</span></div></foreignobject></g></g><g class="edgeLabel" transform="translate(948.8828125,160.60976600646973)" style="opacity: 1;"><g transform="translate(-54.5546875,-11.875)" class="label"><foreignobject width="109.12109375" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.author.*</span></div></foreignobject></g></g><g class="edgeLabel" transform="translate(948.8828125,205.63906288146973)" style="opacity: 1;"><g transform="translate(-68.2890625,-11.875)" class="label"><foreignobject width="136.58203125" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.comment.*</span></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="translate(948.8828125,303.9375)" style="opacity: 1;"><g transform="translate(-48.140625,-11.875)" class="label"><foreignobject width="96.2890625" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.login.*</span></div></foreignobject></g></g><g class="edgeLabel" transform="translate(948.8828125,347.6875)" style="opacity: 1;"><g transform="translate(-52.28125,-11.875)" class="label"><foreignobject width="104.5703125" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.abuse.*</span></div></foreignobject></g></g><g class="edgeLabel" transform="translate(948.8828125,391.4375)" style="opacity: 1;"><g transform="translate(-55.796875,-11.875)" class="label"><foreignobject width="111.6015625" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.article.*</span></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="translate(948.8828125,506.75)" style="opacity: 1;"><g transform="translate(-24.390625,-11.875)" class="label"><foreignobject width="48.7890625" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel">cdc.#</span></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g><g class="edgeLabel" transform="" style="opacity: 1;"><g transform="translate(0,0)" class="label"><foreignobject width="0" height="0"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;"><span class="edgeLabel"/></div></foreignobject></g></g></g><g class="nodes"><g class="node rabbit" id="rte" transform="translate(794.73828125,255.5)" style="opacity: 1;"><polygon points="42.616406250000004,0 85.23281250000001,-42.616406250000004 42.616406250000004,-85.23281250000001 0,-42.616406250000004" transform="translate(-42.616406250000004,42.616406250000004)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-15.4765625,-11.875)"><foreignobject width="30.95703125" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">cdc</div></foreignobject></g></g></g><g class="node rabbit" id="messaging" transform="translate(541.11328125,366.125)" style="opacity: 1;"><polygon points="82.44140625,0 164.8828125,-82.44140625 82.44140625,-164.8828125 0,-82.44140625" transform="translate(-82.44140625,82.44140625)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-59.7265625,-11.875)"><foreignobject width="119.453125" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">pg-messaging</div></foreignobject></g></g></g><g class="node rabbit" id="catchall" transform="translate(794.73828125,506.75)" style="opacity: 1;"><polygon points="60.85546875,0 121.7109375,-60.85546875 60.85546875,-121.7109375 0,-60.85546875" transform="translate(-60.85546875,60.85546875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-35.7421875,-11.875)"><foreignobject width="71.484375" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">catchall</div></foreignobject></g></g></g><g class="node rabbit" id="noop" transform="translate(1186.390625,144.53359413146973)" style="opacity: 1;"><polygon points="-21.875,0 173.375,0 173.375,-43.75 -21.875,-43.75 0,-21.875" transform="translate(-86.6875,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-76.6875,-11.875)"><foreignobject width="153.37890625" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">rte-pipeline-crud</div></foreignobject></g></g></g><g class="node rabbit" id="reop" transform="translate(1186.390625,347.6875)" style="opacity: 1;"><polygon points="-21.875,0 216.5625,0 216.5625,-43.75 -21.875,-43.75 0,-21.875" transform="translate(-108.28125,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-98.28125,-11.875)"><foreignobject width="196.5625" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">rte-pipeline-reporting</div></foreignobject></g></g></g><g class="node rabbit" id="debug" transform="translate(1186.390625,506.75)" style="opacity: 1;"><polygon points="-21.875,0 187.78125,0 187.78125,-43.75 -21.875,-43.75 0,-21.875" transform="translate(-93.890625,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-83.890625,-11.875)"><foreignobject width="167.79296875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">rte-pipeline-debug</div></foreignobject></g></g></g><g class="node nodejs" id="pipeline3" transform="translate(1438.71875,506.75)" style="opacity: 1;"><rect rx="0" ry="0" x="-79.7734375" y="-21.875" width="159.546875" height="43.75" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-69.7734375,-11.875)"><foreignobject width="139.55078125" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">DEBUG-Pipeline</div></foreignobject></g></g></g><g class="node stream" id="stream7" transform="translate(1628.9140625,506.75)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream7</div></foreignobject></g></g></g><g class="node nodejs" id="pipeline2" transform="translate(1438.71875,347.6875)" style="opacity: 1;"><rect rx="0" ry="0" x="-83.109375" y="-21.875" width="166.21875" height="43.75" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-73.109375,-11.875)"><foreignobject width="146.23046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">REPORT-Pipeline</div></foreignobject></g></g></g><g class="node stream" id="stream4" transform="translate(1628.9140625,347.6875)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream4</div></foreignobject></g></g></g><g class="node stream" id="stream5" transform="translate(1818.0859375,347.6875)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream5</div></foreignobject></g></g></g><g class="node stream" id="stream6" transform="translate(1982.2578125,347.6875)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream6</div></foreignobject></g></g></g><g class="node nodejs" id="pipeline1" transform="translate(1438.71875,84.875)" style="opacity: 1;"><rect rx="0" ry="0" x="-73.796875" y="-21.875" width="147.59375" height="43.75" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-63.796875,-11.875)"><foreignobject width="127.59765625" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">CRUD-Pipeline</div></foreignobject></g></g></g><g class="node stream" id="stream1" transform="translate(1628.9140625,84.875)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream1</div></foreignobject></g></g></g><g class="node stream" id="stream2" transform="translate(1818.0859375,84.875)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream2</div></foreignobject></g></g></g><g class="node stream" id="stream3" transform="translate(1982.2578125,84.875)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream3</div></foreignobject></g></g></g><g class="node nodejs" id="pipeline1a" transform="translate(1438.71875,178.625)" style="opacity: 1;"><rect rx="0" ry="0" x="-73.796875" y="-21.875" width="147.59375" height="43.75" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-63.796875,-11.875)"><foreignobject width="127.59765625" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">CRUD-Pipeline</div></foreignobject></g></g></g><g class="node stream" id="stream1a" transform="translate(1628.9140625,178.625)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream1</div></foreignobject></g></g></g><g class="node stream" id="stream2a" transform="translate(1818.0859375,178.625)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream2</div></foreignobject></g></g></g><g class="node stream" id="stream3a" transform="translate(1982.2578125,178.625)" style="opacity: 1;"><polygon points="10.9375,0 103.234375,0 114.171875,-21.875 103.234375,-43.75 10.9375,-43.75 0,-21.875" transform="translate(-57.0859375,21.875)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-36.1484375,-11.875)"><foreignobject width="72.3046875" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">stream3</div></foreignobject></g></g></g><g class="node postgres" id="postgres" label-offset-y="12.190162044450139" transform="translate(67.4765625,366.125)" style="opacity: 1;"><path d="M 0,12.190162044450139 a 59.4765625,12.190162044450139 0,0,0 118.953125 0 a 59.4765625,12.190162044450139 0,0,0 -118.953125 0 l 0,55.94016204445014 a 59.4765625,12.190162044450139 0,0,0 118.953125 0 l 0,-55.94016204445014" transform="translate(-59.4765625,-40.16024306667521)" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-49.4765625,-11.875)"><foreignobject width="98.96484375" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">PostgreSQL</div></foreignobject></g></g></g><g class="node rabbit" id="RABBITMQ" transform="translate(330.78125,366.125)" style="opacity: 1;"><rect rx="5" ry="5" x="-52.890625" y="-21.875" width="105.78125" height="43.75" class="label-container"/><g class="label" transform="translate(0,0)"><g transform="translate(-42.890625,-11.875)"><foreignobject width="85.78125" height="23.75"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; white-space: nowrap;">RabbitMQ</div></foreignobject></g></g></g></g></g></g></svg></a><!--kg-card-end: html--><p>With the <code>Pipeline</code> class we are able to automate real time data workflows in a rather modular way. More importantly, it frees us from having to juggle business logic, database transactions and real time messaging logic throughout the codebase. Once a pipeline is setup, we concern ourselves with the business logic; anytime a particular database record is touched (successfully), it will automatically trigger all of the associated data pipelines. </p><p>In the next part we&apos;ll closer look at writing streams to handle data.</p>]]></content:encoded></item><item><title><![CDATA[A Herd Of Rabbits Part 1:  Postgres Change Capture]]></title><description><![CDATA[Building a real time data engine with PostgreSQL, RabbitMQ, and Node.js. In this first part we'll see how to send change capture data directly to RabbitMQ in about 80 lines of SQL]]></description><link>http://www.codedependant.net/2020/03/27/heard-of-rabbits-1-postgres-change-data-capture-and-rabbitmq/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa68</guid><category><![CDATA[rabbitmq]]></category><category><![CDATA[postgres]]></category><category><![CDATA[change-data-capture]]></category><category><![CDATA[database]]></category><category><![CDATA[herd-of-rabbits]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Fri, 27 Mar 2020 18:38:24 GMT</pubDate><media:content url="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/herd-of-rabbits-med-2.png" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: html--><img src="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/herd-of-rabbits-med-2.png" alt="A Herd Of Rabbits Part 1:  Postgres Change Capture"><p>
  <div class="first-letter"><span>P</span></div>ostgres is no longer &quot;just a database.&quot; It has become a data integration and distribution platform. It has hooks for integrating custom data types, data formats, remote data store integration, remote index support, a rich extension ecosystem, cascading logical replication facilities. It is practically an application server. A proverbial swiss army knife to say the least.
</p><!--kg-card-end: html--><p>At the day job, we use postgres as the primary database. As communication platform (chat) we do a good deal of real-time whiz-bangery. Being that we are an early stage start-up, we try to follow the keep it simple, stupid (k.i.s.s.) approach. Do the simplest thing possible until it isn&apos;t simple anymore. &#xA0;The simple approach to real time was a very manual and you might see something like this quite a bit:</p><pre><code class="language-js:git">async function doMessage(user, opts, db) {
  const {message, resource_id} = opts
  const txn = await db.transaction()
  try {
    const resource = await txn.query({
      text: &apos;INSERT INTO table ...&apos;
    , values: [resource_id, message]
    })
    await tx.commit()
    realtime.publish(user, resource)
    return resource
  } catch (err) {
    txn.rollback()
    log(err)
    // handle errors
  }
  // Other assorted logics
}</code></pre><p>This is simple - I can clearly see what it is trying to do. It adds a message by inserting it into a table, and publishes a message over some real-time channel. There are a couple of problems here. The biggest being that the act of adding a message and dispatching some real-time event are tied together. This innocent little function is a function that cannot really be reused. For example:</p><pre><code class="language-js:git">const doMessage = require(&apos;./do-message&apos;)

async function doThingAndAddMessage(user, opts, db) {
  const txn = await db.transaction()
  try {
    const result = await txn.query(...)
    await doMessage(user, {
      resource_id: result.id
    , message: opts.message
    }, txn)
    await someOtherThing(user, ...)
  } catch (err) {
    txn.rollback()
    log(err)
    // handle errors
  }
}</code></pre><p>If <strong><code>someOtherThing</code> </strong>fails, there is a pretty good chance the real-time message is still sent even though there is an error. This generally leads to people writing different but very similar functions and queries to get around this problem. When this happens, there is more than one function that can add messages, and they don&apos;t always do the real-time bits; which is another problem. At the end of the day, the basic pattern is pretty simple:</p><ol><li>modify something in the database</li><li>Push some message to end-users in real-time telling them what happened so they don&apos;t have to reload pages.</li></ol><p>Generally speaking these real-time messages tell end users what changed / how it changed. This is usually pretty application specific so it made sense in the simple phase to implement this logic in the code paths that handled user interactions. At some point we needed a to understand how things changed. The <code>previous</code> value and the <code>current</code> value. Which usually requires a read of the data before the write. You can also do this in a single query if you are using a database that implements <a href="https://en.wikipedia.org/wiki/Multiversion_concurrency_control">MVCC</a>, but it gets complicated and cumbersome.</p><figure class="kg-card kg-image-card"><img src="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/kafka_connect-1.png" class="kg-image" alt="A Herd Of Rabbits Part 1:  Postgres Change Capture" loading="lazy"></figure><p>We came to realize that the best way to de-couple and simplify these problems would be the <a href="https://martinfowler.com/eaaDev/EventSourcing.html">event sourcing pattern</a>. At the time, our applications were interacting with PostgreSQL so heavily, inverting the application to send writes to kafka, would have been too much of an undertaking. What we could do, however, is use PostgreSQL as an event source - set up <a href="https://debezium.io">debezium</a>, funnel data into Kafka, layer on <a href="http://www.codedependant.net/2020/03/27/heard-of-rabbits-1-postgres-change-data-capture-and-rabbitmq/www.confluent.io/product/ksql">KSQL </a>to pull data back together and sync it back into a database or some other process. That would be a pretty sustainable way to &quot;do real-time&quot;. &#xA0;</p><p>So, why not kafka connect? The idea is sound, but as a small team at a small start-up we had a number of issues with the approach.</p><ol><li>We aren&apos;t really comfortable with the java stack / ecosystem. Nor do we want to be</li><li>We only had 1 devops guy and running kafka in production requires people</li><li>We were already using <a href="https://www.rabbitmq.com/">RabbitMQ </a>and had people with operational experience</li><li>The number of moving parts, and complexity that kafka + kafka connect adds didn&apos;t seem worth it (added ~8 servers for a small cluster).</li><li>We had a number of partitioned tables that made things complicated with debezium; topic per table partition which is hard to reason about down-stream</li></ol><p>Postgres has facilities for sending messages to remote connections with <a href="https://www.postgresql.org/docs/current/sql-notify.html">LISTEN / NOTIFY</a>. However, there are some scalability concerns there. Namely, it requires a single connection ( no pooling ). It also not supported by background workers, which can be a bit problematic. It isn&apos;t really intended for high scale messaging. Additionally, we were already using RabbitMQ and we wanted to keep our messaging there. As luck would have it there is a <a href="https://github.com/omniti-labs/pg_amqp">PostgreSQL extension</a> for sending messages via RabbitMQ. With this single extension we can implement basic change capture and use PostgreSQL and RabbitMQ as a real time engine. &#xA0;</p><p>A change capture message is fairly simple, it tells you <em>where </em>the change is coming from ( a table name or resource name ), and <em>how </em>it changed. The <strong>HOW</strong> can be thought of as a diff &#xA0;of data. All we want to do is get this diff out of PostgreSQL and into some remote service automatically so we don&apos;t have to do it by hand all the time. Here is how we did it</p><h4 id="postgresql-setup">PostgreSQL Setup </h4><p>To start we need to install the amqp extension and the hstore extension, which ships with most distributions of PostgreSQL. More on this later - it will make more sense in a moment. The amqp package can be installed from a source build or from <a href="https://pgxn.org/dist/pg_amqp/doc/amqp.html">PGXN</a>.</p><pre><code class="language-sql:git">CREATE EXTENSION IF NOT EXISTS &quot;amqp&quot;;
CREATE EXTENSION IF NOT EXISTS &quot;hstore&quot;;
GRANT USAGE ON SCHEMA amqp to &lt;USER&gt;;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA amqp to &lt;USER&gt;;</code></pre><p>The amqp extension will set up some table where config for your RabbitMQ cluster will go. The easiest thing to do is add these settings to your PostgreSQL config and read them in the DB setup migrations</p><pre><code class="language-shell:git">## postgres.conf

amqp.host = rabbitmq # rabbit host hame
amqp.port = 5672
amqp.vhost = &apos;/&apos;
amqp.username = guest
amqp.password = guest
amqp.exchange = &apos;pg-messaging&apos;</code></pre><p>You can use the <code>CURRENT_SETTING</code> sql function to read arbitrary values out of the PostgreSQL config.</p><pre><code class="language-sql:git">INSERT INTO amqp.broker (host, port, vhost, username, password)(
  SELECT
    COALESCE(current_setting(&apos;amqp.host&apos;), NULL)
  , COALESCE(current_setting(&apos;amqp.port&apos;)::INT, NULL)
  , COALESCE(current_setting(&apos;amqp.vhost&apos;), NULL)
  , COALESCE(current_setting(&apos;amqp.username&apos;), NULL)
  , COALESCE(current_setting(&apos;amqp.password&apos;), NULL)
);</code></pre><p>If you have ever written a ROW level trigger for PostgreSQL, you are probably familiar with the <a href="https://www.postgresql.org/docs/current/plpgsql-trigger.html">special variables</a> <code>NEW</code> and <code>OLD</code> which represent the previous state of the row being modified, and the current state of the row being modified. We convert this data to JSON, and send it to a RabbitMQ exchange. Our change capture message looks like this:</p><pre><code class="language-js:git">{
  &quot;previous&quot;: Object // JSON representation of the previous row
, &quot;current&quot;: Object // JSON representation of the current row
, &quot;targets&quot;: Array //  A list of columns that actually changed
, &quot;timestamp&quot;: Date // Date / time of the operation
, &quot;operation&quot;: String // WHAT happened (insert, update, delete)
, &quot;resource&quot;: String // An application specific name of the thing being chagned
, &quot;tablename&quot;: String // The name of the table
, &quot;routing_key&quot;: String // The rabbitmq routing key that was used
}</code></pre><p>This gives us everything we need to know </p><ol><li>WHAT changed ( the operation, table + resource )</li><li>WHEN it changed ( a timestamp )</li><li>HOW it changed ( previous / current + targets )</li></ol><h4 id="change-data-trigger">Change Data Trigger</h4><p>To make intelligent decisions in applications that may want to consume this. All that is left is to write the Trigger function to send it out into the world. I&apos;m going to focus on the update operation case, because its the most interesting.</p><pre><code class="language-sql:git">CREATE OR REPLACE FUNCTION AMQP_CHANGE_DATA_CAPTURE()
  RETURNS TRIGGER AS $$
    DECLARE
      routing_key_prefix TEXT
      routing_key
      resource TEXT
    BEGIN
      resource := COALESCE(TG_ARGV[0], TG_TABLE_NAME);
      routing_key_prefix := COALESCE(TG_ARGV[1], &apos;cdc&apos;);
      routing_key := LOWER(FORMAT(&apos;%s.%s.%s&apos;, routing_key_prefix, resource, TG_OP));
    
      IF TG_OP = &apos;INSERT&apos; THEN
      -- SNIP
      RETURN NEW;
      END IF;

      IF TG_OP = &apos;UPDATE&apos; THEN
        PERFORM amq.publish(
          1 -- id of broker in amqp.broker table
        , CURRENT_SETTING(&apos;amqp.exchange&apos;) -- from postgres.conf
        , routing_key
        , JSON_BUILD_OBJECT(
            &apos;previous&apos;, ROW_TO_JSON(OLD)
          , &apos;current&apos;,  ROW_TO_JSON(NEW)
          , &apos;targets&apos;,  AKEYS(HSTORE(NEW) - HSTORE(OLD))
          , &apos;timestamp&apos;, CURRENT_TIMESTAMP
          , &apos;operation&apos;, LOWER(TG_OP)
          , &apos;resource&apos;, resource
          , &apos;table&apos;, TG_TABLE_NAME
          , &apos;routing_key&apos;, routing_key
          )::TEXT;
        RETURN NEW;
      END IF;

      IF TG_OP = &apos;DELETE&apos; THEN
      -- SNIP
      RETURN NULL;
      END IF;

    END;
  $$ LANGUAGE plpgsql;</code></pre><p>There are a couple of things to note here. </p><!--kg-card-begin: html--><img src="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/Asset-1@0.5x.png" class="fr p_all-4" alt="A Herd Of Rabbits Part 1:  Postgres Change Capture"><!--kg-card-end: html--><pre><code class="language-sql:git">, &apos;targets&apos;,  AKEYS(HSTORE(NEW) - HSTORE(OLD))</code></pre><ul><li><a href="https://www.postgresql.org/docs/current/hstore.html">HSTORE</a> is used because it implements subtract &#xA0;<code>hstore - hstore</code>, which removes matching K/V pairs. &#xA0;The <code>AKEYS</code> function just returns an array of keys of the hstore value. In other words, this gives us the columns that were actually changed.</li></ul><pre><code class="language-sql:git">resource := COALESCE(TG_ARGV[0], TG_TABLE_NAME);</code></pre><ul><li><strong>TG_ARGV</strong> is an array of arguments passed to the trigger definition. In this case you can optionally pass a resource name and a routing key prefix if you want to route messages do a different destination. This solves our partitioned table problem. We can give them a common name. For example, Instead of <code>_table_1</code> and <code>_table_2</code> we can can name and handle them in code as just <code>table</code>.</li></ul><!--kg-card-begin: html--><blockquote>
  <h4>trigger [trig&apos;er] -n, noun</h4>
  <p>1. a device, as a lever, the pulling or pressing of which releases a detent or spring</p>
  <p>2. procedural code that is automatically executed in response to certain events on a particular table</p>
  <p>3. special stored procedure that is run when specific actions occur within a database.</p>
</blockquote><!--kg-card-end: html--><p>With the trigger function defined, we can selectively add change capture to any table in our database. We can specify what events that trigger the messages ( or all of them ), what they are called and how they get routed around. </p><pre><code class="language-sql:git">CREATE TRIGGER article_change_data_capture_trig
  AFTER INSERT OR DELETE OR UPDATE
  ON blog_articles
  FOR EACH ROW
    EXECUTE PROCEDURE AMQP_CHANGE_DATA_CAPTURE(&apos;article&apos;);</code></pre><p><strong>NOTE</strong>: Always use <code>AFTER</code> events for these triggers. This ensures that the message only is sent when the transaction succeeds.</p><p>Our RabbitMQ routing keys will look like this</p><ul><li><code>cdc.article.insert</code></li><li><code>cdc.article.delete</code></li><li><code>cdc.article.update</code></li></ul><p>Now for every insert, update and delete on the <code>blog_articles</code> table, A CDC messages will be sent to the <code>pg-messaging</code> exchange on our RabbitMQ server. &#xA0;This falls inline with the Keep It Simple, Stupid mentality. Not only did we successfully avoid any additional servers, complicated infrastructure or expensive hosted services, it only took about ~80 lines of SQL. &#xA0;There are certainly some trade-offs you&apos;ll need to consider with a set up like this:</p><!--kg-card-begin: markdown--><table>
<thead>
<tr>
<th>Pros</th>
<th>Cons</th>
</tr>
</thead>
<tbody>
<tr>
<td>Low complexity</td>
<td>Non-Standard extension</td>
</tr>
<tr>
<td>No additional servers</td>
<td>Config requires a table</td>
</tr>
<tr>
<td>Uses SQL</td>
<td>Additional networking to connect to amqp server</td>
</tr>
<tr>
<td>Low DB Overhead</td>
<td></td>
</tr>
<tr>
<td>Granular and selective</td>
<td></td>
</tr>
<tr>
<td>Trigger Based + replication friendly</td>
<td></td>
</tr>
</tbody>
</table>
<!--kg-card-end: markdown--><p>The biggest downside here is that it makes use of a non-standard ( albeit a very stable one ) extension. In most cases, managed services ( AWS Aurora, Google Cloud SQL, etc. ) do not support these types of extensions, nor do they give you access to install them. If you are currently using or do not want to run your own database servers, this is likely not the most appropriate option.</p><p>With 1 trigger and 1 extension we&apos;ve made a data firehose. In <a href="http://codedependant.net/2020/04/01/a-herd-of-rabbits-part-2-rabbitmq-data-pipelines/">Part 2</a>, we&apos;ll take a look at how to make the data work for us with a little bit of help from Node.js</p>]]></content:encoded></item><item><title><![CDATA[Fun With Postgres: Custom Constraints]]></title><description><![CDATA[Learn how to implement rich data constraints using triggers inside of postgreSQL. Perfect for those situations when simple check constraints aren't enough.]]></description><link>http://www.codedependant.net/2019/09/29/fun-with-postgresql-custom-constraints/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa67</guid><category><![CDATA[postgres]]></category><category><![CDATA[database]]></category><category><![CDATA[trigger]]></category><category><![CDATA[fun-with-postgres]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Sun, 29 Sep 2019 20:58:21 GMT</pubDate><media:content url="http://www.codedependant.net/content/images/2019/09/postgres-splash-small.jpg" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: html--><img src="http://www.codedependant.net/content/images/2019/09/postgres-splash-small.jpg" alt="Fun With Postgres: Custom Constraints"><p>
<div class="first-letter"><span>P</span></div>ostgres comes with a rich set of tool to help developers maintain data integrity. Many of them come in the form of Constraints. Foreign Key, Not Null, Exclusion, and Check constraint allow developers to off load what would usually be a lot of application code to the database server with very litte effort.   
</p><!--kg-card-end: html--><p>Between the Foreign key, which verifies values in another table, and the Check constraint, which verifies values in a specific column, you can typically accomplish just about everything you need to do rather easily. The main problem the you&apos;ll run into is that these kinds of constraints are restricted to a single column on a table. Additionally they only apply the the current value in that column. This restriction doesn&apos;t apply to custom constraint, which is a special kind of trigger. This effectively gives you the full power of SQL to implement much more complex constraint conditions.</p><pre><code class="language-sql:git">CREATE TYPE states AS ENUM (&apos;one&apos;, &apos;two&apos;, &apos;three&apos;);
create table example (
  id INT GENERATED ALWAYS AS IDENTITY
, state states NOT NULL  
);</code></pre><p>A simple example to illustrate the utility of a constraint trigger is a simple state machine. Here, our table has a <code>state</code> field that has a custom ENUM column. This ensures that the column can only contain <code>one</code>, <code>two</code>, or <code>three</code>. However, I want to make sure that the initial value is always <strong>one</strong>. Additionally, I want to make sure that, during update operations, that the <code>state</code> value can only be set to <code>two</code> if the previous value was <code>one</code> and can only be set to <code>three</code> if the previous value was <code>two</code>. If any of those conditions are not met, that operation should fail and abort the transaction.</p><figure class="kg-card kg-code-card"><pre><code class="language-shell:git">+ -- ONE --&gt; -- TWO --&gt; -- THREE -- +</code></pre><figcaption>Restricted Data Transitions</figcaption></figure><p>This kind of logic is generally pretty difficult to capture with traditional check constraints. At this point this is usually where most application developers will turn to doing this work at the application layer by pulling the current record, comparing the input value to the existing value and making a decision. Careful, of course, to account for the situation where there is no record and taking the appropriate action. The problem here is that doing it this way creates a data race between the running instances of the application in the time it take to read / check the record and updating or inserting value. It is an unnecessary bit of complexity when postgres can do all of that in a single step with a constraint </p><!--kg-card-begin: markdown--><blockquote>
<h4 id="triggertrigernnoun">Trigger [tri&apos;ger] -n., --noun</h4>
<ol>
<li>special stored procedure that is run when specific actions occur within a database.</li>
<li>cause an event or situation to happen or exist.</li>
</ol>
</blockquote>
<!--kg-card-end: markdown--><p>To do this we create a function like any other function that <code>RETURNS TRIGGER</code>. The function must do one of three things:</p><ol><li>Raise an exception to abort the transaction</li><li>Return a record/row value having exactly the structure of the table the trigger was fired for</li><li>Return <code>NULL</code></li></ol><pre><code class="language-sql:git">CREATE OR REPLACE FUNCTION example_fsm_check()
RETURNS TRIGGER AS 
$$
  BEGIN
    RETURN NEW;
  END;
$$ LANGUAGE PLPGSQL VOLATILE;</code></pre><p>Right now, this is a no-op. What we need to do is determine if the transaction is an <strong>INSERT </strong>and check for the value <code>one</code> or check for the transition values if it is an <strong>UPDATE</strong>. Postgres injects some additional information into trigger functions that you can inspect to make these kinds of determinations. In this case we want <a href="https://www.postgresql.org/docs/current/plpgsql-trigger.html">TG_OP</a>.</p><pre><code class="language-sql:git">CREATE OR REPLACE FUNCTION example_fsm_check()
RETURNS TRIGGER AS 
$$
  BEGIN
    IF TG_OP = &apos;INSERT&apos; AND NEW.state != &apos;one&apos; THEN
      RAISE EXCEPTION &apos;Invalid FSM State %s&apos;, NEW.state
    END IF;
    RETURN NEW;
  END;
$$ LANGUAGE PLPGSQL VOLATILE;</code></pre><p>Now things are getting interesting. In the above snippet, if the operation is an insert, and the state column is <strong>NOT</strong> <code>one</code>, raise an exception. We can make use of &#xA0;<code>TG_OP</code> to tell us that the operation is an insert, and the special <code>NEW</code> object to inspect the data being inserted. This prevents the initial state from being anything other than the value we want and raises an exception if it is.</p><!--kg-card-begin: markdown--><pre><code class="language-sql:git">INSERT INTO example (state)
VALUES (&apos;two&apos;)
</code></pre>
<pre><code class="language-shell:git">ERROR:  Invalid FSM State twos
CONTEXT:  PL/pgSQL function example_fsm() line 5 at RAISE
SQL state: P0001
</code></pre>
<!--kg-card-end: markdown--><p>The next thing to do is handle the allowable state transitions during updates. Like most other programming languages, plpgsql has some facilities for logic branching. Additionally, during an <code>UPDATE</code> operation, postgres gives trigger functions access to the <code>OLD</code> record as well as the <code>NEW</code> record</p><pre><code class="language-sql:git">-- on update
IF TG_OP = &apos;UPDATE&apos; THEN
  -- if state changes
  IF OLD.status IS DISTINCT FROM NEW.status THEN
    -- only from one -&gt; two
    IF OLD.status = &apos;one&apos; AND NEW.status = &apos;two&apos; THEN
      RAISE EXCEPTION &apos;Invalid state transition&apos;;
    END IF;
		
    -- only from two -&gt; three
    IF OLD.status = &apos;two&apos; AND NEW.status != &apos;three&apos; THEN
      RAISE EXCEPTION &apos;Invalid state transition&apos;;
    END IF;
  END IF;
END IF;
-- end update</code></pre><p>Here, we add a couple of <code>if</code> checks. The main if condition is checking if status column has changed at all - if not, there is nothing to do. However, if it has changed, there are two additional conditions that make up the final logic of our little state machine. If the old status value was <strong>ONE</strong>, and the new value is not <strong>TWO</strong>, then raise an exception. Similarly if the value was <strong>TWO</strong> and the new value is not <strong>THREE</strong>, also raise an exception. </p><p>Lastly, we can attach the function to a table as a <code>CONSTRAINT</code> trigger like normal.</p><!--kg-card-begin: markdown--><pre><code class="language-sql:git">CREATE CONSTRAINT TRIGGER check_sample_fsm
AFTER INSERT OR UPDATE
ON example
FOR EACH ROW
EXECUTE procedure example_fsm_constraint();
</code></pre>
<!--kg-card-end: markdown--><p>That&apos;s it! we&apos;ve defined our own constraint. Now unlike a regular trigger procedure, a constraint trigger can define when it is triggered as well as conditions that define when the trigger is executed. For example, if we want the constraint to fire at the end of a transaction and only when the state is set to <code>one</code>, we can easily add these instructions to the trigger using the <code>DEFERRABLE</code> option and the <code>WHEN</code> condition clause.</p><pre><code class="language-sql:git">CREATE CONSTRAINT TRIGGER check_sample_fsm
AFTER INSERT OR UPDATE
ON example
DEFERRABLE INITIALLY DEFERRED 
FOR EACH ROW
WHEN NEW.state = &apos;one&apos;
EXECUTE procedure example_fsm_constraint();</code></pre><p>Trigger constraints give us the ability to define very complex triggers that extend well beyond what one could do with CHECK constraints. They could even be used, for example, to build out FOREIGN KEY like constraints that check multiple tables or other points of data that can&apos;t be accomplished otherwise.</p><p><a href="https://www.postgresql.org/docs/current/sql-createtrigger.html">Trigger Constraints </a>- They are pretty cool.</p>]]></content:encoded></item><item><title><![CDATA[Flexible Schemas with PostgreSQL and Elasticsearch]]></title><description><![CDATA[Combine the power of postgres and elasticsearch to create a flexible database schema without sacrifice.]]></description><link>http://www.codedependant.net/2018/12/26/flexible-schemas-postgresql-elasticsearch/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa66</guid><category><![CDATA[postgres]]></category><category><![CDATA[elasticsearch]]></category><category><![CDATA[json]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Wed, 26 Dec 2018 23:18:00 GMT</pubDate><media:content url="http://www.codedependant.net/content/images/2018/12/json-merge-postgresql.png" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: html--><img src="http://www.codedependant.net/content/images/2018/12/json-merge-postgresql.png" alt="Flexible Schemas with PostgreSQL and Elasticsearch"><p>
  <div class="first-letter"><span>R</span></div>elational Databases typically make use of a rigid schema - predefined tables containing typed columns allowing for a rich set of functionality that would otherwise be impossible. It is both a major strength as well as a major weakness. On one hand strong typing allows databases to expose a rich set of operators, functions and functionality for each of the types. For postgres, this usually presents itself in the form of column types sql syntax to interact with them. On the other hand it means that all of the data in the table is uniform and deviations or alterations are rather difficult to do.
</p>
<!--kg-card-end: html--><p>At the day job, I am in the process of migrating a number of applications off of a document store to postgres. One of the features we had was custom attributes which effectively allowed end users to associate arbitrary data with various entities. &#xA0;In addition to just porting the ability to store user defined data, we now need to allow end users to be able to query for the associated entity via the attributes they have defined. This includes ability to sort by, apply range (numeric or date) filters and do full text phrase matching over text values. </p><p>For example, <strong>ACME Co</strong>., may want to define attributes about their users like this:</p><ul><li>ip_address (text)</li><li>name (text)</li><li>login_date (timestamp)</li><li>max_sale_value(number)</li><li>total_purchases(number)</li><li>vip_customer (boolean)</li></ul><p>From this data they might want ask arbitrary questions of this data to find and gain insights of users. For example</p><h5 id="find-all-users-who-have-logged-in-within-the-last-3-months-have-more-than-10-total-purchases-but-are-not-vip-and-whose-name-starts-with-j">Find all <code>users</code> who have logged in <code>within the last 3 months</code>, have <code>more than 10 total purchases</code> but are <code>not vip</code> and whose <code>name starts with J</code></h5><p>Translated roughly to SQL:</p><!--kg-card-begin: markdown--><pre><code class="language-sql:git">SELECT *
FROM user_attributes
WHERE
  login_date BETWEEN NOW() - INTERVAL &apos;90 DAY&apos; AND NOW()
AND total_purchases &gt; 10
AND name ILIKE &apos;j%&apos;
AND vip = FALSE
</code></pre>
<!--kg-card-end: markdown--><p>Another company, <strong>WizzBang LLC</strong>., want to store a different set of data</p><ul><li>name (text)</li><li>phone_number (text)</li><li>dog_lover (boolean)</li><li>cat_lover (boolean)</li><li>pet_count (number)</li><li>first_pet (timestamp)</li><li>favorite_dog_breed (text)</li><li>favorite_cat_breed (text)</li><li>award_count (number)</li></ul><p>Not only is the data they store very different both physically and contextually, but the are going to ask very different questions of it than ACME. Of course, they are both large companies and estimated ~10M users. This means we are going to need a way to index their data to support the queries they will need to run so the application doesn&apos;t grind to a stop. There are some options out there.</p><h4 id="eav">EAV</h4><p>The Entity Attribute Value (EAV) design has been around for a while. It has pros and cons. For our situation, on the upside, the column that stores the actual value is a physical, typed column that we can perform a variety of operations on. Additionally, because it is a column, we could index it for better performance.</p><figure class="kg-card kg-image-card"><img src="http://www.codedependant.net/content/images/2018/12/simple-eav.png" class="kg-image" alt="Flexible Schemas with PostgreSQL and Elasticsearch" loading="lazy"></figure><p>EAV schemas try to make the rigid relational model and make it bit more flexible. An <code>entities</code> table defines basic things (Car). An <code>attributes</code> table defines the properties of an entity (weight), and the <code>attribute_values</code> table holds the value for a property associated with an entity. For our needs, the primary problem is that there is only a single value column, meaning they all have to be the same type. That makes it hard to support dates, boolean, or numeric types.</p><!--kg-card-begin: markdown--><blockquote>
<h4 id="jsonjaysunnnoun">JSON (JAY&apos;sun) -n., noun</h4>
<p>We can serialize our data as JSON and everything will be easy</p>
<ol>
<li>An open-standard file format that uses human-readable text to transmit data consisting of attribute-value pairs and array data types</li>
</ol>
</blockquote>
<!--kg-card-end: markdown--><h2 id="json">JSON</h2><p>You might be thinking JSON! think again. If all we needed to do was store the data, you might be on to something. But we have to do a bit more with it and this gets hard. In particular:</p><ol><li>It is really just text under the hood which means we have to do a lot of manual type casting of specific fields and doing that won&apos;t use indexes (slow)</li><li>To be able to do any kind of searching, range queries, etc, we need to know what kind of data is in there anyway. And if we have to control that - we might as well just use a table</li><li>Doing joins off of values in JSON columns is tough. We either have to add columns to the table to help out, or put the JSON column on the source table to avoid having to do join - like a doc store</li></ol><p>The real deal breaker is doing queries that target specific fields within the JSON need to know what kind of data is in the field and explicitly cast it to perform any kind of filtering other than exact matching.</p><!--kg-card-begin: markdown--><pre><code class="language-sql:git">-- SAMPLE TABLE w/ JSON COLUMN
CREATE TABLE sample(
  id SERIAL
, word TEXT
, json_data JSONB NOT NULL
);
CREATE INDEX json_data_gin_indx ON sample USING GIN(json_data);
</code></pre>
<!--kg-card-end: markdown--><!--kg-card-begin: markdown--><pre><code class="language-sql:git">--- INSERT SOME RANDOM DATA
INSERT INTO sample (word, json_data)(
  SELECT
    encode(gen_random_bytes(10), &apos;hex&apos;)
  , JSON_BUILD_OBJECT(
      &apos;value&apos;, iter
    , &apos;sample&apos;, encode(gen_random_bytes(10), &apos;hex&apos;)
  ) from generate_series(1, 2000000) as iter
)
-- &gt; Affected rows: 2000000
-- &gt; Time: 76.108s
</code></pre>
<!--kg-card-end: markdown--><!--kg-card-begin: markdown--><pre><code class="language-sql:git">-- TRY TO QUERY FOR SOME VALUES
select *
from table_b
where json_data-&gt;&gt;&apos;a&apos; &lt; 2

-- &gt; ERROR:  operator does not exist: text &lt; integer
</code></pre>
<!--kg-card-end: markdown--><p>while the <code>-&gt;&gt;</code> operator gets you access to field values within the JSON, they are accessed by the string representation, which greatly limits the ways it can be queried. To get around the issue you would have to manually cast all fields</p><!--kg-card-begin: markdown--><pre><code class="language-sql:git">select *
from table_b
where 
  CAST(json_data-&gt;&gt;&apos;a&apos; AS INTEGER) &lt; 2
</code></pre>
<!--kg-card-end: markdown--><p>This isn&apos;t very complicated to do itself, and it will work a short while. The problem here is that this kind of query can&apos;t make use of indexes. While it is possible to index JSON columns with <a href="https://www.postgresql.org/docs/current/gin.html">GIN</a> indexes, You are fairly limited when it comes to the kinds of queries will make use of indexes. Once the table grows large enough, we are going to need a different solution</p><figure class="kg-card kg-image-card kg-width-wide kg-card-hascaption"><img src="http://www.codedependant.net/content/images/2018/12/gin.png" class="kg-image" alt="Flexible Schemas with PostgreSQL and Elasticsearch" loading="lazy"><figcaption>The GIN index</figcaption></figure><p>What we really want is an actual postgres table with typed columns that gives us all of the powerful query predicate and indexing power that we know and love. I want a flexible schema. To get the functionality we need to implement the described feature, we&apos;re going to need to rethink this.</p><h2 id="attribute-table">Attribute Table</h2><p>This is actually a pretty good use case for table inheritance. PostgreSQL <a href="https://www.postgresql.org/docs/current/tutorial-inheritance.html">table inheritance</a> takes a page directly from object orientated programming practices. Tables can inherit columns and attributes from a common parent table. Every child table will have all of the columns that are added or removed from the parent but are still able to define their own columns to make up a unique schema. This is how we can achieve a flexible schema with physical tables. We can make this process mostly automatic with triggers.</p><!--kg-card-begin: markdown--><pre><code class="language-sql:git">-- base table
CREATE TABLE custom_attributes (
  id UUID NOT NULL default gen_random_uuid()
, user_id UUID NOT NULL
, created_at TIMESTAMPTZ DEFAULT NOW()::TIMESTAMPTZ
, company_id UUID NOT NULL
);

-- TRIGGER: When a new company record is inserted
-- Generate an attribute table for there custom data
CREATE OR REPLACE FUNCTION generate_custom_attribute_table()
  RETURNS TRIGGER AS $$
  DECLARE
    tablename TEXT;
    ident TEXT
    company_id TEXT
  BEGIN
    company_id := COALESCE(NEW.id, gen_random_uuid())::TEXT;
    ident := REPLACE(company_id, E&apos;\-&apos;, &apos;&apos;::TEXT);
    tablename := &apos;user_attributes_&apos; || ident;
    EXECUTE FORMAT(
      &apos;CREATE TABLE %s (
        CONSTRAINT %s_pkey PRIMARY KEY (user_id, company_id)
      ) INHERITS (custom_attributes)&apos;
    , tablename, tablename
    )
    RETURN NULL;
  END;
  $$ LANGUAGE plpgsql VOLATILE;

CREATE TRIGGER on_company_insert
AFTER INSERT
ON company FOR EACH ROW
EXECUTE procedure generate_custom_attribute_table();
</code></pre>
<!--kg-card-end: markdown--><p>When a new record is inserted into the <code>company</code> table, a trigger creates a table that inherits from the <code>custom_attribute</code> table. This is the table that will contain the physical columns to store data points defined by a company. Initially, this table doesn&apos;t have any columns that deviate from the base table.</p><p>Now, we are going to need a way to allow administrators to manage their schema with out giving them direct access to the internal table. For that we are going to need a catalog that keeps record of the field names and their respective types that can be used as the public interface.</p><h2 id="catalog-table">Catalog Table</h2><p>We&apos;ll store the attributes definitions our companies want to store in a table that we can simply add / remove records from. Basically all we need is the name of the field and the type of data that will be stored - In this case we are going to constrain that to just <code>number</code>, <code>boolean</code>, <code>text</code> and <code>date</code> fields.</p><!--kg-card-begin: markdown--><pre><code class="language-sql:git">CREATE DOMAIN user_attribute_type AS TEXT
  CONSTRAINT user_attribute_field_chk
    NOT NULL CHECK(VALUE IN (&apos;boolean&apos;, &apos;date&apos;, &apos;number&apos;, &apos;text&apos;));

CREATE TABLE IF NOT EXISTS user_attribute_catalog (
  id UUID NOT NULL gen_random_uuid()
, company_id UUID NOT NULL REFERENCES company(id)
, field_name text NOT NULL
, field_type user_attribute_type
, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
, CONSTRAINT catalog_attribute_name_uniq (company_id, field_name)
);
</code></pre>
<!--kg-card-end: markdown--><p>When we want to show all of the defined attributes for a company, it is a pretty simple query</p><!--kg-card-begin: markdown--><pre><code class="language-sql:git">SELECT field_name, field_type
FROM user_attribute_catalog
WHERE company_id = $1
</code></pre>
<!--kg-card-end: markdown--><p>The last piece to this is going to be a couple of triggers to add and remove the appropriate columns to the child table when records are added or removed from the catalog table.</p><p>Inserting a record into the catalog table adds a column to the respective attribute table for the company. The type of the column is determined by the <code>field_type</code> column of the inserted record</p><!--kg-card-begin: html--><table>
  <thead>
    <th>Field Type</th>
    <th>Column Type</th>
  </thead>
  <tbody>
    <tr>
      <td>date</td><td>TIMESTAMPTZ</td>
    </tr>
    <tr>
      <td>boolean</td><td>BOOLEAN</td>
    </tr>
    <tr>
      <td>number</td><td>FLOAT8</td>
    </tr>
    <tr>
      <td>text</td>
      <td>
        <a href="https://github.com/zombodb/zombodb/blob/master/TYPE-MAPPING.md">zdb.fulltext</a>
      </td>
    </tr>
  </tbody>
</table><!--kg-card-end: html--><!--kg-card-begin: markdown--><pre><code class="language-sql:git">-- ADD ATTRIBUTE
CREATE OR REPLACE FUNCTION public.add_user_attribute()
RETURNS TRIGGER AS $$
  DECLARE
    statements TEXT;
    tablename TEXT;
    ident TEXT;
    organization_id TEXT;
    field TEXT;
    BEGIN
    ident := REPLACE(NEW.company_id::TEXT, E&apos;\-&apos;, &apos;&apos;::TEXT);
    tablename = &apos;user_attributes_&apos; || ident;
    field := NEW.field_name || &apos;_&apos; || ident

    IF
      NEW.field_type = &apos;boolean&apos; THEN
        statements := FORMAT(&apos;
          ALTER TABLE %s
              ADD COLUMN IF NOT EXISTS %s BOOLEAN
        &apos;, tablename, field);

      ELSEIF NEW.field_type = &apos;number&apos; THEN
        statements := FORMAT(&apos;
          ALTER TABLE %s
              ADD COLUMN IF NOT EXISTS %s FLOAT8
        &apos;, tablename, field);

      ELSEIF NEW.field_type = &apos;date&apos; THEN
        statements := FORMAT(&apos;
          ALTER TABLE %s
              ADD COLUMN IF NOT EXISTS %s TIMESTAMPTZ
        &apos;, tablename, field);
      ELSEIF NEW.field_type = &apos;text&apos; THEN
        statements := FORMAT(&apos;
          ALTER TABLE %s
              ADD COLUMN IF NOT EXISTS %s zdb.fulltext
                CONSTRAINT ua_%s_chk CHECK(char_length(%s) &lt;= 512)
        &apos;, tablename, field, field, field);
      ELSE
        RAISE EXCEPTION &apos;Unknown attribute type: %s&apos;, NEW.field_type;
    END IF;
    EXECUTE statements;
    RETURN NULL;
  END;
  $$
  LANGUAGE plpgsql VOLATILE;

CREATE TRIGGER on_insert_user_attribute_fields
 AFTER INSERT
 ON user_attribute_fields  FOR EACH ROW
 EXECUTE PROCEDURE add_user_attribute();
</code></pre>
<!--kg-card-end: markdown--><!--kg-card-begin: markdown--><pre><code class="language-sql:git">-- REMOVE ATTRIBUTE
CREATE OR REPLACE FUNCTION public.remove_user_attribute()
  RETURNS TRIGGER AS $$
  DECLARE
    tablename TEXT;
    ident TEXT;
    field TEXT;
    BEGIN
      ident := REPLACE(OLD.company_id::TEXT, E&apos;\-&apos;, &apos;&apos;::TEXT);
      tablename := &apos;user_attributes_&apos; || ident;
      field := OLD.field_name || &apos;_&apos; || UUID_TO_IDENT(OLD.id);
      EXECUTE FORMAT(&apos;
        ALTER TABLE %s
          DROP COLUMN IF EXISTS %s
      &apos;, tablename, field);
    RETURN NULL;
    END;
  $$ LANGUAGE plpgsql VOLATILE;

CREATE TRIGGER on_delete_user_attribute_fields
  AFTER DELETE
  ON user_attribute_fields
  FOR EACH ROW
  EXECUTE PROCEDURE remove_user_attribute();
</code></pre>
<!--kg-card-end: markdown--><p>These two trigger functions handle managing the internal table definition that will hold the actual attribute data. When a record is inserted into the catalog table, a column is added. Conversely, when record is deleted from the catalog, it corresponding column is removed.</p><p>It is important to note that the column being added and removed have no constraints and no defaults. Doing it this way avoids having to re-write entire tables and is mostly a change in the metadata about the table. These operations can be preformed on production tables with multiple millions of records in milliseconds.</p><p>Inserting into the Company table will generate an attributes table specifically for that company</p><!--kg-card-begin: markdown--><pre><code class="language-sql:git">INSERT INTO company (id, name)
VALUES (&apos;02335c58-a66f-4b53-9801-cb8045e2e848&apos;, &apos;ACME&apos;);
</code></pre>
<pre><code class="language-shell:git"># list tables
\dt
company
custom_attributes
custom_attribute_catalog
user_attributes_02335c58a66f4b539801cb8045e2e848
</code></pre>
<!--kg-card-end: markdown--><p>By default, it will only have the columns that were defined by the parent attributes table.</p><!--kg-card-begin: markdown--><pre><code class="language-shell:git"># user_attributes_02335c58a66f4b539801cb8045e2e848

+-------------+------+
| Column Name | Type |
| ----------- | ---- |
| id          | UUID |
| company_id  | UUID |
| user_id     | UUID |
+-------------+------+
</code></pre>
<!--kg-card-end: markdown--><p>Adding a field definition to the catalog adds a column of the appropriate type to the child table that was set up for the specific company.</p><!--kg-card-begin: markdown--><pre><code class="language-sql:git">INSERT INTO user_attribute_catalog
(company_id, field_name, field_type)
VALUES (
  &apos;02335c58-a66f-4b53-9801-cb8045e2e848&apos;
, &apos;fake_number&apos;
, &apos;number&apos; 
);
</code></pre>
<!--kg-card-end: markdown--><!--kg-card-begin: markdown--><pre><code class="language-shell:git"># user_attributes_02335c58a66f4b539801cb8045e2e848

+----------------------------------------------+--------+
| Column Name                                  | Type   |
|----------------------------------------------|--------|
| id                                           | UUID   |
| company_id                                   | UUID   |
| user_id                                      | UUID   |
| fake_number_6ad3e88314e04832b39daef8fa7ff730 | DOUBLE |
+----------------------------------------------+--------+
</code></pre>
<!--kg-card-end: markdown--><p>Removing records for from the catalog table will additionally drop the column from the attribute table</p><h2 id="indexing">Indexing</h2><!--kg-card-begin: html--><p>
<img src="https://codedependant-blog.s3.amazonaws.com/content/images/2019/12/zombodb-illustration.png" style="max-height:180px" class="fr" alt="Flexible Schemas with PostgreSQL and Elasticsearch"> We&apos;ve solved the flexible schema problem, but we still have the issue of needing to perform complex user defined queries on a large data set. We need to index this, and to do that we are going to use elasticsearch via the fantastic zombodb extension.
</p><!--kg-card-end: html--><!--kg-card-begin: markdown--><p>Once the extension is installed and configured, we need to update the first trigger that generates the attributes table to add an <a href="https://elastic.co">elasticsearch</a> index.</p>
<!--kg-card-end: markdown--><!--kg-card-begin: markdown--><blockquote>
<h2 id="itisanindexlikeanyotherpostgresindexitjusthappenstoliveinelasticsearch">It is an index like any other postgres index, it just happens to live in elasticsearch</h2>
</blockquote>
<!--kg-card-end: markdown--><!--kg-card-begin: markdown--><pre><code class="language-sql:git">  BEGIN
    company_id := COALESCE(NEW.id, gen_random_uuid())::TEXT;
    ident := REPLACE(company_id, E&apos;\-&apos;, &apos;&apos;::TEXT);
    tablename := &apos;user_attributes_&apos; || ident;
    EXECUTE FORMAT(
      &apos;CREATE TABLE %s (
        CONSTRAINT %s_pkey PRIMARY KEY (user_id, company_id)
      ) INHERITS (custom_attributes);
     CREATE INDEX IF NOT EXISTS %s_zdx ON %s USING zombodb((%s.*)) WITH (alias = &apos;&apos;%s&apos;&apos;)
      &apos;
    , tablename, tablename, tablename, tablename, tablename, tablename
    );
    RETURN NULL;
  END;
</code></pre>
<!--kg-card-end: markdown--><figure class="kg-card kg-image-card"><img src="http://www.codedependant.net/content/images/2018/12/pg_zombodb_index.png" class="kg-image" alt="Flexible Schemas with PostgreSQL and Elasticsearch" loading="lazy"></figure><p>That is all there is &#xA0;to it. The simplest way to think about how zombodb and postgres work together is this - It is an index like any other postgres index, it just happens to live in elasticsearch; In the same way postgres manages a BTREE index on disk, it will manage a zombodb index in elasticsearch. Zombodb indexes the entire row, so as columns are added and values are populated, the index is kept up to date. This means we can perform just about any kind of search query elasticsearch supports. We can update the SQL query for the data set defined for ACME to use zombodb</p><!--kg-card-begin: markdown--><pre><code class="language-sql:git">SELECT *
FROM user_attributes_02335c58a66f4b539801cb8045e2e848
WHERE user_attributes_02335c58a66f4b539801cb8045e2e848 ==&gt;
dsl.limit(25, 
  dsl.must(
    dsl.term(&apos;vip&apos;, &apos;false&apos;)
  , dsl.range(field=&gt;&apos;total_purchases&apos;, gt=&gt;10)
  , dsl.range(
      field=&gt;&apos;login_date&apos;
    , lte=&gt;&apos;2018-12-30&apos;
    , gte=&gt;&apos;2018-09-30&apos;
    )
  , dsl.wildcard(&apos;name&apos;, &apos;j*&apos;)
  )
)
</code></pre>
<!--kg-card-end: markdown--><p>This tells PG to use the zombodb index which makes the appropriate query to your elasticsearch cluster, and will still return the results out of the table. You can still use joins, procedures and everything else postgres provides. We get a strongly typed flexible schema in a relational database backed by a blazing fast elasticsearch index.</p><p>It may not be magic - but it is damn close</p>]]></content:encoded></item><item><title><![CDATA[Exact Match OR Null Queries With Elasticsearch]]></title><description><![CDATA[Elasticsearch doesn't index null values which makes searching for null values rather tricky. Here is simple to find document that have might have null fields]]></description><link>http://www.codedependant.net/2017/12/19/exact-match-or-null-queries-with-elasticsearch/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa65</guid><category><![CDATA[elasticsearch]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Tue, 19 Dec 2017 13:56:42 GMT</pubDate><media:content url="http://www.codedependant.net/content/images/2017/12/Elasticsearch-Logo-Trans-med.png" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: html--><div class="first-letter"><span>E</span></div><img src="http://www.codedependant.net/content/images/2017/12/Elasticsearch-Logo-Trans-med.png" alt="Exact Match OR Null Queries With Elasticsearch"><p>lasticsearch is an extremely powerful searchable document store. The more you use it, the more you learn about the more realize how deep the rabbit hole of possibility goes. Except for when it comes <code>null</code> values. The Achilles heel of elasticsearch. What it really boils down to is elasticsearch doesn&apos;t index <code>null</code> values or fields that are missing from a document. Even if you have set up an index mapping and told elasticsearch about your field.</p><!--kg-card-end: html--><p>As you might think it can get a little tricky searching document for a field value or where that field is null / missing - especially when <a href="http://www.codedependant.net/2017/11/11/sql-like-search-queries-with-elasticsearch/">combined with other field queries</a>. For example lets say I have some documents that looks something like this</p><pre><code class="language-js:git">[{
  &quot;name&quot;: &quot;bill&quot;
, &quot;age&quot;: 25
, &quot;fav_color&quot;: &quot;blue&quot;
}, {
  &quot;name&quot;: &quot;sam&quot;
, &quot;age&quot;: &quot;30&quot;
, &quot;fav_color&quot;: null
}, {
  &quot;name&quot;: &quot;fred&quot;
, &quot;age&quot;: &quot;21&quot;
, &quot;fav_color&quot;: &quot;red&quot;
}]
</code></pre><p>If I want to find all documents where <code>age</code> greater than or equal to <code>25</code> and <code>fav_color</code> is <code>blue</code> <strong>or</strong> <code>null</code> you could think of this in terms of SQL</p><h4 id="sql-s-kw-l-n-noun">SQL &#x2C8;s&#x113;-kw&#x259;l -n --noun</h4><ol><li>domain-specific language used in programming and designed for managing data held in a relational database management system</li><li>the next installment (as of a speech or story); especially : a literary, cinematic, or televised work continuing the course of a story begun in a preceding one</li></ol><pre><code class="language-sql:git">SELECT 
 id
, name
, age
, fav_color
FROM
  users
WHERE
  age &gt; 25
AND
  (fav_color = blue OR fav_color IS NULL)
</code></pre><p>You <em>could</em> do this with <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-bool-query.html">bool</a> queries, but it gets a little complex - even for small queries.</p><pre><code class="language-js:git">{
  &quot;query&quot;: {
    &quot;constant_score&quot;: {
      &quot;filter&quot;: {
        &quot;bool&quot;: {
          &quot;must&quot;: [{
            &quot;range&quot;: {
             &quot;age&quot;: {
               &quot;gte&quot;: 25
             }
            }
          }, {
            &quot;bool&quot;: {
             &quot;should&quot;: [{
               &quot;bool&quot;: {
                &quot;must&quot;: {
                  &quot;term&quot;: {
                   &quot;fav_color&quot;: &quot;blue&quot;
                  }
                }
               }
             }, {
               &quot;bool&quot;: {
                 &quot;must_not&quot;: {
                   &quot;exist&quot;: &quot;fav_color&quot;
                 }
              }
             }]
            }
          }]
        }
      }
    }
  }
}
</code></pre><p>This style of queries is really nice if you want to create a generic query builder for elasticsearch as everything can be reduced to a <strong>bool</strong> query at the end of the day. But elasticsearch has a short cut for this with the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-dis-max-query.html">dis_max</a> query which allows you to combine queries. When the query uses the same field, it is <code>OR</code>&apos;d together, and you can even specify tie breaker values if you need to denote which values are more important. So we could combine the above like so</p><pre><code class="language-js:git">{
  &quot;dis_max&quot;: {
    &quot;queries&quot;: [{
      &quot;term&quot;: {
        &quot;fav_color&quot;: &quot;blue&quot;
      }
    }, {
      &quot;bool&quot;: {
        &quot;must_not&quot;: {
          &quot;exists&quot;: &quot;fav_color&quot;
        }
      }
    }]
  }
}
</code></pre><p>Unfortunately, there is not a <code>missing</code> query type, so we still have to use a <code>bool</code> with a <code>must_not</code> clause. But, now we can replace most of the nested <strong>bool</strong> queries with the <code>dis_max</code> query and elasticsearch will do the work of translating it into a bunch of <code>bool</code> queries for us.</p><pre><code class="language-js:git">{
  &quot;query&quot;: {
    &quot;constant_score&quot;: {
      &quot;filter&quot;: {
        &quot;bool&quot;: {
          &quot;must&quot;: [{
            &quot;range&quot;: {
              &quot;age&quot;: {
                &quot;gte&quot;: 25
              }
            }
          }, {
            &quot;dis_max&quot;: {
              &quot;queries&quot;: [{
                &quot;term&quot;:{
                  &quot;fav_color&quot;:&quot;blue&quot;
                }
              }, {
                &quot;bool&quot;: {
                  &quot;must_not&quot;: {
                    &quot;exists&quot;: &quot;fav_color&quot;
                  }
                }
              }]
            }
          }]
        }
      }
    }
  }
}
</code></pre>]]></content:encoded></item><item><title><![CDATA[SQL Like Search Queries With Elasticsearch]]></title><description><![CDATA[Learn how to combine elasticsearch queries to create search criteria of arbitrary complexity in a similar way to finding data with the SQL query language.]]></description><link>http://www.codedependant.net/2017/11/11/sql-like-search-queries-with-elasticsearch/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa64</guid><category><![CDATA[elasticsearch]]></category><category><![CDATA[sql]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Sat, 11 Nov 2017 20:51:22 GMT</pubDate><media:content url="http://www.codedependant.net/content/images/2017/11/elastic-1.png" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: markdown--><div class="first-letter"><span>E</span></div><img src="http://www.codedependant.net/content/images/2017/11/elastic-1.png" alt="SQL Like Search Queries With Elasticsearch"><p>lasticsearch is an amazing piece of technology. Built on top of luecine it offers all of he incredible search facilities that you&apos;d expect from a full featured search. What makes elasticsearch so powerful, however, is the fact that it stores the actual data that was originally index as JSON documents. Basically, it is a Full Text Search Database more so than a search engine. This allows elastic search to do things that other search engines can&apos;t do like aggregations, scripted queries, multi-query searches, etc; All in addition to the expected searching capabilities like suggestions, spelling corrections, faceting, and so on. For these reasons people are using elasticsearch as the primary data store for massive amounts of data.</p>
<p>One thing that is also uniquely different with elasticsearch is that it&apos;s query language is just JSON objects. You&apos;ll see examples like this:</p>
<pre><code class="language-js:git">{
  &quot;query&quot;: {
    &quot;match&quot; : {
      &quot;message&quot; : &quot;this is a test&quot;
    }
  }
}
</code></pre>
<p>This is a pretty simple search query. <code>match</code> is a full text search that support wildcard matching, partial word matching, fuzzy matching and the like. Many queries work this way and this is great when your search criteria are broad. Very much like how you would expect any search engine to work today. If you are using elasticsearch as a primary data store, you&apos;ll want to do more complicated things like you are probably used to doing with SQL. At first glance it is not apparently obvious that elasticsearch can do this. However, the query DSL has a fantastic gem of a query type - <a href="https://www.elastic.co/guide/en/elasticsearch/reference/5.6/query-dsl-bool-query.html">compound bool</a> that lets you combine multiple query fragments together. The meat of it looks like this:</p>
<pre><code class="language-js:git">{
  &quot;query&quot;: {
    &quot;constant_score&quot;: {
      &quot;filter&quot;: {
        &quot;bool&quot;: {
          &quot;must&quot;: []
          &quot;should&quot;: []
        }
      }
    }
  }
}
</code></pre>
<blockquote>
<h4 id="searchsrchvverbnnoun">Search (s&#x259;rCH) -v, verb., -n, noun;</h4>
<ol>
<li>try to find something by looking or otherwise seeking carefully and thoroughly</li>
<li>an act of searching for someone or something</li>
</ol>
</blockquote>
<p>The two important parts of the <em>bool</em> query, are the <code>should</code> and <code>must</code>. These keys have very SQL-like behaviors behind them.</p>
<ul>
<li><code>should</code> &lt;&gt; sql <code>OR</code></li>
<li><code>must</code> &lt;&gt; sql <code>AND</code></li>
</ul>
<p>Let&apos;s take a look at some more concrete examples.</p>
<h3 id="shirtdocument">Shirt Document</h3>
<p>Lets say we have an index of shirts for some made up e-commerce application. It&apos;s a basic data set, but we want to let our uses find <strong>exactly</strong> what they are looking for with laser like precision.</p>
<pre><code class="language-js:git">{
  &quot;brand&quot;: &quot;Gildan&quot;
, &quot;size&quot;: &quot;XL&quot;
, &quot;color&quot;: &quot;black&quot;
, &quot;type&quot;: &quot;long-sleeve&quot;
, &quot;price&quot;: 19.00
, &quot;gender&quot;: &quot;men&quot;
, &quot;age&quot;: &quot;adult&quot;
}, {
  &quot;brand&quot;: &quot;Hanes&quot;
, &quot;size&quot;: &quot;M&quot;
, &quot;color&quot;: &quot;purple&quot;
, &quot;type&quot;: &quot;hooded-sweatshrt&quot;
, &quot;price&quot;: 22.50
, &quot;gender&quot;: &quot;female&quot;
, &quot;age&quot;: &quot;toddler&quot;
}
</code></pre>
<p>If we want to find all of the <em>Large</em> <em>mens</em> apparel we could add 2 <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-term-query.html">term</a> queries to the must block.</p>
<pre><code class="language-js:git">{
  &quot;query&quot;: {
    &quot;constant_score&quot;: {
      &quot;filter&quot;: {
        &quot;bool&quot;: {
          &quot;must&quot;: [{
            &quot;term&quot;: {
              &quot;size&quot;: &quot;L&quot;
            }
          }, { 
            &quot;term&quot;: {
              &quot;gender&quot;: &quot;men&quot;
            }
          }]
        }
      }
    }
  }
}
</code></pre>
<p>This translates to SQL almost literally, and will return documents for Men&apos;s Large apparel.</p>
<pre><code class="language-sql:git">SELECT *
FROM shirt
WHERE (size = &apos;L&apos; AND  gender = &apos;men&apos;)
</code></pre>
<p>The queries are going to get a little complex, so we can leave off the query envelope bits and deal with the filter block itself. From here, if we just move the query fragments from the <code>must</code> block to the <code>should</code> block and we&apos;ll get a slightly a different query.</p>
<h2 id="elasticsearch">Elasticsearch</h2>
<pre><code class="language-js:git">{
  &quot;bool&quot;: {
    &quot;should&quot;: [{
      &quot;term&quot;: {
        &quot;size&quot;: &quot;L&quot;
      }
    }, {
      &quot;term&quot;: {
        &quot;gender&quot;: &quot;men&quot;
      }
    }]
  }
}
</code></pre>
<h2 id="sql">SQL</h2>
<p>This will return documents for apparel of size Large, OR apparel for men</p>
<pre><code class="language-sql:git">SELECT *
FROM shirt
WHERE (size = &apos;L&apos; OR  gender = &apos;men&apos;)
</code></pre>
<blockquote>
<h4 id="redpillrdpilnnoun">Red Pill (r&#x259;d&apos; pil) -n., --noun;</h4>
<p>Take the red pill - you stay in wonderland and we shall see how deep the rabbit hole goes</p>
<ol>
<li>A dose of the truth</li>
<li>An eye opening experience</li>
</ol>
</blockquote>
<p>Simple enough, and there is a lot that you can do with this. Now, there is one simple fact that isn&apos;t apparently obvious, and it is a bit of a <a href="https://en.wikipedia.org/wiki/Red_pill_and_blue_pill">red pill</a>. The <code>should</code> and <code>must</code> arrays take any number of query fragments. <strong>Bool</strong> is a query fragment. This means they are nest-able, and the rabbit hold runs deep.</p>
<h2 id="elasticsearch">Elasticsearch</h2>
<pre><code class="language-js:git">{
  &quot;bool&quot;: {
    &quot;should&quot;: [{
      &quot;bool&quot;: {
        &quot;must&quot;: [{
          &quot;term&quot;: {
            &quot;color&quot;: &quot;black&quot;
          }
        , {
            &quot;term&quot;: {
              &quot;gender&quot;: &quot;men&quot;
            }
          }
        }]
      }
    , {
        &quot;bool&quot;: {
          &quot;must&quot;: [{
            &quot;term&quot;: {
              &quot;size&quot;: &quot;XL&quot;
            }
          , {
              &quot;term&quot;: {
                &quot;type&quot;: &quot;hooded-sweatshirt&quot;
              }
            }
          }]
        }      
      }
    }]
  }
}
</code></pre>
<h2 id="sql">SQL</h2>
<pre><code class="language-sql:git">SELECT *
FROM shirt
WHERE  ( 
  ( color = &apos;black&apos; AND gender = &apos;male&apos;) 
  OR
  ( size = &apos;XL&apos; AND type = &apos;hooded-sweatshirt&apos;)
)
</code></pre>
<p>You aren&apos;t limited to single <code>term</code> queries either. You can use any legal query - <a href="https://www.elastic.co/guide/en/elasticsearch/reference/5.6/query-dsl-terms-query.html">terms</a>, <a href="https://www.elastic.co/guide/en/elasticsearch/reference/5.6/query-dsl-multi-match-query.html">multi_match</a>, <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-range-query.html">range</a>, etc., at any level.</p>
<h2 id="elasticsearch">Elasticsearch</h2>
<pre><code class="language-js:git">{
  &quot;bool&quot;: {
    &quot;should&quot; [{
      &quot;bool&quot;: {
        &quot;must&quot;: [{
          &quot;terms&quot;: {
            &quot;color&quot;: [ &quot;red&quot;, &quot;black&quot;, &quot;white&quot;]
          }
        }, {
          &quot;term&quot;: {
            &quot;gender&quot;: &quot;men&quot;
          }
        }, {
          &quot;terms&quot;: {
            &quot;type&quot;: [&quot;hooded-sweatshirts&quot;, &quot;long-sleeve&quot;]
          }
        }]
      }      
    }, {
      &quot;bool&quot;: {
        &quot;must&quot;: [{
          &quot;term&quot;: {
            &quot;color&quot;: &quot;blue&quot;
          }
        }]
      , &quot;should&quot;: [{
          &quot;bool&quot;: {
            &quot;must&quot;: [{
              &quot;gender&quot;: &quot;female&quot;
            }, {
              &quot;type&quot;: &quot;hooded-sweatshirt&quot;
            }, {
              &quot;range&quot;: {
                &quot;lte&quot;: 45.00
              }
            }]
          }
        }, {
          &quot;bool&quot;: {
            &quot;must&quot;: [{
              &quot;term&quot;: {
                &quot;age&quot;: &quot;toddler&quot;
              }
            }, {
              &quot;term&quot;: {
                &quot;brand&quot;: &quot;Hanes&quot;
              }
            }]
          }
        }]
      }
    }]
  }
}
</code></pre>
<p>Complex, but still legal. This translates rather easily and still does what you think it would.</p>
<h2 id="sql">SQL</h2>
<pre><code class="language-sql:git">SELECT * 
FROM shirt
WHERE (
    ( 
      color IN (&apos;red&apos;, &apos;black&apos;, &apos;white&apos;) 
      AND
        gender = &apos;men&apos;
      AND
        type IN (&apos;hooded-sweatshirt&apos;, &apos;long-sleeve&apos;)
    )
    OR 
    (
      (color = &apos;blue&apos;)
      AND (
        ( gender = &apos;female&apos; AND type = &apos;hooded-sweatshirt&apos; AND price &lt;= 45.00)
        OR 
        (age = &apos;todler&apos; AND brand = &apos;Hanes&apos;)
      )
    )
)
</code></pre>
<p>Crazy! Moreover, these are still search queries using your indexed content. Your queries will return in under ~10ms over millions of records. It&apos;s power and speed make <a href="https://www.elastic.co">elasticsearch</a> one of most popular the data stores of choice for real-time big data analytics, monitoring and sensor data aggregations and big commerce.</p>
<p>Elasticsearch - It&apos;s pretty good.</p>
<!--kg-card-end: markdown-->]]></content:encoded></item><item><title><![CDATA[Live Coding On Twitch]]></title><description><![CDATA[<!--kg-card-begin: markdown--><div class="first-letter"><span>G</span></div><p>oing to start doing some live coding on twitch! I&apos;ve been in the open source space for a long while and it seems fitting that this is the next step. I don&apos;t really have a schedule in mind or much else lined out. Other than</p>]]></description><link>http://www.codedependant.net/2017/09/14/live-coding-on-twitch/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa5f</guid><category><![CDATA[twitch]]></category><category><![CDATA[streaming]]></category><category><![CDATA[live coding]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Thu, 14 Sep 2017 12:07:58 GMT</pubDate><media:content url="http://www.codedependant.net/content/images/2017/09/dependant-by-design-splash-small.png" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: markdown--><div class="first-letter"><span>G</span></div><img src="http://www.codedependant.net/content/images/2017/09/dependant-by-design-splash-small.png" alt="Live Coding On Twitch"><p>oing to start doing some live coding on twitch! I&apos;ve been in the open source space for a long while and it seems fitting that this is the next step. I don&apos;t really have a schedule in mind or much else lined out. Other than -
 It&apos;s probably happening</p>
<p>Check out my <a href="https://www.twitch.tv/codedependant">channel</a> on twitch. I&apos;ll do my best to send out a heads up when I plan on doing something. I&apos;ll probably work on <a href="https://github.com/esatterwhite/skyring">Skyring</a>, <a href="https://github.com/node-tastypie">Tastypie</a> or maybe dust off some stuff from my <a href="https://bitbucket.org/megadoomer">megadoomer</a> collection of things.</p>
<p>I&apos;m <strong>Twitchin</strong>&apos;!</p>
<!--kg-card-end: markdown-->]]></content:encoded></item><item><title><![CDATA[Exactly Once Execution In A Distributed System]]></title><description><![CDATA[Skyring is a distributed system for managing timers, or delayed execution similar to `setTimeout` in javascript but delivers exactly once semantics]]></description><link>http://www.codedependant.net/2017/09/05/exactly-once-execution-in-distributed-system/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa62</guid><category><![CDATA[javascript]]></category><category><![CDATA[skyring]]></category><category><![CDATA[node.js]]></category><category><![CDATA[leveldb]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Tue, 05 Sep 2017 03:33:08 GMT</pubDate><media:content url="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/skyring-logo-med-1.png" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: html--><div class="first-letter"><span>S</span></div><img src="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/skyring-logo-med-1.png" alt="Exactly Once Execution In A Distributed System"><p>kyring is is a <a href="https://github.com/esatterwhite/skyring">distributed system for managing timers</a>, or delayed execution similar to `setTimeout` in javascript. The difference being that it is handled in a reliable and fault tolerant way. <code>setTimeout</code> in javascript is transient. If the running application is restarted or crashes, any pending timers are lost forever. The one guarantee that skyring provides is that a timer will execute after the specified delay, and that it only executes once. Exactly once is an interesting challenge in distributed systems, and Skyring makes use of a number of mechanisms at the node level to achieve this. From a high level, this is what the behavior on individual nodes looks like.</p><!--kg-card-end: html--><!--kg-card-begin: markdown--><p><img src="http://www.codedependant.net/content/images/2017/09/skyring-exactly-once.png" alt="Exactly Once Execution In A Distributed System" loading="lazy"></p>
<h4 id="sharednothing">Shared Nothing</h4>
<p>Skyring follows the shared nothing mantra similar to <a href="http://cassandra.apache.org">Cassandra</a> and other distributed software systems.  There is no <code>master</code> / <code>slave</code> setup. There is not a central controller or broker.<img src="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/consistent-hashing.png" class="fr" style="max-width:30%" alt="Exactly Once Execution In A Distributed System"> Every node in a cluster is the same as the next and effectively on an island all to itself and has little insight into what other nodes might be doing. It really just knows that there are other nodes in the cluster, the ip address of the node and if it is currently active.</p>
<h4 id="consistenthashring">Consistent Hashring</h4>
<p>Skyring ring uses a consistent hashing to assign a specific subset of keys to specific nodes in the cluster. Nodes connect to each other using an internal tcp mesh called <a href="https://github.com/uber/tchannel">tchannel</a>. When a node in the cluster handles an HTTP request for a timer, it hashes the ID and looks up the node responsible for that timer. If the node that handled the request is not the owner, it will immediately proxy the request to the node that is, and it is handled on the respective node just like every other HTTP request. The benefits gained from this are many. Most notably, is the ability horizontally scale by just starting more instances. More instances <code>=</code> better performance. There are no noisy neighbors in a skyring cluster. Additionally, as nodes are added, they evenly distribute the workload (memory, CPU, disk usage, etc).</p>
<h4>Write Ahead Persistence</h4>
<p>Internally, each skyring node uses a <a href="https://www.npmjs.com/package/levelup">levelup</a> database (w/ a <a href="https://www.npmjs.com/package/leveldown">leveldown</a> backend) where timer data is stored. <img src="http://www.codedependant.net/content/images/2017/09/leveldb.png" alt="Exactly Once Execution In A Distributed System" style="max-width:13%; padding: 10px" class="fl">Before the timer is actually started, it writes a record to leveldb so there is a disk backed record of the timer. Updates to existing timers follow the same pattern - Write to disk first. Aside from a simple persistence layer, levelup gives skyring some limited backend abstraction as levelup allows the persistence layer to be configured. Skyring itself, ships with <a href="https://github.com/Level/memdown">memdown</a>(the default) and <a href="https://www.npmjs.com/package/leveldown">leveldown</a> support. But you could configure it to use rocksdb, lmdb, or any of the <a href="https://github.com/level/levelup/wiki/Modules">levelup plugins</a>.</p>
<p>Using an embedded database is a critical component of the system. To really achieve a shared nothing model, each node must have it&apos;s own database. More traditional central database server solutions, by their very nature, are shared; in the way that all clients connect to the same server where the data lives. If the data is shared, there is a risk of timers being executed more than once, or ownership slipping across node boundaries. With a dedicated database for each node, this is an impossibility. It also greatly simplifies the application logic as all of the data can be trusted, and all operations are reduced to simple <code>key</code> operations.</p>
<h4 id="internalrebalancing">Internal Rebalancing</h4>
<p>Distributing the actual timers throughout the cluster is a little tricky as <a href="https://nodejs.org/dist/latest/docs/api/timers.html">timers</a> in javascript are maintained entirely in memory. There is no in-code representation that could be used to re-create a timer. And, timers cannot be serialized in a way that would make them transferable. This makes the act of moving them from <code>Node A</code> to <code>Node B</code> a challenge. Each node will handle this in two distinct ways. The first is at the edge and is handled by the HTTP api and the internal <code>tchannel</code>. When An HTTP request comes with the node checks for or assigns a unique ID for the prospective timer. It hashes the ID and determines if it is responsible for that ID. if it is not, it will proxy the request over the internal <code>tchannel</code>. The receiving node injects the request proxy and injects it into the HTTP server, and it is handled like every other HTTP request. In that respect, timers over the HTTP API are distributed as they are handled. <img src="http://www.codedependant.net/content/images/2017/09/s01.png" alt="Exactly Once Execution In A Distributed System" class="fr" style="max-width:35%; padding:4px"></p>
<p>This is very similar to how a rebalance of timers is handled when a new node is added to the cluster. Say for example, we have a two node cluster and they each have some pending timers on them.</p>
<p>When a new node is added, we need to re-evaluate the existing timer ids in the cluster to determine if the new node should take them on. Similar to as we talked about previously, because all nodes are currently active members of the <code>tchannel</code> mesh we can do the following:</p>
<ul>
<li>Hash the timer ID</li>
<li>Clear the timer from memory</li>
<li>Pull &amp; Delete the record from leveldb</li>
<li>Wrap it in a mock HTTP request</li>
<li>Proxy it to the right node via <code>tchannel</code></li>
</ul>
<img src="http://www.codedependant.net/content/images/2017/09/s02.png" alt="Exactly Once Execution In A Distributed System" class="fl" style="max-width:35%; padding:4px">
<p>As before, when the new node receives a message over the <code>tchannel</code>, it is injected into the HTTP router and the server handles it as normal. When a node is removed ( intentionally or not ) some interesting things have to happen. Most importantly, the node is no longer an active member of the hashring, cannot accept HTTP requests, and is no longer connected to the <code>tchannel</code> mesh. It does this so it will not be sent any more work during the shutdown process that would just be lost. It also means that it cannot send other nodes the pending timers it does have over the normal avenues.</p>
<p>To circumvent the problem, the each node is connected to a <a href="http://nats.io/">nats</a> cluster of one or more nats servers that it uses to publish reblanace events with timer data. Nats will round robin each event to a different node in the cluster which will perform the normal operations as laid out above. If it is responsible for the timer, it will just funnel in back through the <strong>create</strong> operations setting the timeout to the remaining time left, or executing immediately if that timer has lapsed. In the case the the node is not responsible for that timer, it will just wrap the timer data in a mock HTTP request of proxy it over the <code>tchannel</code> mesh. Business as usual.</p>
<img src="http://www.codedependant.net/content/images/2017/09/s03.png" alt="Exactly Once Execution In A Distributed System" class="fr" style="max-width:35%; padding:4px">
<p>As nodes leave the cluster, timers are redistributed between the active nodes in the cluster. When the last node in the cluster shuts down ( or crashes for that matter ) it will skip over the rebalance phase leaving the data in leveldb intact. When the node starts again, it will start the recovery phase.</p>
<h4 id="restartrecovery">Restart Recovery</h4>
<p>When a node starts ( or restarts ) for the first time, before anything else, it starts streaming any and all records out of the leveldb instance. If there are any records in the DB, it is safe to assume that this node was, at one time responsible for these timers and it will immediately start passing them through the normal <strong>create</strong> operations. If the initial timer has lapsed, it will just execute the timer immediately.</p>
<p>Without the balance and proxy logic, the nuts and bolts of what a skyring node does, is actually very simple. In fact it comes down to about 10 lines of code - literally! It is simple enough that a first year developer could understand.</p>
<pre><code class="language-js:git">const timers = new Map()
function set(id, delay, fn) {
  if (!timers.has(id)) {
    timers.set(id, setTimeout( fn, delay ).unref());
  }
}
function cancel(id) {
  const ref = timers.get(id);
  clearTimeout( ref );
  return timers.delete(id);
}
</code></pre>
<p>Simple. Fast. Scalable</p>
<!--kg-card-end: markdown-->]]></content:encoded></item><item><title><![CDATA[Custom Transports For Skyring]]></title><description><![CDATA[Skyring is a distributed system for managing timers. When a timer lapses, a message is delivered to defined destination via a configurable transport layer ]]></description><link>http://www.codedependant.net/2017/05/30/custom-skyring-transports/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa63</guid><category><![CDATA[zmq]]></category><category><![CDATA[skyring]]></category><category><![CDATA[timers]]></category><category><![CDATA[node.js]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Tue, 30 May 2017 02:27:50 GMT</pubDate><media:content url="https://raw.githubusercontent.com/esatterwhite/skyring/master/assets/skyring.png" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: html--><div class="first-letter"><span>S</span></div><img src="https://raw.githubusercontent.com/esatterwhite/skyring/master/assets/skyring.png" alt="Custom Transports For Skyring"><p>kyring is a distributed system for managing timers. When a timer lapses, a message is delivered to destination that you have defined. *How* that message is delivered is configurable. Out of the box, Skyring comes with an `HTTP` transport, and there is an official package enabling <a hrefp="(https://www.npmjs.com/package/@skyring/tcp-transport">tcp delivery</a> of messages with connection pooling. They are pretty easy to write, and you can use any of the tools you are currently used to using.</p><!--kg-card-end: html--><h4 id="stdout-transport">STDOUT Transport</h4><p>To illustrate the process, we&apos;re going to make a simple transport handler to write the data to <code>stdout</code>. Basically, speaking a transport is just a node.js module that exports a named <code>function</code></p><h4 id="module-m-jo-ol-n-noun">Module [&#x2C8;m&#xE4;jo&#x35E;ol] -n., --noun</h4><p>any of a number of distinct but interrelated units from which a program may be built up or into which a complex activity may be analyzed.</p><pre><code class="language-js:git">&apos;use strict&apos;

const os = require(&apos;os&apos;)

module.exports = function stdout(method, url, payload, id, storage) {
  // deliver the message
  process.stdout.write(payload);
  process.stdout(os.EOL);

  // clear the timer
  storage.remove(id);
};
</code></pre><p>Pretty simple. We just write the data to the stdout out stream attached to the process, of course, be sure to remove the timer reference from the <a href="https://github.com/esatterwhite/skyring">skyring</a> internal <code>storage</code>. To load your transport into a skyring server, you can pass an array of <code>transports</code> when instantiating as server instance. The array can contain references to the transport itself, or as string that can be passed to <a href="https://nodejs.org/api/globals.html#globals_require">require</a></p><pre><code class="language-js:git">&apos;use strict&apos;

const Skyring = require(&apos;skyring&apos;)
const stdout = require(&apos;./transports/stdout&apos;)
const server = new Skyring({
  transports: [require(&apos;./transports/stdout&apos;)]
, seeds: [&apos;localhost:3455&apos;]
})

server.load().listen(3000)
</code></pre><p>Done. Just be sure that <strong>every</strong> skyring node in the cluster has all of the same transports loaded so they have the capability to execute all of the timers. Other than that, we can start using our new transport by referencing it by name in the <code>transport</code> field of the request to create a new timer.</p><pre><code class="language-shell:git">curl -XPOST http://localhost:3000/timer -H &apos;Content-Type: application/json&apos; -d &apos;{
  &quot;timeout&quot;:3000
, &quot;data&quot;:&quot;hello world!&quot;
, &quot;callback&quot;: {
    &quot;transport&quot;: &quot;stdout&quot;
  , &quot;method&quot;:&quot;unused&quot;
  , &quot;uri&quot;: &quot;unused&quot;
  }
}&apos;
</code></pre><h4 id="zmq-transport">ZMQ Transport</h4><!--kg-card-begin: html--><img src="http://www.codedependant.net/content/images/2017/05/zeromq.png" class="fr m_all-4" alt="Custom Transports For Skyring"><!--kg-card-end: html--><p>We&apos;ve got our feet wet with this simple example. Let&apos;s expand a little bit. If you&apos;ve read through the <a href="http://www.codedependant.net/tag/summer-of-sockets">summer of sockets</a>, you may have noticed that I am fond of <a href="http://zeromq.org">ZeroMQ</a>. So instead of just writing to <code>stdout</code>, let&apos;s write a transport for <strong>zmq</strong>. I would want to be able to have it send timer notifications to <code>1</code> or <code>more</code> servers, and specify a message distribution patterns (<code>pub</code>, <code>sub</code>, <code>push</code>, <code>pull</code>, <code>etc</code>).</p><!--kg-card-begin: markdown--><blockquote>
<h4 id="zeromqziremkyoonnoun">ZeroMQ [&#x2C8;zir&#x14D; emkyo&#x35E;o] -n., --noun</h4>
<p>A messaging library, which allows you to design a complex communication system without much effort.</p>
</blockquote>
<!--kg-card-end: markdown--><p>For starters, lets start a new npm package.</p><pre><code class="language-shell:git">$ mkdir -p skyring-zmq-transport/lib
$ cd skyring-zmq-transport
$ touch index.js lib/zmq.js README.md
$ npm init
$ npm install zmq debug --save
$ npm install skyring --save-dev
</code></pre><p>You should have something this</p><pre><code class="language-shell:git">|-- lib
|   `-- zmq.js
|-- package.json
|-- README.md
`-- index.js
</code></pre><p>Lets start easy and fill in the primary transport handler</p><pre><code class="language-js:git">&apos;use strict&apos;

const ZMQ = require(&apos;zmq&apos;)
const debug = require(&apos;debug&apos;)(&apos;skyring:transports:zmq&apos;)

module.exports = function zmq(method, url, payload, id, storage) {
  debug(&apos;execute zmq timer&apos;, &apos;timeout&apos;, payload)

  const conn = ZMQ.socket(&apos;push&apos;)
  conn.connect(url)
  conn.send(&apos;timeout&apos;, ZMQ.ZMQ_SNDMORE)
  conn.send(payload)
  storage.remove(id);
}
</code></pre><p>A simplistic start, but it does complete the basic task. When a timer lapses and was configured to use the <code>zmq</code> transport ( <em>the name of our function</em> ), we&apos;ll make a new zmq socket, connect to the requested url, and send the <code>payload</code>. The first problem with this is it will make a new connection for every timer execution. We really need to keep track of previously created connections. We can do that pretty easily with a <a href="https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Map">Map</a>.</p><pre><code class="language-js:git">&apos;use strict&apos;

const ZMQ = require(&apos;zmq&apos;)
const debug = require(&apos;debug&apos;)(&apos;skyring:transports:zmq&apos;)
const connections = new Map()

module.exports = function zmq(method, url, payload, id, storage) {
  debug(&apos;execute zmq timer&apos;, &apos;timeout&apos;, payload);

  const conn = getConnection(url);
  conn.send(&apos;timeout&apos;, ZMQ.ZMQ_SNDMORE);
  conn.send(payload);
  storage.remove(id);
}

function getConnection(addr) {
  // if we&apos;ve have an existing connection, return it
  if (connections.has(addr)) return connections.get(addr);

  // otherwise make a new one
  debug(`creating socket to ${addr}`);
  const socket = ZMQ.socket(&apos;push&apos;);
  socket.on(&apos;error&apos;, (err) =&gt; {
    console.error(&apos;destroying socket %s&apos;, addr, err.message);
    socket.removeAllListeners();
    socket.disconnect();
    socket.close();
    connections.delete(addr);
  });
  connections.set(addr, socket);
  socket.connect(addr);
  return socket;
}
</code></pre><p>Now, we&apos;ve added a little helper function <code>getConnection</code>. It takes a connection url, makes a new <code>ZMQ</code> <strong>push</strong> socket connected to the url, and returns it. Now we can reuse our connections, and all timers configured with the same url will use the same connection. This does the job, but we could expand a bit to allow for different socket types. ZMQ has comes with <strong>PUB</strong> / <strong>SUB</strong>, <strong>REQ</strong> / <strong>REP</strong>, and a few others. But in the context of skyring, it doesn&apos;t make sense to use a <strong>SUB</strong> socket as it&apos;s read-only. And there is no real need to wait for a response, so both <code>req</code> and <code>rep</code> are out. We should restrict our types to just <code>push</code> and <code>pub</code>. Additionally, we could make it configurable if the socket should <code>connect</code> or <code>bind</code> to the address.</p><pre><code class="language-js:git">&apos;use strict&apos;

const ZMQ = require(&apos;zmq&apos;);
const debug = require(&apos;debug&apos;)(&apos;skyring:transports:zmq&apos;);
const METHODS = new Set([&apos;push&apos;, &apos;pub&apos;]);
const ZMQ_BIND = !!process.env.ZMQ_BIND;
const connections = new Map();

module.exports = function zmq(method, url, payload, id, storage) {
  debug(&apos;execute zmq timer&apos;, &apos;timeout&apos;, payload);

  const conn = getConnection(url, method);
  conn.send(&apos;timeout&apos;, ZMQ.ZMQ_SNDMORE);
  conn.send(payload);
  storage.remove(id);
}

function getConnection(addr, type) {
  const key = `${type}:${addr}`;
  if (connections.has(key)) return connections.get(key);

  if (!METHODS.has(type)) {
    const error = new Error(`unsupported transport method ${type}`);
    error.code = &apos;EZMQMETHOD&apos;;
    throw error;
  }

  debug(`creating ${type} socket to ${addr}`);
  const socket = ZMQ.socket(type);
  socket.on(&apos;error&apos;, (err) =&gt; {
    console.error(&apos;destroying socket %s&apos;, key, err.message);
    socket.removeAllListeners();
    socket.disconnect();
    socket.close();
    connections.delete(key);
  });

  connections.set(key, socket);

  if (ZMQ_BIND) {
    socket.bindSync(addr);
  } else {
    socket.connect(addr);
  }
  return socket;
}
</code></pre><p>Just a couple of small adjustments. Before creating the socket we check to see of the the socket type is allowed and <code>throw</code> if it is not. On line <code>18</code>, we use a composite key for storing connections, so it is possible to have multiple socket types for the same url.</p><p>We can plug this into a single node skyring cluster to see it in action.</p><pre><code class="language-js:git">&apos;use strict&apos;

const Skyring = require(&apos;skyring&apos;);
const server = new Skyring({
  seeds: [&apos;localhost:3455&apos;]
, transports: [require(&apos;./lib/zmq&apos;)]
});

server.load().listen(3000);

process.once(&apos;SIGINT&apos;, onSignal);
process.once(&apos;SIGTERM&apos;, onSignal);

function onSignal() {
  server.close(() =&gt; {
    console.log(&apos;skyring closed&apos;);
  });
}
</code></pre><p>That&apos;s it! We can start setting timers with using our <code>zmq</code> transport</p><pre><code class="language-shell:git">curl -XPOST http://localhost:3000/timer -H &apos;Content-Type: application/json&apos; -d &apos;{
  &quot;timeout&quot;:3000
, &quot;data&quot;:&quot;hello world!&quot;
, &quot;callback&quot;: {
    &quot;transport&quot;: &quot;zmq&quot;
  , &quot;method&quot;:&quot;push&quot;
  , &quot;uri&quot;: &quot;tcp://0.0.0.0:5555&quot;
  }
}&apos;
</code></pre><p>It is pretty simple and straight forward to build your own transport layer for <a href="https://github.com/esatterwhite/skyring">Skyring</a>. It can be a simple function, or for more complex use cases, you can build out entire packages using whatever tools you want. This makes the <code>transport</code> system in skyring very flexible, powerful, and easy to use.</p>]]></content:encoded></item><item><title><![CDATA[Getting Started With Skyring Distributed Timers]]></title><description><![CDATA[Set up a reliable asynchronous timers service with Skyring for node in less than 20 lines of code]]></description><link>http://www.codedependant.net/2017/05/02/getting-started-with-node-skyring-distributed-timers/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa61</guid><category><![CDATA[skyring]]></category><category><![CDATA[timers]]></category><category><![CDATA[node]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Tue, 02 May 2017 01:46:00 GMT</pubDate><media:content url="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/skyring-logo-med.png" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: html--><div class="first-letter"><span>T</span></div><img src="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/skyring-logo-med.png" alt="Getting Started With Skyring Distributed Timers"><p>he very idea of distributed timers is complex. Conceptually is full of race conditions and edge cases. <a href="https://github.com/esatterwhite/skyring" alt="Skyring" target="_blank">Skyring</a> for Node.js boils the problems space down to a simple to use library and API for building scalable service that need to perform time sensitive, actions. That is a mouthful - Think An email gateway, a web-hook service, auto-dialers for telephony systems. Or in the most practical sense, anytime you might need functionality like <a href="https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/setTimeout" target="_blank" alt="setTimeout">setTimeout</a> but needs to survive restarts / crashes; Or are using a language that doesn&apos;t support non-blocking timers. Skyring fills that gap, and it is easy to use. We can get something going in less that <strong>20</strong> lines of code.</p><!--kg-card-end: html--><p>To start, we just install the <a href="https://www.npmjs.com/package/skyring">skyring</a> package from npm</p><pre><code class="language-shell:git">$ npm install skyring --save
</code></pre><p>The only external dependency is a running <a href="https://nats.io/">nats</a> instance. The defaults will do. You can certainly run nats as a cluster if you need high availability, but it isn&apos;t necessary for this</p><pre><code class="language-shell:git">docker run -d --name=nats -p 4222:4222 nats:latest
</code></pre><p>Now we can just require <code>skyring</code> and start it.</p><pre><code class="language-js:git">// index.js
&apos;use strict&apos;

const Skyring = require(&apos;skyring&apos;)
const HTTP_PORT = +(process.env.PORT || 3000)
const node = new Skyring({
  node: {
    app: &apos;demo&apos;
  }
}).load().listen( HTTP_PORT )

function onSignal () {
  // trigger triggers a rebalance and shuts down
  node.close((err) =&gt; {
    if (err) process.exitCode = 1
  })
}

process.on(&apos;SIGINT&apos;, onSignal)
process.on(&apos;SIGTERM&apos;, onSignal)
</code></pre><p>Pretty simple! We care about two ports here - the port for the <code>http</code> server which we have defaulted to <code>3000</code>, and the port for the skyring cluster, which defaults to <code>3456</code>. The default configuration is a <strong>2 node</strong> cluster on ports<code>3456</code> and <code>3455</code>.</p><pre><code class="language-shell:git"># Start Node 1
$ DEBUG=skyring:* PORT=3001 node index.js
</code></pre><pre><code class="language-shell:git"># Start Node 2
$ DEBUG=skyring:* PORT=3002 node index.js --channel:port=3455
</code></pre><figure class="kg-card kg-image-card"><img src="https://raw.githubusercontent.com/esatterwhite/skyring/master/assets/skyring-arch.png" class="kg-image" alt="Getting Started With Skyring Distributed Timers" loading="lazy"></figure><p><br>That is it! You just made one of those! You have a running skyring cluster, and we can start making timers.</p><pre><code class="language-shell:git">$ curl -i -XPOST http://localhost:3000/timer -d &apos;{
  &quot;timeout&quot;: 5000,
  &quot;data&quot;: &quot;hello world!&quot;,
  &quot;callback&quot;: {
    &quot;transport&quot;: &quot;http&quot;,
    &quot;method&quot;: &quot;post&quot;,
    &quot;uri&quot;: &quot;http://0.0.0.0:5555/&quot;
  }
}&apos;
</code></pre><p>If everything goes as planned, you should get back a <code>201</code> and a location header to the timer instance if you need to cancel it, or update it.</p><pre><code class="language-shell:git">HTTP/1.1 201 Created
location: /timer/040a8101-9543-40a8-9634-e66de573abbc
Date: Tue, 02 May 2017 03:04:21 GMT
Connection: keep-alive
Content-Length: 0
</code></pre><p>For this to all come together, you&apos;ll want something to handle the timer when it is triggers. We can do that, again, with a simple node echo server that listens on port <code>5555</code></p><pre><code class="language-js:git">// echo.js
&apos;use strict&apos;;

const http = require(&apos;http&apos;)

http.createServer( (req, res) =&gt; {
  res.writeHead(200)
  res.end();
  req.pipe(process.stdout)
}).listen(5555);

</code></pre><p>You can play around with hammering the skyring cluster with requests, stopping/starting one of the nodes, and any pending timers will find their way to a running node. The <a href="https://github.com/esatterwhite/skyring/blob/af65a04c5577d5bcd9d738acb0a7ee213fdc973d/test/server.spec.js">unit tests</a> do just that! The skyring api is very fast, which means it can be used efficiently from synchronous languages like python or ruby with out the complexity of threading, or introducing bloated async frameworks. Skyring is <code>http &lt;&gt; http</code> which most all languages support out of the box. And with out too much additional work, the default skyring server could be a pretty good web hooks microservice</p><p>Skyring: Distributed, reliable timers made easy.<br>The code for this example can be <a href="https://bitbucket.org/esatterwhite/skyring-demo">found here</a></p>]]></content:encoded></item><item><title><![CDATA[Build JSON API Responses With Postgres CTEs]]></title><description><![CDATA[Paginating data sets from a RDMS is a recurring and tricky problem that developers have to deal with. Postgres provides a some clever ways to solve this.]]></description><link>http://www.codedependant.net/2017/04/30/build-json-api-responses-postregres-with-cte/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa60</guid><category><![CDATA[sql]]></category><category><![CDATA[postgres]]></category><category><![CDATA[node.js]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Sun, 30 Apr 2017 15:49:56 GMT</pubDate><media:content url="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/postgres-logo-1.png" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: html--><div class="first-letter"><span>P</span></div><img src="https://s3.amazonaws.com/codedependant-blog/content/images/2020/03/postgres-logo-1.png" alt="Build JSON API Responses With Postgres CTEs"><p>agination is a recurring problem that developers have to deal with when implementing data access layers for APIs. It can be particularly tricky with the more traditional RDMS like <a href="https://www.mysql.com/" title="mysql">MySQL</a> or <a href="https://www.postgresql.org/" title="postgresql">Postgresql</a>. For example, let&apos;s say we had an API endpoint that allowed consumers to search a data base of moves. We could search by title, director, starring actors, etc. Our data base has millions of movies, and we know we don&apos;t want to return all all the potential matches for every search request. 
</p><!--kg-card-end: html--><p>We only want to return the top 25 or so records and indicate in the response that there are more results to query for:</p><pre><code class="language-js:git">{
  meta: {
    total: 12000
  , limit: 25
  , next: &lt;URL TO NEXT PAGE&gt;
  , previous: &lt;URL TO PREVIOUS PAGE&gt;
  }
, data: [ ... ]
}
</code></pre><p>This sounds fairly simple and straight forward at first glance, however, in order to make pagination possible through the API, we need to indicate how many records would have been returned by the original query prior to any paging. Which effectively means we need to run the query twice as efficiently as possible. We don&apos;t want do this at the application level - pulling potentially hundreds of thousands of records into memory only to discard most of them and risk crashing the application. There are a couple of approaches.</p><p>We could just run two queries in parallel.</p><pre><code class="language-sql:git">SELECT
  count(*)
FROM
  movies
WHERE
  director = &quot;Michael Bay&quot;;
</code></pre><pre><code class="language-sql:git">SELECT
  title
, release_date
, director
FROM
  movies
WHERE
  director = &quot;Michael Bay&quot;;
ORDER BY title
LIMIT 25
OFFSET 50
</code></pre><p>Simple, easy to reason about. But it is two separate queries and round trips to the DB creating double the load on the DB. We will also still have to construct the response manually combining the result from the count query and aggregate a <code>data</code> array.</p><p>Alternatively, we push the count query down into a sub query.</p><pre><code class="language-sql:git">SELECT
  title
, release_date
, director
, (
    SELECT
      count(*)
    FROM
      movies
    WHERE
      director = &quot;Michael Bay&quot;
  ) as total
FROM
  movies
WHERE
  director = &quot;Michael Bay&quot;;
ORDER BY title
LIMIT 25
OFFSET 50
</code></pre><p>This is a bit better, where we can send a single query, and every row will have a column called <code>total</code> indicating the total number of records in the query prior to pagination. But we will still have to check one of the records to grab the count and construct the <code>data</code> array. We can do better though.</p><h4 id="common-table-expressions">Common Table Expressions</h4><p>Provide a way to write auxiliary statements for use in a larger query. These statements, which are often referred to as Common Table Expressions or CTEs, can be thought of as defining temporary tables that exist just for one query.</p><p>Postgres gives us a couple of interesting features that Make this easy. We can use an inline <a href="https://www.postgresql.org/docs/9.1/static/queries-with.html">Common Table Expression</a> to get our count information, and we can use the <a href="https://www.postgresql.org/docs/9.5/static/functions-aggregate.html"><code>JSON_AGG</code></a> and <a href="https://www.postgresql.org/docs/9.5/static/functions-json.html#FUNCTIONS-JSON-CREATION-TABLE"><code>ROW_TO_JSON</code></a> function to compile the <code>data</code> array of json serialized results.</p><pre><code class="language-sql:git">WITH 
  movie_count AS (
    SELECT COUNT(*), movie_id
    FROM movies
    GROUP BY movie_id
  )
, movies AS (
    SELECT
      title
    , release_date
    , director
    FROM
      movies
    WHERE
      director = &quot;Michael Bay&quot;;
    ORDER BY title
    LIMIT 25
    OFFSET 50
  )

SELECT
  movie_count.count,
  COALESCE(
    (
      JSON_AGG(
        ROW_TO_JSON(movies.*)
      ) FILTER ( where movies.movie_id is not null )
    )
    , &apos;[]&apos;::json
  ) as data
  FROM movie_count
LEFT JOIN movies ON movies.movie_id = movies.movie_id
GROUP BY movie_count.count
</code></pre><p>This will give us a single result very close to the way we want it, and make sure that we still get back an empty array if there are no rows that match the query. This will keep our in app processing down to a minimum.</p><pre><code class="language-js:git">{
  count: 12000
  data: [{
    &quot;title&quot;: &quot;Transformaers&quot;
  , &quot;director&quot;: &quot;Michael Bay&quot;
  , &quot;release_date&quot;: 2007
  }
  , ... ]
}
</code></pre><p>The SQL isn&apos;t terribly difficult to read or understand, and we can use the original parameters to determine if there are any additionally pages based on the count which keeps the vast majority of the work at the database layer, and in a single query</p><pre><code class="language-js:git">const prefix = &apos;/api/v1/movies&apos;
const next_val = offset + limit
const prev_val = offset - limit

const next = next_val &lt; total
  ? `${prefix}?limit=${limit}&amp;offset=${next_val}`
  : null

const prev = prev_val &lt; 0
  ? null
  : `${prefix}?limit=${limit}&amp;offset=${prev_val}`
</code></pre><p>Not bad! It isn&apos;t always apparently obvious, but if you find yourself trying to manipulate data structures in your application, it is probably a job for your database. In this case we were able to condense a tabular data structure spread across 2-3 queries in to a single query that returned a nested <code>JSON</code> object with and array of object. That is pretty cool</p><p><strong>postgres++</strong></p>]]></content:encoded></item><item><title><![CDATA[Distributed Timers With Node.js and Skyring]]></title><description><![CDATA[Introducing Skyring for Node.js - A Scaleable, distrusted service for managing timers]]></description><link>http://www.codedependant.net/2016/12/28/distributed-timers-with-node-js-and-skyring/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa36</guid><category><![CDATA[javascript]]></category><category><![CDATA[skyring]]></category><category><![CDATA[timers]]></category><category><![CDATA[architecture]]></category><category><![CDATA[node.js]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Wed, 28 Dec 2016 19:11:22 GMT</pubDate><media:content url="http://www.codedependant.net/content/images/2016/12/skyring.png" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: html--><div class="first-letter"><span>W</span></div><img src="http://www.codedependant.net/content/images/2016/12/skyring.png" alt="Distributed Timers With Node.js and Skyring"><p>
orking with timers a distributed system is a really nasty problem that pops up more often than most people would like. Something as simple an useful as <code>setTimeout</code> / <code>clearTimeout</code> becomes brittle, unreliable and a bottle neck in today&apos;s stateless, scalable server mindset. Basically, I need to be able to set a timer somewhere in a cluster with out knowing or caring about what server. And reversely, I need to be able to cancel that timer **without** having to know where in the cluster that timer lives. But before we can start to understand possible solutions, let&apos;s dive into a use case to understand the problem and why existing solutions aren&apos;t suitable replacements.
</p><!--kg-card-end: html--><h4 id="scenarios">Scenarios</h4><p><strong>Un-send an email</strong> &#xA0;- A simple example might be providing an email service that allows people to cancel an email they have sent if it is within a 10 second window. We can&apos;t really un-send an email, but we can just not send it until our window has lapsed. Or more accurately, send it after 10 seconds giving the end user a way to cancel the act of sending. There are many ways we could do this, but for the purpose of this post - timers.</p><p><strong>Telemarketer auto dial</strong> - A more complex example, we could consider a telemarketer using an automated dialer. Once the telemarketer has finished talking with <strong>Person A</strong>, They are given a small window of time to take some notes and dial <strong>Person B</strong>. If they don&apos;t start dialing with in the given window, the system dials the next person on the list of numbers. The act of manual dialing stops the auto-dial timer so <strong>Person C</strong> isn&apos;t dialed the same time <strong>Person B</strong> is dialed. Timers make a lot of sense here.</p><p>For these short to medium delays <code>setTimeout</code> is a really good fit. They are kept in memory, asynchronous, give us a rather high level of precision, deal with an arbitrary delay on an arbitrary action. More importantly we can cancel it if we want to. Simple, to the point and very easy to reason about. However, in distributed systems, we loose track of which servers are managing which timers in the event we want to cancel it. Which is the critical criteria of our solution. Let&apos;s take a look at a couple of existing solutions out there that we could utilize to work around this.</p><h4 id="existing-solutions">Existing solutions</h4><h2 id="message-brokers">Message brokers</h2><p>RabbitMQ is one of the go to solutions for effectively queuing up work. But for situations like above, there are a couple of inherent problems. For one, messages are virtually impossible to delete. It just isn&apos;t apart of the AMQP protocol. Once a message goes into a queue, the only way to get it out is consume the message, or delete / purge to queue. Not really what we are after.</p><!--kg-card-begin: markdown--><blockquote>
<h2 id="messagebroker">Message Broker</h2>
<h6 id="mesijbrkrnnoun">&#x2C8;mesij &#x2C8;br&#x14D;k&#x259;r -n, --noun</h6>
</blockquote>
<ol>
<li>arrange or negotiate (a settlement, deal, or plan).</li>
<li>an intermediary program module that translates a message from the formal messaging protocol of the sender to the formal messaging protocol of the receiver</li>
</ol>
<!--kg-card-end: markdown--><p>Secondly, there isn&apos;t a way to delay the delivery of a message - they are consumed as fast as a consumer can pull them off the queue. This is also not really great for our use cases. Another pain point is the scalability of message brokers. While it is possible, it tends to be a <a href="https://www.rabbitmq.com/clustering.html">bit complex</a>.</p><h2 id="pub-sub-message-bus">PUB-SUB / Message Bus</h2><p>This can come in various forms. I&apos;ve talked about doing this with <a href="http://www.codedependant.net/2016/09/30/distributed-timers-with-node-js-dgram-and-multicast">multicast</a>. Projects I have worked on and <a href="https://github.com/Metaswitch/chronos">similar projects out there</a> take an approach with some inherent flaws.</p><p>If you are thinking that this sounds like an intentional race condition - it is</p><p>When a timer is set, the same message is broadcast to all or a limited number of nodes in the cluster, and a duplicate timer is created with a random amount of skew on the timeout. The hope here is when one of them triggers, we have enough time to broadcast another message to cancel the timer which have not triggered yet and only one of them executes. If you are thinking that this sounds like an intentional race condition - It is. So we have the ability to delay execution, but now we run the risk of executing the same timer more than once.</p><p>Another problem with using pub sub or the message bus pattern here is that as we add more nodes, the amount of traffic and work that needs to be done also goes up. If every node has to do the work of setting the timer and also look it up to cancel a timer if it is a duplicate - we don&apos;t get any performance gains by adding more nodes. It actually makes it <em>worse</em>.</p><h2 id="redis-queue">Redis Queue</h2><p><a href="https://redis.io">Redis</a> has proven itself to be both very reliable and very versatile. I can be used for a wide range of technological problems. In our use case, we could use redis as a queue for timers to be set, canceled or moved in a cluster. We could have all the nodes in the cluster call <a href="https://redis.io/commands/blpop">BLPOP</a> on a list key waiting for work. That would start a timer, cancel a timer or whatever it might be. <code>BLPOP</code> picks the least idle connection and sends it the next key in the list. This gives us <strong>exactly once</strong> delivery of messages, which is what we are after.</p><p>However, canceling becomes a problem again. We have no way of ensuring that the node that gets the message is the node that has the pending timer to be canceled, or moved to a different server, etc. You could use this in combination with <code>pubsub</code> or multiple list keys so certain servers are responsible for a sub set of actions. The complexity of these solutions greatly out weighs any benefit and still doesn&apos;t solve our problem. Additionally, <code>BLPOP</code> doesn&apos;t work like you would expect it to when you need to run redis as a cluster or behind sentinel. In these situations you run the risk of duplicated messages or missing messages all together, which puts us right back where we started. And, personally speaking, I find using your data store as a messaging layer a bit misplaced.</p><h4 id="skyring">Skyring</h4><p>These sorts of problems and frustrations are what have lead me to build <a href="https://github.com/esatterwhite/skyring">Skyring</a> - Distributed Timers as a service. Simply put - <em>Skyring exposes a simple API into a hashring of servers to manage timers</em></p><figure class="kg-card kg-image-card"><img src="http://www.codedependant.net/content/images/2016/12/skyring-arch.png" class="kg-image" alt="Distributed Timers With Node.js and Skyring" loading="lazy"></figure><p>When I started building skyring, I had only a couple simple requirements.</p><ol><li>Simple and easy to reason about</li><li>Ability to cancel timers</li><li>Horizontally scalable</li><li>Plain old setTimeout where possible</li><li>Reliable - removing a node should not remove timers</li></ol><h4 id="simplicity">Simplicity</h4><p>Simplicity was a major driving factor. Complexity sucks. Complex systems ultimately buckle under there own weight. A dead simple API is all I want.</p><h5 id="set-a-timer">Set a timer</h5><p>Setting a timer is as simple is making an HTTP request to the <code>/timer</code> endpoint to any of the nodes in a <strong>Skyring</strong> cluster.</p><pre><code class="language-shell:git">curl -XPOST http://localhost:8080/timer -d &apos;{
  &quot;timeout&quot;:2000,
  &quot;data&quot;: &quot;hello world&quot;
  &quot;callback&quot;: {
    &quot;transport&quot;: &quot;http&quot;,
    &quot;uri&quot;: &quot;http//api.someservice.com&quot;,
    &quot;method&quot;: &quot;POST&quot;
  }
}&apos;
</code></pre><h2 id="complexity-sucks">Complexity Sucks</h2><p>For performance considerations and simplicity sake, skyring will not return any data payloads. Important information is always returned in the HTTP headers. In this situation, the URI for the timer is returned in the <code>location</code> header.</p><pre><code class="language-shell:git">HTTP/1.1 201 CREATED
location: /timer/4adb026b-6ef3-44a8-af16-4d6be0343ecf
Date: Fri, 23 Dec 2016 00:19:13 GMT
Connection: keep-alive
Content-Length: 0
</code></pre><h5 id="cancel-a-timer">Cancel a timer</h5><p>The other critical piece to the equation is being able to cancel a timer. And it as just as simple as sending a <code>DELETE</code> request to the URI return in the <code>location</code> header</p><pre><code class="language-shell:git">curl -XDELETE http://localhost:8080/timer/4adb026b-6ef3-44a8-af16-4d6be0343ecf
</code></pre><h4 id="horizontal-scale">Horizontal scale</h4><p>Another problem that I wanted to resolve was being able to scale the service out horizontally. Adding servers shouldn&apos;t make things slower as it tends to do with pub sub and message bus. Skyring, is a consistent hashring connected by a persistent <code>tcp</code> channel. As new nodes join the ring, they are responsible for only a specific portion of the timers.</p><figure class="kg-card kg-image-card"><img src="http://www.codedependant.net/content/images/2016/12/cassandra.png" class="kg-image" alt="Distributed Timers With Node.js and Skyring" loading="lazy"></figure><p>Requests can be issued to any node in the cluster. If the node the request lands on is not responsible for the key in question, it is immediately proxied to the node that is responsible over the persistent connection. That means that nothing is ever more than a single network hop away. One node or one thousand nodes - still one hop away. This is very similar to the way <a href="http://cassandra.apache.org">cassandra</a> and other distributed data stores work. This allows skyring to keep the work load evenly spread across the cluster - CPU load, network load, event loop load, etc.</p><h4 id="reliable">Reliable</h4><p>Another problem that needs to be solved is how to deal with a node that leaves the cluster - intentionally or not. The real fly in the gravy here is we have to, both, stop accepting http requests and remove it from the internal hasring so it can stop accepting work. And we still need a way to redistribute any pending timers to the still active nodes. Under the hood, this is achieved over a <a href="http://nats.io">nats</a> queue group so things stay fast, but ensures exactly once delivery of messages, even in clustered setups. When a timer comes in over the nats channel it is funneled back through the normal internal machinery so the node that is now responsible for that timer gets it in, at most, 1 network hop.</p><p>Skyring makes it easy to use timers with confidence in a distributed system with a simple API providing functionality in line with <code>setTimeout</code> and <code>clearTimout</code></p>]]></content:encoded></item><item><title><![CDATA[Distributed Timers with Node.js, Dgram and multicast]]></title><description><![CDATA[Node.js has had support for multicast baked in since the beginning and is exceptionally easy to do. Learn how to communicate to an entire cluster in 25 LOC]]></description><link>http://www.codedependant.net/2016/09/30/distributed-timers-with-node-js-dgram-and-multicast/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa56</guid><category><![CDATA[udp]]></category><category><![CDATA[multicast]]></category><category><![CDATA[dgram]]></category><category><![CDATA[networking]]></category><category><![CDATA[node]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Fri, 30 Sep 2016 23:45:00 GMT</pubDate><content:encoded><![CDATA[<!--kg-card-begin: html--><div class="first-letter"><span>O</span></div><p>ver the years I have had a handful of times where I have had the need for Multicasting. It usually comes about in service or node ( application ) discovery. Or if you are lucky enough, the dreaded <strong>distributed</strong> <code>setTimeout</code> / <code>clearTimeout</code>. Every time find a need for it, I spend hours in the documentation trying to remember how to use it, remembering the Node.js Docs for <a href="https://nodejs.org/api/dgram.html" title="dgram" target="_blank">dgram</a> - which are virtually void of any useful explanation or examples. Finally resorting to finding googling around for the 1 or two examples of multi casting out there and hack something together. It shouldn&apos;t like that. It is actually really easy to do with Node.js. Let&apos;s give it a shot.</p><!--kg-card-end: html--><p>Let&apos;s say for our example you have 6 instances of an application running. An end user makes an HTTP request to your application that starts some long running process. We set a timeout that will kill the operation, and email the original user that the operation has failed to complete - <strong>unless</strong> the user manually kills it with another HTTP request, in which case we would need to call <a href="https://nodejs.org/api/timers.html#timers_cleartimeout_timeout">clearTimeout </a> on the timer reference.</p><p>The door to the rabbit hole here is that if the user does issue a request to cancel the job, we have no control over which server the request goes to, and as a result, we can&apos;t reliably clear the timer. Even if we were able to stop the underlying job, we&apos;ll never be able to prevent the email from going out, notifying the user that the job took too long and failed to complete.</p><!--kg-card-begin: markdown--><blockquote>
<h4 id="mltikstnnoun">&#x2C8;m&#x28C;lti&#x2CC;k&#x251;st -n., noun</h4>
</blockquote>
<ol>
<li>A broadcast from one source simultaneously to several receivers on a network</li>
</ol>
<!--kg-card-end: markdown--><p>Node has had multicast support baked in from the beginning. While the documentation may not indicate so, it is rather simple to do. And in this situation of needing to issue a <code>clearTimeout</code> command to every node in a cluster - it&apos;s perfect.</p><pre><code class="language-js:git">// mcast.js
const dgram = require(&apos;dgram&apos;)
const socket = dgram.createSocket({reuseAddr:true,type:&apos;udp4&apos;});

socket.bind(45555);
socket.on(&apos;listening&apos;, function(){
    socket.setBroadcast( true );
    socket.addMembership(&apos;239.1.2.3&apos;);
});

socket.on(&apos;message&apos;, function( msg ){
    // to something here
    console.log( msg );
})
</code></pre><h4 id="user-datagram-protocol-udp-">User Datagram Protocol ( UDP )</h4><p>Alternative communications protocol to Transmission Control Protocol (<strong>TCP</strong>) used primarily for establishing low-latency and loss tolerating connections between applications on the Internet.</p><p><strong>That&apos;s it</strong>! Done. You can start up as many of these as you want on different machines ( on a wired network - not WiFi ) and everyone will get every message sent be any machine. Make sure that broadcast and multicast is enabled on the network interface you are using. On Linux, type <code>ifconfig</code> in the terminal - You should see something like this:</p><pre><code class="language-shell:git">enp0s3    Link encap:Ethernet  HWaddr 08:00:27:c9:c1:14  
          inet addr:10.0.2.15  Bcast:10.0.2.255  Mask:255.255.255.0
          inet6 addr: fe80::55b1:c2be:3d23:3d4b/64 Scope:Link
    ---&gt;  UP BROADCAST RUNNING MULTICAST  MTU:1500  Metric:1
          RX packets:265510 errors:0 dropped:0 overruns:0 frame:0
          TX packets:126780 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:1000 
          RX bytes:238144376 (238.1 MB)  TX bytes:15633345 (15.6 MB)

</code></pre><p>Try it out, start a bunch of them up and then drop yourself into a Node REPL</p><pre><code class="language-shell:git">$ node
&gt; .load mcast.js
&gt; socket.send(&quot;hello world&quot;, 45555, &apos;239.1.2.3&apos;);
</code></pre><p>From here you could see how easy it would be to structure a message to look up a timer reference and cancel it if it exists. Our message could be something simple like <code>kill:&lt;id&gt;</code></p><pre><code class="language-js:git">var jobs = {};

jobs[ job_id ] = setTimeout(function(){}, 1000 * 60 * 10 ).unref();

socket.on(&apos;message&apos;, function( msg ){
   var bits = msg.toString().split(&apos;:&apos;);
   var kill = bits[0] === &apos;kill&apos;;
   var job = jobs[ bits[1] ];
   kill &amp;&amp; job &amp;&amp; clearTimeout( job );
});

</code></pre><p>And you now you can kill a timer from anywhere in your cluster</p><pre><code class="language-shell:git">&gt; socket.send(&quot;kill:acfe001&quot;, 45555, &apos;239.1.2.3&apos;);
</code></pre><p>Distributed <code>clearTimeout</code> in under 25 lines of code. Yes, it is a very simplistic example, but serves as the basis of virtually every package out on npm that does messaging over udp / multicast - service discovery / registration, Leader elections, message bus implementations, etc. This little snippet is basically what they all boil down to.</p><p>Simple. Powerful. Efficient.</p>]]></content:encoded></item><item><title><![CDATA[Summer of Sockets Part 5: Node, Nanomsg and Websockets]]></title><description><![CDATA[<!--kg-card-begin: html--><div class="first-letter"><span>Z</span></div><p>eroMQ has its fair share of quirks and oddities. It manages everything in a global state, requires things be manually grouped into `Contexts`, allocates a thread per context (making it not thread-save) , transports are baked into the library, and so on. It can be a bit clunky to work</p>]]></description><link>http://www.codedependant.net/2016/09/22/summer-of-sockets-part-5-node-nanomsg-and-websockets/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa5e</guid><category><![CDATA[summer of sockets]]></category><category><![CDATA[nanomsg]]></category><category><![CDATA[websocket]]></category><category><![CDATA[node]]></category><category><![CDATA[zeromq]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Thu, 22 Sep 2016 16:56:07 GMT</pubDate><media:content url="http://www.codedependant.net/content/images/2016/09/nodejs-logo.png" medium="image"/><content:encoded><![CDATA[<!--kg-card-begin: html--><div class="first-letter"><span>Z</span></div><img src="http://www.codedependant.net/content/images/2016/09/nodejs-logo.png" alt="Summer of Sockets Part 5: Node, Nanomsg and Websockets"><p>eroMQ has its fair share of quirks and oddities. It manages everything in a global state, requires things be manually grouped into `Contexts`, allocates a thread per context (making it not thread-save) , transports are baked into the library, and so on. It can be a bit clunky to work with at times. As a result, one of the original developers on the ZMQ project, <a href="http://250bpm.com/" title="Martin Sustrik" target="_blank">Martin Sustrik</a>, started a new project that evolved into a complete re-write / re-realization of the ZMQ project, called <a href="http://nanomsg.org/" title="nanomsg" target="_blank">nanomsg</a>.</p><!--kg-card-end: html--><p>Nanomsg aims to resolve many of the underlying short comings of the zeromq library, but remain compliant with the ZMTP spec. It provides many of the messaging patterns ( which are refereed to as scalability patterns ) as found in ZMQ - like <a href="http://tim.dysinger.net/posts/2013-09-16-getting-started-with-nanomsg.html#pair-two-way-radio">Pair</a>, <a href="http://tim.dysinger.net/posts/2013-09-16-getting-started-with-nanomsg.html#pipeline-a-one-way-pipe">Pipeline</a>, <a href="http://tim.dysinger.net/posts/2013-09-16-getting-started-with-nanomsg.html#pubsub-topics-broadcast">Pub-Sub</a> and <a href="http://tim.dysinger.net/posts/2013-09-16-getting-started-with-nanomsg.html#requestreply-i-ask-you-answer">Req/Rep</a>.</p><p>It also introduces two new patterns, <code>Bus</code> - which is basically the <strong>peer-to-peer</strong> pattern ( many to many ) and <code>Survey</code> - which is, much like <code>pub-sub</code> with <em>replies</em>. What is even more interesting, is that the transport mechanism is plugin based, opposed to zermq where transports are vertically integrated, or baked in. This allows for others to create transport outside of the library itself. What is even more interesting, is that HTML5 Websockets is one of the transports that ships with nanomsg making the browser a viable and active participant in the network topology. The node project for nanomsg comes with a simple example of using websockets - It basically boils down to this:</p><pre><code class="language-js:git">// server.js
// modified example found in the nanomsg project
&apos;use strict&apos;;
const nanomsg = require(&apos;nanomsg&apos;);
const socket = nanomsg.socket(&apos;pair&apos;);
const http   = require(&apos;http&apos;);

// Websocket
socket.bind(&apos;ws://0.0.0.0:3001&apos;);
socket.on(&apos;data&apos;, function(msg){
    pair.send(`received message ${&apos;&apos; + msg}`);
})

// Serve a webpage
http.createServer(function( req, res ){
  fs.createReadStream( &apos;index.html&apos; ).pipe( res )
}).listen( 3000 )
</code></pre><p>Make a new Websocket, and give it a <a href="https://raw.githubusercontent.com/nanomsg/nanomsg/master/rfc/sp-websocket-mapping-01.txt">nanomessage socket type mapping</a>, and wait for data.</p><pre><code class="language-html:git">&lt;!DOCTYPE html&gt;
&lt;html&gt;
&lt;head&gt;&lt;/head&gt;
&lt;body&gt;
   &lt;div id=&quot;response&quot;&gt; 
      &lt;span&gt; &lt;/span&gt; 
   &lt;/div&gt;
   &lt;input type=&quot;text&quot;&gt;
   &lt;button&gt;send&lt;/button&gt;
   &lt;script&gt;
      var ws = new WebSocket(&apos;ws://127.0.0.1:440&apos;,[
         &apos;pair.sp.nanomsg.org&apos;
      ])

      ws.onmessage = function( e ){
          var reader = new FileReader() // handle binary messages
          reader.addEventListener(&apos;loadend&apos;, function(){
             var result = reader.result;
             document.querySelector(&apos;span&apos;).textContent = result
          });
          reader.readAsText( e.data );
      }

      var button = document.querySelector(&apos;button&apos;);
      var input  = document.querySelector(&apos;input&apos;);

      button.addEventListener(&apos;click&apos;, funtion( e ){
         ws.send(input.value);
         input.value = &apos;&apos;;
      },false)
   &lt;/script&gt;
&lt;/body&gt;
&lt;/html&gt;
</code></pre><p><strong>Done</strong>. 2-way communication ( via the <code>Pair</code> socket type ) over websocket using nanomsg. The server code ( the important part anyway ) is only <strong>5 lines</strong>. Event The node package is less than <em>500 lines</em>. In contrast to something like <a href="https://github.com/socketio/socket.io/blob/master/package.json#L23">socket.io</a> which has 3 external dependent libraries ( a parser, adapter, and underlying engine) and also requires a client side library before you can even make a connection.</p><h4 id="d-v-s-n-noun">d&#x259;&#x2C8;v&#x12B;s -n., --noun</h4><p>A piece of software that connects one or more sockets together to deliver messages; An application</p><p>Additionally, all nanomsg sockets are duplex streams. So, in the case of a proxy or forwarding device, we really don&apos;t need to do anything with the messages, could just <a href="https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options">pipe</a> them to another nanomsg socket. Similar to the pair socket type in ZeroMQ, nanomsg will not load balance multiple clients; which means our example only works with the first open connection. But you could just as easily use <code>pub/sub</code>. <code>req</code> and <code>reply</code> socket types or open a second socket to achieve something more practical.</p><h4 id="chat">Chat</h4><p>Chat is really a sync problem more than it is a real-time problem</p><p>Chat is a rather common use case for websockets these days. Unlike most, I like to think of chat as a sync problem more than it is a <em>real-time</em> problem. <code>Person A</code> writes to a log, <code>Person B</code> and <code>Person C</code> get a copy of the log, or an incremental update of the log. <code>Pub-Sub</code> makes these updates rather easy to do, Lets tweak our example to make use of the <code>Pub-Sub</code> model.</p><p>We need to change the socket type on the server.</p><pre><code class="language-js:git">// server.js
const socket = nanomsg.socket(&apos;pub&apos;);
</code></pre><p>We also need to change the websocket mapping in the browser code</p><pre><code class="language-js:git">var ws = new WebSocket(&apos;ws://127.0.0.1:440&apos;,[
   `pub.sp.nanomsg.org&apos;
]);
</code></pre><p>Yes, change both to <code>pub</code>. It feels backwards, but the websocket transport mapping needs to match that of the server and the library does the rest. But pretty simple. Now every browser that connects will get every message sent in a <code>fire and forget</code> style. As you might be thinking, <em>subscribers can&apos;t send messages!</em> And you would be right.</p><p>This breaks our 2-way communication. Unfortunately, nanomsg websocket protocol does not double as an http server as you are probably used to with libraries like <a href="http://socket.io">socket.io</a>. However, this fits in with the way nanomsg works in general. Transports are pretty agnostic as to the use case - browser or not, it is just another participant in the messaging pattern and network topology.</p><!--kg-card-begin: markdown--><blockquote>
<h4 id="topologytpljnnoun">Topology (t&#x259;&#x2C8;p&#xE4;l&#x259;j&#x113;) -n., --noun.</h4>
</blockquote>
<ol>
<li>the way in which constituent parts are interrelated or arranged.</li>
</ol>
<!--kg-card-end: markdown--><p>We could open up another socket connection to handle one way push back to the server, or start an http server - As far as I can tell, you are going to need two ports or multiple servers to handle each scenario. Out of curiosity, I have implemented a <a href="https://bitbucket.org/esatterwhite/nano-chat">silly chat app</a> using <a href="http://kafka.apache.org/">Kafka</a> as the keeper of the log. Basically an HTTP POST is used to write to the log, and Kafka notifies any consumers ( the same app in this case ) which publish the message down the websocket. This could easily be expanded to use Kafka topics to mirror additional rooms and IMs and tying server instances to partitions and it would scale out pretty easily.</p><p><strong>Chat</strong>. <strong>Done</strong>.</p><p>Nanomsg builds on many of the great idea that came out of ZeroMQ and opends the door for more interesting ideas with far less work. As we&apos;ve seen here, the amount of code required to include client side applications as a member of otherwise complex networking applications is rather easy to do.<br>You can find the code for this example <a href="https://bitbucket.org/esatterwhite/summer-of-sockets/src/2e6870421ffad7d748025749ec7da9a0a71eb7f2/05.nanomsg_ws/?at=master">here</a> along with the rest of the <a href="http://www.codedependant.net/tag/summer-of-sockets">summer of sockets</a> posts</p>]]></content:encoded></item><item><title><![CDATA[Node Style Woes - Domains and Promises]]></title><description><![CDATA[Node.js support for new ES6 features expands by the day, but some of those new features don't work well with  existing Core features. The Promise is one.]]></description><link>http://www.codedependant.net/2016/09/12/node-style-woes-domains-and-promises/</link><guid isPermaLink="false">5fa8ac718fa7604a827dfa5d</guid><category><![CDATA[class]]></category><category><![CDATA[es6]]></category><category><![CDATA[promises]]></category><category><![CDATA[domain]]></category><category><![CDATA[node]]></category><dc:creator><![CDATA[Eric Satterwhite]]></dc:creator><pubDate>Mon, 12 Sep 2016 22:44:54 GMT</pubDate><content:encoded><![CDATA[<!--kg-card-begin: html--><div class="first-letter"><span>D</span></div><p>omains have been the red-headed step child of error handling in Node.js It is a library that has been deprecated since <code>v0.12</code> and has been awaiting a suitable replacement ever since ( we are at <code>v6.5</code> at the time of writing ). Until one has been implemented by the Node Core team, it is still de-facto way to deal with error propagation. As Node.js supports more and more ES6 features, I have been upgrading my open source projects where it seem appropriate. In my command line tool package, <a href="https://github.com/esatterwhite/node-seeli" target="_blank">seeli</a>, I was doing some updates and came across some exceptionally odd behavior around <a href="https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise" target="_blank">ES6 Promises</a> and <a href="https://nodejs.org/api/domain.html#domain_implicit_binding" target="_blank">implicit Domain binding</a>. In a nutshell - <strong>It&apos;s broke</strong>.</p> <!--kg-card-end: html--><h4 id="monkey-patch-m-ngk-pach-v-verb">Monkey Patch (&#x2C8;m&#x259;NGk&#x113; paCH) -v,. --verb</h4><p>dynamic modifications of a class or module at runtime, motivated by the intent to patch existing third-party code as a workaround to a bug or feature which does not act as desired</p><p>Implicit binding is what happens when you require the <a href="https://nodejs.org/api/domain.html">domain module</a> from node - It does a little <a href="https://en.wikipedia.org/wiki/Monkey_patch">monkey patching</a> of the <a href="https://nodejs.org/api/events.html#events_class_eventemitter">EventEmitter</a> class so that is knows to propagate errors events to a domain instance. All instances of the <code>EventEmitter</code> created after the domain module has been loaded are all considered to be under the single domain all errors events emitted will be forwarded to the domain handlers.</p><pre><code class="language-js:git">const domain = require(&apos;domain&apos;).create();
const events = require(&apos;events&apos;);

domain.on(&apos;error&apos;, ( err )=&gt;{
    console.error(&apos;An error has occurred&apos;);
    console.error( err );
    process.exit(1)
});

let e = new events.EventEmitter();
e.emit(&apos;error&apos;, new Error(&apos;I am an Error&apos;));
</code></pre><p>If you run this little snippet of code, The error is dumped to <code>stderr</code> and the process exits. Perfect, this is what I want, and Seeli has been dealing with errors this way for years with out a problem. However, in <em>keeping up with the Jones&apos;</em> I upgraded the version of <a href="https://www.npmjs.com/package/inquirer">inquirer</a> for seeli which introduces the use of Promises. No problem, just pass a <code>then</code> callback, and move the code down a notch - Simple, right? Nope! Everything stops working. Let&apos;s modify the above snippet to illustrate</p><pre><code class="language-js:git">const domain = require(&apos;domain&apos;).create();
const events = require(&apos;events&apos;);

domain.on(&apos;error&apos;, ( err )=&gt;{
    console.error(&apos;An error has occurred&apos;);
    console.error( err );
    process.exit(1)
});

let e = new events.EventEmitter();
Promise.resolve(true)
       .then(function( result ){
            e.emit(&apos;error&apos;, new Error(&apos;I am an Error&apos;));
       })
</code></pre><p>Nothing happens. Alright! OK! Hold on a minute - maybe the <code>Promise</code> is doing some magic for use and the catch handler is being called. Let&apos;s try that</p><pre><code class="language-js:git">const domain = require(&apos;domain&apos;).create();
const events = require(&apos;events&apos;);

domain.on(&apos;error&apos;, ( err )=&gt;{
    console.error(&apos;An error has occurred&apos;);
    console.error( err );
    process.exit(1)
});

let e = new events.EventEmitter();
Promise.resolve(true)
       .then(function( result ){
            e.emit(&apos;error&apos;, new Error(&apos;I am an Error&apos;));
       })
       .catch(function(err){
           console.log( &apos;catch handler called&apos; );
           e.emit(&apos;error&apos;, err);
       })
</code></pre><p>This prints <code>catch handler called</code> and exits. The error <strong>still</strong> does not propagate to the domain handler.</p><pre><code class="language-shell">$ catch handler called
</code></pre><p>Closer. What if we use the bind function on our domain to wrap the promise callback. This way if anything goes wrong in our original callback, we <em>should</em> still be notified of it</p><pre><code class="language-js:git">const domain = require(&apos;domain&apos;).create();
const events = require(&apos;events&apos;);

domain.on(&apos;error&apos;, ( err )=&gt;{
    console.error(&apos;An error has occurred&apos;);
    console.error( err );
    process.exit(1)
});

let e = new events.EventEmitter();
Promise.resolve(true)
       .then(domain.bind(function( result ){
            e.emit(&apos;error&apos;, new Error(&apos;I am an Error&apos;));
       }))
</code></pre><p>When we run this snippet, you guessed it - Nothing.</p><p>Basically, <a href="https://github.com/nodejs/node-v0.x-archive/issues/8648">implicit binding just doesn&apos;t jive with promises</a>. The v8 engine has what is called a microtask queue which is outside the control of node ( and domains ). If you want your errors to propagate out of the Promise internals, you need to explicitly add emitter instances to the domain in question.</p><pre><code class="language-js:git">const domain = require(&apos;domain&apos;).create();
const events = require(&apos;events&apos;);

domain.on(&apos;error&apos;, ( err )=&gt;{
    console.error(&apos;An error has occurred&apos;);
    console.error( err );
    process.exit(1)
});

let e = new events.EventEmitter();
domain.add( e ); // &lt;- explicit binding

Promise.resolve(true)
       .then(function( result ){
            e.emit(&apos;error&apos;, new Error(&apos;I am an Error&apos;));
       })
</code></pre><p>Now when we run this snippet things look a little bit better</p><pre><code class="language-shell:git">An error has occured
{ Error: I am an Error
    at /home/esatterwhite/dev/js/node-seeli/kill.js:15:29
    at process._tickDomainCallback (internal/process/next_tick.js:129:7)
    at Module.runMain (module.js:577:11)
    at run (bootstrap_node.js:352:7)
    at startup (bootstrap_node.js:144:9)
    at bootstrap_node.js:467:3 }
</code></pre><p>The error handling story in Node.js is confusing at best and this only complicates the matter even more. I hope the Node Core team comes up with a solid solution to replace domains and solidify the error handling saga soon. Until then, plain old <a href="http://thenodeway.io/posts/understanding-error-first-callbacks/">Node-style callbacks</a> are still the most reliable solution.</p>]]></content:encoded></item></channel></rss>