Rebuilding Adobe Analytics Full Path Report With Spark

The new Analysis Workspace in Adobe Analytics is better than the old menu-driven Reporting experience in many aspects. However, some features are missing in the new Workspace, such as Path reporting.

There is a Flow visualisation in Workspace, however, it requires manually adding dimension after dimension to build and the performance is not good, it will take ages to render when there are about 5 layers of dimension and each layer is expanded to 10 or above items.

The flow visualisation is still handy, especially since it can mix different dimensions in one single flow, yet it is sometimes difficult to explain. However, the old and simple Full Path report lists out all paths for a single dimension and is not reproducible in Workspace using flow visualisation.

I don’t like the old full path report in the past and also now. The top few paths with the most visits are usually bounced traffic where visitors only viewed one single page and were gone, and some only viewed very few pages and were gone. This made the full path report not very useful. There may be some insightful paths but far away down the list.

Moreover, the old full path report is only available to pages and page URLs, but not on any other dimensions, where path analysis may also be required.

After all, I want to rebuild the full path report using Data Feed and Spark, and this could be a base for some further path analysis which is difficult to do in Analysis Workspace.


The most intuitive full path report to rebuild is the page full page report, where page information is stored in the ‘pagename’ column in the hit data file from Data Feed. Moreover, I want to full path to be limited to each visit, which can be identified by another four columns, “post_visid_high”, “post_visid_low”, “visit_num”, “visit_start_time_gmt” in the hit data.

So the idea is simple with the following data pipeline. The pagename dimension could be replaced by any dimension to get a full path report and the group by visit identifying columns can be grouped by visitor identifying columns (“post_visid_high” and “post_visid_low”) if we want to have a full path report at visitor level instead of visit.

The full path report rebuilding process

It turns out that the above can be easily implemented in Spark.


Read Data Feed Hit Data

There are only two notes in reading hit data into Spark as DataFrame:

  1. It is tab-separated
  2. Column headers are in a separate file
column_headers = spark.read.csv(os.path.join(data_feed_folder,  'column_headers.tsv'), sep=r'\t', header=True)
hit_data = spark.read.csv(files, sep=r'\t')
hit_data = hit_data.toDF(*column_headers.columns)

Filter Row With Valid ‘pagename’

I had stored the column name in the dimension variable so I could easily try to build a full path report for any dimension. The validation here is also quite simple as long as the pagename is not null.

path_by_visit = hit_data.filter(F.col(dimension).isNotNull())

Group By Visit And Get The Sequence Of ‘pagename

This one is amazingly simple with the method collect_list which returns the list of pagename per visit. There is just one note that the collect_list method is non-deterministic under distributed computing and shuffling, which means the order of returned pagename is not guaranteed. This is a problem with the full path report as it should be ordered, so I am getting both pagename and the ‘visit_page_num’ in the collect_list method where I can sort pagename in sequence to ensure they are in proper order.

path_by_visit = path_by_visit.groupBy("post_visid_high", "post_visid_low", "visit_num", "visit_start_time_gmt").agg(
F.collect_list(F.struct('visit_page_num', dimension)).alias('path_order'))

Group By Sequence And Get The Count

As mentioned above, the returned list from collect_list may not be in proper order, so I created a UDF to sort pagename by ‘visit_page_num’. To make it a bit more useful, I also added a parameter to the UDF to decide if repeating elements in the sequence should be consolidated as one or not. The repeating views on the same page, when the user reloads the same page, may not be meaningful in the full-page analysis.

One additional note is the ‘visit_page_num’ should be cast to integer type before it can be used in the ‘sorted’ function in the UDF.

@F.udf(returnType=ArrayType(StringType()))
def sorter(l, removeDuplicate):
  path_order = sorted(l, key=operator.itemgetter(0))
  path = [item[1] for item in path_order]
  if(removeDuplicate):
    path = [x[0] for x in groupby(path)]
  return path
paths = path_by_visit.select(sorter('path_order', F.lit(True)).alias('path')).groupBy('path').count()
print('Paths (no dup):', paths.count())
paths = path_by_visit.select(sorter('path_order', F.lit(False)).alias('path')).groupBy('path').count()
print('Paths: (with dup)', paths.count())

The resulting paths DataFrame has two columns, the first column is the ordered array of pagename for each full path and the second column is the number of visits to the corresponding path.

Following is the (kind of) completed code for reference.

from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import IntegerType, ArrayType, StringType

import os
import pandas as pd
import operator
from itertools import groupby

data_feed_folder = os.getenv('data.feed.folder')
dimension = 'pagename'

conf = SparkConf()
spark = SparkSession.builder.config(conf=conf).getOrCreate()

dates = pd.date_range(start_date, end_date)
files = [os.path.join(data_feed_folder, rsid, _.strftime(
    "%Y-%m-%d") + '_hit_data.tsv.gz') for _ in dates]

column_headers = spark.read.csv(os.path.join(
    data_feed_folder,  'column_headers.tsv'), sep=r'\t', header=True)
hit_data = spark.read.csv(files, sep=r'\t')
hit_data = hit_data.toDF(*column_headers.columns)

hit_data = hit_data.withColumn(
    "visit_page_num", hit_data["visit_page_num"].cast(IntegerType()))

path_by_visit = hit_data.filter(F.col(dimension).isNotNull())
path_by_visit = path_by_visit.groupBy("post_visid_high", "post_visid_low", "visit_num", "visit_start_time_gmt").agg(
    F.collect_list(F.struct('visit_page_num', dimension)).alias('path_order'))


@F.udf(returnType=ArrayType(StringType()))
def sorter(l, removeDuplicate):
    path_order = sorted(l, key=operator.itemgetter(0))
    path = [item[1] for item in path_order]
    if(removeDuplicate):
        path = [x[0] for x in groupby(path)]
    return path


paths = path_by_visit.select(
    sorter('path_order', F.lit(True)).alias('path')).groupBy('path').count()

print('Paths (no dup):', paths.count())

paths = path_by_visit.select(
    sorter('path_order', F.lit(False)).alias('path')).groupBy('path').count()

print('Paths: (with dup)', paths.count())

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *