# Distributed set-expression cardinality estimation

Imported: 13 Feb '17 | Published: 18 Jan '11

Abhinandan Sujit Das, Sumit Ganguly, Minos N. Garofalakis, Rajeev Rastogi

USPTO - Utility Patents

## Abstract

A method and system for answering set-expression cardinality queries while lowering data communication costs by utilizing a coordinator site to provide global knowledge of the distribution of certain frequently occurring stream elements to significantly reduce the transmission of element state information to the central site and, optionally, capturing the semantics of the input set expression in a Boolean logic formula and using models of the formula to determine whether an element state change at a remote site can affect the set expression result.

## Description

### FIELD OF THE INVENTION

The invention relates generally to information processing systems and, more particularly, to query processing in a distributed streaming environment.

### BACKGROUND OF THE INVENTION

In distributed data streaming applications, rapid update streams originating at tens or hundreds of remote sites are continuously transmitted to a central processing system for online querying and analysis. Examples include monitoring of service provider network traffic statistics, telecommunication call detail records, Web usage logs, financial stock tickers, retail chain transactions, weather data, sensor data, and so on.

An important consideration in the above-mentioned monitoring applications is the communication overhead imposed by the distributed query processing architecture on the underlying network. Specifically, transmitting every update stream to a central site for processing can lead to inordinate amounts of message traffic, and thus have a crippling effect on the communication infrastructure as well as the central site processor.

For many distributed stream-oriented applications, exact answers are not required and approximations with guarantees on the amount of error suffice. The tradeoff between answer accuracy and communication overhead for specific classes of continuous queries over distributed update streams has been studied recently.

One approach considers aggregation queries that compute sums and averages of dynamically changing numeric values spread over multiple sources. In this approach, each site is assigned an interval of a certain width such that the sum of site interval widths is less than the application's total error tolerance. Thus, as long as the numeric value at each site stays within the interval for the site, no messages need to be sent by the sites in order to satisfy the application's accuracy requirements. However, in case the value at a site drifts outside the site's interval, the site is required to transmit the value to the central site and make appropriate adjustments to its interval.

Another approach focuses on the problem of continually tracking top-k values in distributed data streams; the developed techniques ensure the continuing validity of the current top-k set (at the central site) by installing arithmetic constraints at each site.

Unfortunately, most existing approaches for processing data streams are primarily concerned with exploring space-accuracy tradeoffs (mostly for single streams) rather than communication-accuracy tradeoffs in a distributed streams setting.

### SUMMARY

The invention addressed various deficiencies in the prior art by providing methodologies pertaining to the problem of approximately answering set-expression cardinality queries over distributed streams originating at tens or hundreds of remote sites.

Various embodiments provide a method and system for answering set-expression cardinality queries while lowering data communication costs by utilizing a coordinator site to provide global knowledge of the distribution of certain frequently occurring stream elements to significantly reduce the transmission of element state information to the central site and, optionally, capturing the semantics of the input set expression in a Boolean logic formula and using models of the formula to determine whether an element state change at a remote site can affect the set expression result.

The inventive methodologies estimate set-expression cardinality with guaranteed accuracy at a central processing site, while keeping data communication costs between the remote sites and the central processor at a minimum. The inventive solutions exploit global knowledge of the distribution of frequent elements as well as the semantics of set expressions to reduce data transmission overhead while preserving user-specified error guarantees. The inventive methodologies and protocols efficiently propagate global frequency information across sites, and provide a logic-based formulation for identifying the element state changes (at a remote site) that can affect the set expression result (at the central site). The methodologies are effective in reducing the volume of message traffic.

Specifically, in one embodiment of the invention, a distributed framework for processing set-expression cardinality queries is provided wherein: each update stream site being allocated an error budget adapted to determine when the update stream site communicates stream state information to a central site; each update stream site associates a charge with every stream element that is inserted or deleted since stream state was previously transmitted to a central site; and each update stream site transmits current stream state information when the sum of its respective element charges exceeds its respective error budget.

To facilitate understanding, identical reference numerals have been used, where possible, to designate identical elements that are common to the figures.

### DETAILED DESCRIPTION OF THE INVENTION

The invention will be described within the context of distributed stream-oriented applications in which a relatively large number of remote sites provide update streams to a central site for processing.

The present invention is adapted to providing a distributed framework for processing set-expression cardinality queries. Each site or stream source is allocated an error budget which governs when the site communicates stream state information to the central processing site (e.g., for estimating set-expression cardinality). Each remote site associates a charge with every stream element that is inserted or deleted since stream state was last transmitted to the central site. Only when the sum of element charges at a site exceeds the site's error budget does the site communicate the current stream state information to the central site. The framework allows for flexibility in how elements are assigned charges as well as how site error budgets are allocated. Specifically, methods for computing charges are only required to satisfy certain basic properties needed for correctness in terms of providing the stipulated error guarantees. Methods that return smaller charges for elements are more desirable since they result in lower communication overhead.

In another embodiment of the invention, additional techniques that incorporate global knowledge to reduce communication are provided. In many distributed streaming environments, the frequency distribution of stream elements will be skewed with certain elements occurring more frequently than others. For example, in the flows collected from an ISP's border routers, IP addresses corresponding to popular Web sites like Yahoo, Google, Amazon and the like will be contained in a disproportionately large number of flows. If such a frequently occurring element (e.g., an IP address or other stream element) is inserted into a stream at a site (where it does not appear previously), there is no need to charge for it since the element must already be present at the central site and, therefore, the insert has no effect on the set-expression cardinality at the central site. Similarly, the charge for the deletion of a frequent element can be distributed across all the sites where the element occurs since the element would need to be deleted at all these sites to truly go away. Thus, global knowledge of frequent stream elements can lead to lower overall communication costs due to reduced element charges at each site. In one embodiment, protocols for disseminating this global information to the various sites while incurring minimal message overhead are provided.

In another embodiment of the invention, additional techniques that exploit set-expression semantics to reduce communication are provided. In this embodiment, the semantics of set expressions are exploited to obtain further reductions in element charges. For example, in the expression S∪T, if an element e is already present in stream S, then inserts and deletes of e from T have no effect on the set-expression result. Therefore, there is not need to charge for them. In this embodiment, a logic-based approach is provided where the conditions for a change in a set-expression result are captured in a Boolean formula. Models for the Boolean formula then represent scenarios for result changes and are used to compute element charges. Finally, in order to address the (provably required) large time complexity of model enumeration, the methodologies develop an efficient heuristic for computing element charges whose running time is polynomial in the number of streams.

It is noted that while the primary focus of the invention is estimating set-expression cardinality, the described techniques are quite powerful, and can also be used to approximate set expression results (i.e., sets of data elements) at a central site. This can be used by the coordinator to run other potentially complex queries on top of it, which could be more useful than just cardinality queries. For example, in the below DDoS scenario, the results could be filtered to identify malicious hosts, or in the below Akamai example, to identify users corresponding to certain traffic patterns.

It will be appreciated by those skilled in the art that the invention has broad applicability to distributed stream-oriented applications and query processing. For example, in wireless sensor networks (e.g., for environmental monitoring, inventory tracking, etc.), sensors tend to have a very limited battery life, and radio communication is much more expensive in terms of power consumption compared to processing. To ensure longer lifetimes for sensor nodes, it is critical to reduce the amount of data transmitted, even if that implies additional processing at the sensor nodes. Generally speaking, the invention is adapted to enable sufficiently accurate approximations to queries while avoiding excessive communication overhead within the distributed query processing architecture.

Another application is the problem of detecting distributed denial-of-service (DDoS) attacks by analyzing network flow information collected from an ISP's border routers. In a typical DDoS attack scenario, hundreds of compromised “zombie” hosts flood a specific victim destination with large numbers of seemingly legitimate packets. Furthermore, in order to elude source identification, attackers typically forge, or “spoof”, the IP source address of each packet they send with a randomly-chosen address. Consequently, one approach for detecting DDoS attacks is to look for sudden spikes in the number of distinct IP source addresses observed in the flows across the ISP's border routers. The DDoS monitoring application does not require IP source address counts to be tracked with complete precision. Approximate counts can be equally effective for the purpose of discerning DDoS activity as long as errors are small enough so as to not mask abrupt changes in the true counts. Thus, depending on the accuracy requirements of the DDoS application, routers only need to transmit a subset of flow records to the central monitoring site.

As another example, consider a Web content delivery service such as that provided by Akamai (www.akamai.com). In this case, Web sites are replicated at a large number of geographically distributed servers, and users requesting access to a Web site are automatically redirected to the geographically closest server, or the least loaded server. Here, one might often be interested in tracking (approximately) the number of (distinct) users accessing a Web site (across all servers), the number of users who visit both a Web site A and Web site B, or the number of users who visit Web site A but not B. These statistics can be useful for determining the servers at which to replicate Web sites, deciding which advertisements to display at each Web site, and so on.

