Batch Data flow & Real-time Data flow in Pega
RealTime Data Flow

Batch Data flow & Real-time Data flow in Pega

Summary:

This article discusses about advanced invocations of data flow rule and how Pega provides maximum throughput using data flow processing. The complete insight on this article will help getting a better understanding about Queue processor in the upcoming post.

This post might not be relevant in between our series of posts on background processing (Job scheduler & Queue processor). But we wanted to post this because Queue processor in Pega v8 uses Real-time data flow to process the messages in the Queue. So a good understanding of the real-time data flows in Pega and its execution will make us more prepared to understand the queue processor and its back-end processing.

When we started analyzing real-time data flows in Pega, we identified a lot of interesting & almost unnoticed features in the data flow. Extensive information on it will help us use it in our day to day implementations and also will make us understand why Pega has moved the entire queue processor execution to data flows.

Please make sure you go through our post on data flow which covers the basics about data flow and its associated components.

To keep it simple, process flow in Pega is used to define the business process of the work instances and data flow in Pega is used to define the business process of the data instances.

Before getting into the discussion, let’s make sure we are comfortable with the below terminologies in Pega to understand the actual context better.

Child Requestor in Pega

Refer to our article on Requestor & requestor types in Pega to understand the basics of the requestor in Pega. Child requestor is the requestor that is a spin-off from the current requestor session to optimize performance.

In most of the cases, child requestor is executed as a background requestor. The current requestor can communicate with the child requestor using pool id.

For example, consider a scenario where we want to perform tasks such as validating the fields, sending an email notification, updating a data type & routing the case to workbasket on click of assignment submission. We can implement this in multiple ways and most preferably using below two approaches,

  • Using synchronous execution of the actions in the same requestor context (without Child Requestor).
  • Since validation and workbasket routing are dependent on each other, we can execute them in the same requestor context sequentially. Email notification and data type updates are stand-alone transaction logic that can be executed in a separate (Child) requestor using the Queue method (with Child Requestor).

Partitioning in Pega

In general terms, partitioning is distributing/sharing equally. In Pega, it means the same but in terms of records or instances.

For example, consider a scenario where we have two data types. We need to copy records from the first data type to the second data type. There are 100k records to be copied. We can implement this in multiple ways and most preferably using below two approaches,

  • Implement logic to copy the 100k records in the same requestor context (without partitioning).
  • Implement logic to copy the 100k records using 3 child requestors and using proper partitioning logic(key) to distribute records to each child requestor (with partitioning).

We will see in detail about how to identify the partition key & configure the partition logic in Pega in the later part of the post.

Horizontal scaling in Pega

As part of the performance optimization, we can scale our environment to have more application servers (nodes) without changing the size of the existing server(node). This way of scaling up our environment is known as horizontal scaling.

Vertical scaling in Pega

As part of the performance optimization, we can scale up our environment to change the size of the existing server(node) instead of adding new servers (nodes). This way of scaling our environment is known as vertical scaling.

Few examples of vertical scaling metrics are, adding more resources like CPU, RAM(Heap size) to the server(node), increasing the number of the requestor and other thresholds, etc.

Publish-Subscribe Paradigm

In a common messaging system (synchronous communication), two or more systems will communicate with each other by knowing their identity and whom they communicate with.

Synchronous communication

But Publish-Subscribe paradigm is an asynchronous way of communication where Publisher (Sender) and the Subscriber (Receiver) will not be knowing each other and whom they communicate with. Messages to be communicated will be posted in a messaging queue (Topic) & the subscribers will be requesting the messages on-demand basis.

Asynchronous communication

The below four terminologies make up the publisher/subscriber model,

  1. Topic – An intermediary channel that maintains a list of subscribers to relay messages to that are received from publishers
  2. Message – Serialized messages sent to a topic by a publisher which has no knowledge of the subscribers
  3. Publisher – The application that publishes a message to a topic
  4. Subscriber – An application that registers itself with the desired topic in order to receive the appropriate messages

Let’s now get into the actual discussion on the data flows.

Based on the configuration of the data flow and the different ways of invoking the data flow, it can be broadly classified into 4 types,

  • Batch Processing
  • Real-time Processing
  • Single case Processing
  • External Processing

Batch processing and Real-time processing will be discussed in this post, whereas Single case and external processing will be covered in a separate post.

Batch Processing

  • When we want to process more than one record using data flow, the system considers it as data flow of type batch.
  • Batch processing data flows are created as instances of class PEGA-DM-DDF-WORK.
  • Batch data flow will run only if at least one of the batch data flow node is up and running.
  • Batch data flow node status can be verified using DEV studio-> Decisioning -> Infrastructure -> Services -> Data flow -> Batch.
Pega cloud-hosted environments should raise SR with Pega GCS to map a node under the BATCH processing node type.

