Decompressing Concatenated GZIP Files in C# – Received From AWS CloudWatch Logs

I was writing a solution in C# to use AWS Lambda and AWS CloudWatch Logs subscriptions to process and parse log files delivered from EC2 instances. The setup is simple enough, the SSM Agent or EC2Config service delivers the log files to CloudWatch Logs. The Log Group has a Subscription that streams the log files to S3 via Kinesis Firehose. This could also be setup where the logs are streamed to a CloudWatch Logs Destination in another account that is tied to a Kinesis Firehose Delivery Stream in that account. At this point, the S3 bucket has an event that is triggered any time an object is created and that event calls a Lambda function. The Lambda function downloads the object delivered to S3 and does some stuff: this is where the complication happens.

The Kinesis Firehose stream is configured to deliver the log files in a .ZIP format. That’s fine, I use the standard ZipArchive class to unzip that file and put the contents into a temp folder. There’s only ever 1 file in the .ZIP, but I do something like the following:

using (FileStream ReadStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
{
  using (ZipArchive Archive = new ZipArchive(ReadStream))
  {
    Archive.ExtractToDirectory("/tmp");
  }
}

The output extracted to /tmp however is also compressed by AWS using the GZIP compression algorithm (no extension on the file though). I thought fine, I’ll use the native C# GZipStream class. But the results I was expecting to see when retrieving the file content from this stream didn’t match the logs I knew were being sent to CloudWatch Logs. In fact, it appeared that I was losing numerous events. The reason for this is because AWS is concatenating multiple GZIP files, each containing 1 or more log events, into a single binary object, which is what is delivered to S3 (or inside another wrapper of compression if that is what you configure for your Kinesis Firehose stream). The GZipStream doesn’t handle this concatenation, it stops after it reaches the End Of File (EOF) for the first GZIP block of bytes in the overall file, so any other log events are lost. Other technologies like 7Zip correctly manage this, but I wanted to keep my dependencies low and not have to bundle additional software in my Lambda function (which introduces more overhead).

I took a look at the byte contents of the entire GZIP file delivered by AWS looking for the beginning or EOF pattern. I noticed this pattern, which was repeated twice, and matched the set of two log events that I could see when opening it with 7Zip.

0x1F
0x8B
0x8
0x0
0x0
0x0
0x0
0x0
0x0
0x0

What does this pattern mean?

Bytes 0 – 1 : The GZIP signature, 0x1F and 0x8B are the indication that the rest of the file is a GZIP

Byte 2 : The compression method

Bytes 3-7 : Last modification time

Byte 8 : Compression Flags

Byte 9 : Operating System

The overall GZIP header is 10 bytes long. I also saw a pattern for the EOF

0xF3
0x9
0x0
0x0

But I wasn’t as confident in the consistency of this marker, so I decided to use the GZIP signature bytes to determine the beginning of files in the binary block delivered from AWS. I knew these 2 bytes would be consistent regardless of the other header information. My process then goes through all of the bytes in the file looking for this pattern, and if it is found, recording the byte index of the first matching byte. Then I chunk up the source file in multiple byte arrays and process each of those as a MemoryStream and send that to the GZipStream which I can then write out to a file or process. The code for this looks like this


/// <summary>
/// Provides a workaround to decompressing gzip files that are concatenated
/// </summary>
/// <param name="filePath">The path to the gzip file</param>
/// <returns>The decompressed byte content of the gzip file</returns>
private static async Task<byte[]> GUnzipConcatenatedFile(string filePath)
{
  //Get the bytes of the file
  byte[] FileBytes = File.ReadAllBytes(filePath);

  List<int> StartIndexes = new List<int>();

  /*
  * This pattern indicates the start of a GZip file as found from looking at the files
  * The file header is 10 bytes in size
  * 0-1 Signature 0x1F, 0x8B
  * 2 Compression Method - 0x08 is for DEFLATE, 0-7 are reserved
  * 3 Flags
  * 4-7 Last Modification Time
  * 8 Compression Flags
  * 9 Operating System
  */

  byte[] StartOfFilePattern = new byte[] { 0x1F, 0x8B, 0x08 };

  //This will limit the last byte we check to make sure it doesn't exceed the end of the file
  //If the file is 100 bytes and the file pattern is 10 bytes, the last byte we want to check is
  //90 -> i.e. we will check index 90, 91, 92, 93, 94, 95, 96, 97, 98, 99 and index 99 is the last
  //index in the file bytes
  int TraversableLength = FileBytes.Length - StartOfFilePattern.Length;

  for (int i = 0; i <= TraversableLength; i++)
  {
    bool Match = true;

    //Test the next run of characters to see if they match
    for (int j = 0; j < StartOfFilePattern.Length; j++)
    {
      //If the character doesn't match, break out
      //We're making sure that i + j doesn't exceed the length as part
      //of the loop bounds
      if (FileBytes[i + j] != StartOfFilePattern[j])
      {
        Match = false;
        break;
      }
    }

    //If we did find a run of
    if (Match == true)
    {
      StartIndexes.Add(i);
      i += StartOfFilePattern.Length;
    }
  }

  //In case the pattern doesn't match, just start from the beginning of the file
  if (!StartIndexes.Any())
  {
    StartIndexes.Add(0);
  }

  List<byte[]> Chunks = new List<byte[]>();

  for (int i = 0; i < StartIndexes.Count; i++)   
  {     
    int Start = StartIndexes.ElementAt(i);     
    int Length = 0;     
    
    if (i + 1 == StartIndexes.Count)     
    {       
      Length = FileBytes.Length - Start;     
    }     
    else     
    {       
      Length = StartIndexes.ElementAt(i + 1) - i;     
    }
    
    //Prevent adding an empty array, for example, if the pattern occured     
    //as the last 10 bytes of the file, there wouldn't be anything following     
    //it to represent data     if (Length > 0)
    {
      Chunks.Add(FileBytes.Skip(Start).Take(Length).ToArray());
    }
  }

  using (MemoryStream MStreamOut = new MemoryStream())
  {
    foreach (byte[] Chunk in Chunks)
    {
      using (MemoryStream MStream = new MemoryStream(Chunk))
      {
        using (GZipStream GZStream = new GZipStream(MStream, CompressionMode.Decompress))
        {
          await GZStream.CopyToAsync(MStreamOut);
        }
      }
    }

    return MStreamOut.ToArray();
  }
}

I also came up with a similar method to use in NodeJS because the native zlib library only reads the first stream as well.

var deferred = Q.defer();
//Path deals with files on the system
var path = require('path');

//Launch the native gunzip function from linux
//"-c" keeps the compressed version and writes the contents to stdout instead
var gunzip = require('child_process').spawn("gunzip", ["-c", path.normalize(zipPath)]);
var buffer = [];

var count = 1;

//Read the gzip contents and add them to the buffer, each data read is a stream in the gzip
gunzip.stdout.on("data", function (data) {
  console.log("Read stream #" + count++ + " from the gzip.");
  buffer.push(data.toString());
});

gunzip.stderr.on("data", function (data) {
  console.log("Error reading gzip file " + zipPath + " with error " + data.toString());
});

gunzip.on("error", function (err) {
  console.log("There was an error in gunzip " + err);
  deferred.reject(err);
});

gunzip.on("close", function (code) {
  console.log("GUNZIP Exit Code: " + code);

  if (code === 0) {
    //This joins all of the elements in the array
    buffer.join("");
    console.log("Log Key: " + key + "\nFile data:\n" + buffer.toString());

    //If the gzip was concatenated with multiple streams, there are several json objects but are not formatted as an array
    //This will parse the string and fix that
    console.log("Fixing up the JSON object.");

  try {
      var log = createJson(buffer.toString());
      console.log("Log Key: " + key + "\nFile data:\n" + log);
      deferred.resolve([log, key]);
    }
    catch (err) {
      console.log("Error creating the json: " + err);
      deferred.reject(err);
    }
  }
  else {
    console.log("Trying to read file as plain text.");
    fs.readFile(path.normalize(zipPath), 'utf8', function (err, data) {

    if (err) {
      console.log("Could not read as plain text with error " + err);
      deferred.reject(err);
    }
    else {
      console.log("Fixing up the JSON object from: " + data);

      try {
        var log = createJson(data);
        console.log("Log Key: " + key + "\nFile data:\n" + log);
        deferred.resolve([log, key]);
      }
      catch (err) {
        console.log("Error creating the json: " + err);
        deferred.reject(err);
      }
    }
  });
}

The remaining issue to handle is that the extracted log event contents aren’t valid JSON, you just have two JSON objects, so in the Javascript version you can see I have function, createJson, this brackets the objects in an array by appending “[” to the beginning and “]” to the end and putting a comma in between the two objects. That function is a totally separate discussion, but the need is there to convert your one, two, or more JSON objects written into a single or multiple streams into a valid JSON string that you can then parse later.

Hope this helps someone else when they run into the same problem!

One thought on “Decompressing Concatenated GZIP Files in C# – Received From AWS CloudWatch Logs

  1. Mike, you are an absolute lifesaver!

    I’m writing a simple program to process Apache raw access logs to find rogue IP addresses and the default GZipStream seemed to be truncating the log files.

    I doubt I would have figured it out without your help.

    Many thanks

    Dn

Leave a comment