The problem of counting the number of distinct IP source addresses or web-site users, as discussed above, are special cases of the more general set-expression cardinality estimation problem, which is discussed in more detail herein. In this more general problem, of interest is estimating the number of distinct values in the result of an arbitrary set expression over distributed data streams. For example, in the DDoS scenario, in one embodiment is employed the set difference cardinality query |S−T| to detect significant traffic deviations. Specifically, S is the IP source address set for a sliding window spanning the past week (until now) and T is the set of IP source addresses from the week prior to that (e.g., two weeks ago). Similarly, in the Web example, if S and T are the sets of users who visit Web sites A and B, respectively, then the set intersection query |S∩T| yields the number of users who access both sites A and B.

System Model

This section describes a distributed update-stream processing architecture and formally defines the set-expression cardinality estimation problem addressed in this paper.

FIG. 1 depicts a distributed stream processing model according to an embodiment of the present invention. The distributed stream processing model 100 of FIG. 1 represents a distributed environment with m+1 sites and n n update streams. Stream updates arrive continuously at remote sites 1, . . . , m and site 0 is a special coordinator site that is responsible for generating answers to user (set expression cardinality) queries. It is assumed that there is no direct communication among remote sites, as is typical in most network monitoring applications (see, e.g., NOC example below). Optionally, more or fewer remote sites may be used, more coordinator sites may be used, the number of update streams sent from different remote sites may be the same or different, the sites may be arranged in a communication hierarchy where each site communicates only with its respective “parent” and “children” sites in the hierarchy. These and other modifications to the invention are envisioned by the inventor and will be appreciated by those skilled in the art and informed by the teachings of the present invention.

The model 100 of FIG. 1 provides a plurality of sites denoted as Site 0 (1100), Site 1 (1101) and so on up to Site m (110m), collectively denoted as sites 110. In the model 100 of FIG. 1, the first site (i.e., Site 0) is utilized as a coordinator site, which receives a plurality of update state message (USM) streams from each of the remote sites 1 through m. Assuming n update state message streams for each site, the update state message streams are denoted for remote site 1 as USM0,1 through USMn-1,1, for remote site 2 as USM0,2 through USMn-1,2 and so on up to remote site m as USM0,m through USMn-1,m.

Each remote site exchanges messages only with the coordinator, providing it with state information for streams at the site. Note that this distributed communication model is representative of a large class of real-life applications including network monitoring where a central Network Operations Center (NOC) is responsible for processing network traffic statistics collected at the switches and routers distributed across the network.

FIG. 4 depicts a processing architecture according to an embodiment of the present invention. Specifically, the processing architecture 400 of FIG. 4 may be used for a central site or for a remote site, depending upon the software included therein. The architecture of FIG. 4 is presented as a general purpose computing element adapted to perform the various stream processing tasks described herein. Moreover, while not discussed in detail herein, appropriate systems and apparatus for practicing the data structures, methodology and other aspects of the invention may be found in any system benefiting from the data processing and other techniques described herein.

Specifically, FIG. 4 comprises a processing architecture 400 including a processor 420 as well as memory 440 for storing various control programs and other programs as well as data. The memory 440 may also store an operating system supporting the various programs.

The processor 420 cooperates with conventional support circuitry 430 such as power supplies, clock circuits, cache memory and the like as well as circuits that assist in executing the software routine stored in the memory 440. As such, it is contemplated that some of the steps discussed herein as software processes may be implemented within hardware, for example as circuitry that cooperates with the processor 420 to perform various steps. The processing architecture 400 also contains input/output (I/O) circuitry 410 which forms an interface between the various functional elements communicating with the architecture 400.

The architecture 400 may be advantageously employed within the context of a network management system (NMS), an element management system (EMS) or any other network management system. More specifically, the architecture 400 may be utilized within the context of a coordinator site (110) and/or a remote site (120) within the model 100 of FIG. 1.

The invention may be implemented as a computer program product wherein computer instructions, when processed by a computer, adapt the operation of the computer such that the methods, data structures and/or techniques of the present invention are invoked or otherwise provided. Instructions for invoking the inventive methods may be stored in fixed or removable media, transmitted via a data stream in a broadcast media, and/or stored within a working memory within a computing device operating according to the instructions.

Memory 440 is depicted as including, in one embodiment, coordinator site processing routines such as a transmission control routine 442, a set expression cardinality algorithm 444 and other programs and data 446. Memory 440 is depicted as including, in one embodiment, remote site processing routines 448. The various routines and related methodologies will be discussed below.

It will be appreciated by those skilled in the art and informed by the teachings of the present invention that while the memory 440 includes a plurality of data structures, algorithms and storage regions, there is no requirement within the context of the present invention that a single memory device as depicted be utilized within the context of the update-stream processing architecture. Specifically, any combination of internal, external and/or associated memory may be utilized to store the software instructions necessary to provide the various functions. Thus, while the architecture depicts the memory as an integral portion of a relatively unified structure, the memory 440 may in fact be distributed internal or external to the update-stream processing architecture 400.

Referring back to FIG. 1., at each remote site j, the n update streams render n distinct multi-sets S0,j, . . . , Sn-1,j of elements from the integer domain [M]={0, . . . , M−1}. It is noted that other non-integer domains may also be utilized within the context of the present invention. Generally speaking, any finite domain may be used.

Each stream update at remote site j is a triple of the form <i,e,±ν> where i identifies the multi-set Si,j being updated, e∈[M] is the specific data element whose frequency changes, and ±ν is the net change in the frequency of e in Si,j, i.e., “+ν” (“−ν”) denotes ν insertions (resp., deletions) of e. It is assumed that all deletions in the update streams are legal; that is, an update <i,e,−ν> can only be issued if the net frequency of e in Si,j is at least ν. Note that delete operations help to substantially enrich the streaming model; for example, with deletions, the methodologies can easily handle sliding window queries by simply issuing a delete operation for each expired stream update that is no longer in the window of interest. For each i=0, . . . , n−1, let Si=∪jSi,j. Thus, Si reflects the global state of the ith update stream, while each multi-set Si,j captures the local state of stream i at site j. For purposes of simplicity, the terms Si and Si,j will be referred to as streams, though actual meaning of these terms is more precisely given as the current states of the underlying streams.

The invention is adapted to the problem of answering set-expression cardinality queries over the underlying collection of distributed update streams. Specifically, given a set expression E over streams S0, . . . , Sn-1 (with the standard set operators ∪, ∩ and − as connectives), the methodologies seek to estimate |E|, the number of distinct elements in E. For example, |S0∩S1| is the number of distinct elements in the intersection of streams S0 and S1. If for m=2 remote sites, S0,1={a}, S0,2={a,b}, S1,1={b} and S1,2={c}, then S0={a,b} and S1={b,c}. Thus, E=S0∩S1={b} and |E|=1.

The problem of estimating |E| at the coordinator is complicated because the substreams Si,j that comprise each distributed stream Si are distributed across the remote sites. Accurately tracking |E| by having remote sites continuously ship every stream update to the coordinator is clearly impractical for high data rate streams. Consequently, in order to reduce the burden on the communication infrastructure, the methodologies allow |E| to be approximated, but enforce a bound on the error in the final estimate. Specifically, for a pre-specified error tolerance ε, the methodologies seek to compute an estimate {circumflex over (X)} for X=|E| (at the coordinator) such that X−ε≦{circumflex over (X)}≦X+ε. The ε error parameter provides system designers with a useful knob that enables them to trade accuracy for efficiency. Essentially, the larger the error tolerance of an application, the smaller the communication overhead required to ensure that the estimate {circumflex over (X)} meets the ε accuracy guarantee.

1. Estimating Single Stream Cardinality.

A distributed algorithm for the case when the expression E whose cardinality the methodologies wish to estimate is a single stream Si (which is the union of substreams Si,j at remote sites) will first be described. Thus, the methodologies are basically looking to estimate the number of distinct elements in stream Si. The scheme for the distinct elements estimation problem illustrates the key concepts underlying the approach as well as the overall structure of the distributed solutions. In the next section, the methodologies will generalize the solution for a single stream to handle arbitrary set expressions.

The objective of the distributed algorithm is to be able to continuously estimate |E| at the coordinator site with ε accuracy. To achieve this, the methodologies distribute the error tolerance of ε among the m remote sites. The methodologies denote the error budget allocated to site j by εj; thus, Σjεj=ε. While there are multiple ways to allocate error budgets to sites, a simple approach is to allocate these proportional to the stream update rates at the sites. Another approach is to distribute the error budget uniformly across the remote sites. In an alternate embodiment, any scheme may be used as long as the following invariant is satisfied: Σjεj=ε and for all j, εj≧0.