Business scenario

Let’s consider a scenario where all the customer-related information in the Claims application is stored in the data type (Customer). Now we need to copy all the customer information from Pega to another (external database) table.

Let’s see how best we can achieve this using data flow in Pega with maximum throughput.

Implementation

Data flow to implement the given requirement would look like below. Please make sure you go through the data flow basics post to understand each and every component and its usage.

Execution

  • We can directly run the created data flow rule from the actions menu in the data flow rule or we can use the DataFlow-Execute method in an activity rule to trigger the data flow run. In both the scenarios, a data flow work object will be created and the data flow gets executed.
Execution result of the data flow rule. It has successfully processed 300 records.
  • Once the data flow run is completed, we can see the records getting copied successfully to the destination table. Data flow also keeps track of the failed records with the failure reason in its execution.

Partitioning in Batch data flow processing

As examined in our earlier post, data flow performs much better than activities in terms of its execution. Pega has still optimized the execution of the data flows by using partitioning in its execution. The combination of data flow and partitioning helps us achieve maximum throughput.

To enable partitioning for a data flow, the below list of factors should be considered,

  • Maximum no. of child requestors to be created.
  • Maximum no. of records to be distributed for each child requestor.

Number of Child Requestors to be created

  • The number of nodes mapped under batch data flow type defines the number of batch processing nodes. In our scenario it’s only 1.
We can add any number of nodes based on our need
  • The number of maximum threads allowed for a batch data-flow node defines the number of parallel requestors that can be used for processing. In our scenario, it’s 5. This can be adjusted based on the business need.
  • Hence the maximum number of child requestor that can be created will be calculated using the below formula,

Max No Of Child requestor = No Of Batch Node * Thread (requestor) count

In our scenario,

Max No Of Child requestor = 5 (1*5)

Number of records to be distributed for each child requestor (Partitioning)

  • Now, the number of records to be distributed for each child requestor will be defined using the partition key in the Data set rule (Data set that is used as the source of the data flow).
  • CustomerType is a column in the customer table. For testing purposes, we had set up 300 records in the customer table with different customer type values.
    • Silver – 100
    • Gold – 100
    • Platinum – 100
  • Since the Customer type is mentioned as the partitioning key, 3 child requestors will be created and each of the child requestors will process 100 records each.
  • As per our configurations, the system has created 3 child requestor and have process 100 records in each requestor. The distribution of records is based on the partition key that is mentioned in the data set that is mapped as the source of the data flow.

This way of processing records using partitioning will reduce the time required to process the records drastically. In our scenario, the processing time using partitioning will be 1/3 of the actual time.

Scaling up the batch data flows

Batch data flows supports both horizontal and vertical scaling.

  • As part of Horizontal scaling, we can add one more batch data-flow node. Hence 10 child requestors can be executed parallelly.

Max No Of Child requestor = No Of Batch Node * Thread (requestor) count = 2* 5=10

  • As part of Vertical scaling, we can increase the number of threads(requestors) per count for the existing batch node from 5 to 7. Hence 7 child requestors can be executed in parallel.

Max No Of Child requestor = No Of Batch Node * Thread (requestor) count = 1* 7=7


Real-time Processing

Queue Processor in v8 uses Real-time data flow processing for its execution. So understanding how real-time data flow works will help us understand how the queue processor works and how it is different from standard agents in terms of performance.

  • Real-time processing is similar to Batch data flow, but it only differs in terms of its invocation & processing of the messages. Batch data flows are invoked internally using the DataFlow-Execute method whereas Real-time data flows are invoked based on a real-time event that happens in a system.
  • Real-time processing data flows are created as instances of class PEGA-DM-DDF-WORK.
  • Real-time data flows will run only if at least one of the Stream data flow node is up and running.
  • Stream data flow node status can be verified using DEV studio-> Decisioning -> Infrastructure -> Services -> Data flow -> Stream.
  • Unlike other data flows, Real-time data flow will always be in the running state. It keeps running until we explicitly stop or disconnect. As it is always up and running, it can respond to all the real-time events that get queued to it for processing.

Business Scenario

Let’s consider a scenario where we need to parse and store all the customer-related information coming from an external system in a Pega data type (Customer).

Let’s see how we can achieve the above requirement using real-time data flow and trigger the data flow based on a real-time event.

Implementation

Data flow to implement the given requirement would look like below. Please make sure you go through the data flow basics post to understand each and every component and its usage.

  • The Source data set of the real-time data flow will not be pointing to the database table, instead it will be pointing to a Stream. These kinds of data sets are called as stream data set and are used to transfer messages in a publish/subscriber paradigm.
  • A real-time event will publish the message to the stream data set. Stream Data set will then hold the message in it. Subscribers can subscribe to the data set and receive the messages/records whenever required.
  • In our scenario, the external system publishes the message to the Stream data set. Stream data set will persist the message and keep it ready for subscribers. Subscribers can then get the required messages by browsing the stream data set which will contain all the messages.

