In dem Beitrag davor (Azure PowerShell Serie: Run Hive Scripts) im Rahmen unserer Azure PowerShell Serie sind wir ein PowerShell-Skript durchgegangen, um schön angenehm Hive-Skripte von PowerShell aus in einem HDInsight-Cluster auszuführen.

Wie sieht es denn in einem konkreten Fall aus? Nehmen wir einen Schritt aus der Blog-Serie Big Data Twitter Demo. Zur Erinnerung: Tweets haben wir in Echtzeit mittels StreamInsight extrahiert und deren Rohdaten im Windows Azure Blob Stroage gespeichert. Mithilfe eines HDInsight-Clusters haben wir daraufhin Hive-Skripte ausgeführt und weitere Hive-Tabellen erstellt, um im Nachhinein eine schnelle Importierung der Hive-Tabellen in Excel und den Gebrauch der mächtigen Power BI Tools zu ermöglichen: Hive-Analyse starten.

Hier ist der dazugehörige PowerShell-Skript, um die lokal gespeicherten Hive-Skripte per PowerShell auf den neu erstellten HDInsight-Cluster auszuführen:

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101

#############################################
# HiveQL on Tweets in HDInsight-Cluster


#############################################
# 0. Azure Account Details

Add-AzureAccount
$subName = "Internal Consumption"
Select-AzureSubscription -SubscriptionName $subName

# Azure account details automatically set
$subID = Get-AzureSubscription -Current | %{ $_.SubscriptionId } 


##################################################################
# 1. Input information


$clusterName = "<HDInsightClusterName>"
$location = "<DatacenterLocation>" #e.g. North Europe, West Europe, etc.
$numNodes = 1 #start small
$storageAccount = "<StorageAccountName>"
$defaultContainer = "<StorageContainerName>"

# Variables automatically set for you
$storageKey = Get-AzureStorageKey $storageAccount | %{ $_.Primary } 
$storageContext = New-AzureStorageContext -StorageAccountName $storageAccount
 `
   
-StorageAccountKey 
$storageKey
$fullStorage
 = "${storageAccount}.blob.core.windows.net"


# local HiveQL scripts
$localFileNames = "1_create-tweet-input", "2_create-tweet-details", "3_insert-tweet-details", "4_create-others"
$localFolder = "C:\<scriptFilesPath>"
$blobFolder = "scripts"


##################################################################
# 2. Upload HiveQL scripts from local to Azure Blob Storage


foreach ($item in $localFileNames
){
   
$hqlLocalFile = "$localFolder\$item.hql"
    $hqlBlobName = "$blobFolder/$item.hql"

    Write-Host "Copying $hqlLocalFile to $hqlBlobName" -BackgroundColor Green

    # Copy the file from local workstation to WASB
    Set-AzureStorageBlobContent -File $hqlLocalFile -Container $defaultContainer
 `
       
-Blob $hqlBlobName -Context $storageContext
}



##################################################################
# 3. Create HDInsight Cluster and configure it


$clusterCreds = Get-Credential -Message "New admin account to be created for your HDInsight cluster"

# Simple create
New-AzureHDInsightCluster -Name $clusterName -Subscription $subID
 `
   
-Location $location -DefaultStorageAccountName $storageAccount
 `
   
-DefaultStorageAccountKey $storageKey
 `
   
-DefaultStorageContainerName $defaultContainer
 `
   
-Credential $clusterCreds -ClusterSizeInNodes $numNodes



##################################################################
# 4. Execute HiveQL script


foreach ($item in $localFileNames
){

   
# set script
    $hqlScriptFile = "wasb://$defaultContainer@$storageAccount.blob.core.windows.net/$blobFolder/$item.hql"

    Write-Host "Invoking HiveQL script $item" -BackgroundColor Green

    Use-AzureHDInsightCluster $clusterName

    # execute HiveQL script
    Invoke-Hive -File $hqlScriptFile
}



##################################################################
# 5. OPTIONAL: Clean up

# Remove HDInsight cluster

Remove-AzureHDInsightCluster -Name $clusterName