The error parameter εj essentially dictates when site j sends the current states of substreams Si,j at site j to the coordinator. The methodologies denote by Ŝi,j, the most recent state of substream Si,j communicated (by site j) to the coordinator. In addition to Si,j, site j also stores in its local memory, the transmitted states Ŝi,j for substreams at the site. For each stream Si, the coordinator constructs the global state Ŝi by taking the union of all the local substream states Ŝi,j received from the remote sites. Thus, Ŝi=ÅjŜi,j. Now let Ê be the result of evaluating expression E on the states Ŝi instead of Si. The coordinator estimates the cardinality of set expression E as |Ê|.

Generally speaking, if remote sites have limited memory, then the methodologies are optionally modified to store a compact sketch synopsis for each substream (instead of the complete substream state). Techniques associated with the compact sketch synopsis are described in more detail in U.S. patent application entitled METHOD FOR DISTINCT COUNT ESTIMATION OVER JOINS OF CONTINUOUS UPDATE STREAMS, Ser. No. 10/957,185 which is incorporated herein by reference in its entirety. In this embodiment each remote site keeps track of its respective substream states. Briefly, the JD Sketch techniques are adapted to maintaining a summary of a first continuous stream of tuple updates by hashing tuples received from the first continuous data stream according to at least one initial attribute, and for each bucket of the at least one initial attribute, generating a corresponding set of 2-level hash sketches according to at least one other attribute. The JD Sketch synopsis data structure is not a standard hash table. It may be conceptualized as a hash table having buckets which additionally include second level hash structures adapted to capture information pertaining to the set of elements of a stream. That is, only a summary of the set of information mapping to each hash table bucket is actually kept in the bucket. This summary hash table bucket summary may comprise, illustratively, a set of 2-level hash sketch synopses. Thus, each hash table bucket stores a corresponding set of hash tables of those data elements that map or “collide” into the hash table bucket.

Other synopsis structures may be used to maintain a summary of update stream data prior to transmission of that data from a remote site, such as 2-level or multiple level hash structures.

Generally speaking, any means of maintaining a summary or reduced size representation of the underlying update stream may be advantageously employed within the context of the present invention. In one embodiment, a delete resistant sampling scheme (such as the JD Sketch scheme) is used. A delete resistant sampling scheme comprises any sampling scheme which takes as an input a stream of inserts and deletes of elements and outputs a distinct sample (of a given size) of the stream which is identical (with high probability) to the sample that would be obtained if the deleted elements were never inserted to begin with.

In order to guarantee that the estimate |Ê| is correct, the methodologies ensure that |E|−ε≦|Ê|.≦|E|+ε. A simple approach for ensuring this for E=Si is as follows. At each remote site j, if either of |Si,j−Ŝi,j| or |Ŝi,j−Si,j| exceeds εj, then site j sends the most recent state Si,j to the coordinator. One can easily show that this simple scheme guarantees that at all times, |E−Ê|≦ε and |Ê−E|≦ε, and is thus correct. For instance, consider an element e in E−Ê. The element must belong to Si,j−Ŝi,j at some site j, and since |Si,j−Ŝi,j|≦εj, it must be counted against the error budget εj at site j. As a result, since Σjεj=ε, the methodologies get that |E−Ê|≦ε. Further, since |E|−|Ê|≦|E−Ê|, the methodologies obtain that |E|−|Ê|≦ε. Similarly, it is possible to show that |Ê|−|E|≦ε, and thus the estimate |Ê| is within ε error of |E|.

The above scheme associates a charge φj(e) with each element e at every remote site j, and if the total of these charges exceed ej, then the remote site communicates state information to the coordinator. More formally, let φj+(e)=1 if e∈(Si,j−Ŝi,j), φj(e)=1 if e∈(Ŝi,j−Si,j), and φj+(e)=φj(e)=0, otherwise. As a result, Σeφj+(e)=|Si,j−Ŝi,j| and Σeφj(e)=|Ŝi,j−Si,j|. Thus, there is a message exchange between site j and the coordinator if either Σeφj+(e)>εj or Σeφj(e)>εj.

In the above scheme, element charges are computed based entirely on the local state information available at each site. The methodologies next show that by exploiting global knowledge about element e, the methodologies can reduce the charge φj(e) for e, and as a consequence, the overall message traffic between remote sites and the coordinator.

A key observation made by the present inventors is that in many stream-oriented domains, there will be a certain subset of globally “popular” elements. For instance, in an IP network monitoring scenario, destination IP addresses corresponding to popular Web sites like Yahoo, Amazon, Google etc. will frequently appear in the flow records collected from network routers. An important characteristic of each such globally popular element is that, at any given point in time, it will appear in substreams at multiple sites although the exact sites that contain the element may vary over time.

Assuming, for a popular element e, each remote site (approximately) knows the number of substream states Ŝi,j that contain e. Specifically, for stream Si, let θi(e)≧1 be a lower bound on the number of sites for which e appears in the Ŝi,j states communicated to the coordinator. Then, even if element e is newly inserted into Si,j at site j (that is, e∈(Si,j−Ŝi,j)), the methodologies should not charge for it since e is already in Ŝi and, thus, cannot possibly be in Si−Ŝi. Similarly, if e is deleted from Si,j at site j (that is, e∈(Ŝi,j−Si,j)), then in order for e to be deleted from Si and thus be in Ŝ1−Si,e, must be deleted from Si,j at least θi(e) sites. Thus, it suffices to charge φj(e)=1/θi(e) (instead of 1) for the local delete of e at each site j. This way, if local deletions at the ≧θi(e) sites cause e to be globally deleted (that is, e∈(Ŝi−Si)), then the cumulative charge Σjφj(e) for e across the sites is at least 1. As a result, since Σeφj(e)≦εj at each site j, this total charge of 1 is counted against the various εjs, and correctness is not compromised.

2. Distributed Algorithm.

The details of a distributed scheme for producing a correct cardinality estimate |Ê| at the coordinator will now be described. For each element e∈Ŝi, the coordinator maintains a count Ci(e) of the number of remote sites whose states Ŝi,j contain the element e. Elements whose counts Ci(e) exceed a threshold τ are considered to be frequent, and added to a frequent element set Fi for stream Si. The coordinator also uses the count Ci(e) for each element e∈Fi to compute a lower bound threshold θi(e) such that the invariant Ci(e)≧θi(e) always holds. It continuously communicates changes in the frequent element sets Fi and the threshold values θi(e) to the remote sites so that these can be used to compute element charges φj(e) at the sites (as described in the previous subsection). Thus, in order to keep the message overhead under control, the coordinator does not send exact element counts Ci(e) to remote sites, but rather disseminates the thresholds, as described in the paragraph below. Each remote site j keeps track of the sum of local element charges Σeφj(e) in variable Φj. Further, when Φj becomes greater than εj, it sends the deltas Δi+=Si,j−Ŝi,j and Δii,j−Si,j that capture the local state changes for substream Si,j since site j last transmitted state information to the coordinator. (Note that the deltas are sets and not multi-sets).

Coordinator Site Actions.

FIG. 2 depicts the actions performed by the coordinator (e.g., Site 0 of FIG. 1) when it receives the deltas Δi+ and Δi for substream Si,j from site j. The coordinator employs the received deltas to first update element counts Ci(e) and the stream state Ŝi stored at the coordinator. As previously noted, the sets Ŝi are used to generate the final estimate |Ê|. It then uses the new counts Ci(e) to adjust the frequent element set Fi, and the threshold values θi(e) for frequent elements. It also informs all the remote sites of changes to Fi and θi(e) by sending them “make frequent” and “adjust threshold” control messages, which trigger the remote sites to apply the same changes to their local copies of Fi and θi(e). The control messages thus ensure that the values of Fi and θi(e) are synchronized between the coordinator and remote sites.

The correctness of the distributed scheme is noted by the fact that for each element e∈Fi, the threshold value θi(e) is always a lower bound on the number of sites j for whom e is in the local state Ŝi,j sent to the coordinator. Thus, the scheme for modifying Fi and θi(e) needs to preserve the invariant Ci(e)≧θi(e) while controlling the number of messages between the coordinator and remote sites. Clearly, to maintain the invariant, the coordinator needs to send messages to all sites every time the count Ci(e) drops below the current threshold θi(e) for an element e∈Fi. Consequently, in order to prevent minor fluctuations in the value of Ci(e) from generating excessive amounts of control message traffic, one strategy is to try and keep a sufficient gap between Ci(e) and θi(e). Thus, for instance, if Ci(e) becomes less than θi(e), then the methodologies simply halve the value of θi(e) (or reduce by some amount other than a factor of 2). Similarly, the methodologies double (or increases by some amount other than a factor of 2) the value of θi(e) only when Ci(e) exceeds 4θi(e), and (conservatively) consider an element to be frequent only if Ci(e) exceeds 2τ. It is noted that other schemes can be used to propagate global knowledge of element counts, as long as the invariant Ci(e)≧θi(e) is satisfied. For efficiency purpose, it may be desirable to choose schemes that minimize the number of control messages used to propagate the global count information.