Queue processor uses Kafka (distributed streaming platform) data set, to save the incoming messages. So the incoming messages will be published to the kafka data set. Individual queue processor rules in the application will be acting as a subscriber of the Kafka data set. They subscribe and receive the messages back from Kafka and proceed with its execution.

  • Each stream data set by default exposes the data set for the real-time invocation (Publishing Messages) using REST (stateless protocol) or Web Socket (stateful protocol).
  • Any real-time event can be published to the stream dataset which will be used to trigger the data flow associated with it.

Execution

  • Real-time data flows will be triggered based on specific event occurrence. The event that triggers the real-time data flow can be either from an external source or can also be within Pega.

In Queue processor, Message gets published to the Kafka data set when we use the method Queue-For-Processing in an activity rule. Here Queue-For-Processing is the real-time event that publishes the message to the Kafka data set.

  • We can trigger the created real-time data flow by hitting the generated REST end-point URL from the stream data set in any of the REST client (Postman). The corresponding hit will trigger the data flow run.
  • For each invocation, the system queues a message and triggers the real-time data flow execution. The aggregate statistics keep increasing based on the invocation.

Partitioning in Real-time data flow processing

To enable partitioning for a real-time data flow, the below list of factors should be considered,

  • Maximum no of child requestors to be created.
  • Maximum no of records to be distributed for each child requestor.

Number of Child Requestors to be created

  • The number of nodes mapped under the Real-time data flow type defines the number of real-time processing nodes. In our scenario, it’s 1.
  • The number of maximum threads(requestor) count allowed for a real-time data-flow node. In our scenario, it’s 5. This can be adjusted based on the business need.
  • Hence the maximum number of child requestor that can be created will be calculated using the below formula,

Max No Of Child requestor = No Of Real-time Node * Thread (requestor) count

In our scenario,

Max No Of Child requestor = 5 (1*5)

Number of records to be distributed for each child requestor (Partitioning)

  • Unlike other data flows, we need not explicitly mention the partition key in data set for real-time data flows. By default each real-time data flows will be enabled for 20 partitions.

From the above configuration, we see that our system can have a maximum of only 5 child requestors and here we talk about 20 partitions? How does that work 😳 and how 20 partitions get shared between 5 requestors? 😳

Let’s say if we have records in 15 partitions (0-14). the system will create 5 child requestors and will process the first 5 partitions. Once the child requestor completes and becomes available, it picks up the next partition and the cycle keeps repeating until all the 15 partition records are processed successfully. Hence, in the end, the system ensures that all the records in partitions are processed successfully.

  • Partition key in real-time data flow will be set in property pxStreamPartition for all the published messages. Whenever a message gets published, the system by default sets the values for pxStreamPartition between 0-19 (20 partitions).
  • Messages with partition key as 1 will be processed in a separate requestor whereas messages with partition key as 2 will be processed in a separate requestor and so on. Success and failure counts of each partition will be tracked by the system along with the failure reason in the case of failure.
  • With 20 partitions, real-time data flow processes record 20x time faster than the normal execution.

Scaling up the real-time data flows

Real-time data flows supports both horizontal and vertical scaling.

  • As part of Horizontal scaling, we can add one more real-time data-flow node. Hence 10 child requestors can be executed parallel.

Max No Of Child requestor = No Of Real-time Node * Thread (requestor) count = 2* 5=10

  • As part of Vertical scaling, we can increase the number of threads per count for the existing real-time node from 5 to 7. Hence 7 child requestors can be executed parallel.

Max No Of Child requestor = No Of Real-time Node * Thread (requestor) count = 1* 7=7


Now we are at the end of the post. We hope this post might have given you some insights into how Pega has enhanced/optimized the execution of the process using data flows.

This post will be a bit difficult to follow since it involves advanced and detailed technical information on the data flow execution. We even spent lots of days and nights to collect this information. Hence we request you all to go through the post again and again until it is clear for you.

Queue processor becomes easy to understand when we are clear on what’s discussed above.

You can even start a discussion in the OSP forum if you face any issues when you explore or implement what’s discussed above. We will be happy to help you anytime.

Stay tuned for our next post on Queue Processor. 😎

OSP TEAM
Written by
OSP Editorial Team

Recent Jobs from our community

loading
View more Jobs on OSP Forum
Join the discussion

Feel free to post your questions here about this topic if any. We will definitely get back to you ASAP !!!
If you have any off-topic questions, Let's discuss at OSP Forum