# OPTIONAL: remove scripts
foreach ($item in $localFileNames
){
   
$hqlLocalFile = "$localFolder\$item.hql"
    $hqlBlobName = "$blobFolder/$item.hql"

    Write-Host "Removing $hqlBlobName" -BackgroundColor Green

    # Remove the file from WASB
    Remove-AzureStorageBlob -Blob $hqlBlobName -Container $defaultContainer
 `
       
-Context $storageContext
}
 

Wie wir im Abschnitt 1 sehen können, haben wir bereits Hive-Skripte erstellt und lokal abgespeichert. Die Hive-Skripte sind auch hier zu finden:

--1_create-tweet-input.hql
DROP TABLE tweet_input;
CREATE EXTERNAL TABLE tweet_input (tweet string) PARTITIONED BY (date string);

ALTER TABLE tweet_input ADD PARTITION (date = '2013-12-09') LOCATION '/twitter/2013-12-09';
ALTER TABLE tweet_input ADD PARTITION (date = '2013-12-10') LOCATION '/twitter/2013-12-10';
ALTER TABLE tweet_input ADD PARTITION (date = '2013-12-11') LOCATION '/twitter/2013-12-11';
ALTER TABLE tweet_input ADD PARTITION (date = '2013-12-12') LOCATION '/twitter/2013-12-12';
ALTER TABLE tweet_input ADD PARTITION (date = '2013-12-13') LOCATION '/twitter/2013-12-13';
--ALTER TABLE tweet_input ADD PARTITION (date = '2013-11-05') LOCATION '/twitter/2013-11-05';
--ALTER TABLE tweet_input ADD PARTITION (date = '2013-11-06') LOCATION '/twitter/2013-11-06';
--ALTER TABLE tweet_input ADD PARTITION (date = '2013-11-07') LOCATION '/twitter/2013-11-07';
--ALTER TABLE tweet_input ADD PARTITION (date = '2013-11-08') LOCATION '/twitter/2013-11-08';

 

--2_create-tweet-details.hql
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

drop table tweet_details;
create table tweet_details
(
    id bigint,
    id_str string,
    created_at string,
    created_at_date string,
    created_at_year string,
    created_at_month string,
    created_at_day string,
    created_at_time string,
    in_reply_to_user_id_str string,
    text string,
    contributors string,
    is_a_retweet boolean,
    truncated string,
    coordinates string,
    source string,
    retweet_count int,
    url string,
    hashtags array<string>,
    user_mentions array<string>,
    first_hashtag string,
    first_user_mention string,
    screen_name string,
    name string,
    followers_count int,
    listed_count int,
    friends_count int,
    lang string,
    user_location string,
    time_zone string,
    profile_image_url string
)
partitioned by (partition_key string)
STORED AS SEQUENCEFILE;

 

--3_insert-tweet-details.hql
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;


insert overwrite table tweet_details
partition (partition_key)
select
    cast(get_json_object(tweet, '$.id_str') as bigint) as id,        
    get_json_object(tweet, '$.id_str') as id_str,        
    get_json_object(tweet, '$.created_at') as created_at,
    concat(substr (get_json_object(tweet, '$.created_at'),1,10),' ',
        substr (get_json_object(tweet, '$.created_at'),27,4)) as created_at_date,
    substr (get_json_object(tweet, '$.created_at'),27,4) as created_at_year,
    case substr (get_json_object(tweet, '$.created_at'),5,3)
        when "Jan" then "01"
        when "Feb" then "02"
        when "Mar" then "03"
        when "Apr" then "04"
        when "May" then "05"
        when "Jun" then "06"
        when "Jul" then "07"
        when "Aug" then "08"
        when "Sep" then "09"
        when "Oct" then "10"
        when "Nov" then "11"
        when "Dec" then "12" end as created_at_month,
    substr (get_json_object(tweet, '$.created_at'),9,2) as created_at_day,
    substr (get_json_object(tweet, '$.created_at'),12,8) as created_at_time,
    get_json_object(tweet, '$.in_reply_to_user_id_str') as in_reply_to_user_id_str,
    get_json_object(tweet, '$.text') as text,
    get_json_object(tweet, '$.contributors') as contributors,
    (cast (get_json_object(tweet, '$.retweet_count') as int) != 0) as is_a_retweet,
    get_json_object(tweet, '$.truncated') as truncated,
    get_json_object(tweet, '$.coordinates') as coordinates,
    get_json_object(tweet, '$.source') as source,
    cast (get_json_object(tweet, '$.retweet_count') as int) as retweet_count,
    get_json_object(tweet, '$.entities.display_url') as url,
    array(    
        trim(lower(get_json_object(tweet, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(tweet, '$.entities.hashtags[1].text'))),
        trim(lower(get_json_object(tweet, '$.entities.hashtags[2].text'))),
        trim(lower(get_json_object(tweet, '$.entities.hashtags[3].text'))),
        trim(lower(get_json_object(tweet, '$.entities.hashtags[4].text')))) as hashtags,
    array(
        trim(lower(get_json_object(tweet, '$.entities.user_mentions[0].screen_name'))),
        trim(lower(get_json_object(tweet, '$.entities.user_mentions[1].screen_name'))),
        trim(lower(get_json_object(tweet, '$.entities.user_mentions[2].screen_name'))),
        trim(lower(get_json_object(tweet, '$.entities.user_mentions[3].screen_name'))),
        trim(lower(get_json_object(tweet, '$.entities.user_mentions[4].screen_name')))) as user_mentions,
    trim(lower(get_json_object(tweet, '$.entities.hashtags[0].text'))) as first_hashtag,
    trim(lower(get_json_object(tweet, '$.entities.user_mentions[0].screen_name'))) as first_user_mention,
    get_json_object(tweet, '$.user.screen_name') as screen_name,
    get_json_object(tweet, '$.user.name') as name,
    cast (get_json_object(tweet, '$.user.followers_count') as int) as followers_count,
    cast (get_json_object(tweet, '$.user.listed_count') as int) as listed_count,
    cast (get_json_object(tweet, '$.user.friends_count') as int) as friends_count,
    get_json_object(tweet, '$.user.lang') as lang,
    get_json_object(tweet, '$.user.location') as user_location,
    get_json_object(tweet, '$.user.time_zone') as time_zone,
    get_json_object(tweet, '$.user.profile_image_url') as profile_image_url,
    concat(substr (get_json_object(tweet, '$.created_at'),1,10),' ',
        substr (get_json_object(tweet, '$.created_at'),27,4)) as partition_key

from tweet_input
where (length(tweet) > 500);
--4_create-others.hql

--
-- Geo-Koordinaten
--


drop table tweet_coordinates;
create table tweet_coordinates 
( 
    id_str string,
    coordinate_type string, 
    longitude float,
    latitude float
)
STORED AS SEQUENCEFILE;


insert overwrite table tweet_coordinates
select 
    id_str,
    get_json_object(coordinates, '$.type') as type, 
    get_json_object(coordinates, '$.coordinates[0]') as longitude, 
    get_json_object(coordinates, '$.coordinates[1]') as latitude 
from tweet_details

where get_json_object(coordinates, '$.coordinates[1]') is not null;


-- SELECT * FROM tweet_coordinates limit 1;




--
-- HASHTAGS
--


drop table tweet_hashtags;

create table tweet_hashtags
( 
    id_str string,
    hashtag string
)
partitioned by (tag_index int)
STORED AS SEQUENCEFILE;


insert overwrite table tweet_hashtags partition(tag_index=0)
select id_str, hashtags[0] as hashtag 
from tweet_details where hashtags[0] is not null;

insert overwrite table tweet_hashtags partition(tag_index=1)
select id_str, hashtags[0] as hashtag 
from tweet_details where hashtags[1] is not null;

insert overwrite table tweet_hashtags partition(tag_index=2)
select id_str, hashtags[0] as hashtag 
from tweet_details where hashtags[2] is not null;

insert overwrite table tweet_hashtags partition(tag_index=3)
select id_str, hashtags[0] as hashtag 
from tweet_details where hashtags[3] is not null;

insert overwrite table tweet_hashtags partition(tag_index=4)
select id_str, hashtags[0] as hashtag 
from tweet_details where hashtags[4] is not null;

-- Test
-- select hashtag, count(*) as hashtag_count 
-- from 
-- tweet_hashtags 
-- group by hashtag 
-- order by hashtag_count;



--
-- USER MENTIONS
--

drop table tweet_user_mentions;

create table tweet_user_mentions
( 
    id_str string,
    user_mention string
)
partitioned by (mention_index int)
STORED AS SEQUENCEFILE;


insert overwrite table tweet_user_mentions partition(mention_index=0)
select id_str, user_mentions[0] as user_mention 
from tweet_details where user_mentions[0] is not null
limit 100;

insert overwrite table tweet_user_mentions partition(mention_index=1)
select id_str, user_mentions[0] as user_mention 
from tweet_details where user_mentions[1] is not null
limit 100;

insert overwrite table tweet_user_mentions partition(mention_index=2)
select id_str, user_mentions[0] as user_mention 
from tweet_details where user_mentions[2] is not null
limit 100;

insert overwrite table tweet_user_mentions partition(mention_index=3)
select id_str, user_mentions[0] as user_mention 
from tweet_details where user_mentions[3] is not null
limit 100;

insert overwrite table tweet_user_mentions partition(mention_index=4)
select id_str, user_mentions[0] as user_mention 
from tweet_details where user_mentions[4] is not null
limit 100;


-- Test
-- select user_mention, count(*) as user_mention_count 
-- from tweet_user_mentions 
-- group by user_mention 
-- order by user_mention_count;


--
-- Tweeter
--


drop table tweeter;

create table tweeter 
( 
    screen_name string, 
    max_followers int, 
    max_friends int
);


insert overwrite table tweeter
select MIN(screen_name), max(followers_count) max_followers, max(friends_count) max_friends
from tweet_details 
group by LOWER(screen_name);