An additional mechanism used in optional embodiment of the invention has been found by the inventors to be effective for keeping the volume of control messages low. Specifically, to double θi(e) only after the count Ci(e) is somewhat stable (that is, has stayed above 3θi(e) for a certain time period after crossing 4θi(e)). Using this strategy, the number of control messages is relatively insensitive to the value of the threshold parameter τ. Finally, observe that while increasing θi(e) is not required for preserving the invariant Ci(e)≧θi(e), larger θi(e) values are key to reducing the charges φj(e) that sites incur for elements.

Remote Site Actions.

FIG. 3 depicts the actions performed by a remote site j (e.g., the jth remote site of FIG. 1) when an element e is inserted into or deleted from Si,j (due to a stream update), or the frequent set Fi or threshold value θi(e) gets modified (due to a “make frequent” or “adjust threshold” control message for e from the coordinator). Essentially, remote site j computes new charges φj+(e) and φj(e) for e, and appropriately adjusts the total site charges Φj+ and Φj. Further, if either of these charges exceeds εj, the deltas for all substreams Si,j are sent to the coordinator; thus, Ŝi,j=Si,j, and consequently, all charges φj(e) are reset to 0. It is noted that sending the deltas for all ‘other’ substreams to the coordinator is not required when the expression E=Si since there is only one substream at each site, but is needed for the more general set expressions considered in the next section.

The procedure COMPUTECHARGE in FIG. 3 is adapted for use in the single stream case (that is, E=Si). Described later are alternate charge computation procedures that apply to general set expressions. Generally speaking, COMPUTECHARGE associates a charge of 1 for non-frequent elements that are newly inserted into or deleted from Si,j since the last message to the coordinator. For frequent elements e∈Fi, charge φj+(e)=0 if e is newly inserted, and charge φj(e)=1/θi(e) if e is locally deleted.

Correctness Argument.

For ease of exposition, in the arguments pertaining to the correctness of the distributed scheme, it is assumes that all message transmissions and the actions they trigger are performed instantaneously. While not a perfect assumption, the scheme can be extended to simulate such an instantaneous execution (at a logical level) by having sites send special acknowledgements for messages once all the actions triggered by the messages have completed. Additional details are provided in U.S. patent application entitled METHOD FOR DISTINCT COUNT ESTIMATION OVER JOINS OF CONTINUOUS UPDATE STREAMS, Ser. No. 10/957,185 which is incorporated herein by reference in its entirety.

The charges φj+(e) and φj(e) computed by COMPUTECHARGE can be shown to satisfy the following two invariants:
For each e∈E−Ê, Σjφj+(e)≧1  (equation 1)
For each e∈Ê−E, Σjφj(e)≧1  (equation 2)

Thus, the distributed scheme is correct because it can be shown that Equation (1) implies that |E|−ε≦|Ê| and Equation (2) implies that |Ê|≦|E|+ε. Moreover, as discussed earlier, the space usage of the distributed algorithm can be reduced by storing a compact sketch synopsis for each substream Si,j instead of the entire substream state. This scheme provides probabilistic as opposed to deterministic error guarantees. For instance, it can maintain a (delete-resistant) distinct sample for each substream, and use the substream samples in place of the substream states in our distributed scheme.

Estimating Cardinality of Arbitrary Set Expressions

The previously described methodologies will now be applied to the problem of estimating (to within e absolute error) the cardinality of an arbitrary set expression E involving the distributed update streams S0, . . . , Sn-1. The distributed scheme for general set expressions is identical to the scheme for single streams except for the charging procedure COMPUTECHARGE. Thus, as previously noted, for each stream Si, the coordinator maintains the states Si, the frequent sets Fi, and the threshold values θi(e) for the number of sites j whose shipped state Ŝi,j contains element e. The cardinality estimate at the coordinator is |Ê|, where Ê is the result of evaluating expression E using Ŝi instead of Si. The coordinator processes the deltas from a remote site for an arbitrary stream Si as described in procedure COORDINATOR (see FIG. 2). Similarly, site j executes the actions described in procedure REMOTE (see FIG. 3) every time there is a change in the substream state Si,j, the frequent set Fi, or the local threshold value θi(e).

In one embodiment of the charging procedure for the single stream case, the methodologies charged 1 for inserts and deletes of elements e∉Fi, and if e∈Fi, inserts were free (i.e., not charged) and deletes were charged 1/θi(e). However, when E contains multiple streams, computing the charge φj(e) for an element e is more involved since e may be concurrently inserted/deleted from more than one substream Si,j at site j. In one embodiment, this complication is overcome by setting the charges φj+(e)=φj(e)=1 if, for any of the substreams Si,j, either e∈(Si,j−Ŝi,j) or e∈(Ŝi,j−Si,j). However, while this scheme is correct it may be too conservative in some applications, and may end up overcharging in some situations. This, in turn, could lead to frequent state transmission messages from remote sites to the coordinator. It is noted that any scheme satisfying the invariants of equations 1 and 2 can be used to determine element charges. However, for efficiency purposes (i.e., to minimize communication) it is desired to use a scheme that allocates as low a charge as possible to inserts and deletes of elements.

### Example 1

Consider distributed streams S1, S2, and S3, and let expression E=S1∩(S2−S3). For element e at site j, let e∈Ŝ3,j and e∈S3,j. Clearly, e∈Ŝ3 and e∈S3, and thus e∉Ê and e∉E. As a result, even if e∈(Ŝ1,j−S1,j) or e∈(S2j−Ŝ2,j), the methodologies should not charge for element e at site j since e cannot possibly be in either E−Ê or Ê−E; thus, based on the semantics of expression E, setting the charges φj+(e)=φj(e)=0 will still ensure correctness.

As will be discussed below, for an arbitrary set expression E, the methodologies focus on the problem of computing the minimum possible charges φj+(e) and φj(e) for a fixed element e at site j by leveraging the semantics of expression E. The various charging schemes ensure that charges φj+(e) and φj(e) satisfy Equations (1) and (2), and thus provide an accuracy guarantee of ε for the final estimate |Ê|.

The first charging method is denoted as a Model-Based Charging Scheme, and is based on enumerating models for a Boolean formula corresponding to expression E. Finding the optimal charge under the scheme can be shown to be NP Hard and, thus, has an exponential time complexity. The methodologies develop a heuristic that at the expense of overcharging in some situations described later below is able to eliminate model enumeration altogether, and bring down the time complexity so that it is polynomial in the number of streams.

The methodologies developed by the inventors provide that a stream Si has a local state change at site j if either e∈(Si,j−Ŝi,j) or e∈(Si,j−Ŝi,j). Similarly, the methodologies provide that a stream Si has a global state change if either e∈(Si−Ŝi) or e∈(Si−Ŝi).

A Model-Based Charging Scheme

The charging procedure first constructs a Boolean formula Ψj that captures the semantics of expression E and local stream constraints at each site j. It then defines the charge φj(e) at site j in terms of the charges for models M that satisfy Ψj.

In one embodiment to construct the Boolean formula, for each stream Si, let pi and {circumflex over (p)}i be Boolean variables with semantics e∈Si and e∈Ŝi, respectively. The methodologies construct two Boolean formulae Ψj+ and Ψj over the variables pi and {circumflex over (p)}i. Intuitively, Ψj+ and Ψj specify the conditions that stream states Si and Ŝi must satisfy for e∈(E−Ê) and e∈(Ê−E), respectively. The formulae also capture constraints on Si and Ŝi due to local knowledge at site j of the substream states Si,j, Ŝi,j, and threshold values θi. For example, if e∈Si,j, then it must be the case that e∈Si (since Si=∪jSi,j), and thus, variable pi must be true.

The formulae Ψj+ and Ψj are built using the following three formulae: (1) an Expression formula FE representing the logic of expression E, (2) State formulae Ĝj,Gj that model the local knowledge that site j has about stream states Si and Ŝi, and (3) a Threshold formula H that captures the constraints due to the thresholds θi for each stream Si.

Expression Formula.

The expression formula FE is constructed recursively as follows:

1. For every stream in Si in E, the methodologies replace its occurrence by the Boolean variable pi;

2. The expression E1∪E2 is translated as FE1FE2;

3. The expression E1∩E2 is translated as FE1FE2; and

4. The expression E1−E2 is translated as FE1(FE2).

For example, the set expression E=S1∩(S2−S3) is translated into the Boolean formula FE=pi(p2p3). It is then seen that element e∈E if FE is true for the stream states Si. For instance, e∈S1∩(S2−S3) if e∈S1(e∈S2e∉S3). Formula {circumflex over (F)}E is constructed similarly, except that variables pi are replaced by {circumflex over (p)}i.

State Formula.

The state formulae Gj and Ĝj are conjunctions of a subset of the Boolean variables pi and {circumflex over (p)}i, respectively. In one embodiment, if e∈Si,j, then variable pi is added to Gj. Thus, Gj captures the constraints on streams Si for whom the methodologies can infer that e∈Si based on local information that e∈Si,j at site j. Similarly, the methodologies construct Ĝj by adding variable {circumflex over (p)}i to it if e∈Ŝi,j. Note that Gj and Ĝj may be different for the various remote sites depending on the substream states at each site.