22 comments
  • Guys, awesome. I am really mesmerized by your actions guys. You are posting things you identified while analysis also to keep everyone aware of it guys. That is really so good of you. Keep it up guys. You rock.

  • As I mentioned earlier, i have spent my time on a valuable post which made me happy.
    I can understand that analysing the new stuffs, doing POC and explaining it with a crisp and clear example is really a big task.
    Hats off team for your effort

  • As per one example in the article, We had ability to create up to 5 child requestors to process the queue items. But based on the partition key only 3 partitions can be done so only 3 out of 5 child requestors were utilized.
    Why cant we use the remaining 2 child requestors. what conflict will we face?

    • That’s really an amazing question. The partition key is used to distribute records equally to each requestor. If we want to distribute 3 partitions to 5 child requestors, then we might face some duplicate processing of records when we split records with one partition key between two child requestors.

      It might end up complicating the entire process.

      • Thank you for your quick response.

        Just curious to know how standard agents executing on multiple nodes managed to process the queue items in the single partition(queue table) with out duplication. Did the stored procedure sp_pr_reserve_queue_item took care of duplication issue?

    • Stream nodes are used to publish and subscribe to the message (stream processing) whereas real-time nodes are used for the processing of the messages once the subscriber receives it. It’s just the Pega’s architecture to ensure proper load balance.

  • Is it true that batch data flows are executed in case of job schedulers and real time data flows are executed in case of queue processors? This article does not seem to state this explicitly.

    • Job schedulers are schedule driven and it’s not required to have it run using batch data flows. But we can trigger a data flow run using job schedulers based on the business need.

      Let us know if you need any clear understanding of any specific points. We are happy to analyze and get back with the information.

      • Hi,

        Thank you so much for clarifying! Also, I do notice that you have written at the beginning of this article that items for queue processors are processed using real time data flows. You have also explained it.

        Moreover, I have formulated a technique of maximizing the horizontal and vertical scaling of job schedulers that will be replacing the advanced agents after we complete the upgrade from 7 to 8. Here are the steps (the solution exploits data partitioning):
        1) Call a data flow from the JS activity (DataFlow-Execute method).
        2) Fetch the records for the data flow from a data set.
        3) Source the data set from the same report definition that the existing advanced agent is using.
        4) Use a suitable partitioning property in the data set so that it spawns off about 5 to 10 partitions.
        5) Assign a number of threads to the data flow service for the node type that is used by the JS that is greater than or equal to the number of partitions that are likely to be created. This will implement vertical scaling.
        6) Configure the destination of the data flow to be abstract.
        7) Insert necessary shapes between the source and the destination shapes in the data flow. There shapes can implement the same processing logic in the JS that was being implemented by the advanced agent that it is replacing. For example, we can call a data transform that in turn calls the existing JS activity. These activities can queue processing to a queue processor. This will implement horizontal scaling.

        Please let me know your thoughts.

        Thanks!

        Sourav

  • Hello, guys! Thank you so much for this valuable information, especially when there is no much pega help online. I have one question. You were saying that “With 20 partitions, real-time data flows processes records 20x times faster than the normal execution”. A normal execution means having 1 child requestor? And, if we consider the scenario you have created, there are 5 child requestors and 20 partitions and only 5 partitions can run parallelly, therefore the processing time is only 4 times faster, not 20 times, correct? I want to make sure I understood the performance impact when talking about partitions.

  • Hi Team

    Thanks for this nice post !! I have a requirement where we are trying to generate a very complex PDF document comprises 100 + pages using queue processor . In side the queue processor activity there is also a condition to generate 2 complex PDFs with each 100 + pages. When 2 huge PDFs are getting generated some time we are getting 504 error or Lock Issues as these PDFs are suppose to be attached to the work object post generation . We have used pxCreatePDF activity inside queue processor activity . Please suggest me to leverage any of the existing options available in Queue Processor which will enhance parallel Processing and also avoid Locking issue.

    NB** we tried increasing the Thread option given in Queue Processor but locking issue still persists.

    Thanks
    Naveen P.

    • When the attachments size is huge, it’s always recommended to use a repository rather than Pega storage. See if you can use any cloud based storage for docs and link the Public url to the work object.

      Have the PDF generation alone in a queue processor and take the logic of attaching it to the Work Object using another background processing. It will get you some stable process.

  • Two users Executes the same RealTime dataflow(Queuprocessor) , how do we identify which messages belongs to user1 are failed ?

  • Thank you so much for sharing such valuable information.
    Can you please share single case processing and External processing data flow’s as well?

  • Hats off to the effort .. Pega documentations should have details at this level.. so we know the topics at core level and we can leverage features to maximum extent …. I see we design the solutions based on our knowledge and sometimes end up limiting it’s usage.
    By reading this and Queue Processor, I am more confident to design high throughput jobs in more efficient way…