Threshold Formula.

The threshold formula H only applies to Boolean variables {circumflex over (p)}i. If e∈Fi for stream Si, then the methodologies add variable {circumflex over (p)}i to H. Thus, H captures the constraints on stream states Ŝi for whom the methodologies can deduce that e∈Ŝi from the frequent element sets. Note that formula H is identical at all sites since Fi is the same at all sites. That is, the above protocol generally is adapted to the condition in which the set of frequent elements Fi is identical across all sites. However, in an alternate protocol the set of frequent elements Fi,j need not be identical across all sites. In this alternate protocol, the coordinator needs to maintain the invariant Ci(e)≧θi,j(e) for all j.

The formulae Ψj+ and Ψj at site j are constructed as follows.
Ψj+=({circumflex over (F)}EFE)(ĜjGjH)
Ψj=({circumflex over (F)}EFE)(ĜjGjH)

The formulae Ψj+ and Ψj comprise two parts. The first part, involving FE and {circumflex over (F)}E, captures the conditions for one of e∈(E−Ê) or e∈(Ê−E) to hold. The second part (ĜjGjH) specifies the constraints on stream states Ŝi and Si due to local knowledge at site j of substream states and frequent element sets. Thus, for the Boolean formula Ψj+, it follows that e∈(E−Ê) if Ψj+ is true for stream states Ŝi,Si. Consequently, if Ψj+ is unsatisfiable, then it is impossible that e∈(E−Ê), and so the methodologies can set φj+(e)=0. Similarly, if Ψj is unsatisfiable, then charge φj (e)=0.

Revisiting Example 1, where E=S1∩(S2−S3), and element e∈Ŝ3,j and e∈S3,j, the methodologies get that Ψj+=({circumflex over (p)}1{circumflex over (p)}2{circumflex over (p)}3)(p1p2p3)({circumflex over (p)}3p3). It is noted that Ψj+ is unsatisfiable (due to p3p3), and thus, charge φj+(e)=0. In the following discussion, the methodologies show how models for Ψj can be used to compute the charges φj+(e) when Ψj is satisfiable.

Computing Charges Using Formula.

As an overview, consider the problem of computing the charge φj+(e). For an arbitrary Boolean formula over {circumflex over (p)}i,pi, the methodologies define a model to be an arbitrary subset of ∪i{pi,{circumflex over (p)}i}. Each model M assigns truth values to variables pi,{circumflex over (p)}i with variable pi({circumflex over (p)}i) being assigned true if pi∈M (resp., {circumflex over (p)}i∈M); otherwise, pi(resp., {circumflex over (p)}i I) is assigned false. The methodologies say that model M satisfies a Boolean formula if the formula evaluates to true for the truth assignment specified by M. For example, model {{circumflex over (p)}1,p2} satisfies the formula {circumflex over (p)}1p2, but the model {{circumflex over (p)}1} does not. Now, each model M represents a specific scenario for states Ŝi,Si. Essentially, e∈Si(e∈Ŝi) if pi∈M (resp., {circumflex over (p)}i∈M). If e∈(E−Ê) for stream states Sii, then the model corresponding to these states must satisfy Ψj+. Further, every model M that satisfies Ψj+ represents (from the local viewpoint of site j) a possible scenario for states Ŝi,Si that is consistent with local substream states at site j, and in which e∈(E−Ê).

This model-based approach assigns a charge φj(M) to each model M that satisfies Ψj+ at site j. Furthermore, since as far as site j is concerned, any of these models can potentially occur and cause e∈(E−Ê), the methodologies set charge φj+(e) as follows.
φj+(e)=max{φj(M): Model M satisfies Ψj+}  (equation 3)

As previously noted, if e∈(E−Ê), then Σjφj+(e)≧1. Thus, by choosing the charge φj(M) for each model M such that Σjφj+(M)≧1 if M were to occur, the methodologies can ensure that Σjφj+(e)≧1 if e∈(E−Ê) due to some model M that satisfies Ψj+.

To compute the charge φj(M) for a model M that satisfies Ψj+ let P be the set of streams Si such that exactly one of pi or {circumflex over (p)}i belongs to M, i.e., either {pi,{circumflex over (p)}i}∩M={pi} or {pi,{circumflex over (p)}i}∩M={{circumflex over (p)}i}. Thus, P is the set of streams that experience a global state change in model M. In an exemplary model-based scheme, site j selects a single “culprit” stream Si from P using a selection mechanism that satisfies the following property:

UNIFORM CULPRIT SELECTION PROPERTY: Given a model M and a set P of streams with global state changes in M, every site selects the same culprit stream Si∈P for M.

As further discussed below, a specific culprit selection scheme satisfying the above property that attempts to minimize the magnitude of the charge φj+(e) at site j will be provided. For the selected culprit stream Si, let charge φ(Si) be defined as follows:

$ϕ ⁢ ⁢ ( S i ) = { 1 / θ i ⁡ ( e ) if ⁢ ⁢ e ∈ F i 1 otherwise ( equation ⁢ ⁢ 4 )$

The reciprocal of this charge 1/φ(Si) is the lower bound on the number of sites where stream Si must have local state changes for it to have a global state change. For instance, if e∈Fi, then for e to be in (Ŝi−Si), e must be in (Ŝi,j−Si,j) for at least 1/φ(Si)=θi(e) sites. The methodologies define the charge φj+(M) for model M in terms of the charge φ(Si) for the culprit stream Si, as follows:

$ϕ j ⁡ ( M ) = { ϕ ⁡ ( S i ) if ⁢ ⁢ culprit ⁢ ⁢ stream ⁢ ⁢ S i ⁢ ⁢ has ⁢ ⁢ a local ⁢ ⁢ state ⁢ ⁢ change ⁢ ⁢ at ⁢ ⁢ site ⁢ ⁢ j 0 otherwise ( equation ⁢ ⁢ 5 )$

Thus, the methodologies are able to ensure that if model M indeed does occur, then since the culprit stream Si has a global state change in M, at least the 1/φ(Si) sites j at which Si has local state changes, choose φj(M)=φ(Si) and thus, Σjφj(M)≧(1/φ(Si))φ(Si)≧1.

The correctness of the charging scheme follows from Lemma 1 below.

Lemma 1: Let charge φj+(e) be computed as described in Equations (3), (4) and (5), and the culprit stream Si for each model M selected using a scheme that satisfies the uniform culprit selection property. If e∈(E−Ê), then Σjφj+(e)≧1. (An analogous lemma holds for φj(e))□

Culprit Selection. For a model M, in one embodiment a culprit selection scheme is implemented as follows: lexicographically order streams Si∈P based on charge, index pairs <φ(Si),i>, and choose the smallest stream in the lexicographic ordering as the culprit. In other words, the culprit stream is the stream with the minimum charge φ(Si), with ties being broken in favor of the stream with the smallest index. Since the charge φ(Si) for stream Si is the same across all the sites, the simple culprit selection scheme satisfies the uniform culprit selection property. Thus, due to Lemma 1, the charging procedure is correct. Also, the charging procedure selects the stream with the smallest charge as the culprit for model M, it minimizes the maximum charge incurred for M across the sites.

### Example 2

Consider distributed streams S1, S2 and S3, and let expression E=S1∩(S2−S3). At some site j, let the substream states be as shown in the table below.

i = 1 i = 2 i = 3 Ŝi, j e e Si, j e e

Thus, element e is in all substream states except for Ŝ1,j and S3,j. Also, let e∈F3, e∉F1, e∉F2 and θ3(e)=4; the meaning here is that e is contained in at least 4 substream states for S3 transmitted to the coordinator. It follows that φ(S1)=φ(S2)=1 and φ(S3)=1/4. Also, the formula Ψj+ for E at site j is given as:
({circumflex over (p)}1{circumflex over (p)}2{circumflex over (p)}3)(p1p2p3)(p1{circumflex over (p)}2{circumflex over (p)}3)

Thus, for any model M that satisfies Ψj+, it must be the case that {{circumflex over (p)}3,p3}. As a result, S3∈P and since the charge φ(S3) for S3 is the smallest, it is chosen as the culprit for all models. Consequently, since S3 has a local state change at site j, φj(M)=φ(S3)=1/4 for all models M that satisfy Ψj+, and thus, the charge φj+(e)=1/4. Furthermore, since Ψj is unsatisfiable, charge φj(e)=0.

If the stream S3 does not have a local state change at site j (i.e., e is neither in Ŝ3,j nor in S3,j), then e∈F3, Ψj+ will remain the same as before and S3 will still be chosen as the culprit stream for all models M that satisfy Ψj+. However, since S3 does not have a local state change at site j, φj(M) will be 0 for the models, and thus charge φj+(e)=0.

Computational Complexity.

In order to determine the complexity of the model-based approach, the following decision problem for φj+(e) is considered.

PROBLEM (MAXIMUM CHARGE MODEL): Given expression E, site j, element e, and constant k, does there exist a model M that satisfies Ψj+ and for which φj(M)≧k?

The following theorem can be proved using a reduction from 3-SAT: The MAXIMUM CHARGE MODEL problem is NP-complete. Therefore, it follows that since φj+(e) is the maximum charge for models M that satisfy Ψj+, computing φj+(e) is intractable.

Heuristic for Charge Computation.

The model-based charging procedure enumerates all models M in the worst case, and thus, has a worst-case time complexity of O(22n). While this may be reasonable for small values of n (e.g., 3 or 4 streams), the model enumeration-based approach will clearly not scale when set expressions involve a moderately large number of streams, a scenario likely in practice. (e.g. in the Akamai case). In this section, the methodologies present a heuristic solution for computing the charges φj+(e) and φj(e) for an element e at site j. The heuristic procedure has a time complexity that is polynomial in the number of streams n, and computes identical charge values as the model-based approach as long as every stream appears at most once in the expression E. However, the heuristic may overcharge for element e in certain cases when there are duplicate occurrences of streams in expression E.

Overview.

The model-based charging procedure computes φj+(e) as the maximum stream charge φ(Si) such that (1) Si has a local state change at site j, and (2) Si is the culprit stream for some model M that satisfies Ψj+ (Recall that the culprit stream Si for model M is the stream with the smallest charge, index pair <φ(Si),i> from among streams with a global state change in M.) Thus, for a stream Si, if the methodologies can develop a test for quickly determining if Si is the culprit stream for some model that satisfies Ψj+, then the methodologies can speed up the computation of charge φj+(e). This is a key idea underlying the heuristic.

Let T denote the expression tree for E with leaves and internal nodes corresponding to streams and set operators in E, respectively. For each node V of T, let E(V) be the subexpression for the subtree rooted at node V, and {circumflex over (F)}E(V) and FE(V) be the formulae for E(V) as defined in Section 4.1.1. For example, in the expression tree for E=S1∩(S2−S3), the subexpression for the subtree rooted at V=“−” is E(V)=S2−S3, and FE(V)=p2p3. Now, in order to quickly test if a stream Si is the culprit stream for some model satisfying Ψj+, the heuristic keeps track of culprit streams (for models) at each node of the expression tree using the notion of charge triples. Formally, suppose that M is a model that satisfies the local constraints (GjĜjH) at site j. At node V in T, the methodologies define the charge triple for model M, denoted by t(M,V), as the triple (a,b,x) with the following values:

If M satisfies {circumflex over (F)}E(V), then bit a=1; otherwise, a=0.

Similarly, if M satisfies FE(V), then bit b=1; otherwise b=0.

If none of the streams in V's subtree have a global state change in model M, then x=∞. (The charge, index pair <φ(S),∞> is considered to be greater than <φ(Si),i> for all streams Si) Otherwise, x is the index of the culprit stream for M in V's subtree; that is, x=i, where Si is the stream with the smallest charge, index pair <φ(Si),i> from among streams (in V's subtree) with a global state change in M.

For example, consider a model M that satisfies {circumflex over (F)}E(V)FE,V (in addition to local constraints). Then, if the culprit stream Si for M in V's subtree is defined, the charge triple t(M,V) for M at node V is (0,1,i); otherwise, t(M,V)=(0,1∞). The charging heuristic computes, in a bottom-up fashion, a set C of charge triples for each node V of T. Furthermore, it ensures that for every model M that satisfies (GjĜjH), the computed set C for node V contains the triple t(M,V). Here, it is important to note that the size of C (in the worst case) is linear in the number of streams n—this is because there are at most O(n) distinct charge triples t(M,V) (one for each combination of a,b and x).

Now, consider the charge triple set C for the root V of T. Clearly, since E(V)=E, if a model M satisfies Ψj+=({circumflex over (F)}EFE)jGjH) and has culprit stream Si, then triple t(M,V)=(0,1,i) must be in C. Thus, the methodologies can quickly determine if a stream Si is the culprit stream for some model satisfying Ψj+ by checking if C contains the triple (0,1,i). Hence, by selecting φj+(e) to be the maximum stream charge φ(Si) such that (1) Si has a local state change at site j, and (2) triple (0,1,i)∈C, the methodologies can ensure that Model M satisfies Ψj+} and thus, due to Lemma 1, the charging heuristic is correct.

The following example illustrates the execution of a bottom up charge triple computation (a more detailed explanation is provided below with respect to FIG. 7 and its associated text).

### Example 3

Consider the distributed scenario described in Example 2 involving streams S1, S2 and S3, and expression E=S1∩(S2−S3). Suppose that element e is in all substream states except for Ŝ1,j and S3,j, and also e∈F3 and θ3(e)=4. Thus, φ(S1)=φ(S2)=1 and φ(S3)=1/4. FIG. 5 graphically illustrates the charge triple sets computed for the nodes of the expression tree for E by the charging heuristic.

The charge triple set for each leaf Si is first initialized to contain t(M,Si) for models M that satisfy local constraints. For example, since e is in S1,j but not in Ŝ1,j, it follows that p1∈Gj and thus for models M that satisfy (GjĜjH), pi∈M but {circumflex over (p)}1 may or may not be in M; so the charge triple set for S1 contains the triples (1,1,∞) (for models that contain {circumflex over (p)}1) and (0,1,1) (for models that do not contain {circumflex over (p)}1).

Next, the charge triple (a,b,x) for each internal node V is computed by combining pairs of triples (a1,b1,x1) and (a2,b2,x2) from V's two children. Suppose that op is the Boolean operation corresponding to the set operation for V, the Boolean operations for ∪,∩ and − are , and , respectively. Then a=a1 op a2,b=b1 op b2, and x is set to one of x1 or x2, whichever has the smaller charge, index pair <φ(Sx,i),xi> For example, the charge triples for node “−” of T are generated by combining triples for nodes S2 and S3. Triples (1,1,∞) and (1,1,∞) when combined result in the triple (0,0,∞) (since 11=0). Similarly, combining triples (1,1,∞) and (1,0,3) results in the triple (1,0,3) (since 10=1, and <φ(S3),3> is less than <φ(S),∞>). Finally, the sets for Si and “−” are combined to obtain the charge triple set C for the root node “∩”, which is then used by the charging heuristic to compute the charges φj+(e) and φj(e). Since C contains the triple (0,1,3) and S3 has a local state change at site j, charge φj+(e)=φ(S3)=1/4. Further, since C does not contain a triple of the form (1,0,x),φj(e)=0.

Correctness Argument. The following lemma establishes the correctness of] the charging heuristic.

Lemma 2: Consider a model M that satisfies local constraints (GjĜjH) at site j. Then, for an arbitrary node V in T, charge triple t(M,V) is in the set of charge triples for V computed by the heuristic.

Computational Complexity. The maximum size of a charge triple set for a node is O(n), and thus, the worst-case time complexity of the charging heuristic can be shown to be O(n2s), where s is the size of set expression E. By using the following pruning optimization, the maximum size of a charge triple set for a node can be reduced to O(1), and thus the time complexity of our charging heuristic can be reduced to O(s):

At each node having charge triples of the form (a,b,x), at most two distinct charge triples are maintained for each of the four possible combinations of values a and b can take:

• (1) The charge triple with the largest charge index pair φ(Sx),x; and
• (2) The charge triple with the largest charge index pair φ(Sy),y amongst all charge triples where the index y corresponds to a culprit stream having a local state change. (Note that the charge triples corresponding to (1) and (2) could be identical.)

The following lemma implies that when E contains no duplicate streams, the heuristic returns the same charge values as the model based approach.

Lemma 3: Let E be a set expression in which each stream appears at most once. For an arbitrary node V in T, charge triplet is in the set of charge triples for V computed by the heuristic if and only if t=t(M,V) for some model M satisfying (GjĜjH) at site j.

The following discussion is directed to more detailed descriptions of alternate embodiments of the invention.

Charging Schemes

Correctness Invariants

The charging schemes presented herein (such as the procedure COMPUTECHARGE provided in FIG. 3, the model-based charging scheme and the tree-based charging heuristic) satisfy the following two invariants:
For each e∈E−Ê, Σjφj+(e)≧1  (equation 6)
For each e∈Ê−E, Σjφj(e)≧1  (equation 7)

These invariants are necessary for correctness of any distributed charging scheme. In this section, it will be shown that maintaining these invariants are also sufficient to ensure correctness of the charging schemes. In particular, Equation (7) implies that |Ê|≦|E|+ε and (analogously) Equation (6) implies that |E|−ε≦|Ê|. Suppose Equation (7) holds. Then, we have

$ E ^  ≤ ⁢  E  +  E ^ - E  ≤ ⁢  E  + ∑ e ∈ E ^ - E ⁢ ∑ j ⁢ ϕ j - ⁡ ( e ) / ⁢ * ⁢ S ⁢ ince ⁢ ⁢ Equation ⁡ ( 7 ) ⁢ holds * / ≤ ⁢  E  + ∑ j ⁢ ∑ e ∈ E ^ - E ⁢ ϕ j - ⁡ ( e ) ≤ ⁢  E  + ∑ j ⁢ Φ j - ≤ ⁢  E  + ∑ j ⁢ ɛ j / ⁢ * ⁢ S ⁢ ince ⁢ ⁢ Φ j - ⁢ ɛ j ⁢ * / ≤ ⁢  E  + ɛ$

In an analogous manner, Equation (6) implies that |E|−ε≦|Ê|.

FIG. 6-7 depicts a pseudo code representations of remote site charge computation routines according to various embodiments of the invention.

Pseudo Code for Model-Based Charging Algorithm

FIG. 6 depicts a model-based algorithm for computing the charges φj+(e) and φj(e) for element e at site j. Procedure COMPUTECHARGE2 invokes CHARGE to determine the charges φj(M) for models M that satisfy Ψj, and selects φj(e) to be the maximum of the model charges. For a model M, CHARGE selects a single culprit stream Si from the set P of streams as described in Step 2. Essentially, streams Si∈P are lexicographically ordered based on charge, index pairs <φ(Si)>, and the smallest stream in the lexicographic ordering is chosen as the culprit. In other words, the culprit stream is the stream with the minimum charge φ(Si), with ties being broken in favor of the stream with the smallest index. Clearly, since the charge φ(Si) for stream Si is the same across all the sites, our simple culprit selection scheme satisfies the uniform culprit selection property. Thus, due to Lemma 1, our charging procedure COMPUTECHARGE2 is correct. Also, observe that since procedure CHARGE selects the stream with the smallest charge as the culprit for model M, it minimizes the maximum charge incurred for M across the sites.

Pseudo Code of Procedure for Tree-Based Charging Heuristic

TABLE 1 Charge Triple Initialization Table Stream State Charge Triples (e ∈ Ŝi, j  e ∈ Fi)  e ∈ Si, j (1, 1, ∞) e ∈ Ŝi, j  e ∈ Fi  e ∉ Si, j (1, 1, ∞) (1, 0, i) e ∈ Ŝi, j  e ∉ Fi  e ∉ Si, j (1, 1, ∞) (1, 0, i) e ∉ Ŝi, j  e ∈ Fi  e ∉ Si, j (1, 1, ∞) (1, 0, i) e ∉ Ŝi, j  e ∉ Fi  e ∈ Si, j (1, 1, ∞) (0, 1, i) e ∉ Ŝi, j  e ∉ Fi  e ∉ Si, j (1, 1, ∞) (1, 0, i) (0, 1, i) (0, 0, ∞)

FIG. 7 depicts an expression tree-based charging heuristic. Specifically, the procedure COMPUTERCHARGE3 relies on procedure CHARGETRIPLES to recursively compute charge triple sets C for each node V of T from the charge triple sets for the node's two children. If V is a leaf corresponding to stream Si, then C is initialized as shown in Table 1 based on the current local state Si,j of the stream at site j, the stream state Ŝi,j communicated by site j to the coordinator, and the values of Fii for stream Si. Essentially, since FE(V)=pi, {circumflex over (F)}E(V)={circumflex over (p)}i, models satisfying (GjĜjH) can be grouped under one of four categories depending on whether or not {circumflex over (p)}i∈M, pi∈M. The models M in each category map to the same charge triple (a,b,x), where a=1 iff {circumflex over (p)}i∈M and b=1 iff pi∈M. Furthermore, if a=b, then since Si does not have a global state change in M, x=∞. On the other hand, if a≠b, then x=i since Si is the (only) culprit stream for M. For example, suppose that e∈Ŝi,j, e∉Si,j and e∈Fi. Then, {circumflex over (p)}i∈Ĝj, {circumflex over (p)}i∈H, and for the models M that satisfy (GjĜjH), we have that {circumflex over (p)}i∈M, while pi may or may not belong to M. Thus, for models M that contain pi, the charge triple is (1,1,∞), and for those that do not, the charge triple is (1,0,i).

Next, consider the case when V is an internal node for one of the set operators in {∪,∩,−}. Let C1 and C2 be the charge triple sets for V's two children V1 and V2. Then, for a model M satisfying (GjĜjH), the charge triple t(M,V) in the charge triple set C at node V can be obtained by combining the charge triples t (M,V1)∈C1 and t (M,V2)∈C2 for M at nodes V1 and V2, respectively. To see this, suppose that t(M,V1)=(a1,b1,x1) and t(M,V2)=(a2,b2,x2). Also, let op=bool_op(V), where bool_op(∪)=V, bool_op(∩)= and bool_op(−)= are Boolean operators corresponding to the set operators.

From the construction of FE(V), it is known that FE(V)=FE(V1)op FE(V2) and thus M satisfies FE(V) if (M satisfies FE(V1) op M satisfies FE(V2)). As a result, if t(M,V)=(a,b,x), then a=a1 op a2 and b=b1 op b2. Further, the culprit stream for M in V's subtree is one of Sx1 (under node V1) or Sx2 (under node V2) depending on which stream has a smaller charge, index pair. Note that if an index, say x1 is equal to ∞, implying that there is no culprit under node V1, then the culprit under V is the same as the culprit for V2 (assuming it is defined).

While the foregoing is directed to various embodiments of the present invention, other and further embodiments of the invention may be devised without departing from the basic scope thereof. As such, the appropriate scope of the invention is to be determined according to the claims, which follow.

## Claims

establishing, for each of said update stream sites, a respective site charge budget, said site charge budget being determined by a coordinator site; and
communicating, by the coordinator site, which comprises at least a processor, said site charge budgets toward said update stream sites;
each site charge budget being adapted to control an initiation of update stream transmission by restricting transmission of said update stream until a sum of element charges at the respective update stream site exceeds the site charge budget, said element charges being attributed to stream element updates;
wherein said update stream elements comprise insertion elements and deletion elements, each of said insertion elements and deletion elements being associated with element charges wherein said insertion element charges and said deletion elements charges are determined in a manner satisfying the following invariants:

For each e∈E−Ê, Σjφj+(e)≧1; and

For each e∈Ê−E, Σjφj(e)≧1.
where e is the specific data element whose frequency changes;
E is a set expression;
Ê is the result of evaluating the expression E;
φj (e) is a charge with each element e at every remote site j.
establishing, for each of said update stream sites, a respective site charge budget, said site charge budget being determined by a coordinator site; and
communicating, by the coordinator site, which comprises at least a processor, said site charge budgets toward said update stream sites;
each site charge budget being adapted to control an initiation of update stream transmission by restricting transmission of said update stream until a sum of element charges at the respective update stream site exceeds the site charge budget, said element charges being attributed to stream element updates;
wherein said update stream elements comprise insertion elements and deletion elements, each of said insertion elements and deletion elements being associated with element charges wherein said insertion element charges and said deletion elements charges are determined in a manner satisfying the following invariants:

For each e∈E−Ê, Σjφj+(e)≧1; and

For each e∈Ê−E, Σjφj(e)≧1.
where e is the specific data element whose frequency changes;
E is a set expression;
Ê is the result of evaluating the expression E;
φj (e) is a charge with each element e at every remote site j.

For each e∈E−Ê, Σjφj+(e)≧1; and

For each e∈Ê−E, Σjφj(e)≧1.
2. The method of claim 1, wherein a total error budget comprises the sum of the site charge budgets allocated to each of said update sites, said total error budget being allocated among said update sites in proportion to stream update rates at the sites.
3. The method of claim 1, wherein a total error budget comprises the sum of the site charge budgets allocated to each of said update sites, said total error budget being allocated among said update sites in a manner satisfying the following invariant: Σjεj=ε and for all j, εj≧0.
4. The method of claim 1, wherein the element charges are determined by at least one of set expression semantics and global information of element counts.
5. The method of claim 1, wherein insertion element charges and deletion element charges are determined according to a Boolean logic formula.
6. The method of claim 5, wherein the Boolean logic formula is described as follows:

Ψj+=({circumflex over (F)}EFE)(ĜjGjH); and

Ψj=({circumflex over (F)}EFE)(ĜjGjH).

Ψj+=({circumflex over (F)}EFE)(ĜjGjH); and

Ψj=({circumflex over (F)}EFE)(ĜjGjH).
7. The method of claim 6, wherein the Boolean logic formula is used to determine element charges φj+(e) and φj(e) as follows:
$ϕ j + ⁡ ( e ) = max ⁢ { ϕ j ⁡ ( M ) ⁢ : ⁢ Model ⁢ ⁢ M ⁢ ⁢ satisfies ⁢ ⁢ Ψ j + } ; and$ $ϕ j - ⁡ ( e ) = max ⁢ { ϕ j ⁡ ( M ) ⁢ : ⁢ Model ⁢ ⁢ M ⁢ ⁢ satisfies ⁢ ⁢ Ψ j - } ;$ $wherein ⁢ :$ $ϕ j ⁡ ( M ) ⁢ ⁢ is ⁢ ⁢ given ⁢ ⁢ by ⁢ ⁢ ϕ j ⁡ ( M ) = { ϕ ⁢ ⁢ ( S i ) if ⁢ ⁢ culprit ⁢ ⁢ stream ⁢ ⁢ S i ⁢ ⁢ has ⁢ ⁢ a local ⁢ ⁢ state ⁢ ⁢ change ⁢ ⁢ at ⁢ ⁢ site ⁢ ⁢ j 1 otherwise ; ⁢ and ⁢ ⁢ ϕ ⁢ ⁢ ( S i ) ⁢ ⁢ is ⁢ ⁢ given ⁢ ⁢ by ⁢ ⁢ ϕ ⁢ ⁢ ( S i ) = { 1 / θ i ⁡ ( e ) if ⁢ ⁢ e ∈ F i ⁢ 1 otherwise .$
8. The method of claim 1, wherein insertion element charges and deletion element charges are determined according to a tree based heuristic that propagates charges in a bottom up manner starting from leaf nodes of a set expression tree, wherein the a charge at a root node is used to determine the element update charge at a given site.
9. The method of claim 8, wherein:
charge triples of the form (a,b,x) are propagated up the expression tree;
leaf nodes are initialized according to the following table:
Stream State Charge Triples (e ∈ Ŝi, j  e ∈ Fi)  e ∈ Si, j (1, 1, ∞) e ∈ Ŝi, j  e ∈ Fi  e ∉ Si, j (1, 1, ∞) (1, 0, i) e ∈ Ŝi, j  e ∉ Fi  e ∉ Si, j (1, 1, ∞) (1, 0, i) e ∉ Ŝi, j  e ∈ Fi  e ∉ Si, j (1, 1, ∞) (1, 0, i) e ∉ Ŝi, j  e ∉ Fi  e ∈ Si, j (1, 1, ∞) (0, 1, i) e ∉ Ŝi, j  e ∉ Fi  e ∉ Si, j (1, 1, ∞) (1, 0, i) (0, 1, i) (0, 0, ∞);
and
the set C={(a,b,x)} of charge triples for each internal node V is computed as follows:
combining pairs of triples (a1, b1, x1) and (a2,b2, x2) from V′s two children nodes;
calculating a=a1 op a2,b=b1 op b2, and setting x to one of x1 or x2, whichever has the smaller charge index pair <φ(Sx,i),xi>, where op is the Boolean operation corresponding to the set operation for V.
charge triples of the form (a,b,x) are propagated up the expression tree;
leaf nodes are initialized according to the following table:
and
the set C={(a,b,x)} of charge triples for each internal node V is computed as follows:
combining pairs of triples (a1, b1, x1) and (a2,b2, x2) from V′s two children nodes;
calculating a=a1 op a2,b=b1 op b2, and setting x to one of x1 or x2, whichever has the smaller charge index pair <φ(Sx,i),xi>, where op is the Boolean operation corresponding to the set operation for V.
combining pairs of triples (a1, b1, x1) and (a2,b2, x2) from V′s two children nodes;
calculating a=a1 op a2,b=b1 op b2, and setting x to one of x1 or x2, whichever has the smaller charge index pair <φ(Sx,i),xi>, where op is the Boolean operation corresponding to the set operation for V.
10. The method of claim 1, wherein:
each of said plurality of update stream sites maintains a summary of its respective update stream, said summary formed using a delete resistant scheme for maintaining distinct sample.
each of said plurality of update stream sites maintains a summary of its respective update stream, said summary formed using a delete resistant scheme for maintaining distinct sample.
11. The method of claim 10, wherein said summary structure comprises a JD sketch synopsis data structure.
12. A distributed framework for providing a set expression estimation, comprising:
allocating, by a coordinator site, error budgets to each of a plurality of remote sites according to a first set of invariants, each of said remote sites providing a respective update stream to the coordinator site, said update streams including at least one of insert elements and delete elements; and
allocating update stream insert element and delete element charges according to a second set of invariants comprising:

For each e∈E−Ê, Σjφj+(e)≧1; and

For each e∈Ê−E, Σjφj(e)≧1.
where e is the specific data element whose frequency changes;
E is a set expression;
Ê is the result of evaluating the expression E;
φj e) is a charge with each element e at every remote site j.
each of said remote sites accumulating element charges in response to the occurrence of respective update and delete elements;
each of said remote sites transmitting a respective update stream to said coordinator site when an accumulation of element charges exceeds a respective error budget.
allocating, by a coordinator site, error budgets to each of a plurality of remote sites according to a first set of invariants, each of said remote sites providing a respective update stream to the coordinator site, said update streams including at least one of insert elements and delete elements; and
allocating update stream insert element and delete element charges according to a second set of invariants comprising:

For each e∈E−Ê, Σjφj+(e)≧1; and

For each e∈Ê−E, Σjφj(e)≧1.
where e is the specific data element whose frequency changes;
E is a set expression;
Ê is the result of evaluating the expression E;
φj e) is a charge with each element e at every remote site j.
each of said remote sites accumulating element charges in response to the occurrence of respective update and delete elements;
each of said remote sites transmitting a respective update stream to said coordinator site when an accumulation of element charges exceeds a respective error budget.

For each e∈E−Ê, Σjφj+(e)≧1; and

For each e∈Ê−E, Σjφj(e)≧1.
13. The distributed framework of claim 12, wherein:
said coordinator site using update streams transmitted by said remote sites to estimate a set expression and its cardinality.
said coordinator site using update streams transmitted by said remote sites to estimate a set expression and its cardinality.
14. The distributed framework of claim 12, wherein said first set of invariants comprises:
Σjεj=ε and for all j, εj≧0.
Σjεj=ε and for all j, εj≧0.
15. The distributed framework of claim 12, further comprising:
adapting at least one of said site error budgets, said insert element charges and said delete element charges in response to coordinator site global information.
adapting at least one of said site error budgets, said insert element charges and said delete element charges in response to coordinator site global information.
16. The distributed framework of claim 15, wherein said global information comprises lower bounds on a number of distinct sites at which a stream element is present.
17. The distributed framework of claim 16, wherein:
said site error budget θi(e) is reduced by a factor of 2 if Ci(e) becomes less than θi(e), and doubled if Ci(e) exceeds 4×θi(e).
said site error budget θi(e) is reduced by a factor of 2 if Ci(e) becomes less than θi(e), and doubled if Ci(e) exceeds 4×θi(e).
18. The distributed framework of claim 16, further comprising:
propagating global knowledge of element counts according to a scheme that satisfies the invariant Ci(e)≧θi(e).
propagating global knowledge of element counts according to a scheme that satisfies the invariant Ci(e)≧θi(e).
19. A system for processing set-expression cardinality queries, comprising:
a central site for establishing and communicating a site charge budget; and
a plurality of update stream sites, wherein:
each update stream site being allocated an error budget satisfying the following invariants:

For each e∈E−Ê, Σjφj+(e)≧1; and

For each e∈Ê−E, Σjφj(e)≧1.
where e is the specific data element whose frequency changes;
E is a set expression:
Ê is the result of evaluating the expression E;
φj e) is a charge with each element e at every remote site j.
said error budgets adapted to determine when the update stream site communicates stream state information to the central site, said site error budget θi (e) is reduced by a factor of 2 if Ci (e) becomes less than θi (e) and doubled if Ci (e) exceeds 4×θi (e).
each update stream site associates a charge with every stream element that is inserted or deleted since stream state was previously transmitted to the central site; and each update stream site transmits current stream state information when the sum of its respective element charges exceeds its respective error budget.
a central site for establishing and communicating a site charge budget; and
a plurality of update stream sites, wherein:
each update stream site being allocated an error budget satisfying the following invariants:

For each e∈E−Ê, Σjφj+(e)≧1; and

For each e∈Ê−E, Σjφj(e)≧1.
where e is the specific data element whose frequency changes;
E is a set expression:
Ê is the result of evaluating the expression E;
φj e) is a charge with each element e at every remote site j.
said error budgets adapted to determine when the update stream site communicates stream state information to the central site, said site error budget θi (e) is reduced by a factor of 2 if Ci (e) becomes less than θi (e) and doubled if Ci (e) exceeds 4×θi (e).
each update stream site associates a charge with every stream element that is inserted or deleted since stream state was previously transmitted to the central site; and each update stream site transmits current stream state information when the sum of its respective element charges exceeds its respective error budget.

For each e∈E−Ê, Σjφj+(e)≧1; and

For each e∈Ê−E, Σjφj(e)≧1.